最近遇到一个kafka方面的问题,大体就是因为consumer处理业务超时,致使没法正常提交Offset,进而致使没法消费新消息的问题。下面我想从如下几个方面对这次故障排查进行复盘分析:业务背景、问题描述、排查思路、经验教训。spring
先简单描述一下业务背景吧。咱们有个业务须要严格按顺序消费Topic消息,因此针对该topic设置了惟一的partition,以及惟一的副本。当同一个消费组的多个consumer启动时,只会有一个consumer订阅到该Topic,进行消费,保证同一个消费组内的消费顺序。
注:消费组的groupId名称为“smart-building-consumer-group”,订阅的Topic名称为“gate_contact_modify”。apache
有一天咱们忽然收到一个问题反馈:producer侧的业务产生消息后,consumer侧并无获得预期的结果。通过排查,排除了业务逻辑出现问题的可能性,咱们判断最有多是由于kafka消息没有被消费到。为了印证这个猜想,咱们查看了consumer消费日志,发现日志中存在这样几处问题:
(1)日志偶尔会打印出一条Kafka的警告日志,内容为:org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.maybeAutoCommitOffsetsSync:648 - Auto-commit of offsets {gate_contact_modify-0=OffsetAndMetadata{offset=2801, metadata=''}} failed for group smart-building-consumer-group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
(2)接着进行了一次rebalance;
(3)consumer侧输出了Topic消费者的业务日志,代表正常获取到了Topic消息。
接着咱们查看kafka 消费组中该Topic对应的Offset的变化状况,发现Offset一直没有变化。session
日志中的异常信息很明确的告知咱们,topic消息消费完成后,因为group发生了一次rebalance,致使Commit没有被提交,这代表两次poll消息的间隔时间超过了max.poll.interval.ms定义的最大间隔,这也意味着一次poll后处理消息的过程超时了,正是因为poll间隔时间超时,致使了一次rebalance。同时建议咱们要么增长间隔时间,要么减小每次拉取的最大消息数。
另外,因为Commit没有被提交,致使OffSet值没有变化,那么每次拉取到的消息都是同一批重复消息。具体的异常流程以下图:ide
根据上述信息,咱们进一步检查了consumer的max.poll.records配置、max.poll.interval.ms配置,并统计了每条Topic消息的处理耗时,发现max.poll.records使用了默认配置值500,max.poll.interval.ms使用了默认配置值为300s,而每条Topic消息的处理耗时为10S。这进一步证明了咱们的推论:
因为每次拉取的消息数太多,而每条消息处理时间又较长,致使每次消息处理时间超过了拉取时间间隔,从而使得group进行了一次rebalance,致使commit失败,并最终致使下次拉取重复的消息、继续处理超时,进入一个死循环状态。
知道问题根源后,咱们结合业务特色,更改了max.poll.records=1,每次仅拉取一条消息进行处理,最终解决了这个问题。oop
此次故障排查,使咱们对Kafka消息poll机制、rebalance和commit之间的相互影响等有了更深的理解。
(1)kafka每次poll能够指定批量消息数,以提升消费效率,但批量的大小要结合poll间隔超时时间和每条消息的处理时间进行权衡;
(2)一旦两次poll的间隔时间超过阈值,group会认为当前consumer可能存在故障点,会触发一次rebalance,从新分配Topic的partition;
(3)若是在commit以前进行了一次rebalance,那么本次commit将会失败,下次poll会拉取到旧的数据(重复消费),所以要保证好消息处理的幂等性;ui