关于Kafka 的 consumer 消费者手动提交详解

前言

在上一篇 Kafka使用Java实现数据的生产和消费demo 中介绍如何简单的使用kafka进行数据传输。本篇则重点介绍kafka中的 consumer 消费者的讲解。html

应用场景

在上一篇kafka的consumer消费者,咱们使用的是自动提交offset下标。
可是offset下标自动提交其实在不少场景都不适用,由于自动提交是在kafka拉取到数据以后就直接提交,这样很容易丢失数据,尤为是在须要事物控制的时候。
不少状况下咱们须要从kafka成功拉取数据以后,对数据进行相应的处理以后再进行提交。如拉取数据以后进行写入mysql这种 , 因此这时咱们就须要进行手动提交kafka的offset下标。mysql

这里顺便说下offset具体是什么。
offset:指的是kafka的topic中的每一个消费组消费的下标。
简单的来讲就是一条消息对应一个offset下标,每次消费数据的时候若是提交offset,那么下次消费就会从提交的offset加一那里开始消费。
好比一个topic中有100条数据,我消费了50条而且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。linux

测试

说了这么,那么咱们开始进行手动提交测试。
首先,使用kafka 的producer 程序往kafka集群发送了100条测试数据。git

这里写图片描述
程序打印中已经成功发送了,这里咱们在kafka服务器使用命令中来查看是否成功发送.
命令以下:github

kafka-console-consumer.sh  --zookeeper master:2181  --topic KAFKA_TEST2 --from-beginning

这里写图片描述

注:
1.master 是我在linux中作了IP映射的关系,实际能够换成IP。
2.由于kafka是集群,因此也能够在集群的其余机器进行消费。sql

能够看到已经成功发送了100条。数据库

成功发送消息以后,咱们再使用kafka的consumer 进行数据消费。服务器

由于是用来测试手动提交
因此 将 enable.auto.commit 改为 false 进行手动提交
而且设置每次拉取最大10条测试

props.put("enable.auto.commit", "false");
props.put("max.poll.records", 10);

将提交方式改为false以后
须要手动提交只需加上这段代码spa

consumer.commitSync();

那么首先尝试消费不提交,测试能不能重复消费。
右键运行main方法进行消费,不提交offset下标。
这里写图片描述

成功消费以后,结束程序,再次运行main方法进行消费,也不提交offset下标。
这里写图片描述

并未手动进行提交,并且并未更改消费组名,可是能够看到已经重复消费了!

接下来,开始测试手动提交。

  1. 测试目的:
    1.测试手动提交以后的offset,能不能再次消费。
    2.测试未提交的offset,能不能再次进行消费。
  2. 测试方法: 当消费到50条的时候,进行手动提交,而后剩下的50条不进行提交。
  3. 但愿达成的目的: 手动提交的offset不能再次消费,未提交的能够再次进行消费。

为了达到上述目的,咱们测试只需添加以下代码便可:

if(list.size()==50){
    consumer.commitSync();
}

更改代码以后,开始运行程序
测试示例图以下:
这里写图片描述

简单的一看,和以前未提交的同样,貌似没有什么问题。
可是正常来讲,未提交的下标不该该重复进行消费,直到它提交为止吗?
由于要进行重复消费,可是messageNo 会一直累加,只会手动的提交前50条offset,
后面的50条offset会一直没法消费,因此打印的条数不该该是100,而是应该一直打印。

那么测试的结果和预想的为何不一致呢?
以前不是已经测试过能够重复消费未提交的offset吗?
其实这点能够根据两次启动方式的不一样而得出结论。
开始测试未提交重复消费的时候,实际我是启动-暂停-启动,那么本地的consumer实际是被初始化过两次。
而刚刚测试的实际consumer只有初始化一次。
至于为何初始化一次就不行呢?
由于kafka的offset下标的记录实际会有两份,服务端会本身记录一份,本地的消费者客户端也会记录一份,提交的offset会告诉服务端已经消费到这了,可是本地的并不会所以而改变offset进行再次消费。

简单的来讲假若有10条数据,在第5条的时候进行提交了offset下标,那么服务端就知道该组消费的下标到第5条了,若是同组其余的consumer进行消费的时候就会从第6条开始进行消费。可是本地的消费者客户端并不会所以而改变,它仍是会继续消费下去,并不会再次从第6条开始消费,因此会出现上图状况。

可是项目中运行以后,是不会所以而重启的,因此这时咱们能够换一种思路。
就是若是触发某个条件,因此致使offset未提交,咱们就能够关闭以前的consumer,而后新new一个consumer,这样就能够再次进行消费了! 固然配置要和以前的同样。

那么将以前的提交代码更改以下:

if(list.size()==50){
    consumer.commitSync();
}else if(list.size()>50){
    consumer.close();
    init();
    list.clear();
    list2.clear();
}

注:这里由于是测试,为了简单明了,因此条件我写的很简单。实际状况请根据我的的为准。

示例图以下:
这里写图片描述
说明:
1.由于每次是拉取10条,因此在60条的时候kafka的配置初始化了,而后又重新拉取了50-60条的数据,可是没有提交,因此并不会影响实际结果。
2.这里为了方便截图展现,因此打印条件改了,可是不影响程序!

从测试结果中,咱们达到了以前想要测试的目的,未提交的offset能够重复进行消费。
这种作法通常也能够知足大部分需求。
例如从kafka获取数据入库,若是一批数据入库成功,就提交offset,不然不提交,而后再次拉取。
可是这种作法并不能最大的保证数据的完整性。好比在运行的时候,程序挂了之类的。
因此还有一种方法是手动的指定offset下标进行获取数据,直到kafka的数据处理成功以后,将offset记录下来,好比写在数据库中。那么这种作法,等到下一篇再进行尝试吧!

该项目我放在github上了,有兴趣的能够看看!
地址:https://github.com/xuwujing/kafka

到此,本文结束,谢谢阅读!

相关文章
相关标签/搜索