关于一次Kafka重复消费问题排查记录的闲谈

  前段时间上线一个新服务,咱们的运营在测试的时候,导入了一批数据,结果目标表里的数据是预期数量的2倍,有大量的重复数据,一开始我认为多是我没有过滤数据类型致使的,我所消费的数据是经过监听数据库的binlog解析后推送到Kafka的数据,我收到kafka消息通过反序列化获得多条数据库表的变更记录,每一条记录都有一个类型:INSERTUPDATEDELETE,其实也就是对应SQL的类型,我一开始并无判断这个类型,而是收到的全部数据都进行后续的处理,运营也说多是有UPDATE操做的,因而我加了过滤,只处理INSERT类型数据,改好以后想着确定没问题了,因而让运营再删掉原有数据从新导入一遍,次日再看结果。html

  然而次日,问题依旧......这下可难到我了,想了一会猜想会不会是前面的环节推送的数据就是重复的呢?可是也不能瞎猜,因而我在处理一条数据以前打印上offset,发版后开始观察日志,不一下子库里出现了重复数据了,我拿到重复的订单号以后去日志中搜索,结果,两条数据的offset是同样的?数据库

img

  看来这真是我消费的问题了,可是以前相似的项目也是一样的消费方式,历来没出现太重复消费呀,这就让我很是纳闷,因而找了几条重复数据,观察了一下插入时间,发现时间间隔还挺有规律,基本都是五分钟左右重复插入一次,在日志上发现重复消费是两台机器交替进行的,我所消费的Topic是只有一个分区的,因此只会有一台机器消费,从日志上看出来两台机器在交替消费,于是产生了重复消费,可是为何呢,因而再搜了下Exception关键词,发现了一个CommitFailedExceptionapache

img

  能够看出来,是当前消费端被踢出了消费组,随后offset提交失败,而后换了一台机器从新消费这个offset,致使了重复消费,可是为何被被剔除出消费组我仍然没法解释,由于日志上再也没其余错误信息了,好在后来又让我发现了点蛛丝马迹,那会恰好使用了tail -f命令,不经意间发现过了很久,接收到的数据offset都是同样的,一条kafka消息解压后有这么多?(后来在本地试了下,果真一条Kafka消息,反序列化后都有3000到9000条不等,这也太多了吧。。)通过一番Google以后注意到了Kafka的两个配置项:max.poll.interval.msmax.poll.records分别表明拉取消息的间隔时间和拉取的最大条数,个人配置是:缓存

max.poll.interval.ms = 600000 # 默认5分钟
max.poll.records = 20         # 这个是我最开始写的20条

  也就是说,最快要5分钟内处理完20个offset,不然将认为客户端不在消费了,也就产生了上面的异常,被踢出消费组,然后又commit失败。从打印的日志来看,这个时间明显不够,处理一个offset的消息都要好久,更别说最大20个了。知道缘由后,我改了参数:运维

max.poll.interval.ms = 1200000
max.poll.records = 1

  也就是一条十分钟的一个的下限速度,可是后来证实,这个时间依旧远远不够(最后这个时间已经改成了1个小时...),通过一段时间的观察,一条Kafka消息反序列化最多会有10000条数据,处理时间最长大约40多分钟,我最开始确实没有想到这个Topic中的一条消息包含的数据会这么多,致使了这一系列的问题,时间改成1小时后连续两天再没出现太重复消费offset的问题。性能

  可是可是,这个程序的处理速度慢,也是致使此问题的一个缘由,固然后来发现代码中,处理一条数据,平均查询与插入数据库的次数有五六十次!这固然快不起来啊,因而我在大部分的SQL查询部分使用WeakHashMap进行了数据缓存,由于这部分查询的数据基本是不会有变更的,极大的减小了数据库查询次数,处理速度提高了将近10倍!再后来,因为这个Topic只有一个partition,彻底没办法用到多台机器的性能,并且据运维反馈,这个TiDB binlog监听的中间件只支持一个分区发送, 因而我本身在程序中增长了一道转发,即消费到Kafka消息反序列化以后,将反序列化以后的数据先不作处理,直接一条一条转发至RocketMQ,这个过程是很是快的,rocketMQ再发送至各个机器上(也可新建一个多partition的Kafka Topic用于转发),这样就能充分利用集群的优点,进一步极大地提升处理速度,这一块说这么多其实偏题了,属于后续的一个优化。测试

  后来发现,其实每一次在消费端即将离开以前,都会有一条日志:fetch

img

  提示向服务端发送了离开消费组的请求,由于客户端poll操做已超时,并建议增大最大拉取间隔时间或者减少最大拉取数量(这个不行,我都改到1了 T_T )。优化

  而后在处理完一个offset提交的时候会提示请求失败,当前消费者再也不是此消费组的一部分。指针

img

  可是日志级别是INFO,确实不太容易发现,至此问题已彻底解决,缘由也彻底清楚了。

  总的来讲这次遇到的Kafka重复消费的缘由,第一是Kafka的消息太大(后来解析到最大有包含25万条数据的一条消息。。。这都是使用protobuf序列化的消息),可是这部分咱们没法变更,第二是一开始处理速度也比较慢,默认间隔时间彻底不够,综合致使频繁从新消费。

  通过这次问题对Kafka参数有了更深的一些认识,除上面两个用到的同时也是比较重要参数以外,还有请求的超时时间、会话超时时间、心跳检测事件、拉取消息的超时时间等,在本地Debug期间还遇到过一个老是拉取不到消息而后报空指针异常的问题,而后查到缘由是拉取Kafka消息超时了,一条消息可能有十几兆,那会恰好我电脑的网速很是慢,就超时了,后来加大了fetch.max.wait.msrequest.timeout.ms就没问题了。颇有意义的一次Kafka问题排查经历

附一个:Kafka Consumer配置官方文档

相关文章
相关标签/搜索