应用程序使用KafkaConsul'le 「向Kafka 订阅主题,并从订阅的主题上接收消息。Kafka的消息读取不一样于从其余消息系统读取数据,它涉及了一些独特的概念和想法。java
单个的消费者就跟前面的消息系统的消费者同样,建立一个消费者对象,而后订阅一个主题并开始接受消息,而后作本身的业务逻辑,可是Kafka天生就是支持体量很大的数据消费,若是只是使用单个的消费者消费消息,当生产者写入消息的速度远远大于了消费者的速度,大量消息堆积在消费者上可能会致使性能反而下降或撑爆消费者,因此横向伸缩是颇有必要的,就想多个生产者能够向相同的主题写消息同样,咱们也可使用多个消费者从同一个主题读取消息,对消息进行分流,这多个消费者就从属于一个消费者群组。一个群组里的消费者订阅的是同一个主题,每一个消费者接收主题一部分分区的消息。node
假设主题T1有四个分区,咱们建立了消费者群组G1,建立了一个消费者C1从属于G1,它是G1里的惟一的消费者,此时订阅主题状况为,C1将会接收到主题中四个分区中的消息,如图:正则表达式
此时咱们在消费者群组中新增一个消费者C2,那么每一个消费者将分别从两个分区接受消息,如图:apache
若是咱们有四个消费者时,将会每一个消费者都分到一个分区。bootstrap
若是群组中的消费者超过了主题的分区数,那么有一部分消费者就会被闲置,不会接收任何消息。如图:缓存
往群组里增长消费者是横向伸缩消费能力的主要方式。服务器
对于多个群组来讲,每一个群组都会从Kafka中接收到全部的消息,而且各个群组之间是互不干扰的。因此横向伸缩Kafka消费者和消费者群组并不会对性能形成负面影响。简而言之就是,为每个须要获取一个或多个主题所有消息的应用程序建立一个消费者群组,而后往群组里添加消费者来伸缩读取能力和处理能力,群组里的每一个消费者只处理一部分消息。如图:网络
一个新的消费者加入群组时,它读取的是本来由其余消费者读取的消息。当一个消费者被关闭或发生奔溃时,它就离开群组,本来由它读取的分区将由群组里的其余消费者来读取。在主题发生变化时, 好比管理员添加了新的分区,会发生分区重分配。分区的全部权从一个消费者变成了里另外一个消费者,这样的行为被称为再均衡。再均衡很是重要, 它为消费者群组带来了高可用性和伸缩性(咱们能够放心地添加或移除消费者),不过在正常状况下,咱们并不但愿发生这样的行为。在再均衡期间,消费者没法读取消息,形成整个群组一小段时间的不可用。另外,当分区被从新分配给另外一个消费者时,消费者当前的读取状态会丢失,它有可能还须要去刷新缓存,在它从新恢复状态以前会拖慢应用程序。session
消费者经过向被指派为群组协调器的broker (不一样的群组能够有不一样的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的全部权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。若是消费者中止发送心跳的时间足够长,会话就会过时,群组协调器认为它已经死亡,就会触发一次再均衡。若是一个消费者发生崩溃,井中止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会当即触发一次再均衡,尽可能下降处理停顿。socket
在建立KafkaConsumer以前,须要将消费者想要的属性存放到Properties中,而后再将properties传给KafkaConsumer。
Consuer也有三个必须的属性。bootstrap.servers,这里跟Producer同样,另外两个key.deserializer和value.deserializer也与Producer相似,不过一个是序列化,一个是反序列化而已。
还有一个group.id不是必须的,可是咱们一般都会指定改消费者属于哪一个群组,因此也能够认为是必须的。
设置Properties的代码片断以下:
Properties kafkaPropertie = new Properties(); //配置broker地址,配置多个容错 kafkaPropertie.put("bootstrap.servers", "node2:9092,node1:9092,node1:9093"); //配置key-value容许使用参数化类型,反序列化 kafkaPropertie.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); kafkaPropertie.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); //指定消费者所属的群组 kafkaPropertie.put("group.id","one");
接下来建立消费者,将Properties对象传入到消费者,而后订阅主题,以下:
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaPropertie); /*订阅主题,这里使用的是最简单的订阅testTopic主题,这里也能够出入正则表达式,来区分想要订阅的多个指定的主题,如: *Pattern pattern = new Pattern.compile("testTopic"); * consumer.subscribe(pattern); */ consumer.subscribe(Collections.singletonList("testTopic"));
接下来轮询消息,以下:
//轮询消息 while (true) { //获取ConsumerRecords,一秒钟轮训一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); //消费消息,遍历records for (ConsumerRecord<String, String> r : records) { LOGGER.error("partition:", r.partition()); LOGGER.error("topic:", r.topic()); LOGGER.error("offset:", r.offset()); System.out.println(r.key() + ":" + r.value()); } Thread.sleep(1000); }
生产者发送消息,而后查看消费者打印状况:
KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world0 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world1 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world2 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world3 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world4 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world5 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world6 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world7 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world8 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world9
只存在一个组群和一个消费者时:
当咱们启动两个消费者,同一个组群,并在Topic上建立两个Partition(分区),发送消息
final ProducerRecord<String, String> record = new ProducerRecord<String, String>("one",i % 2,"key3","hello world" + i);
将消息分发到0和1两个partition
此时两个消费者消费的消息总和等于发送的消息的总和,使用不一样的群组的不一样的订阅同一个topic,每一个消费者群组都能收到全部的消息。
轮询不仅是获取数据那么简单。在第一次调用新消费者的poll ()方法时,它会负责查找GroupCoordinator , 而后加入群组,接受分配的分区。若是发生了再均衡,整个过程也是在轮询期间进行的。固然,心跳也是从轮询里发送出去的。因此,咱们要确保在轮询期间所作的任何处理工做都应该尽快完成。
消费者完整代码以下:
package com.wangx.kafka.client; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsuerDemo { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsuerDemo.class); public static void main(String[] args) throws InterruptedException { Properties kafkaPropertie = new Properties(); //配置broker地址,配置多个容错 kafkaPropertie.put("bootstrap.servers", "node2:9092,node1:9092,node1:9093"); //配置key-value容许使用参数化类型,反序列化 kafkaPropertie.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); kafkaPropertie.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); //指定消费者所属的群组 kafkaPropertie.put("group.id","1"); //建立KafkaConsumer,将kafkaPropertie传入。 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaPropertie); /*订阅主题,这里使用的是最简单的订阅testTopic主题,这里也能够出入正则表达式,来区分想要订阅的多个指定的主题,如: *Pattern pattern = new Pattern.compile("testTopic"); * consumer.subscribe(pattern); */ consumer.subscribe(Collections.singletonList("one")); //轮询消息 while (true) { //获取ConsumerRecords,一秒钟轮训一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); //消费消息,遍历records for (ConsumerRecord<String, String> r : records) { LOGGER.error("partition:", r.partition()); LOGGER.error("topic:", r.topic()); LOGGER.error("offset:", r.offset()); System.out.println(r.key() + ":" + r.value()); } Thread.sleep(1000); } } }
1. fetch.min.bytes: 该属性指定了消费者从服务器获取记录的最小字节数。
2. fetch.max.wait.ms:咱们经过 fetch.min.byte告诉Kafka ,等到有足够的数据时才把它返回给消费者。
而 fetch.max.wait.ms则用于指定broker 的等待时间
3. max.partition.fetch.bytes:默认值是1MB,该属性指定了服务器从每一个分区里返回给消费者的最大字节数.
4. session.timeout.ms: 默认3s,该属性指定了消费者在被认为死亡以前能够与服务器断开链接的时
5. auto.offset.reset:该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的状况下(因消费者长时间失效,包含偏移量的记录已通过时井被删除)该做何处
6. enable.auto.commit:该属性指定了消费者是否自动提交偏移量,默认值是true。
7. partition.assignment.strategy: 分区分配给消费者群组的分配策略,有以下两种策略:
Range:该策略会把主题的若干个连续的分区分配给消费者.
RoundRobin:该策略把主题的全部分区逐个分配给消费.
8. client.id:该属性能够是任意字符串, broker 用它来标识从客户端发送过来的消息,一般被用在日志、度量指标和配额里。
9. max.poll.records: 该属性用于控制单次调用call () 方法可以返回的记录数量,能够帮你控制在轮询里须要处理的数据量。
10. receive.buffer.bytes 和send.buffer.bytes: socket 在读写数据时用到的TCP 缓冲区也能够设置大小。若是它们被设为-1,就使用操做系统的默认值。若是生产者或消费者与broker处于不一样的数据中心内,能够适当增大这些值,由于跨数据中心的网络通常都有比较高的延迟和比较低的带宽。