https://blog.csdn.net/varyall/article/details/79220745

https://www.cnblogs.com/xiohao/p/12774526.html

参数 值类型 说明 有效值
–topic string 被消费的topic
–whitelist string 正则表达式,指定要包含以供使用的主题的白名单
–partition integer 指定分区
除非指定’–offset’,否则从分区结束(latest)开始消费
–offset string 执行消费的起始offset位置
默认值:latest latest
earliest
<offset>
–consumer-property string 将用户定义的属性以key=value的形式传递给使用者
–consumer.config string 消费者配置属性文件
请注意,[consumer-property]优先于此配置
–formatter string 用于格式化kafka消息以供显示的类的名称
默认值:kafka.tools.DefaultMessageFormatter kafka.tools.DefaultMessageFormatter
kafka.tools.LoggingMessageFormatter
kafka.tools.NoOpMessageFormatter
kafka.tools.ChecksumMessageFormatter
–property string 初始化消息格式化程序的属性 print.timestamp=true|false
print.key=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
key.deserializer=<key.deserializer>
value.deserializer=<value.deserializer>
–from-beginning 从存在的最早消息开始,而不是从最新消息开始
–max-messages integer 消费的最大数据量,若不指定,则持续消费下去
–timeout-ms integer 在指定时间间隔内没有消息可用时退出
–skip-message-on-error 如果处理消息时出错,请跳过它而不是暂停
–bootstrap-server string 必需(除非使用旧版本的消费者),要连接的服务器
–key-deserializer string
–value-deserializer string
–enable-systest-events 除记录消费的消息外,还记录消费者的生命周期
(用于系统测试)
–isolation-level string 设置为read_committed以过滤掉未提交的事务性消息
设置为read_uncommitted以读取所有消息
默认值:read_uncommitted
–group string 指定消费者所属组的ID
–blacklist string 要从消费中排除的主题黑名单
–csv-reporter-enabled 如果设置,将启用csv metrics报告器
–delete-consumer-offsets 如果指定,则启动时删除zookeeper中的消费者信息
–metrics-dir string 输出csv度量值
需与[csv-reporter-enable]配合使用
–zookeeper string 必需(仅当使用旧的使用者时)连接zookeeper的字符串。
可以给出多个URL以允许故障转移

————————————————
版权声明:本文为CSDN博主「Ernest.Wu」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_29116427/article/details/80206125

 

kafka auto.offset.reset latest earliest 详解

auto.offset.reset关乎kafka数据的读取,是一个非常重要的设置。常用的二个值是latest和earliest,默认是latest。

 

一,latest和earliest区别

1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

提交过offset,latest和earliest没有区别,但是在没有提交offset情况下,用latest直接会导致无法读取旧数据。

二,创建topic

  1. # bin/kafka-topics.sh –create –zookeeper bigserver1:2181,bigserver2:2181,testing:2181 –replication-factor 2 –partitions 3 –topic tank
  2. Created topic “tank”.  
  3. # bin/kafka-topics.sh –describe –zookeeper bigserver1:2181,bigserver2:2181,testing:2181 –topic tank
  4. Topic:tank PartitionCount:3 ReplicationFactor:2 Configs:
  5.  Topic: tank Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2
  6.  Topic: tank Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
  7.  Topic: tank Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1

三,生产数据和接收生产数据

  1. [root@bigserver1 kafka]# bin/kafka-console-producer.sh –broker-list bigserver1:9092,bigserver2:9092,testing:9092 –topic tank
  2. >1
  3. >2
  4. >3
  5. >4
  6. >5
  7. >6
  8. 。。。。。。。。。省略。。。。。。。。。
  9. [root@bigserver1 kafka]# bin/kafka-console-consumer.sh –bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 –topic tank –from-beginning
  10. 1
  11. 2
  12. 3
  13. 4
  14. 5
  15. 6
  16. 。。。。。。。。省略。。。。。。。。

四,测试代码

  1. object tank {
  2.     def main(args: Array[String]): Unit = {
  3.         val pros: Properties = new Properties  
  4.         pros.put(“bootstrap.servers”, “bigserver1:9092,bigserver2:9092,testing:9092”)  
  5.         /*分组由消费者决定,完全自定义,没有要求*/  
  6.         pros.put(“group.id”, “tank”)  
  7.         //设置为true 表示offset自动托管到kafka内部的一个特定名称为__consumer_offsets的topic  
  8.         pros.put(“enable.auto.commit”, “false”)  
  9.         pros.put(“auto.commit.interval.ms”, “1000”)  
  10.         pros.put(“max.poll.records”, “5”)  
  11.         pros.put(“session.timeout.ms”, “30000”)  
  12.         //只有当offset不存在的时候,才用latest或者earliest  
  13.         pros.put(“auto.offset.reset”, “latest”)  
  14.         pros.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)  
  15.         pros.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)  
  16.         val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](pros)  
  17.         /*这里填写主题名称*/  
  18.         consumer.subscribe(util.Arrays.asList(“tank”))  
  19.         val system = akka.actor.ActorSystem(“system”)  
  20.         system.scheduler.schedule(0 seconds, 30 seconds)(tankTest.saveData(args,consumer))
  21.     }
  22.     object tankTest {
  23.         def saveData(args: Array[String],consumer: KafkaConsumer[String,String]): Unit = {
  24.             val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(3))
  25.             if (!records.isEmpty) {  
  26.                 for (record <- records) {  
  27.                     if (record.value != null && !record.value.equals(“”)) {  
  28.                         myLog.syncLog(record.value + “\t准备开启消费者出列数据”, “kafka”, “get”)  
  29.                     }
  30.                 }
  31.                 consumer.commitSync()
  32.             }
  33.         }
  34.     }
  35. }

五,测试1,过程如下

1,查看offset

  1. # bin/kafka-consumer-groups.sh –bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 –group tank –describe
  2. Error: Consumer group ‘tank’ does not exist.  

在没有提交offset的情况,会报这个错误

2,latest模式运行,拉取不到数据

2019-04-28 16:22:55 INFO Fetcher:583 – [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-1 to offset 11.
2019-04-28 16:22:55 INFO Fetcher:583 – [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-0 to offset 11.
2019-04-28 16:22:55 INFO Fetcher:583 – [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-2 to offset 11.

3,再用kafka-console-producer.sh生产数据,latest是可以拉到的,并且是拉取最新的数据(程序运行以后的数据),以前提交的数据是拉取不到的。

4,查看offset不报错了

  1. # bin/kafka-consumer-groups.sh –bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 –group tank –describe
  2. Consumer group ‘tank’ has no active members.  
  3. TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID  
  4. tank            1          12              14              2               –               –               –
  5. tank            0          12              14              2               –               –               –
  6. tank            2          13              15              2               –               –               –

5,将auto.offset.reset设置成earliest,第一次生产的数据也取不到

在这里要注意:如果kafka只接收数据,从来没来消费过,程序一开始不要用latest,不然以前的数据就接收不到了。应当先earliest,然后二都都可以

六,测试2

1,重新创建topic,重复上面的第二,第三步

2,代码端先earliest,最早提交的数据是可以获取到的,再生产数据也是可以获取到的。

3,将auto.offset.reset设置成latest,再生产数据也是可以获取到的。

七,结论

虽然auto.offset.reset默认是latest,但是建议使用earliest。

发表评论

邮箱地址不会被公开。 必填项已用*标注