>舒适提示:整个 Kafka 专栏基于 kafka-2.2.1 版本。java
根据 KafkaConsumer 类上的注释上来看 KafkaConsumer 具备以下特征:算法
在 Kafka 中 KafkaConsumer 是线程不安全的。apache
2.2.1 版本的KafkaConsumer 兼容 kafka 0.10.0 和 0.11.0 等低版本。bootstrap
消息偏移量与消费偏移量(消息消费进度) Kafka 为分区中的每一条消息维护一个偏移量,即消息偏移量。这个偏移量充当该分区内记录的惟一标识符。消费偏移量(消息消费进度)存储的是消费组当前的处理进度。消息消费进度的提交在 kafka 中能够定时自动提交也能够手动提交。手动提交能够调用 ommitSync() 或 commitAsync 方法。安全
消费组 与 订阅关系 多个消费这能够同属于一个消费组,消费组内的全部消费者共同消费主题下的全部消息。一个消费组能够订阅多个主题。服务器
队列负载机制 既然同一个消费组内的消费者共同承担主题下全部队列的消费,那他们如何进行分工呢?默认状况下采起平均分配,例如一个消费组有两个消费者c一、c2,一个 topic 的分区数为6,那 c1 会负责3个分区的消费,一样 c2 会负责另外3个分区的分配。网络
那若是其中一个消费者宕机或新增一个消费者,那队列能动态调整吗?session
答案是会从新再次平衡,例如若是新增一个消费者 c3,则c1,c2,c3都会负责2个分区的消息消费,分区重平衡会在后续文章中重点介绍。消费者也能够经过 assign 方法手动指定分区,此时会禁用默认的自动分配机制。架构
消费者故障检测机制 当经过 subscribe 方法订阅某些主题时,此时该消费者还未真正加入到订阅组,只有当 consumeer#poll 方法被调用后,而且会向 broker 定时发送心跳包,若是 broker 在 session.timeout.ms 时间内未收到心跳包,则 broker 会任务该消费者已宕机,会将其剔除,并触发消费端的分区重平衡。并发
消费者也有可能遇到“活体锁”的状况,即它继续发送心跳,但没有任何进展。在这种状况下,为了防止消费者无限期地占用它的分区,能够使用max.poll.interval.ms 设置提供了一个活性检测机制。基本上,若是您调用轮询的频率低于配置的最大间隔,那么客户机将主动离开组,以便另外一个消费者能够接管它的分区。当这种状况发生时,您可能会看到一个偏移提交失败(由调用{@link #commitSync()}抛出的{@link CommitFailedException}表示)。
kafka 对 poll loop 行为的控制参数 Kafka 提供了以下两个参数来控制 poll 的行为:
对于消息处理时间不可预测的状况下上述两个参数可能不够用,那将如何是好呢?
一般的建议将消息拉取与消息消费分开,一个线程负责 poll 消息,处理这些消息使用另外的线程,这里就须要手动提交消费进度。为了控制消息拉起的过快,您可能会须要用到 Consumer#pause(Collection) 方法,暂时中止向该分区拉起消息。RocketMQ 的推模式就是采用了这种策略。若是你们有兴趣的话,能够从笔者所著的《RocketMQ技术内幕》一书中详细了解。
public static void testConsumer1() { Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072"); props.setProperty("group.id", "C_ODS_ORDERCONSUME_01"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<string, string> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("TOPIC_ORDER")); while (true) { ConsumerRecords<string, string> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<string, string> record : records) { System.out.println("消息消费中"); System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
public static void testConsumer2() { Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "false"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<string, string> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<consumerrecord<string, string>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<string, string> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<string, string> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { // insertIntoDb(buffer); // 省略处理逻辑 consumer.commitSync(); buffer.clear(); } } }
要认识 Kafka 的消费者,我的认为最好的办法就是从它的类图着手,下面给出 Consumer 接口的类图。
接下来对起重点方法进行一个初步的介绍,从下篇文章开始将对其进行详细设计。
接下来笔者根据其构造函数,对一一介绍其核心属性的含义,为接下来说解其核心方法打下基础。
Kafka Consumer 消费者就介绍到这里了,从下篇文章开始将开始详细介绍 Kafka 关于消息消费的方方面面。
做者介绍:丁威,《RocketMQ技术内幕》做者,RocketMQ 社区布道师,公众号:中间件兴趣圈 维护者,目前已陆续发表源码分析Java集合、Java 并发包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源码专栏。能够点击连接:中间件知识星球,一块儿探讨高并发、分布式服务架构,交流源码。
</topicpartition,></topicpartition,></topicpartition,></topicpartition,></topicpartition,></topicpartition,></topicpartition,></topicpartition,></string,></string,></metricname,></topicpartition,></topicpartition,></k,></string,></string,></consumerrecord<string,></string,></string,></string,></string,>