前言
与生产者对应的是消费者,应用程序能够经过KafkaConsumer来订阅主题,并从订阅的主题中拉取消息。不过在使用KafkaConsumer消费消息以前须要先了解消费者和消费组的概念,不然没法理解如何使用KafkaConsumer。apache
<!--more-->json
Consumer
- 消费者(Consumer)负责订阅Kafka中的主题(Topic),而且从订阅的主题上拉取消息。与其余一些消息中间件不一样的是:在Kafka的消费理念中还有一层消费组(Consumer Group)的概念,每一个消费者都有一个对应的消费组。
当消息发布到主题后,只会被投递给订阅它的每一个消费组中的一个消费者。如图所示,某个主题中共有4个分区(Partition):P0、P一、P二、P3。有两个消费组A和B都订阅了这个主题,消费组A中有4个消费者(C0、C一、C2和C3),消费组B中有2个消费者(C4和C5)。按照Kafka默认的规则,最后的分配结果是消费组A中的每个消费者分配到1个分区,消费组B中的每个消费者分配到2个分区,两个消费组之间互不影响。每一个消费者只能消费所分配到的分区中的消息。换言之,每个分区只能被一个消费组中的一个消费者所消费。bootstrap
分区分配的演变(Rebalance)
咱们再来看一下消费组内的消费者个数变化时所对应的分区分配的演变。假设目前某消费组内只有一个消费者C0,订阅了一个主题,这个主题包含 7 个分区:P0、P一、P二、P三、P四、P五、P6。也就是说,这个消费者C0订阅了7个分区,具体分配情形如图。服务器
消费者与消费组此时消费组内又加入了一个新的消费者C1,按照既定的逻辑,须要将原来消费者C0的部分分区分配给消费者C1消费,以下图所示。消费者C0和C1各自负责消费所分配到的分区,彼此之间并没有逻辑上的干扰。ide
紧接着消费组内又加入了一个新的消费者C2,消费者C0、C1和C2按照下图方式各自负责消费所分配到的分区。.net
消费者与消费组这种模型可让总体的消费能力具有横向伸缩性,咱们能够增长(或减小)消费者的个数来提升(或下降)总体的消费能力。对于分区数固定的状况,一味地增长消费者并不会让消费能力一直获得提高,若是消费者过多,出现了消费者的个数大于分区个数的状况,就会有消费者分配不到任何分区。参考图以下,一共有8个消费者,7个分区,那么最后的消费者C7因为分配不到任何分区而没法消费任何消息。线程
投递模式
以上分配逻辑都是基于默认的分区分配策略进行分析的,能够经过消费者客户端参数partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略,有关分区分配的更多细节能够再接下来的系列继续聊。翻译
对于消息中间件而言,通常有两种消息投递模式:debug
点对点(P2P,Point-to-Point)模式: 点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。code
发布/订阅(Pub/Sub)模式: 发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic),主题能够认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题中订阅消息。主题使得消息的订阅者和发布者互相保持独立,不须要进行接触便可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。
Kafka 同时支持两种消息投递模式,而这正是得益于消费者与消费组模型的契合:
-
若是全部的消费者都隶属于同一个消费组,那么全部的消息都会被均衡地投递给每个消费者,即每条消息只会被一个消费者处理,这就至关于点对点模式的应用。
-
若是全部的消费者都隶属于不一样的消费组,那么全部的消息都会被广播给全部的消费者,即每条消息会被全部的消费者处理,这就至关于发布/订阅模式的应用
消费组是一个逻辑上的概念,它将旗下的消费者归为一类,每个消费者只隶属于一个消费组。每个消费组都会有一个固定的名称,消费者在进行消费前须要指定其所属消费组的名称,这个能够经过消费者客户端参数group.id来配置,默认值为空字符串。消费者并不是逻辑上的概念,它是实际的应用实例,它能够是一个线程,也能够是一个进程。同一个消费组内的消费者既能够部署在同一台机器上,也能够部署在不一样的机器上。
建立一个Kafka消费者
- 如下代码段显示了如何建立KafkaConsumer:
Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
- 订阅主题
consumer.subscribe(Collections.singletonList("customerCountries"));
- 要订阅全部test主题,咱们能够:
consumer.subscribe(Pattern.compile("test.*"));
- 轮询循环
消费者API的核心是一个简单的循环,用于轮询服务器以获取更多数据。 一旦用户订阅了主题,轮询循环便会处理协调,分区从新平衡,心跳和数据获取的全部详细信息,从而为开发人员提供了一个干净的API,该API仅从分配的分区中返回可用数据。 消费者的主体以下所示
try { while (true) { 1 ConsumerRecords<String, String> records = consumer.poll(100); 2 for (ConsumerRecord<String, String> record : records) 3 { log.debug("topic = %s, partition = %d, offset = %d," customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); int updatedCount = 1; if (custCountryMap.countainsKey(record.value())) { updatedCount = custCountryMap.get(record.value()) + 1; } custCountryMap.put(record.value(), updatedCount) JSONObject json = new JSONObject(custCountryMap); System.out.println(json.toString(4)) 4 } } } finally { consumer.close(); 5 }
- 反序列化
public class StringDeserializer implements Deserializer<String> { private String encoding = "UTF8"; @Override public void configure(Map<String, ?> configs, boolean isKey) { String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding"; Object encodingValue = configs.get(propertyName); if (encodingValue == null) encodingValue = configs.get("deserializer.encoding"); if (encodingValue instanceof String) encoding = (String) encodingValue; } @Override public String deserialize(String topic, byte[] data) { try { if (data == null) return null; else return new String(data, encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); } } }
- 消息消费
Kafka中的消费是基于拉模式的。消息的消费通常有两种模式:推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。从轮询循环代码清单中能够看出,Kafka中的消息消费是一个不断轮询的过程,消费者所要作的就是重复地调用poll()方法,而poll()方法返回的是所订阅的主题(分区)上的一组消息。对于poll()方法而言,若是某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空;若是订阅的全部分区中都没有可供消费的消息,那么poll()方法返回为空的消息集合。
poll(long)方法中timeout的时间单位固定为毫秒,而poll(Duration)方法能够根据Duration中的ofMillis()、ofSeconds()、ofMinutes()、ofHours()等多种不一样的方法指定不一样的时间单位,灵活性更强。而且 poll(long)方法也已经被标注为@Deprecated,虽然目前还可使用,若是条件容许的话,仍是推荐使用poll(Duration)的方式。
咱们在消费消息的时候能够直接对 ConsumerRecord 中感兴趣的字段进行具体的业务逻辑处理。
poll()方法的返回值类型是 ConsumerRecords,它用来表示一次拉取操做所得到的消息集,内部包含了若干ConsumerRecord,它提供了一个iterator()方法来循环遍历消息集内部的消息,iterator()方法的定义以下:
@Override public Iterator<ConsumerRecord<K, V>> iterator() { return new ConcatenatedIterable<>(records.values()).iterator(); }
在 ConsumerRecords 类中还提供了几个方法来方便开发人员对消息集进行处理:count()方法用来计算出消息集中的消息个数,返回类型是int;isEmpty()方法用来判断消息集是否为空,返回类型是boolean;empty()方法用来获取一个空的消息集,返回类型是ConsumerRecord<K,V>。
到目前为止,能够简单地认为poll()方法只是拉取一下消息而已,但就其内部逻辑而言并不简单,它涉及消费位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容
- 位移提交
对于Kafka中的分区而言,它的每条消息都有惟一的offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个offset的概念,消费者使用offset来表示消费到分区中某个消息所在的位置。单词“offset”能够翻译为“偏移量”,也能够翻译为“位移”,不少同窗可能并无过多地在乎这一点:在不少中文资料中都会交叉使用“偏移量”和“位移”这两个词,并无很严谨地进行区分。
我对offset作了一些区分:对于消息在分区中的位置,咱们将offset称为“偏移量”;对于消费者消费到的位置,将 offset 称为“位移”,有时候也会更明确地称之为“消费位移”。作这一区分的目的是让读者在遇到 offset 的时候能够很容易甄别出是在讲分区存储层面的内容,仍是在讲消费层面的内容
在每次调用poll()方法时,它返回的是尚未被消费过的消息集(固然这个前提是消息已经存储在Kafka 中了,而且暂不考虑异常状况的发生),在旧消费者客户端中,消费位移是存储在ZooKeeper中的。而在新消费者客户端中,消费位移存储在Kafka内部的主题__consumer_offsets中。这里把将消费位移存储起来(持久化)的动做称为“提交”,消费者在消费完消息以后须要执行消费位移的提交。
- 指定位移消费
正是有了消费位移的持久化,才使消费者在关闭、崩溃或者在遇到再均衡的时候,可让接替的消费者可以根据存储的消费位移继续进行消费 ,但是有一个问题则是 _consumer_offsets 位移信息过时而被删除后,它也没有能够查找的消费位移 ,这个时候就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费
除了查找不到消费位移,位移越界也会触发 auto.offset.reset 参数的执行 ,然而有些时候,咱们须要一种更细粒度的掌控,可让咱们从特定的位移处开始拉取消息,哎 !这个时候 KafkaConsumer 中的 seek()方法正好提供了这个功能,让咱们得以追前消费或回溯消费。seek()方法的具体定义以下:
public void seek(TopicPartition partition, long offset) {}
seek()方法为咱们提供了从特定位置读取消息的能力,咱们能够经过这个方法来向前跳过若干消息,也能够经过这个方法来向后回溯若干消息,这样为消息的消费提供了很大的灵活性
原创不易,若是以为有点用的话,请绝不留情点个赞,转发一下,这将是我持续输出优质文章的最强动力。