【问题处理】Spring Boot中kafka消息能力不足现象及问题解决

【本文首发于个人网址:liumoran.cn

kfaka中主要包含生产者与消费都对象,生产者往队列中推送消息,而消费者则从队列中读取并处理消息。生产者推送的过程较为简单,而对于消费者来说,情况复杂且容易受各种因素影响。

1. 消费过程

消费过程如下所示:

Created with Raphaël 2.2.0 开始 是否有消息? 读取消息 处理消息 提交offset 是否停止? 结束 等待 等待超时 yes no yes no

其中最关键的两点就是消息处理和提交offset这块。

分区与消费者
在这里插入图片描述
消费者在消费过程中,先调用服务器端接口注册消费者,然后服务器端会根据消费者情况及这个消费者所要消费的队列及其分区情况来分配分区给该消费者。需要注意的是分区与消费者是多对一的关系,也就是说,一个分区最多分配给一个消费都,但一个消费者可以消费多个分区的数据;这也导致一个限制:消费者的数目最多配置到它所消费的队列的分区数总和;当消费者数目比分区数还要大时是没有意义的。

那为什么要有这么一个限制?这应该和kafka所采用的机制有关系。当某个消费者读取一批消息时,服务端需要等待消费都处理完成并提交offset,表示当前已经处理到队列中的哪个消息了。如果可以多个消费者消费同一个分区,那么管理offset的工作将会变得极为复杂,而且需要的异常处理更多且出问题时排查难度非常复杂。

那这样又产生了另外一个问题,如果一个消费者消费不来某个分区的数据怎么办?其实结论非常简单:加分区和消费者呗!

消费者不断的监听对应的队列分区,当读取到消息时,即对读取到的消息进行处理,如进行数据加工后存关系型数据库等;处理完成后,提交offset到服务器端。

这个是正常的流程,对于多数情况来说当然没有问题。

但系统运行过程中最不缺少的就是异常的情况,如果事先不考虑好异常情况的处理措施,它所带来的结果往往是灾难性的。

2. 异常场景分析

设想一个场景,当一个分区的数据分配给某个消费者后,消费者出了某些故障down了,或者其它一些网络的原因,服务器长时间未收到消费者本该提交offset的请求;这时候服务端应该做什么处理?服务端显然不能一直等待,否则不仅仅是之前消费者读取的那批消息不会被处理,甚至会导致这个分区的数据也不会被处理;这种情况对于一个分布式系统来说是无法忍受的。因此,针对这种情况,kafka会在消息读取一定时间内,如果发现消费者还未提交offset时,就会将这批消息置为未处理,这样他们又可以继续被新的消费者消费了。

那如果是每条数据需要处理的时间过长,消费者消费能力有限而不能在指定时间内处理完成,会有什么结果?刚刚说到对于这种情况服务端会将这批消息置成未处理;当消费者处理完成后去提交offset时,会提示相关的失败消息。这时候这个消费都继续去读消息,就会重复读取到之前实际已经处理过的那批消息继续进行处理了。这就出现了一个死循环,消费者一直在消费同一批数据,但每次都提交不成功! 服务端对于这种情况有没有处理尚不清楚,但在Kafka的Java实现中,如果这种情况持续一定次数或者时间后,消费者将会从监听中退出,不再处理任何消息了。出现了这种情况,只能修改topic的名称,然后重启消费者重新注册监听。这种情况对于生产系统来说也是非常严重的问题!

那如何避免这种情况?

3. 消费能力不足问题处理

有以下几种方式可以用来处理消费者消费能力不足的问题:

  1. 调整kafka参数,减少每次读取的记录数(max.poll.records),调高超时时间(max.poll.interval.ms);
  2. 提高消费者处理能力,优化代码效率,或者使用多线程来处理消息。
  3. 增加消费者,如果消费者数目已经与分区数一致,那么就需要同时增加分区数和消费者数目了;

如果使用多线程来处理消息,控制上会较为复杂。我们启动多个线程来处理消费,可能会将数据加工后插入到数据库中,主线程必须等待所有线程执行成功后再提交offset,否则如果提前将offset提交,那么如果多线程处理失败,这批消息就没有办法被正常处理了。即使等待所有线程都执行完成也可能会有问题,某个线程在处理某条消息时可能失败,这个时候如何处理可能与具体业务相关,是忽略这条数据的错误还是整批数据都要被重新处理?如果忽略的话还比较简单,如果整批数据都要重新处理,那么就需要处理多个线程的事务或者其它业务回滚了。这个控制非常复杂,视具体的业务场景而定。

相对来说第1、3条是处理的最佳方式了。当然这些参数调到最优也必然是一个根据实际情况不断调整的过程。


个人主页:liumoran.cn