3: 各个消费者按组协调消费this
(1)使用一个全新的"group.id"(就是以前没有被任何消费者使用过); (2)使用assign来订阅; # 例如 groupId @KafkaListener(topics = "test-syn",groupId = "test-2") public void send(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object messge = kafkaMessage.get(); log.info("【KafkaListener监听到消息】" + messge); } }
注意:若是把 "enable.auto.commit" 设为 "false",使用 consumer.commitAsync(currentOffsets, null) 手动提交 offset ,是不能从头开始消费的spa
auto.offset.reset值含义解释: code
也就是说不管哪一种设置,只要 kafka 中相同 group、partition 中已经有提交的 offset,则都没法从开始消费。blog
参考论坛:服务器重启了,那么该group是否会从新消费服务器里面全部的消息ip
KafkaConsumer.subscribe() : 为consumer自动分配partition,get
有内部算法保证topic-partition以最优的方式均匀分配给同group下的不一样consumer。若是有多个partition且只有一个消费者,则按顺序消费全部分区。不会重复消费。kafka
KafkaConsumer.assign() : 为consumer手动、显示的指定须要消费的topic-partitions,it
不受group.id限制,不提交offset,至关与指定的group无效(this method does not use the consumer's group management)。能够重复消费。
或者,这样作:
目前就 high level API 而言,offset 是存于 Zookeeper 中的,没法存于 HDFS,而 low level API 的 offset 是由本身去维护的,能够将之存于 HDFS 中。
# groupId 将多个消费者分配到同一个组下面 @KafkaListener(topics = "test-syn",groupId = "test-1") public void send(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object messge = kafkaMessage.get(); log.info("【KafkaListener监听到消息】" + messge); } } @KafkaListener(topics = "test-syn",groupId = "test-1") public void send(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object messge = kafkaMessage.get(); log.info("【KafkaListener监听到消息】" + messge); } }
@KafkaListener(topics = "test-syn",groupId = "test-1") public void send(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object messge = kafkaMessage.get(); log.info("【KafkaListener监听到消息】1" + messge); } } @KafkaListener(topics = "test-syn",groupId = "test-2") public void send2(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object messge = kafkaMessage.get(); log.info("【KafkaListener监听到消息】2" + messge); } } @KafkaListener(topics = "test-syn",groupId = "test-3") public void send(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object messge = kafkaMessage.get(); log.info("【KafkaListener监听到消息】1" + messge); } } @KafkaListener(topics = "test-syn",groupId = "test-2") public void send2(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object messge = kafkaMessage.get(); log.info("【KafkaListener监听到消息】2" + messge); } } # 上面 1 2 3 收到相同的消费message 2 2 收到不一样的message