在消费Kafka中分区的数据时,咱们须要跟踪哪些消息是读取过的、哪些是没有读取过的。这是读取消息不丢失的关键所在。数据库
Kafka是经过offset顺序读取事件的。若是一个消费者退出,再重启的时候,它知道从哪儿继续读取消息进行处理。因此,消费者须要「提交」属于它们本身的偏移量。若是消费者已经提交了偏移量,但消息没有获得有效处理,此时就会形成消费者消息丢失。因此,咱们应该重视偏移量提交的时间点以及提交的方式。缓存
一、group.id性能
二、auto.offset.reset线程
三、enable.auto.commit事件
四、auto.commit.interval.ms事务
若是咱们但愿可以更有效地控制偏移量提交的时间点,就须要显示地提交偏移量。同步
一、老是在处理完事件后再提交偏移量it
若是全部的处理都是在轮询里完成,无需在轮询之间维护状态,那么可使用自动提交,或者在轮询结束后进行手动提交。ast
二、提交频率是性能和重复消息数量之间的权衡test
这个意思是:提交频率越高,重复消息处理的数量越少,性能也是比较低的。提交频率越低,重复消息处理的数量越多,性能是比较好的。因此,要根据实际的状况,来衡量在什么时机,来提交偏移量。即便是在最简单的场景你,也须要在一个循环中屡次提交偏移量。
三、确保对提交的偏移量内心有数
必定要在处理完消息后,再提交偏移量,不然会出现某些消息会被处理。
四、消费者可能须要重试
但处理消息出现问题时,例如:把Kafka中的数据写入到HBase中,此时HBase临时不可用。咱们想要重试。假设这条消息是:#30,#30处理失败了。那你们想一想?#31能提交吗?
显然是不能的,若是#31提交了,那么#31以前的全部数据,都不会被处理了。咱们可使用如下几种模式来处理:
模式一
① 但遇到可重试错误时,提交最后一个处理成功的偏移量
② 把没有处理好的消息保存到缓冲区
③ 调用 pause() 方法,确保其余的轮询不会返回数据
④ 尝试从新处理缓存中的数据,若是重试成功,或者重试次数达到上限并决定放弃,把错误记录下来并丢弃消息
⑤ 调用 resume() 方法让消费者继续从轮询里获取新数据
模式二
① 遇到可重试错误时,把错误写入一个独立的主题,而后继续
② 用一个独立的消费者组负责从该主题上读取错误消息,并进行重试
五、长时间处理
有时候要进行比较复杂的处理,暂停轮询的时间不能超过几秒钟。要保持轮询,由于只有在轮询过程当中,才能往broker发送心跳。可使用一个线程池来处理数据,可让轮询不获取新的数据,直到工做县好吃呢个处理完成。消费者一直保持轮询,心跳正常,就不会发生再均衡。
八、仅一次传递
有的程序不只是须要“至少一次”(at least-once语义)(意味着没有数据丢失),还须要仅一次(exactly-once)语义。实现一次性语义,最经常使用的办法就是把结果写入到一个支持惟一键的系统里,好比:k-v存储、关系数据库、ES或者其余数据存储。可使用主题、分区和偏移量来做为主键,这样,能够碰巧读取到同一个相同的消息,直接覆盖写入就能够了。这种称为幂等性写入。
还有一种,就是使用关系型数据库,HDFS中一些被定义过的原子操做也常常用来达到相同的目的。把消息和偏移量放在同一个事务里,这样让它们保持同步。消费者启动,获取最近处理过的偏移量,调用seek()方法从偏移量位置继续读取数据
参考文件:
「Kafka权威指南」