Kafka中位移提交那些事儿

Kafka中位移提交那些事儿.png

本文已收录GitHub,更有互联网大厂面试真题,面试攻略,高效学习资料等git


以前咱们说过,Consumer 端有个位移的概念,它和消息在分区中的位移不是一回事儿,虽然它们的英文都是 Offset。今天咱们要聊的位移是 Consumer 的消费位移,它记录了Consumer 要消费的下一条消息的位移。这可能和你之前了解的有些出入,不过切记是下一条消息的位移,而不是目前最新消费消息的位移。github

我来举个例子说明一下。假设一个分区中有 10 条消息,位移分别是 0 到 9。某个Consumer 应用已消费了 5 条消息,这就说明该 Consumer 消费了位移为 0 到 4 的 5 条消息,此时 Consumer 的位移是 5,指向了下一条消息的位移。面试

Consumer 须要向 Kafka 汇报本身的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。由于 Consumer 可以同时消费多个分区的数据,因此位移的提交其实是在分区粒度上进行的,即Consumer 须要为分配给它的每一个分区提交各自的位移数据。数据库

提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启以后,就可以从 Kafka 中读取以前提交的位移值,而后从相应的位移处继续消费,从而避免整个消费过程重来一遍。换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即若是你提交了位移 X,那么 Kafka 会认为全部位移值小于 X 的消息你都已经成功消费了。apache

这一点特别关键。由于位移提交很是灵活,你彻底能够提交任何位移值,但由此产生的后果你也要一并承担。假设你的 Consumer 消费了 10 条消息,你提交的位移值倒是 20,那么从理论上讲,位移介于 11~19 之间的消息是有可能丢失的;相反地,若是你提交的位移值是 5,那么位移介于 5~9 之间的消息就有可能被重复消费。因此,我想再强调一下,位移提交的语义保障是由你来负责的,Kafka 只会“无脑”地接受你提交的位移。你对位移提交的管理直接影响了你的 Consumer 所能提供的消息语义保障。bootstrap

鉴于位移提交甚至是位移管理对 Consumer 端的巨大影响,Kafka,特别是KafkaConsumer API,提供了多种提交位移的方法。从用户的角度来讲,位移提交分为自动提交和手动提交;从 Consumer 端的角度来讲,位移提交分为同步提交和异步提交网络

咱们先来讲说自动提交和手动提交。所谓自动提交,就是指 Kafka Consumer 在后台默默地为你提交位移,做为用户的你彻底没必要操心这些事;而手动提交,则是指你要本身提交位移,Kafka Consumer 压根无论。异步

开启自动提交位移的方法很简单。Consumer 端有个参数 enable.auto.commit,把它设置为 true 或者压根不设置它就能够了。由于它的默认值就是 true,即 Java Consumer 默认就是自动提交位移的。若是启用了自动提交,Consumer 端还有个参数就派上用场了:auto.commit.interval.ms。它的默认值是 5 秒,代表 Kafka 每 5 秒会为你自动提交一次位移。ide

为了把这个问题说清楚,我给出了完整的 Java 代码。这段代码展现了设置自动提交位移的方法。有了这段代码作基础,今天后面的讲解我就再也不展现完整的代码了。函数

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserialprops.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeseriKafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),
}

上面的橙色粗体部分,就是开启自动提交位移的方法。整体来讲,仍是很简单的吧。

和自动提交相反的,就是手动提交了。开启手动提交位移的方法就是设置enable.auto.commit 为 false。可是,仅仅设置它为 false 还不够,由于你只是告诉Kafka Consumer 不要自动提交位移而已,你还须要调用相应的 API 手动提交位移。

最简单的 API 就是KafkaConsumer#commitSync()。该方法会提交KafkaConsumer#poll() 返回的最新位移。从名字上来看,它是一个同步操做,即该方法会一直等待,直到位移被成功提交才会返回。若是提交过程当中出现异常,该方法会将异常信息抛出。下面这段代码展现了 commitSync() 的使用方法:

while(true) {
ConsumerRecords<String, String>records=
consumer.poll(Duration.ofSeconds(1));
process(records);
//处理消息
try {
 consumer.commitSync();
}
catch (CommitFailedException e) {
 handle(e);
 //处理提交失败异常
}
}

可见,调用 consumer.commitSync() 方法的时机,是在你处理完了 poll() 方法返回的全部消息以后。若是你莽撞地过早提交了位移,就可能会出现消费数据丢失的状况。那么你可能会问,自动提交位移就不会出现消费数据丢失的状况了吗?它能恰到好处地把握时机进行位移提交吗?为了搞清楚这个问题,咱们必需要深刻地了解一下自动提交位移的顺序。

一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的全部消息。从顺序上来讲,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,所以它能保证不出现消费丢失的状况。但自动提交位移的一个问题在于,它可能会出现重复消费

在默认状况下,Consumer 每 5 秒自动提交一次位移。如今,咱们假设提交位移以后的 3秒发生了 Rebalance 操做。在 Rebalance 以后,全部 Consumer 从上一次提交的位移处继续消费,但该位移已是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的全部数据都要从新再消费一次。虽然你可以经过减小 auto.commit.interval.ms 的值来提升提交频率,但这么作只能缩小重复消费的时间窗口,不可能彻底消除它。这是自动提交机制的一个缺陷。

反观手动提交位移,它的好处就在于更加灵活,你彻底可以把控位移提交的时机和频率。可是,它也有一个缺陷,就是在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。在任何系统中,由于程序而非资源限制而致使的阻塞均可能是系统的瓶颈,会影响整个应用程序的 TPS。固然,你能够选择拉长提交间隔,但这样作的后果是 Consumer 的提交频率降低,在下次 Consumer 重启回来后,会有更多的消息被从新消费。

鉴于这个问题,Kafka 社区为手动提交位移提供了另外一个 API 方法:

KafkaConsumer#commitAsync()。从名字上来看它就不是同步的,而是一个异步操做。调用 commitAsync() 以后,它会当即返回,不会阻塞,所以不会影响 Consumer 应用的 TPS。因为它是异步的,Kafka 提供了回调函数(callback),供你实现提交以后的逻辑,好比记录日志或处理异常等。下面这段代码展现了调用 commitAsync() 的方法:

while(true) {
ConsumerRecords<String, String>records=
consumer.poll(Duration.ofSeconds(1));
process(records);
//处理消息
consumer.commitAsync((offsets, exception) -> {
 if(exception != null)
   handle(exception);
}
);

commitAsync 是否可以替代 commitSync 呢?答案是不能。commitAsync 的问题在于,出现问题时它不会自动重试。由于它是异步操做,假若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过时”或不是最新值了。所以,异步提交的重试其实没有意义,因此 commitAsync 是不会重试的。

显然,若是是手动提交,咱们须要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果,缘由有两个:

  1. 咱们能够利用 commitSync 的自动重试来规避那些瞬时错误,好比网络的瞬时抖动,Broker 端 GC 等。由于这些问题都是短暂的,自动重试一般都会成功,所以,咱们不想本身重试,而是但愿 Kafka Consumer 帮咱们作这件事。

  2. 咱们不但愿程序总处于阻塞状态,影响 TPS。

咱们来看一下下面这段代码,它展现的是如何将两个 API 方法结合使用进行手动提交。

try{
while(true) {
 ConsumerRecords<String, String> records =
 consumer.poll(Duration.ofSeconds(1));
 process(records);
 //处理消息
 commitAysnc();
 //使用异步提交规避阻塞
}
}
catch(Exception e) {
handle(e);
//处理异常
}
finally{
try{
 consumer.commitSync();
 //最后一次提交使用同步阻塞式提交
}
finally{
 consumer.close();

这段代码同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,咱们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,咱们调用commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前可以保存正确的位移数据。将二者结合后,咱们既实现了异步无阻塞式的位移管理,也确保了Consumer 位移的正确性,因此,若是你须要自行编写代码开发一套 Kafka Consumer 应用,那么我推荐你使用上面的代码范例来实现手动的位移提交。

咱们说了自动提交和手动提交,也说了同步提交和异步提交,这些就是 Kafka 位移提交的所有了吗?其实,咱们还差一部分。

实际上,Kafka Consumer API 还提供了一组更为方便的方法,能够帮助你实现更精细化的位移管理功能。刚刚咱们聊到的全部位移提交,都是提交 poll 方法返回的全部消息的位移,好比 poll 方法一次返回了 500 条消息,当你处理完这 500 条消息以后,前面咱们提到的各类方法会一次性地将这 500 条消息的位移一并处理。简单来讲,就是直接提交最新一条消息的位移。但若是我想更加细粒度化地提交位移,该怎么办呢?

设想这样一个场景:你的 poll 方法返回的不是 500 条消息,而是 5000 条。那么,你确定不想把这 5000 条消息都处理完以后再提交位移,由于一旦中间出现差错,以前处理的所有都要重来一遍。这相似于咱们数据库中的事务处理。不少时候,咱们但愿将一个大事务分割成若干个小事务分别提交,这可以有效减小错误恢复的时间。

在 Kafka 中也是相同的道理。对于一次要处理不少消息的 Consumer 而言,它会关心社区有没有方法容许它在消费的中间进行位移提交。好比前面这个 5000 条消息的例子,你可能但愿每处理完 100 条消息就提交一次位移,这样可以避免大批量的消息从新消费。

庆幸的是,Kafka Consumer API 为手动提交提供了这样的方法:

commitSync(Map<TopicPartition, OffsetAndMetadata>) 和commitAsync(Map<TopicPartition, OffsetAndMetadata>)。它们的参数是一个 Map对象,键就是 TopicPartition,即消费的分区,而值是一个 OffsetAndMetadata 对象,保存的主要是位移数据。

就拿刚刚提过的那个例子来讲,如何每处理 100 条消息就提交一次位移呢?在这里,我以commitAsync 为例,展现一段代码,实际上,commitSync 的调用方法和它是如出一辙的。

privateMap<TopicPartition, OffsetAndMetadata> offsets =newHashMap<>();
int count = 0;
……
while(true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
 process(record);
 //处理消息
 offsets.put(newTopicPartition(record.topic(), record.partition
 newOffsetAndMetadata(record.offset() + 1);
 if(count % 100 == 0)
 consumer.commitAsync(offsets, null);
 //回调处理逻辑是
 count++;
}
}

简单解释一下这段代码。程序先是建立了一个 Map 对象,用于保存 Consumer 消费处理过程当中要提交的分区位移,以后开始逐条处理消息,并构造要提交的位移值。还记得以前我说过要提交下一条消息的位移吗?这就是这里构造 OffsetAndMetadata 对象时,使用当前消息位移加 1 的缘由。代码的最后部分是作位移的提交。我在这里设置了一个计数器,每累计 100 条消息就统一提交一次位移。与调用无参的 commitAsync 不一样,这里调用了带 Map 对象参数的 commitAsync 进行细粒度的位移提交。这样,这段代码就可以实现每处理 100 条消息就提交一次位移,不用再受 poll 方法返回的消息总数的限制了。

总结

Kafka Consumer 的位移提交,是实现 Consumer 端语义保障的重要手段。位移提交分为自动提交和手动提交,而手动提交又分为同步提交和异步提交。在实际使用过程当中,推荐你使用手动提交机制,由于它更加可控,也更加灵活。另外,建议你同时采用同步提交和异步提交两种方式,这样既不影响 TPS,又支持自动重试,改善 Consumer 应用的高可用性。总之,Kafka Consumer API 提供了多种灵活的提交方法,方便你根据本身的业务场景定制你的提交策略。

相关文章
相关标签/搜索