kafka数据如何被重复消费

近段时间学习极客时间李玥老师的后端存储实战课时,看到一个不少意思的东西:用kafka存储点击流的数据,并重复处理。在以往的使用中,kafka只是一个消息传输的载体,消息被消费后就不能再次消费。新知识与印象相冲突,因而就有了本篇文章:kafka数据如何被重复消费。html

前期理论了解

首先我先去官网纠正了我对kafka的总体了解。java

官网对kafka的描述是:一个分布式流平台。怪本身的学艺不精。git

其次,我从新看了一下kafka消费者的消费过程:kafka首先经过push/poll(默认为poll)获取消息,接收消息处理完成后手动/自动提交消费成功,kafka服务器则根据提交状况决定是否移动当前偏移量。github

方案肯定

kafka消费者读取数据的位置是经过偏移量判断,那若是我能将偏移量手动设置为起始位置,就能实现重复消费?这个有搞头。apache

如何手动设置偏移量是关键。后端

show me the code

代码的关键主要在于偏移量设置 api 的调用,其他没什么特别。api

要注意的是,代码中我分别调用了做用不一样的设置偏移量,仅做为展现,可按需取用。服务器

最后消费者消息消息时,我只使用默认的拉取条数设置消费一次,可按需进行修改。分布式

 /**  
  * repeat kafka message  
  * @param host kafka host  
  * @param groupId kafka consumer group id  
  * @param autoCommit whether auto commit consume  
  * @param topic consume topic  
  * @param consumeTimeOut consume time out  
 */  
  private void textResetOffset(String host, String groupId, Boolean autoCommit, String topic, Long consumeTimeOut){  
  //form a properties to new consumer  
  Properties properties = new Properties();  
  properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);  
  properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);  
  properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString());  
  properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());  
  properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());  
  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);  
  //subscribe incoming topic  
  consumer.subscribe(Collections.singletonList(topic));  
  //get consumer consume partitions  
  List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);  
  List<TopicPartition> topicPartitions = new ArrayList<>();  
  for(PartitionInfo partitionInfo : partitionInfos){  
  TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());  
  topicPartitions.add(topicPartition);  
  }  
  // poll data from kafka server to prevent lazy operation  
  consumer.poll(Duration.ofSeconds(consumeTimeOut));  
  //reset offset from beginning  
  consumer.seekToBeginning(topicPartitions);  
  //reset designated partition offset by designated spot  
  int offset = 20;  
  consumer.seek(topicPartitions.get(0), offset);  
  //reset offset to end  
  consumer.seekToEnd(topicPartitions);  
  //consume message as usual  
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));  
  Iterator<ConsumerRecord<String, String>> iterator = records.iterator();  
  while (iterator.hasNext()){  
  ConsumerRecord<String, String> record = iterator.next();  
  log.info("consume data: {}", record.value());  
  }  
  }
运行结果

需注意的点

在手动设置偏移量时,遇到了一个exceptionide

 java.lang.IllegalStateException: No current assignment for partition test-0

翻了一下stackoverflow以及官方文档后,才了解到设置偏移量是一个lazy operation,官网的解释以下。

Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the first offset in all partitions only when poll(long)) or position(TopicPartition)) are called. If no partition is provided, seek to the first offset for all of the currently assigned partitions.

因而我先进行一次 poll 操做后再设置偏移量。

本文首发于 cartoon的博客
转载请注明出处:https://cartoonyu.github.io/cartoon-blog/post/message-queue/kafka数据如何被重复消费/