近段时间学习极客时间李玥老师的后端存储实战课时,看到一个不少意思的东西:用kafka存储点击流的数据,并重复处理。在以往的使用中,kafka只是一个消息传输的载体,消息被消费后就不能再次消费。新知识与印象相冲突,因而就有了本篇文章:kafka数据如何被重复消费。html
首先我先去官网纠正了我对kafka的总体了解。java
官网对kafka的描述是:一个分布式流平台。怪本身的学艺不精。git
其次,我从新看了一下kafka消费者的消费过程:kafka首先经过push/poll(默认为poll)获取消息,接收消息处理完成后手动/自动提交消费成功,kafka服务器则根据提交状况决定是否移动当前偏移量。github
kafka消费者读取数据的位置是经过偏移量判断,那若是我能将偏移量手动设置为起始位置,就能实现重复消费?这个有搞头。apache
如何手动设置偏移量是关键。后端
代码的关键主要在于偏移量设置 api 的调用,其他没什么特别。api
要注意的是,代码中我分别调用了做用不一样的设置偏移量,仅做为展现,可按需取用。服务器
最后消费者消息消息时,我只使用默认的拉取条数设置消费一次,可按需进行修改。分布式
/** * repeat kafka message * @param host kafka host * @param groupId kafka consumer group id * @param autoCommit whether auto commit consume * @param topic consume topic * @param consumeTimeOut consume time out */ private void textResetOffset(String host, String groupId, Boolean autoCommit, String topic, Long consumeTimeOut){ //form a properties to new consumer Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString()); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //subscribe incoming topic consumer.subscribe(Collections.singletonList(topic)); //get consumer consume partitions List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); List<TopicPartition> topicPartitions = new ArrayList<>(); for(PartitionInfo partitionInfo : partitionInfos){ TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); topicPartitions.add(topicPartition); } // poll data from kafka server to prevent lazy operation consumer.poll(Duration.ofSeconds(consumeTimeOut)); //reset offset from beginning consumer.seekToBeginning(topicPartitions); //reset designated partition offset by designated spot int offset = 20; consumer.seek(topicPartitions.get(0), offset); //reset offset to end consumer.seekToEnd(topicPartitions); //consume message as usual ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); while (iterator.hasNext()){ ConsumerRecord<String, String> record = iterator.next(); log.info("consume data: {}", record.value()); } }
在手动设置偏移量时,遇到了一个exceptionide
java.lang.IllegalStateException: No current assignment for partition test-0
翻了一下stackoverflow以及官方文档后,才了解到设置偏移量是一个lazy operation,官网的解释以下。
Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the first offset in all partitions only whenpoll(long)
) orposition(TopicPartition)
) are called. If no partition is provided, seek to the first offset for all of the currently assigned partitions.
因而我先进行一次 poll 操做后再设置偏移量。
本文首发于 cartoon的博客
转载请注明出处:https://cartoonyu.github.io/cartoon-blog/post/message-queue/kafka数据如何被重复消费/