kafka的消费者API提供从kafka服务端拉取消息的能力,kafka引入了消费者组的概念,不一样消费者组之间互不影响,独自拥有一份数据,而同一个消费者组内的消费者则有以下规律:apache
分区数=消费者数:一个消费者拉取一个分区的数据bootstrap
分区数>消费者数:同一个消费者可能拉取不一样分区的数据oop
分区数<消费者数:一个消费者拉取一个分区的数据,多余的消费者不参与工做,当正在工做的消费者挂了之 后,这些闲着的消费者会顶替它干活,但会出现重复消费数据的状况spa
全部提交的offset都在kafka内建的一个消息队列中存在的,有50个分区,可使用以下命令查看.net
查看全部topicdebug
./kafka-topics.sh --zookeeper hadoop01:2181 --listcode
查看某个消费者组订阅的topic的当前offset和滞后进度server
./kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --describe --group my_groupblog
1.偏移量-自动提交队列
/* 消费者拉取数据以后自动提交偏移量,不关心后续对消息的处理是否正确 优势:消费快,适用于数据一致性弱的业务场景 缺点:消息很容易丢失 */ @Test public void autoCommit() { Properties props = new Properties(); //设置kafka集群的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //设置消费者组,组名字自定义,组名字相同的消费者在一个组 props.put("group.id", "my_group"); //开启offset自动提交 props.put("enable.auto.commit", "true"); //自动提交时间间隔 props.put("auto.commit.interval.ms", "1000"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //实例化一个消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消费者订阅主题,能够订阅多个主题 consumer.subscribe(Arrays.asList("mytopic1")); //死循环不停的从broker中拿数据 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
运行上面的程序输出结果:
使用以下命令查看offset提交后当前位置
./kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --describe --group my_group
比较上面两张图,最后一次消费的OFFSET=216493,下一个要消费的OFFSET=216494
一般从Kafka拿到的消息是要作业务处理,并且业务处理完成才算真正消费成功,因此须要客户端控制offset提交时间
@Test public void munualCommit() { Properties props = new Properties(); //设置kafka集群的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //设置消费者组,组名字自定义,组名字相同的消费者在一个组 props.put("group.id", "my_group"); //开启offset自动提交 props.put("enable.auto.commit", "false"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //实例化一个消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消费者订阅主题,能够订阅多个主题 consumer.subscribe(Arrays.asList("mytopic1")); final int minBatchSize = 50; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { //insertIntoDb(buffer); for (ConsumerRecord bf : buffer) { System.out.printf("offset = %d, key = %s, value = %s%n", bf.offset(), bf.key(), bf.value()); } consumer.commitSync(); buffer.clear(); } } }
在munualCommit的基础上更细粒度的提交数据,按照每一个分区手动提交偏移量
这里实现了按照分区取数据,所以能够从分区入手,不一样的分区能够作不一样的操做,能够灵活实现一些功能
为了验证手动提交偏移量,有两种方式:
1.debug的时候,在poll数据以后,手动提交前偏移量以前终止程序,再次启动看数据是否重复被拉取 2.debug的时候,在poll数据以后,手动提交前偏移量以前终止程序,登陆Linux 主机执行以下命令:
/kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --describe --group my_group
命令的输出结果能够看到当前topic每一个区分被提交后的当前偏移量、还未被消费的最大偏移量、二者之间的差等信息
@Test public void munualCommitByPartition() { Properties props = new Properties(); //设置kafka集群的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //设置消费者组,组名字自定义,组名字相同的消费者在一个组 props.put("group.id", "my_group"); //开启offset自动提交 props.put("enable.auto.commit", "false"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //实例化一个消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消费者订阅主题,能够订阅多个主题 consumer.subscribe(Arrays.asList("mytopic3")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println("partition: " + partition.partition() + " , " + record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); /* 提交的偏移量应该始终是您的应用程序将要读取的下一条消息的偏移量。所以,在调用commitSync()时, offset应该是处理的最后一条消息的偏移量加1 为何这里要加上面不加喃?由于上面Kafka可以自动帮咱们维护全部分区的偏移量设置,有兴趣的同窗能够看看SubscriptionState.allConsumed()就知道 */ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); } }
消费只读取特定分区数据,这种方式比上面的更加灵活,在实际应用场景中会常常使用
由于分区的数据是有序的,利用这个特性能够用于数据到达有前后顺序的业务,好比一个用户将订单提交,紧接着又取消订单,那么取消的订单必定要后于提交的订单到达某一个分区,这样保证业务处理的正确性
一旦指定了分区,要注意如下两点:
a.kafka提供的消费者组内的协调功能就再也不有效
b.这样的写法可能出现不一样消费者分配了相同的分区,为了不偏移量提交冲突,每一个消费者实例的group_id要不重复
@Test public void munualPollByPartition() { Properties props = new Properties(); //设置kafka集群的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //设置消费者组,组名字自定义,组名字相同的消费者在一个组 props.put("group.id", "my_group"); //开启offset自动提交 props.put("enable.auto.commit", "false"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //实例化一个消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消费者订阅主题,并设置要拉取的分区 TopicPartition partition0 = new TopicPartition("mytopic3", 0); //TopicPartition partition1 = new TopicPartition("mytopic2", 1); //consumer.assign(Arrays.asList(partition0, partition1)); consumer.assign(Arrays.asList(partition0)); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println("partition: " + partition.partition() + " , " + record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); } }