kafka笔记4(2)

提交和偏移量数据库

每次调用poll 方法,老是返回生产者写入Kafka但尚未被消费者读取过的记录咱们所以能够追踪到哪些记录时被群组里的哪一个消费者读取过的。安全

咱们把更新分区当前位置的操做叫作提交。服务器

那么消费者时如何提交偏移量的呢?消费者往一个叫作_consumer_offset的特殊主题发送消息,消息里包含每一个分区的偏移量。若是消费者一直处于运行状态,那么偏移量没有什么用处。不过若是消费者发生崩溃或者有新的消费者加入群组,就会触发在均衡,完成再均衡以后,每一个消费者可能分配到新的分区,而不是以前那一个,为了可以继续以前的工做,消费者须要读取每一个分区最后一次提交的偏移量,而后从偏移量指定的地方继续处理。异步

若是提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。线程

若是提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。事务

KafkaConsumer API提供了不少方式来提交偏移量。开发

自动提交it

最简单的方式时让消费者本身提交偏移量,若是enable.auto.commit被设为true,那么每过5s,消费者自动把从poll方法接收到的最大偏移量提交上去。提交间隔时间由auto.commit.interval.ms控制,默认值时5s,若是这个5s内发生了再均衡,那么就会有消息被重复处理。自动提交虽然方便,可是并无为开发者留有余地来避免重复处理消息。io

手动提交List

开发者能够在必要的时候提交当前偏移量,而不是基于时间间隔。把enable.auto.commit设为false,让应用程序决定什么时候提交偏移量。使用commitSync方法提交偏移量最简单也最可靠。这个API会提交由poll方法返回的最新偏移量,提交成功后立刻返回,若是提交失败就会抛出异常。

commitSync方法将会提交由poll方法返回的最新偏移量,因此处理完全部记录后要确保调用了commitSync,不然仍是会有丢失消息的风险。若是发生了再均衡,从最近一批消息到发生在均衡之间的全部消息都将会被重复处理。

异步提交

手动提交有一个不足之处,在broker对提交请求做出回应以前,应用程序会一直阻塞,这会影响应用程序的吞吐量,咱们能够经过下降提交频率来提高吞吐量,但若是发生了再均衡,会增长重复消息的数量。

这个时候能够考虑使用异步提交API,咱们只管发送提交请求,无需等待broker的响应。

在成功提交或碰到没法恢复的错误以前,commitSync方法会一直重试,可是commitAsync不会,他之因此不进行重试,是由于它在收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。

咱们之因此提到这个问题的复杂性和提交顺序的重要性,是由于commitAsync方法支持回调,在broker做出响应时会执行回调。回调常常被用于记录提交错误或者生成度量指标,不过若是用于重试,必定注意提交的顺序。

咱们可使用一个单调递增的序列号来维护异步提交的顺序,在每次提交偏移量以后或在回调里提交偏移量时递增序列号。在进行重试前,先检查回调的序列号和即将提交的偏移量是否相等,若是相等,说明没有新的提交,那么能够安全地进行重试,若是序列号比较大,说明有一个新的提交意见发送,应该中止重试。

提交特定的偏移量

提交偏移量的频率和处理消息的频率时同样的,但若是向频繁提交怎么办?

消费者API容许在调用commitSync和commitAsync方法时传进去但愿提交的分区和偏移量map。

再均衡监听器

在为消费者分配新分区或者移除旧分区时,能够经过消费者API执行一些应用程序代码,在调用subscribe方法时传进去一个ConsumerRebalanceListener实例就能够了,它有两个须要实现的方法:

onPartitionRevoked方法会在再均衡开始以前和消费者中止读取消息以后被调用。若是在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取。

onPartitionAssigned方法会在从新分配分区以后和消费者开始读取消息以前被调用。

从特定偏移量开始处理记录

有时候须要从特定的偏移量开始读取消息。若是想从分区的起始位置开始读取消息,或者直接跳到分区的末尾开始读取消息,可使用seekToBeginning方法和seekToEnd方法。

若是保存记录和偏移量能够在一个原子操做里完成,就能够避免重复处理。若是在同一个事务里把记录和偏移量都写到数据库里,那么咱们就会知道记录和偏移量要么都成功,要么都失败,而后从新处理记录。

如今的问题时:若是偏移量时保存在数据库里而不是Kafka里,那么消费者在获得新分区时怎么知道该从哪里开始读取?这个时候可使用seek方法,在消费者启动或分配到新分区时,可使用seek方法查找保存到数据库里的偏移量。如何退出

若是肯定要退出循环,须要经过另一个线程调用consumer.wakeup方法。

相关文章
相关标签/搜索