Kafka重复消费和丢失数据研究


Kafka重复消费缘由 redis

底层根本缘由:已经消费了数据,可是offset没提交。 服务器

缘由1:强行kill线程,致使消费后的数据,offset没有提交。 分布式

缘由2:设置offset为自动提交,关闭kafka时,若是在close以前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。例如: spa

try { 线程

consumer.unsubscribe(); 内存

} catch (Exception e) { kafka

} it

try { io

consumer.close(); 集群

} catch (Exception e) {

}

上面代码会致使部分offset没提交,下次启动时会重复消费。


Kafka Consumer丢失数据缘由

猜想:设置offset为自动定时提交,当offset被自动定时提交时,数据还在内存中未处理,此时恰好把线程kill掉,那么offset已经提交,可是数据未处理,致使这部份内存中的数据丢失。



 

记录offset和恢复offset的方案

 

理论上记录offset,下一个group consumer能够接着记录的offset位置继续消费。

offset记录方案:

每次消费时更新每一个topic+partition位置的offset在内存中,

Map<key, value>,key=topic+'-'+partition,value=offset

当调用关闭consumer线程时,把上面Map的offset数据记录到 文件中*(分布式集群可能要记录到redis中)。

 

下一次启动consumer,须要读取上一次的offset信息,方法是 以当前的topic+partition为key,从上次的Map中去寻找offset。

而后使用consumer.seek()方法指定到上次的offset位置。

 

说明:

一、该方案针对单台服务器比较简单,直接把offset记录到本地文件中便可,可是对于多台服务器集群,offset也要记录到同一个地方,而且须要作去重处理。

      若是线上程序是由多台服务器组成的集群,是否能够用一台服务器来支撑?应该能够,只是消费慢一点,没多大影响。


二、如何保证接着offset消费的数据正确性

为了确保consumer消费的数据必定是接着上一次consumer消费的数据,

consumer消费时,记录第一次取出的数据,将其offset和上次consumer最后消费的offset进行对比,若是相同则继续消费。若是不一样,则中止消费,检查缘由。

相关文章
相关标签/搜索