估计运维年前没有祭拜服务器,Nginx的问题修复了,Kafka又不行了。今天,原本想再睡会,结果,电话又响了。仍是运营,“喂,冰河,到公司了吗?赶忙看看服务器吧,又出问题了“。“在路上了,运维那哥们儿还没上班吗”? “还在休假。。。”, 我:“。。。”。哎,这哥们儿是跑路了吗?先无论他,问题仍是要解决。java
到公司后,放下我专用的双肩包,拿出个人利器——笔记本电脑,打开后迅速登陆监控系统,发现主要业务系统没啥问题。一个非核心服务发出了告警,而且监控系统中显示这个服务频繁的抛出以下异常。spring
2021-02-28 22:03:05 131 pool-7-thread-3 ERROR [] - commit failed org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na] at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
从上面输出的异常信息,大概能够判断出系统出现的问题:Kafka消费者在处理完一批poll消息后,在同步提交偏移量给broker时报错了。大概就是由于当前消费者线程的分区被broker给回收了,由于Kafka认为这个消费者挂掉了,咱们能够从下面的输出信息中能够看出这一点。apache
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Kafka内部触发了Rebalance机制,明确了问题,接下来,咱们就开始分析问题了。编程
既然Kafka触发了Rebalance机制,那我就来讲说Kafka触发Rebalance的时机。bash
举个具体点的例子,好比某个分组下有10个Consumer实例,这个分组订阅了一个50个分区的主题。正常状况下,Kafka会为每一个消费者分配5个分区。这个分配的过程就是Rebalance。服务器
当Kafka中知足以下条件时,会触发Rebalance:微信
后面两种状况咱们能够人为的避免,在实际工做过程当中,对于Kafka发生Rebalance最多见的缘由是消费组成员的变化。网络
消费者成员正常的添加和停掉致使Rebalance,这种状况没法避免,可是时在某些状况下,Consumer 实例会被 Coordinator 错误地认为 “已中止” 从而被“踢出”Group,致使Rebalance。session
当 Consumer Group 完成 Rebalance 以后,每一个 Consumer 实例都会按期地向 Coordinator 发送心跳请求,代表它还存活着。若是某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经 “死” 了,从而将其从 Group 中移除,而后开启新一轮 Rebalance。这个时间能够经过Consumer 端的参数 session.timeout.ms
进行配置。默认值是 10 秒。架构
除了这个参数,Consumer 还提供了一个控制发送心跳请求频率的参数,就是 heartbeat.interval.ms
。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是可以更加快速地知晓当前是否开启 Rebalance,由于,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED
标志封装进心跳请求的响应体中。
除了以上两个参数,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms
参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示 Consumer 程序若是在 5 分钟以内没法消费完 poll 方法返回的消息,那么 Consumer 会主动发起 “离开组” 的请求,Coordinator 也会开启新一轮 Rebalance。
经过上面的分析,咱们能够看一下那些rebalance是能够避免的:
第一类非必要 Rebalance 是由于未能及时发送心跳,致使 Consumer 被 “踢出”Group 而引起的。这种状况下咱们能够设置 session.timeout.ms 和 heartbeat.interval.ms 的值,来尽可能避免rebalance的出现。(如下的配置是在网上找到的最佳实践,暂时还没测试过)
session.timeout.ms >= 3 * heartbeat.interval.ms
。将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 可以更快地定位已经挂掉的 Consumer,早日把它们踢出 Group。
第二类非必要 Rebalance 是 Consumer 消费时间过长致使的。此时,max.poll.interval.ms
参数值的设置显得尤其关键。若是要避免非预期的 Rebalance,最好将该参数值设置得大一点,比下游最大处理时间稍长一点。
总之,要为业务处理逻辑留下充足的时间。这样,Consumer 就不会由于处理这些消息的时间太长而引起 Rebalance 。
kafka
的偏移量(offset
)是由消费者进行管理的,偏移量有两种,拉取偏移量
(position)与提交偏移量
(committed)。拉取偏移量表明当前消费者分区消费进度。每次消息消费后,须要提交偏移量。在提交偏移量时,kafka
会使用拉取偏移量
的值做为分区的提交偏移量
发送给协调者。
若是没有提交偏移量,下一次消费者从新与broker链接后,会从当前消费者group已提交到broker的偏移量处开始消费。
因此,问题就在这里,当咱们处理消息时间太长时,已经被broker剔除,提交偏移量又会报错。因此拉取偏移量没有提交到broker,分区又rebalance。下一次从新分配分区时,消费者会从最新的已提交偏移量处开始消费。这里就出现了重复消费的问题。
其实,说了这么多,Kafka消费者输出的异常日志中也给出了相应的解决方案。
接下来,咱们说说Kafka中的拉取偏移量和提交偏移量。
其实,从输出的日志信息中,也大概给出了解决问题的方式,简单点来讲,就是能够经过增长 max.poll.interval.ms
时长和 session.timeout.ms
时长,减小 max.poll.records
的配置值,而且消费端在处理完消息时要及时提交偏移量。
经过以前的分析,咱们应该知道如何解决这个问题了。这里须要说一下的是,我在集成Kafka的时候,使用的是SpringBoot和Kafka消费监听器,消费端的主要代码结构以下所示。
@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory") public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){ logger.info("topic is {}, offset is {}, value is {} n", record.topic(), record.offset(), record.value()); try { Object value = record.value(); logger.info(value.toString()); ack.acknowledge(); } catch (Exception e) { logger.error("日志消费端异常: {}", e); } }
上述代码逻辑比较简单,就是获取到Kafka中的消息后直接打印输出到日志文件中。
这里,我先根据异常日志的提示信息进行配置,因此,我在SpringBoot的application.yml文件中新增了以下配置信息。
spring: kafka: consumer: properties: max.poll.interval.ms: 3600000 max.poll.records: 50 session.timeout.ms: 60000 heartbeat.interval.ms: 3000
配置完成后,再次测试消费者逻辑,发现仍是抛出Rebalance异常。
咱们从另外一个角度来看下Kafka消费者所产生的问题:一个Consumer在生产消息,另外一个Consumer在消费它的消息,它们不能在同一个groupId 下面,更改其中一个的groupId 便可。
这里,咱们的业务项目是分模块和子系统进行开发的,例如模块A在生产消息,模块B消费模块A生产的消息。此时,修改配置参数,例如 session.timeout.ms: 60000
,根本不起做用,仍是抛出Rebalance
异常。
此时,我尝试修改下消费者分组的groupId,将下面的代码
@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory") public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){
修改成以下所示的代码。
@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer-logs", containerFactory = "kafkaListenerContainerFactory") public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){
再次测试,问题解决~~
此次解决的问题真是个奇葩啊!!接下来写个【Kafka系列】专题,详细介绍Kafka的原理、源码解析和实战等内容,小伙伴们大家以为呢?欢迎文末留言讨论~~
好了,今天就到这儿吧,我是冰河,你们有啥问题能够在下方留言,也能够加我微信:sun_shine_lyz,我拉你进群,一块儿交流技术,一块儿进阶,一块儿牛逼~~