上一篇文章了解了kafka的重要组件zookeeper,用来保存broker、consumer等相关信息,作到平滑扩展。这篇文章就实际操做部署下kafka,用几个简单的例子加深对kafka的理解,学会基本使用kafka。html
我将会在本地部署一个三台机器的zookeeper集群,和一个2台机器的kafka集群。python
zookeeper的搭建能够看个人上一篇文章分布式系统中zookeeper实现配置管理+集群管理,按照步骤,一步步能够很容易的搭建3太服务器的zookeeper集群。跟以前同样,我仍是在本地的3个端口搭建了3台服务器,地址以下所示:apache
192.168.0.105:2181 192.168.0.105:2182 192.168.0.105:2183
这三台服务器一下子会在kafka配置中用到。json
第一步. 下载kafkabootstrap
到kafka官网下载apache kafka,解压到/path/to/kafka
目录。服务器
第二步. 修改配置文件
复制/path/to/kafka/config/server.properties
,到/path/to/kafka/config/server-1.properties
和/path/to/kafka/config/server-2.properties
异步
配置文件中修改的差别内容以下所示:
server-1.properties
:分布式
broker.id=1 listeners=PLAINTEXT://:9093 log.dirs=/tmp/kafka-logs-1 zookeeper.connect=192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183
server-2.properties
:函数
broker.id=2 listeners=PLAINTEXT://:9094 log.dirs=/tmp/kafka-logs-2 zookeeper.connect=192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183
其中broker.id
是broker的惟一标示,集群中的broker标识必须惟一。
listeners
是broker监听的地址和端口,advertised.listeners
用于和producer、consumer交互,后者未配置会默认使用前者,listeners的完整格式是listeners = listener_name://host_name:port
,其中PLAINTEXT
是协议,还有一种是SSL
,具体还没太搞明白(TODO)。
log.dirs
是日志数据的存放目录,也就是producer产生的数据存放的目录。
zookeeper.connect
配置是zookeeper的集群,broker启动以后将信息注册到zookeeper集群中。性能
第三步. 启动服务器
cd /path/to/kafka bin/kafka-server-start.sh -daemon config/server-1.properties bin/kafka-server-start.sh -daemon config/server-2.properties
使用jps
命令能够看见2个kafka进程,证实启动成功了。
第四步. 建立topic
建立topic通常使用kafka自带的脚本建立:
bin/kafka-topics.sh --create --zookeeper 192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183 --replication-factor 2 --partitions 10 --topic user-event
其中--zookeeper
就是后面就是咱们上面配置的zookeeper集群,--replication-factor
表明每一个分区在集群中复制的份数,后面的值要小于kafka集群中服务器数量,--partitions
表示建立主题的分区数量,通常分区越大,性能越好,--topic
后边儿就是建立主题的名字,运行成功以后会看到Created topic "user-event".
字样,表示建立成功,会在kafka配置的日志目录下建立主题信息,好比下面的:
ll /tmp/kafka-logs-1
drwxr-xr-x 7 ritoyan wheel 224 6 3 21:21 clock-tick-0 drwxr-xr-x 7 ritoyan wheel 224 6 3 21:21 clock-tick-2 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-0 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-1 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-2 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-3 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-4 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-5 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-6 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-7 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-8 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-9
ll /tmp/kafka-logs-2
drwxr-xr-x 7 ritoyan wheel 224 6 3 21:21 clock-tick-1 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-0 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-1 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-2 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-3 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-4 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-5 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-6 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-7 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-8 drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-9
能够看到两个broker中都建立了主题user-event
的10个分区。可能也有人要问了,clock-tick
这个主题怎么在broker1中有2个分区,broker2中有1个分区,这个是我以前建立的一个分区,用了下面的命令bin/kafka-topics.sh --create --zookeeper 192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183 --replication-factor 1 --partitions 3 --topic clock-tick
,只有一份日志记录,3个分区,分区会均匀的分布在全部broker上。
至此kafka环境配置好了,西面咱们看看如何使用。
安装kafka-python
,用来操做kafka,pip3 install kafka-python
,这里是他的文档,文档写的不错,简洁易懂kafka-python
bootstrap_servers
是kafka集群地址信息,下面事项主题user-event
发送一条消息,send
发送消息是异步的,会立刻返回,所以咱们要经过阻塞的方式等待消息发送成功(或者flush()
也能够,flush会阻塞知道全部log都发送成功),不然消息可能会发送失败,但也不会有提示,关于上面这个能够经过删除send以后的语句试试,会发现broker不会收到消息,而后在send后加上time.sleep(10)
以后,会看到broker收到消息。
from kafka import KafkaProducer from kafka.errors import KafkaError producer = KafkaProducer( bootstrap_servers=[ "localhost:9093", "localhost:9094" ] ) future = producer.send("user-event", b'I am rito yan') try: record_metadata = future.get(timeout=10) print_r(record_metadata) except KafkaError as e: print(e)
阻塞等待发送成功以后,会看到返回插入记录的信息:
RecordMetadata(topic='user-event', partition=7, topic_partition=TopicPartition(topic='user-event', partition=7), offset=1, timestamp=1528034253757, checksum=None, serialized_key_size=-1, serialized_value_size=13)
,里面包括了插入log的主题、分区等信息。
建立producer的时候能够经过value_serializer
指定格式化函数,好比咱们数据是个dict,能够指定格式化函数,将dict转化为byte:
import json producer = KafkaProducer( bootstrap_servers=[ "localhost:9093", "localhost:9094" ], value_serializer=lambda m: json.dumps(m).encode('ascii') ) future = producer.send("user-event", { "name": "燕睿涛", "age": 26, "friends": [ "ritoyan", "luluyrt" ] })
这样就能够将格式化以后的信息发送给broker,不用每次发送的时候都本身格式化,真是不要太好用。
建立一个consumer,其中group_id
是分组,broker中的每个数据只能被consumer组中的一个consumer消费。
from kafka import KafkaConsumer consumer = KafkaConsumer( "user-event", group_id = "user-event-test", bootstrap_servers = [ "localhost:9093", "localhost:9094" ] ) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
启动以后,进程会一直阻塞在哪里,等broker中有消息的时候就会去消费,启动多个进程,只要保证group_id一致,就能够保证消息只被组内的一个consumer消费,上面的程序会输出:
user-event:8:2: key=None value=b'{"name": "\\u71d5\\u777f\\u6d9b", "age": 26, "friends": ["ritoyan", "luluyrt"]}'
一样,进入的时候有value_serializer
,出来的时候对应的也有value_deserializer
,消费者能够配置value_deserializer
来格式化内容,跟producer对应起来
consumer = KafkaConsumer( "user-event", group_id = "user-event-test", bootstrap_servers = [ "localhost:9093", "localhost:9094" ], value_deserializer=lambda m: json.loads(m.decode('ascii')) )
输出内容user-event:8:3: key=None value={'name': '燕睿涛', 'age': 26, 'friends': ['ritoyan', 'luluyrt']}
咱们的consumer可能有不少分组,能够经过西面的命令查看分组信息:
cd /path/to/kafka bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093,localhost:9094 --list
能够看到我使用中的分组有4个,分别以下所示
clock-tick-test3 user-event-test clock-tick-test2 clock-tick-test
能够经过bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9093 --group user-event-test --describe
,查看分组user-event-test
的信息,能够看到西面的信息,包含消费的主题、分区信息,以及consumer在分区中的offset和分区的总offset。(为了格式化显示,删了部分列的部分字母)
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID user-event 3 0 0 0 kafka-python-154b2 /127.0.0.1 kafka-python user-event 0 0 0 0 kafka-python-154b2 /127.0.0.1 kafka-python user-event 1 1 1 0 kafka-python-154b2 /127.0.0.1 kafka-python user-event 2 1 1 0 kafka-python-154b2 /127.0.0.1 kafka-python user-event 4 0 0 0 kafka-python-154b2 /127.0.0.1 kafka-python user-event 9 1 1 0 kafka-python-78517 /127.0.0.1 kafka-python user-event 8 4 4 0 kafka-python-78517 /127.0.0.1 kafka-python user-event 7 2 2 0 kafka-python-78517 /127.0.0.1 kafka-python user-event 6 1 1 0 kafka-python-78517 /127.0.0.1 kafka-python user-event 5 0 0 0 kafka-python-78517 /127.0.0.1 kafka-python
至此,kafka的基本使用算是掌握了,之后要是有机会在项目中实践就行了,在实际工程中的各类问题能够更加深入的理解其中的原理。