Kafka是一个分布式流处理系统,流处理系统使它能够像消息队列同样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制。
Kafka的基本概念python
kafka运行在集群上,集群包含一个或多个服务器。kafka把消息存在topic中,每一条消息包含键值(key),值(value)和时间戳(timestamp)。
kafka有如下一些基本概念:bootstrap
消息生产者,就是向kafka broker发消息的客户端。
消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。
主题,由用户定义并配置在Kafka服务器,用于创建Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。
消息分区,一个topic能够分为多个 partition,每一个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker能够容纳多个topic。
消费组,用于归组同类消费者。每一个consumer属于一个特定的consumer group,多个消费组能够共同消息一个Topic下的消息,每消费组中的消费者消费Topic的部分消息,这些消费者就组成了一个分组。
消息在partition中的偏移量。每一条消息在partition都有惟一的偏移量,消息者能够指定偏移量来指定要消费的消息。
如上图所示,kafka将topic中的消息存在不一样的partition中。若是存在键值(key),消息按照键值(key)作分类存在不一样的partiition中,若是不存在键值(key),消息按照轮询(Round Robin)机制存在不一样的partition中。默认状况下,键值(key)决定了一条消息会被存在哪一个partition中。服务器
partition中的消息序列是有序的消息序列。kafka在partition使用偏移量(offset)来指定消息的位置。一个topic的一个partition只能被一个consumer group中的一个consumer消费,多个consumer消费同一个partition中的数据是不容许的,可是一个consumer能够消费多个partition中的数据。架构
kafka将partition的数据复制到不一样的broker,提供了partition数据的备份。每个partition都有一个broker做为leader,若干个broker做为follower。全部的数据读写都经过leader所在的服务器进行,而且leader在不一样broker之间复制数据。并发
上图中,对于Partition 0,broker 1是它的leader,broker 2和broker 3是follower。对于Partition 1,broker 2是它的leader,broker 1和broker 3是follower。分布式
在上图中,当有Client(也就是Producer)要写入数据到Partition 0时,会写入到leader Broker 1,Broker 1再将数据复制到follower Broker 2和Broker 3。spa
在上图中,Client向Partition 1中写入数据时,会写入到Broker 2,由于Broker 2是Partition 1的Leader,而后Broker 2再将数据复制到follower Broker 1和Broker 3中。3d
上图中的topic一共有3个partition,对每一个partition的读写都由不一样的broker处理,所以总的吞吐量获得了提高。code
kafka-python是一个python的Kafka客户端,能够用来向kafka的topic发送消息、消费消息。server
这个实验会实现一个producer和一个consumer,producer向kafka发送消息,consumer从topic中消费消息。结构以下图
producer代码
#-*- coding: utf-8 -*- from kafka import KafkaProducer import time producer = KafkaProducer(bootstrap_servers='localhost:9092') i = 1000 while True: ts = int(time.time() * 1000) producer.send(topic='py_test', value=str(i), key=str(i), timestamp_ms=ts) producer.flush() print i i += 1 time.sleep(1)
consumer代码
#-*- coding: utf-8 -*- from kafka import KafkaConsumer consumer = KafkaConsumer('py_test', bootstrap_servers=["localhost:9092"]) for message in consumer: print message
接下来建立test topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
打开两个窗口中,咱们在window1中运行producer,以下:
在window2中运行consumer,以下:
这个实验将展现消费组的容错性的特色。这个实验中将建立一个有2个partition的topic,和2个consumer,这2个consumer共同消费同一个topic中的数据。结构以下所示
producer部分代码和实验一相同,这里再也不重复。consumer须要指定所属的consumer group,代码以下
#-*- coding: utf-8 -*- from kafka import KafkaConsumer consumer = KafkaConsumer('py_test', group_id='testgt', bootstrap_servers=["localhost:9092"]) for message in consumer: print message
接下来咱们建立topic,名字test,设置partition数量为2
打开三个窗口,一个窗口运行producer,还有两个窗口运行consumer。
运行consumer的两个窗口的输出以下:
能够看到两个consumer同时运行的状况下,它们分别消费不一样partition中的数据。window1中的consumer消费partition 0中的数据,window2中的consumer消费parition 1中的数据。
咱们尝试关闭window1中的consumer,能够看到以下结果
刚开始window2中的consumer只消费partition1中的数据,当window1中的consumer退出后,window2中的consumer中也开始消费partition 0中的数据了。
kafka容许consumer将当前消费的消息的offset提交到kafka中,这样若是consumer因异常退出后,下次启动仍然能够从上次记录的offset开始向后继续消费消息。
这个实验的结构和实验一的结构是同样的,使用一个producer,一个consumer,test topic的partition数量设为1。
producer的代码和实验一中的同样,这里再也不重复。consumer的代码稍做修改,这里consumer中打印出下一个要被消费的消息的offset。consumer代码以下
#-*- coding: utf-8 -*- from kafka import KafkaConsumer, TopicPartition tp = TopicPartition('py_test', 0) consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id='test_g', auto_offset_reset='earliest', enable_auto_commit=False) consumer.assign([tp]) print "starting offset is", consumer.position(tp) for message in consumer: # pass print message.value
auto.offset.reset值含义解释 earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
在一个窗口中启动producer,在另外一个窗口而且启动consumer。consumer的输出以下
能够尝试退出consumer,再启动consumer。每一次从新启动,consumer都是从offset=98的消息开始消费的。
修改consumer的代码以下 在consumer消费每一条消息后将offset提交回kafka
#-*- coding: utf-8 -*- from kafka import KafkaConsumer, TopicPartition tp = TopicPartition('py_test', 0) consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id='test', auto_offset_reset='earliest', enable_auto_commit=True) consumer.assign([tp]) print "starting offset is", consumer.position(tp) for message in consumer: print message.offset # consumer.commit() 也能够主动提交offset
启动consumer
能够看到consumer从offset=98的消息开始消费,到offset=829时,咱们Ctrl+C退出consumer。
咱们再次启动consumer
能够看到从新启动后,consumer从上一次记录的offset开始继续消费消息。以后每一次consumer从新启动,consumer都会从上一次中止的地方继续开始消费。
#!/usr/bin/env python # -*- coding: utf-8 -*- from kafka import KafkaConsumer, TopicPartition tp = TopicPartition('py_test', 0) consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id='test_1', auto_offset_reset='earliest', enable_auto_commit=False) consumer.assign([tp]) print "starting offset is", consumer.position(tp) for message in consumer: print message.offset # consumer.commit()
换一个group_id test_1,会从starting offset is 0开始输出:
starting offset is 0 0