为何须要consumer group?算法
consumer group是用于实现高伸缩性、高容错性的consumer机制。组内的多个实例能够同时读取消息(不一样的消息),而一旦某个consumer挂了,group会把这个实例的任务马上交给其余的consumer负责,不会丢失数据。这个过程叫作重平衡。数据库
kafka实际上同时支持两种消息引擎,基于队列和基于发布/订阅。缓存
- 全部的consumer实例都同属于一个group——实现了队列模型,每一个消息只被一个consumer处理。
- consumer属于不一样的group——实现了基于发布/订阅模型。极端状况下,每一个group只有一个consumer,那么就至关于kafka把消息广播到全部的consumer。
这里的位移指的是consumer端的offset,不是parititon那个。每一个consumer都会为它消费的分区维护属于本身的位置信息,记录当前消费到该patition的哪一个位置。安全
kafka中,采用consumer group保存消费者端的offset,同时还引入了checkpoint机制按期对offset进行持久化。多线程
下图展现了consumer端的offset保存方式,kafka consumer内部是使用一个map来保存其订阅topic所属分区的位移。app
consumer客户端须要按期向kafka集群汇报本身消费数据的进度,这个过程称为位移提交。旧版本(0.9.0.0以前)的kafka consumer把位移提交到zookeeper。而以后的版本把位移提交到kafka的一个内部topic(__consumer_offsets)上,不依赖zk保存位移信息,因此在开发新版本的consumer时也不须要链接到zk。__consumer_offset 主题是kafka自行建立的,用户不要擅自删除。它保存的是consumer的位移信息,每条消息格式大体以下:socket
它本质上是一种协议,规定一个consumer group下全部consumer如何达成一致来分配订阅topic的分区。好比一个topic有100个partition,有一个group订阅该topic,其中有50个consumer,那么consumer group会为每一个consumer平均分配两个分区,即每一个consumer负责两个分区的数据读取。这个过程就成为rebalance。ide
在Kafka1.0.0版本中,Java Consumer是一个多线程或者说是一个双线程的Java进程——建立KafkaConsumer的为用户主线程,同时consumer后台还有一个心跳线程。KafkaConsumer的poll()方法运行在主线程。这代表:消费者组执行rebalance、消息获取、coordinator管理等操做都运行在主线程。spa
每次poll方法返回的都是订阅分区上的一组消息,若是某些分区没有准备好,可能会返回空。线程
try { while (true) { // 一次poll()能够拿到不少数据,不足1s时会阻塞,1000ms是最大阻塞时间 ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> r : records) { System.out.printf("offset=%d, key=%s, value=%s, partition=%d\n", r.offset(), r.key(), r.value(), r.partition()); } } } finally { consumer.close(); }
poll()方法根据当前consumer消费位移返回消息集合。
若是poll方法没有给定参数,那么consumer端会阻塞以等待数据不断积累并最终知足consumer的需求(好比要一次至少获取1m的数据);
若是给定了参数,那么等待时间超过了指定超时时间就返回。
Java Consumer是非线程安全的,若是把它用到多线程中,会抛出KafkaConsumer is not safe for multi-threaded access异常。
超时参数的用处:
假设用户除了获取数据之外还须要按期执行其余的常规任务(每隔10s须要把消费状况记录到日志中),用户就能够设置consumer.poll(10000),让consumer在等待kafka消息的同时还能够按期执行其余任务。
若是程序惟一的任务是从kafka获取消息而后处理,那么能够采用如下方法
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (ConsumerRecord<String, String> r : records) { System.out.printf("offset=%d, key=%s, value=%s, partition=%d\n", r.offset(), r.key(), r.value(), r.partition()); } } } catch (WakeupException e) { // 忽略异常处理 }finally { consumer.close(); }
这段代码可让consumer无限等待,而后在另一个线程中调用consumer.wakeup()来触发异常,注意,用户能够安全地在另外一个线程中调用consumer.wakeup(),这时特例,其余方法都是不安全的。
总结以上,poll()的使用方法:
consumer须要为它要读取的分区保存消费进度,即分区中当前最新消费消息的位置,这个位置就成为offset。consuemr须要按期向kafka集群提交本身的位置信息,这里的位移值一般指下一条待消费的消息位置。
offset是实现消息交付语义的保证,以下:
若是consumer在消费前就提交了位移,那么能够实现at most once语义;若是在消费以后提交了位移,可实现at least once语义。
consumer中的位置信息不少,下面要给出区别:
consumer会在kafka集群的全部broker中选择一个broker做为consumer group的coordinator,用于实现组成员管理、消费分配方案指定以及位移提交等。
consumer group首次启动时,因为没有初始的位移信息,coordinator必须为其肯定初始值,这就是consumer参数 auto.offset.reset 的做用,一般,要么从最先(earliest)(从头开始消费),要么从最新(latest)(从最新append到partition日志的位置开始消费)开始。
当consumer运行了一段时间以后,它必须提交本身的位移。若是consumer崩溃或被关闭,它负责的分区就会分配给其余的consumer。
consumer提交位移的主要机制是经过向所属的coordinator发送位移提交请求来实现,每一个位移提交请求都会往__consumer_offsets对应的分区追加一条消息。
默认状况下,consumer是自动提交位移的,间隔是5秒(CDH5.14 1.0.1+kafka-3.1.0 是60秒)。能够经过 auto.commit.interval.ms 设置。
手动提交:由用户自行确当消息什么时候被真正处理完毕并提交位移。以下面的例子:
final int minBatchSize = 500; // 缓存 List<ConsumerRecord<String,String>> buffer = new ArrayList<>(minBatchSize); try { while (true) { ConsumerRecords<String,String> records = consumer.poll(1000); records.forEach(buffer::add); if (buffer.size() >= minBatchSize){ // 插入数据库 insertIntoDb(buffer); // 等数据插入数据库以后,再同步提交位移 consumer.commitSync(); // 若是提交位移失败了,那么重启consumer后会重复消费以前的数据,再次插入到数据库中 // 清空缓冲区 buffer.clear(); } } } finally { consumer.close(); }
若是要进行更加细粒度的控制,能够进行分区层的手动提交位移:
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); // 处理每一个分区的记录 records.partitions().forEach(p -> { List<ConsumerRecord<String, String>> partitionRecords = records.records(p); partitionRecords.forEach(pr -> { System.out.println(pr.offset() + ": " + pr.value()); }); // 获取该partition最后一个消息的位移 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); // 提交的offset应该是下一次插入消息的位置 consumer.commitSync(Collections.singletonMap(p,new OffsetAndMetadata(lastOffset+1))); }); } } finally { consumer.close(); }
总结如下,自动提交和手动提交的区别:
使用方法 | 优点 | 劣势 | 交付语义保证 | 使用场景 | |
自动提交 | 默认或显式配置enable.auto.commit=true | 简单 | 没法精确控制,提交失败后不易处理 | 最少一次 | 对消息交付语义无需求,可容忍必定的消息丢失 |
手动提交 | 设置enable.auto.commit=false,并调用consumer.Sync()或consumer.Async()提交 | 可精确控制位移提交 | 额外开发成本,须自行提交 | 易实现最少一次,依赖外部状态能够实现精确一次 | 消息处理逻辑重,不容许消息丢失 |
consumer group的rebalance本质上是一组协议,规定了consumer group是如何达成一致来分配订阅topic的全部分区的。
对于每一个组,kafka的某个broker会被选举为组协调者(group coordinatior)。coordinatior负责对组的状态进行管理,它的主要职责就是当新成员到达时,促成组内的全部成员达成新的分区分配方案,即coordinator负责对组执行rebalance。
有三个:
consumer默认提供了3种分配策略:
下面给一个简单的例子,假设目前某个consumer group有2个consumer A和B,当C加入时,触发了rebalance条件,coordinator会进行rebalance,根据range策略从新分配了partition。
rebalance generation用于标识某次rebalance。它是一个整数,从0开始。它主要是为了保护consumer group的,好比上一届的consumer因为某些缘由延迟提交了offset,但rebalance以后该group产生了新的一届成员,而此次延迟的offset提交的是旧的generation信息,所以会被consumer group拒绝。
group与coordinator共同使用rebalance协议来完成rebance操做,kafka提供了下面5个协议:
consumer group在rebalance以前必须肯定coordinator所在的broker,并建立与之通讯的socket。
肯定coordinator位置的算法以下:
创建socket以后,开始进行rebalance。主要有2步:
这个监听器的主要做用是在coordinator开启一轮rebalance的先后进行一些操做,好比,要在rebalance前手动提交位移到第三方存储。
要使用监听器,要在consumer.subscribe()方法的第二个参数新建一个回调接口ConsumerRebalanceListener,里面封装了相关的逻辑,咱们须要实现onPartitionsRevoked(rebalace前调用)和onPartitionAssigned(rebalance后调用)方法。
consumer.subscribe(Arrays.asList("test1"), new ConsumerRebalanceListener() { //rebalance监听器 //在coordinator开启新一轮rebalance前调用 @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { //这里能够进行一些操做,好比把手动提交的位移存储到第三方 partitions.forEach(tp -> saveOffsetInExternalStore(consumer.position(tp))); joinStart.set(System.currentTimeMillis()); } //在rebalance完成后调用 @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { totalRebalanceTimeMs.addAndGet(System.currentTimeMillis() - joinStart.get()); //更新总的rebalance时长 // 从外部存储读取每一个topicPartition的位移,而后移动当前consumer的位移到该位置 partitions.forEach(tp -> consumer.seek(tp,readOffsetFromExternalStore(tp))); } private long readOffsetFromExternalStore(TopicPartition tp) { } // 保存到数据库 private void saveOffsetInExternalStore(long position) { } }); // 位移处理后就能够从上面移动到的位置开始读取了 try { while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); consumerRecords.forEach(r -> System.out.printf("offset=%d, key=%s, value=%s, partition=%d\n", r.offset(), r.key(), r.value(), r.partition())); } } finally { System.out.println("totoalRebalanceTimeMs: " + totalRebalanceTimeMs); consumer.close(); }
KafkaConsumer是非线程安全的,多个线程中要避免共用一个KafkaConsumer。
那么如何实现多线程的consumer消费呢?有两种方法
1.每一个线程维护一个KafkaConsumer,每一个consumer消费固定数目的分区。
2.单个KafkaConsumer实例+多worker线程。仅由一个consumer实例接收消息,而后马上交给其余的工做线程进行消息的处理。
前面讨论的consumer都是以consumer group的形式存在的,group自动帮用户执行分区分配和rebalance。
standalone consumer间彼此独立工做互不干扰,任何一个consumer崩溃都不会影响其余standalone consumer的工做。
使用standalone consumer的方法就是调用KafkaConsumer的assign方法。这个方法接收一个分区列表,直接赋予该consumer访问这些分区的权力。
List<TopicPartition> partitions = new ArrayList<>(10); consumer.partitionsFor("test1").forEach(partitionInfo -> partitions.add(new TopicPartition(partitionInfo.topic(), 0))); //只订阅分区0的消息 //赋予consumer访问分区的能力 consumer.assign(partitions);
注意:assign和subscribe不能混用。