python消费和生产kafka消息

-------------------------------------------------------------------------------
 kafka = KafkaProducer(bootstrap_servers==: 10: : : : : : =, msg, partition=-------------------------------------------------------------------------------- kafka = KafkaConsumer(, bootstrap_servers=[ msg =  %--------------------------------------------------------------------------------
二、若是想要完成负载均衡,就须要知道kafka的分区机制,同一个主题,能够为其分区,在生产者不指定分区的状况,kafka会将多个消息分发到不一样的分区,消费者订阅时候若是不指定服务组,
会收到全部分区的消息,若是指定了服务组,则同一服务组的消费者会消费不一样的分区,若是2个分区两个消费者的消费者组消费,则,每一个消费者消费一个分区,若是有三个消费者的服务组,
则会出现一个消费者消费不到数据;若是想要消费同一分区,则须要用不一样的服务组。以此为原理,咱们对消费者作以下修改:


 kafka = KafkaConsumer(, group_id=, bootstrap_servers=[ msg =  %------------------------------------------------------------------------------------

三、kafka提供了偏移量的概念,容许消费者根据偏移量消费以前遗漏的内容,这基于kafka名义上的全量存储,能够保留大量的历史数据,历史保存时间是可配置的,通常是7天,若是偏移量定位到了已删除的位置那也会有问题,可是这种状况可能很小;每一个保存的数据文件都是以偏移量命名的,当前要查的偏移量减去文件名就是数据在该文件的相对位置。要指定偏移量消费数据,须要指定该消费者要消费的分区,不然代码会找不到分区而没法消费,代码以下:bootstrap

 kafka  kafka.structs = KafkaConsumer(group_id=, bootstrap_servers=[=, partition=0), TopicPartition(topic=, partition=1 consumer.partitions_for_topic()  =, partition= msg =  %-----------------------------------------------------------------------------------

四、有时候,咱们并不须要实时获取数据,由于这样可能会形成性能瓶颈,咱们只须要定时去获取队列里的数据而后批量处理就能够,这种状况,咱们能够选择主动拉取数据负载均衡


 kafka = KafkaConsumer(group_id=, bootstrap_servers=[=(== consumer.poll(timeout_ms=5)  
    2+= 1
  %-----------------------------------------------------------------------------------
相关文章
相关标签/搜索