致使kafka的重复消费问题缘由在于,已经消费了数据,可是offset没来得及提交(好比Kafka没有或者不知道该数据已经被消费)。
总结如下场景致使Kakfa重复消费:java
缘由1:强行kill线程,致使消费后的数据,offset没有提交(消费系统宕机、重启等)。
缘由2:设置offset为自动提交,关闭kafka时,若是在close以前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。
例如:spring
try { consumer.unsubscribe(); } catch (Exception e) { } try { consumer.close(); } catch (Exception e) { }
上面代码会致使部分offset没提交,下次启动时会重复消费。apache
解决方法:设置offset自动提交为falsebootstrap
整合了Spring配置的修改以下配置
spring配置:session
spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=latest
整合了API方式的修改enable.auto.commit为false
API配置:并发
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false");
一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的全部消息。从顺序上来讲,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,所以它能保证不出现消费丢失的状况。oop
缘由3:(重复消费最多见的缘由):消费后的数据,当offset尚未提交时,partition就断开链接。好比,一般会遇到消费的数据,处理很耗时,致使超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有必定概率offset没提交,会致使重平衡后重复消费。 this
缘由4:当消费者从新分配partition的时候,可能出现从头开始消费的状况,致使重发问题。 线程
缘由5:当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,致使心跳机制检测报告出问题。日志
缘由6:并发很大,可能在规定的时间(session.time.out默认30s)内没有消费完,就会可能致使reblance重平衡,致使一部分offset自动提交失败,而后重平衡后重复消费
问题描述:
咱们系统压测过程当中出现下面问题:异常rebalance,并且平均间隔3到5分钟就会触发rebalance,分析日志发现比较严重。错误日志以下:
08-09 11:01:11 131 pool-7-thread-3 ERROR [] - commit failed org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na] at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
这个错误的意思是,消费者在处理完一批poll的消息后,在同步提交偏移量给broker时报的错。初步分析日志是因为当前消费者线程消费的分区已经被broker给回收了,由于kafka认为这个消费者死了,那么为何呢?
问题分析:
这里就涉及到问题是消费者在建立时会有一个属性max.poll.interval.ms(默认间隔时间为300s),
该属性意思为kafka消费者在每一轮poll()调用之间的最大延迟,消费者在获取更多记录以前能够空闲的时间量的上限。若是此超时时间期满以前poll()没有被再次调用,则消费者被视为失败,而且分组将从新平衡,以便将分区从新分配给别的成员。
由于offset此时已经不许确,生产环境不能直接去修改offset偏移量。
因此从新指定了一个消费组(group.id=order_consumer_group),而后指定auto-offset-reset=latest这样我就只须要重启个人服务了,而不须要动kafka和zookeeper了!
#consumer spring.kafka.consumer.group-id=order_consumer_group spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=latest
注:若是你想要消费者从头开始消费某个topic的全量数据,能够从新指定一个全新的group.id=new_group,而后指定auto-offset-reset=earliest便可