由于kafka已经下降了zookeeper在kafka中的重要性,全部不推荐单独搭建zookeeper了,使用集成的zookeeper已经很是好了bootstrap
如今kafka版本是1.1.0,可是根据scala编译分为2.11和2.12版本,其实就是2.11对应JDK1.7, 2.12对应JDK1.8vim
例如3台集群主机IP分别为10.128.90.111;10.128.90.112;10.128.90.113服务器
kafka解压部署在主机路径 /app/kafka_2.11-1.1.0app
broker.id=0 //kafka broker的id,三个节点不相同测试
listeners=PLAINTEXT://10.128.90.111:29092 //kafka端口,三个节点分别填写节点本机IPspa
log.dir=/app/kafkadata //kafka日志路径scala
zookeeper.connect=10.128.90.111:22181,10.128.90.112:22181,10.128.90.113:22181 //zookeeper集群地址,使用,分隔日志
dataDir=/app/zookeeperdata //zookeeper快照存储地址server
clientPort=22181 //端口号blog
maxClientCnxns=0 //客户端与服务端链接限制
initLimit=10
syncLimit=5
server.0=10.128.90.111:22888:23888
server.1=10.128.90.112:22888:23888
server.2=10.128.90.113:22888:23888
22888用于follower与leader之间的数据同步与通讯;23888用于leader选举时的通讯
0,1,2 是zookeeper三个节点id,需在zookeeper快照存储文件夹中建立myid文件,并写入对应的id号
例如 10.128.90.111 需写入0
切换路径到/app/zookeeperdata下
vim myid
写入0保存
启动zookeeper:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &
启动kafka:
bin/kafka-server-start.sh -daemon config/server.properties &
建立Topic
bin/kafka-topics.sh --create --topic mytest --zookeeper 10.128.90.111:22181,10.128.90.112:22181,10.128.90.113:22181 --replication-factor 1 --partitions 1
replication-factor 副本数,至关于把数据复制了几份保存在服务器上
partitions 分区,至关于把数据打散保存,能够提升数据的吞吐量,不易过大,过大会形成稳定性,须要更大的内存,需打开更多的句柄
查看Topic列表
bin/kafka-topics.sh --list --zookeeper 10.128.90.111:22181
建立生产者
bin/kafka-console-producer.sh --broker-list 10.128.90.111:29092,10.128.90.112:29092,10.128.90.113:29092 --topic mytest
建立消费者
bin/kafka-console-consumer.sh --bootstrap-server 10.128.90.111:29092,10.128.90.112:29092,10.128.90.113:29092 --topic mytest
这时在生产者发生消息,消费者窗口就能接收到消息了
例如
修改bin/kafka-server-start.sh
第一行加 export JAVA_HOME=/app/jdk1.8.0_131