项目中有一个需求,是经过消费kafka的消息来处理数据,可是想要实现延迟消费的效果,因而想到了是否能够本身管理kafka的commit来实现,就是经过设置`enable.auto.commit`为False,预期是若是消费到了消息,可是不commit,kafka就会从新把消息放回队列,后续还会再次消费到,直到超过设置的延迟时间再真正消费并commit。python
因而写了个demo来验证,结果发现这个配置的效果并非本身想要的。bootstrap
生产者每秒钟向kafka的topic发送一条消息。spa
#!/usr/bin/env python # -*- coding:utf-8 -*- import time from confluent_kafka import Producer, KafkaError from confluent_kafka import TopicPartition from confluent_kafka import OFFSET_BEGINNING p = Producer({'bootstrap.servers':'localhost:9092, localhost:9093, localhost:9094'}) topic = 'nico-test' msg_tpl = 'hello kafka:{0}' while True: msg = msg_tpl.format(time.time()) p.produce(topic, msg) print('Produce msg:{0}'.format(msg)) time.sleep(1) p.flush()
消费者设置了配置项enable.auto.commit:False。设计
#!/usr/bin/env python # -*- coding:utf-8 -*- import time from confluent_kafka import Consumer, KafkaError from confluent_kafka import TopicPartition from confluent_kafka import OFFSET_BEGINNING c = Consumer({ 'bootstrap.servers':'localhost:9092, localhost:9093, localhost:9094', 'group.id':'nico-test', 'auto.offset.reset':'earliest', 'enable.auto.commit':False }) topic = 'nico-test' c.subscribe([topic]) cd = c.list_topics() print(cd.cluster_id) print(cd.controller_id) print(cd.brokers) print(cd.topics) print(cd.orig_broker_id) print(cd.orig_broker_name) while True: msg = c.poll(1.0) if msg is None: continue print('topic:{topic}, partition:{partition}, offset:{offset}, headers:{headers}, key:{key}, msg:{msg}, timestamp:{timestamp}'.format(topic=msg.topic(), msg=msg.value(), headers=msg.headers(), key=msg.key(), offset=msg.offset(), partition=msg.partition(), timestamp=msg.timestamp()))
结果是consumer启动后会一直顺序的消费消息,而且并不会把消息重放到队列中,可是当consumer被kill掉重启时,每次都是从最开始的时候消费的,因此总结一下,该配置项的做用是当配置为true时,每次获取到消息后就会自动更新存储在zookepper中的offset值。code
最后本身也想了一下,这里不支持延迟消费的缘由其实和kafka的实现原理有很大的关系,kafka是直接把消息存储在磁盘文件中的,若是想要实现重放(支持延迟消费)那么就须要把该消息从消息队列中删除,而后从新插入到消息队列,那这样就跟kafka的设计相违背了。orm