Kafka消费者之提交消息的偏移量

原文连接:https://cloud.tencent.com/developer/article/1462432javascript

1、概述

在新消费者客户端中,消费位移是存储在Kafka内部的主题 __consumer_offsets 中。把消费位移存储起来(持久化)的动做称为 “提交” ,消费者在消费完消息以后须要执行消费位移的提交java

参考下图的消费位移,x 表示某一次拉取操做中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么咱们就能够说消费者的消费位移为 x ,图中也用了 lastConsumedOffset 这个单词来标识它。git

不过须要很是明确的是,当前消费者须要提交的消费位移并非 x ,而是 x+1 ,对应上图中的 position ,它表示下一条须要拉取的消息的位置。github

KafkaConsumer 类提供了 partition(TopicPartition) 和 committed(TopicPartition) 两个方法来分别获取上面所说的 postion 和 committed offset 的值。这两个方法的定义以下所示:异步

  • public long position(TopicPartition partition)
  • public OffsetAndMetadata committed(TopicPartition partition)

可经过 TestOffsetAndPosition.java 来测试consumed offset、committed offset、position之间的关系。该 TestOffsetAndPosition.java 文件的地址为:函数

https://github.com/841809077/hdpproject/blob/master/src/main/java/com/hdp/project/kafka/consumer/TestOffsetAndPosition.javapost

2、offset 提交的两种方式

一、自动提交

在 Kafka 中默认的消费位移的提交方式为自动提交,这个由消费者客户端参数 enable.auto.commit 配置,默认值为 true 。这个默认的自动提交不是每消费一条消息就提交一次,而是按期提交,这个按期的周期时间由客户端 auto.commit.interval.ms 配置,默认值为 5 秒,此参数生效的前提是 enable.auto.commit 参数为 true 。性能

在默认的配置下,消费者每隔 5 秒会将拉取到的每一个分区中最大的消息位移进行提交。自动位移提交的动做是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求以前会检查是否能够进行位移提交,若是能够,那么就会提交上一次轮询的位移。测试

二、手动提交

Kafka 自动提交消费位移的方式很是简便,它免去了复杂的位移提交逻辑,但并无为开发者留有余地来处理重复消费消息丢失的问题。自动位移提交没法作到精确的位移管理,因此Kafka还提供了手动位移提交的方式,这样就可使得开发人员对消费位移的管理控制更加灵活。开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 false 。示例以下:spa

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

手动提交又分为同步提交异步提交,对应于 KafkaConsumer 中的 commitSync() 和 commitAsync() 两种类型的方法。

2.一、同步提交

消费者能够调用 commitSync() 方法,来实现位移的同步提交。

commitSync() 方法会根据 poll() 方法拉取的最新位移来进行提交,只要没有发生不可回复的错误,它就会阻塞消费者线程直至位移提交完成。

对于采用 commitSync() 的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是同样的。若是想寻求更细粒度的、更精准的提交,那么就须要使用 commitSync() 的另外一个含参方法,具体定义以下:

public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)

该方法提供了一个 offsets 参数,用来提交指定分区的位移。

2.二、异步提交

与 commitSync() 方法相反,异步提交的方式在执行的时候消费者线程不会被阻塞,能够在提交消费位移的结果还未返回以前就开始新一次的拉取操做。异步提交可使消费者的性能获得必定的加强。commitAsync() 方法有三个不一样的重载方法:

public void commitAsync() public void commitAsync(OffsetCommitCallback callback) public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) 

第一个无参方法和第三个方法中的 offsets 都很好理解,对照 commitSync() 方法便可。关键是第二个方法与第三个方法的 callback 参数,它提供了一个异步提交的回调方法,当位移提交完成后会回调 OffsetCommitCallback 中的 onComplete() 方法。以下图所示:

发送提交请求后能够继续作其它事情。若是提交失败,错误信息和偏移量会被记录下来。

3、同步和异步组合提交

通常状况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,由于若是提交失败是由于临时问题致使的,那么后续的提交总会有成功的。但若是这是发生在 关闭消费者 或 再均衡(分区的所属权从一个消费者转移到另外一个消费者的行为) 前的最后一次提交,就要确保可以提交成功。

所以,在消费者关闭前通常会组合使用 commitAsync() 和 commitSync() 。使用 commitAsync() 方式来作每条消费信息的提交(由于该种方式速度更快),最后再使用 commitSync() 方式来作位移提交最后的保证。

try { while (true) { // 消费者poll而且执行一些操做 // ... // 异步提交,也可以使用有回调函数的异步提交。较同步提交速度更快。 consumer.commitAsync(); } } catch (Exception e) { logger.error("Unexpected error" , e); } finally { try { // 同步提交,来作位移提交最后的保证。 consumer.commitSync(); } finally { consumer.close(); } }

4、总结

本文主要讲解了消费者提交消息位移的两种方式,分为:

  • 自动提交
  • 手动提交

而 手动提交 又分为:

  • 同步提交
  • 异步提交

而在通常状况下,建议使用手动的方式:异步和同步组合提交消息位移。由于异步提交不须要等待提交的反馈结果,便可进行新一次的拉取消息操做,速度较同步提交更快。但在最后一次提交消息位移以前,为了保证位移提交成功,仍是须要再作一次同步提交操做。

本文参考《Kafka权威指南》与《深刻理解Kafka:核心设计与实践原理》,也推荐你们阅读这两本书。

相关文章
相关标签/搜索