操做系统:Cent OS 7html
Kafka版本:0.9.0.0java
Kafka官网下载:请点击apache
JDK版本:1.7.0_51windows
SSH Secure Shell版本:XShell 5网络
下载:curl
curl -L -O http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz
解压:socket
tar zxvf kafka_2.10-0.9.0.0.tgz
/bin 操做kafka的可执行脚本,还包含windows下脚本tcp
/config 配置文件所在目录分布式
/libs 依赖库目录
/logs 日志数据目录,目录kafka把server端日志分为5种类型,分为:server,request,state,log-cleaner,controller
配置zookeeper
请参考zookeeper
进入kafka安装工程根目录编辑config/server.properties
kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect,kafka server端config/server.properties参数说明和解释以下:
启动
进入kafka目录,敲入命令 bin/kafka-server-start.sh config/server.properties &
检测2181与9092端口
netstat -tunlp|egrep "(2181|9092)"
tcp 0 0 :::2181 :::* LISTEN 19787/java
tcp 0 0 :::9092 :::* LISTEN 28094/java
说明:
Kafka的进程ID为28094,占用端口为9092
QuorumPeerMain为对应的zookeeper实例,进程ID为19787,在2181端口监听
启动2个XSHELL客户端,一个用于生产者发送消息,一个用于消费者接受消息。
运行producer,随机敲入几个字符,至关于把这个敲入的字符消息发送给队列。
bin/kafka-console-producer.sh --broker-list 192.168.1.181:9092 --topic test
说明:早版本的Kafka,–broker-list 192.168.1.181:9092需改成–zookeeper 192.168.1.181:2181
运行consumer,能够看到刚才发送的消息列表。
bin/kafka-console-consumer.sh --zookeeper 192.168.1.181:2181 --topic test --from-beginning
注意:
producer,指定的Socket(192.168.1.181+9092),说明生产者的消息要发往kafka,也便是broker
consumer, 指定的Socket(192.168.1.181+2181),说明消费者的消息来自zookeeper(协调转发)
上面的只是一个单个的broker,下面咱们来实验一个多broker的集群。
刚才只是启动了单个broker,如今启动有3个broker组成的集群,这些broker节点也都是在本机上。
咱们先看看config/server0.properties配置信息:
broker.id=0
listeners=PLAINTEXT://:9092
port=9092
host.name=192.168.1.181
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.1.181:2181
zookeeper.connection.timeout.ms=6000
queued.max.requests =500
log.cleanup.policy = delete
说明:
broker.id为集群中惟一的标注一个节点,由于在同一个机器上,因此必须指定不一样的端口和日志文件,避免数据被覆盖。
在上面单个broker的实验中,为何kafka的端口为9092,这里能够看得很清楚。
kafka cluster怎么同zookeeper交互的,配置信息中也有体现。
那么下面,咱们仿照上面的配置文件,提供2个broker的配置文件:
server1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
port=9093
host.name=192.168.1.181
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs1
num.partitions=5
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.1.181:2181
zookeeper.connection.timeout.ms=6000
queued.max.requests =500
log.cleanup.policy = delete
server2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
port=9094
host.name=192.168.1.181
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs2
num.partitions=5
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.1.181:2181
zookeeper.connection.timeout.ms=6000
queued.max.requests =500
log.cleanup.policy = delete
命令以下:
bin/kafka-server-start.sh config/server0.properties & #启动broker0
bin/kafka-server-start.sh config/server1.properties & #启动broker1
bin/kafka-server-start.sh config/server2.properties & #启动broker2
查看218一、909二、909三、9094端口
netstat -tunlp|egrep "(2181|9092|9093|9094)"
tcp 0 0 :::9093 :::* LISTEN 29725/java
tcp 0 0 :::2181 :::* LISTEN 19787/java
tcp 0 0 :::9094 :::* LISTEN 29800/java
tcp 0 0 :::9092 :::* LISTEN 29572/java
一个zookeeper在2181端口上监听,3个kafka cluster(broker)分别在端口9092,9093,9094监听。
bin/kafka-topics.sh --create --topic topic_1 --partitions 1 --replication-factor 3 \--zookeeper localhost:2181
bin/kafka-topics.sh --create --topic topic_2 --partitions 1 --replication-factor 3 \--zookeeper localhost:2181
bin/kafka-topics.sh --create --topic topic_3 --partitions 1 --replication-factor 3 \--zookeeper localhost:2181
查看topic建立状况:
bin/kafka-topics.sh --list --zookeeper localhost:2181
test
topic_1
topic_2
topic_3
[root@atman081 kafka_2.10-0.9.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic:topic_1 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topic_1 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic:topic_2 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topic_2 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic:topic_3 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topic_3 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
上面的有些东西,也许还不太清楚,暂放,继续试验。须要注意的是topic_1的Leader=2
发送消息
bin/kafka-console-producer.sh --topic topic_1 --broker-list 192.168.1.181:9092,192.168.1.181:9093,192.168.1.181:9094
接收消息
bin/kafka-console-consumer.sh --topic topic_1 --zookeeper 192.168.1.181:2181 --from-beginning
须要注意,此时producer将topic发布到了3个broker中,如今就有点分布式的概念了。
kill broker(id=0)
首先,咱们根据前面的配置,获得broker(id=0)应该在9092监听,这样就能肯定它的PID了。
broker0没kill以前topic在kafka cluster中的状况
bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic:topic_1 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topic_1 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic:topic_2 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topic_2 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic:topic_3 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topic_3 Partition: 0 Leader: 2 Replicas: 0,2,1 Isr: 2,1,0
kill以后,再观察,作下对比。很明显,主要变化在于Isr,之后再分析
bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: -1 Replicas: 0 Isr:
Topic:topic_1 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topic_1 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1
Topic:topic_2 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topic_2 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2
Topic:topic_3 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topic_3 Partition: 0 Leader: 2 Replicas: 0,2,1 Isr: 2,1
测试下,发送消息,接受消息,是否收到影响。
发送消息
bin/kafka-console-producer.sh --topic topic_1 --broker-list 192.168.1.181:9092,192.168.1.181:9093,192.168.1.181:9094
接收消息
bin/kafka-console-consumer.sh --topic topic_1 --zookeeper 192.168.1.181:2181 --from-beginning
可见,kafka的分布式机制,容错能力仍是挺好的~
producer 消息的生成者,即发布消息
consumer 消息的消费者,即订阅消息
broker Kafka以集群的方式运行,能够由一个或多个服务组成,服务即broker
zookeeper 协调转发
producers经过网络将消息发送到Kafka集群,集群向消费者提供消息
kafka对消息进行概括,即topic,也就是说producer发布topic,consumer订阅topic