Properties prop = new Properties(); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer"); prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumerDemo"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop); consumer.subscribe(Collections.singleton("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); String value = record.value(); System.err.println(record.toString()); } }
kafka
消费者是以 组为基本单位 进行消费的。消费的模型以下java
1 个 topic
容许被多个 消费组
消费。再次强调,kafka
消费是以组为单位。正则表达式
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer");
以上这行代码设置了消费组。算法
partition
分配topic
为逻辑上的概念,partition
才是物理上的概念。那么看完这个以上的消费模型图。你可能会很疑惑。当一个组下有多个消费者时,每一个消费者是如何消费的?安全
先说明:partition
的分配为平均分配多线程
假设一:topic1
下面有 3 个分区。分别以下:p1 - p3。那么 groupA
下的三个消费者消费的对应 partition
为以下ide
instance1: p1 instance2: p2 instance3: p3
假设二:topic1
下面有 8 个分区。分别为 p1 - p8。那么 groupA
中每一个消费者分配到的 partition
就以下函数
instance1: p1,p2,p3 instance2: p4,p5,p6 instance3: p7,p8
partition
重分配假设三:topic1
下面有 8 个分区:P1 - P8。groupA
有三个消费者:c1,c2,c3。此时分配的 partition
以下fetch
c1: p1,p2,p3 c2: p4,p5,p6 c3: p7,p8
若是此时,又有一个新的消费者加入到 groupA
会发生什么呢? partition
会被从新分配大数据
c1: p1,p2 c2: p3,p4 c3: p5,p6 c4: p7,p8
API
介绍void subscribe(Collection<String> topics); void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
从方法上看 kafka
容许一个消费者订阅多个 topic
。spa
void subscribe(Pattern pattern); void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
入参 Pattern
则表示,可使用正则表达式匹配多个 topic
. 实例代码以下
Pattern pattern = Pattern.compile("test?"); consumer.subscribe(pattern);
能够订阅主题,那么天然也能够取消订阅主题
consumer.unsubscribe();
固然,也能够直接获取到消费组订阅的主题
Set<String> topics = consumer.subscription();
一个主题下面有多个 partition
, 那么是否能够指定要消费的队列呢?答案是能够的
TopicPartition p1 = new TopicPartition("test1", 0); TopicPartition p2 = new TopicPartition("test1", 1); consumer.assign(Arrays.asList(p1, p2));
不过须要注意的是,若是指定了消费的分区,那么是消费者是没法自动 rebanlance
的。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
从消费者端的这行代码,咱们能够看出,kafka
消息消费采用的是拉取模式。当未拉取到消息时,会阻塞线程。
poll
方法返回的 ConsumerRecords
实现 Iterable
接口,是 ConsumerRecord
的迭代器。ConsumerRecord
属性相对简单
public class ConsumerRecord<K, V> { private final String topic; // 主题 private final int partition; // 分区 private final long offset; // 消息所属分区偏移量 private final long timestamp; // 时间戳 private final TimestampType timestampType; // 二者类型,消息建立时间戳及消息追加到日志的时间戳 private final int serializedKeySize; private final int serializedValueSize; private final Headers headers; // 发送的header private final K key; // 发送的 key private final V value; // 发送的内容 private volatile Long checksum; // CRC32 校验值 }
对于分区而言,消息会有一个惟一 offset
, 表示消息在分区中的位置,称之为 偏移量
。对于消息消费而言,也有消费进度的 offset
,称之为 位移
。kafka
将消息的消费进度存储在 kafka
内部主题 __onsumer_offset
中。kafka
中默认每隔 5s
保存消息的消费进度。可经过 auto.commit.interval.ms
进行配置。
kafka
提供手动提交的 API
,下面演示一下。
Properties prop = new Properties(); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer"); prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumerDemo"); prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop); consumer.subscribe(Collections.singleton("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println("消费:" + record.toString()); } consumer.commitSync(); }
须要注意的是,须要将 enable.auto.commit
设置为 true
.
kafka
设置 新消费组 从哪一个位置开始消费的配置为:auto.offset.reset
该配置有如下 3 个配置项
latest
(默认配置)默认从最新的位置,开始消费。
earliest
从最先的位置开始消费。当配置为该参数时,kafka
会打印以下日志:Resetting offset for partition
none
当消费组,没有对应消费进度时,会直接抛 NoOffsetForPartitionException
异常
kafka
还提供了 seek(TopicPartition partition, long offset)
方法,容许新的消费者,设置从哪一个位置开始消费。
// 由于分配 分区的动做,发生在 pool 中,所以在设置消费偏移量时,须要先拉取消息 Set<TopicPartition> assignment = new HashSet<>(); while (assignment.size() == 0) { consumer.poll(Duration.ofMillis(100)); assignment = consumer.assignment(); } for (TopicPartition tp : assignment) { consumer.seek(tp, 50); } while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.err.println("消费:" + record.toString()); } }
更多状况下,咱们可能会指定消费组从指定的时间点开始消费
Map<TopicPartition, Long> timestampToSearch = new HashMap<>(); for (TopicPartition tp : assignment) { // 指定从一天前开始消费 timestampToSearch.put(tp, System.currentTimeMillis() - 1 * 24 * 3600 * 1000); } Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch); for (TopicPartition tp : assignment) { OffsetAndTimestamp timestamp = offsets.get(tp); if (null != timestamp) { consumer.seek(tp, timestamp.offset()); } }
在分区再均衡期间,消费组内的消费者是没法读取消息的。而且若是以前的消费者没有及时提交消费进度,那么会形成重复消费。
kafka
在 subscribe
的时候,提供了回调函数,容许咱们在触发再均衡时,作控制
void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
看一下 ConsumerRebalanceListener
定义的接口
// 再均衡开始以前和消费者中止读取消息以前被调用,可利用该会掉,提交消费位移 void onPartitionsRevoked(Collection<TopicPartition> partitions); // 从新分区后,消费者开始读取消息以前被调用 void onPartitionsAssigned(Collection<TopicPartition> partitions);
下面演示,如何在再均衡以前,提交消费偏移
consumer.subscribe(Collections.singleton("test"), new ConsumerRebalanceListener() { // 在再均衡开始以前和消费者中止读取消息以前被调用,可利用该会掉,提交消费位移 @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 提交消费偏移 consumer.commitSync(); } // 从新分区后,消费者开始读取消息以前被调用 @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } });
消费者,容许在 消费以前,消费偏移提交以后,关闭以前,进行控制,多个拦截器则组成拦截器链, 且多个拦截器以前须要用 ',' 号隔开。
先看拦截器定义的接口
public interface ConsumerInterceptor<K, V> extends Configurable { // 消息消费以前 ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records); // 提交以后调用 void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets); // 关闭以前调用 void close(); }
Properties prop = new Properties(); prop.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName() + "," + MyConsumerInterceptor2.class.getName());
fetch.min.bytes
默认 1B
,poll
时,拉取的最小数据量。
fetch.max.bytes
默认 5242880B
,50MB,poll
时,拉取的最大数据量。
fetch.max.wait.ms
默认 500ms
,若是 kafka
一直没有触发 poll
动做,那么最多等待 fetch.max.wait.ms
。
max.partition.fetch.bytes
默认 1048576B
,1MB,分区拉取时的最大数据量
max.poll.records
默认 500条
,拉取的最大消息条数
connections.max.idle.ms
默认 540000ms
, 9分钟,多久关闭闲置的链接
receive.buffer.bytes
默认 65536B
,64KB
,SOCKET
接受消息的缓冲区(SO_RECBUF
)
request.timeout.ms
默认 30000ms
,配置 consumer
等待请求响应的最长时间
metadata.max.age.ms
默认 300000ms
,5 分钟,配置元数据过时时间。元数据在限定的时间内,没有更新,会被强制更新
reconnect.backoff.ms
默认 50ms
,配置尝试链接指定主机以前的等待时间,避免频繁链接主机
retry.backoff.ms
默认 100ms
,发送失败时,2次的间隔时间
kafka
消费以组为单位,且容许一个消费组订阅多个 topic
partition
重分配算法,为平均算法KafkaConsumer
为线程不安全。所以 poll()
只有当前线程在拉取消息。kafka
要实现多线程拉取相对麻烦kafka
消费者端,提供的 API
很是灵活,容许从指定的位置消费,容许手动提交某个分区的消费偏移kafka
提供消费者拦截器链,容许在 消费以前,提交消费偏移以后 控制。RocketMQ
建议 1 个消费组只消费一个 topic
, 且在实际开发中,若是消费者订阅多个 topic
会没法正常工做。kafka
中 1 个消费者能够订阅多个 topic
。RocketMQ
能够确保消费时,消息不丢失,kafka
没法保证。RocketMQ
在消费者端,实现了多线程消费,kafka
则没有kafka
默认每 5s
持久化消费进度,RocketMQ
也是。不过 RocketMQ
会提交偏移量最小的消息。好比,线程 A 消费了 20 的消息。线程 B 消费了 10 的消息。当线程 A 提交消费进度的时候,会提交 10,而不会提交20。这也是 RocketMQ
能够确保消息消费时不丢的缘由。RocketMQ
发生 rebalance
,即 kafka
的再分配。默认和 kafka
一致,采用的是 平均分配算法
。不过 RocketMQ
容许自定义再分配算法,且提供了丰富算法支持。RocketMQ
与 kafka
一致,都存在重复消费问题。API
来看,kafka
客户端会比 RocketMQ
更加灵活。kafka
设置 新的消费组 从哪一个位置开始消费,没有额外的条件限制;RocketMQ
只有当旧消息堆积很是多时,才会有效。