Kafka 依赖 ZooKeeper,因此须要在 ZooKeeper 的基础上部署,具体能够参考 zookeeper集群部署。node
设置主机名,并设置hostslinux
192.168.4.100 master 192.168.4.21 node 192.168.4.57 node1
关闭Selinux、firewalld并安装JDKapache
以上就不一一赘述,直接进入正题。bootstrap
下载安装包( master 操做) wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz 解压并挪到 master 节点相应目录下进行操做 tar xf kafka_2.11-2.1.1.tgz
进入kafka目录,创建日志存放路径vim
[root@master kafka_2.11-2.1.1]# mkdir logs
vim config/server.propertieswindows
在默认配置下,须要添加 port 、host.name 和 message.max.byte(消息保存的最大值),修改 broker.id 、log.dirs 和 zookeeper.connect。bash
[root@master kafka_2.11-2.1.1]# cat config/server.properties |grep -v ^$ |grep -v ^# broker.id=1 port=9092 host.name=192.168.4.100 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/bigdata/kafka_2.11-2.1.1/logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 message.max.byte=5242880 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.4.100:2181,192.168.4.21:2181,192.168.4.57:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
分发到另外两台机器服务器
[root@master bigdata]# scp -r kafka_2.11-2.1.1 192.168.4.21:/bigdata/ [root@master bigdata]# scp -r kafka_2.11-2.1.1 192.168.4.57:/bigdata/
修改配置文件( node 和 node1 上操做)socket
vi /bigdata/kafka_2.11-2.1.1/config/server.properties 在 node 上, 将 broker.id 设置为 2,host.name 设置为 192.168.4.21 在 node1 上, 将 broker.id 设置为 3,host.name 设置为 192.168.4.57
分别在三台服务器上启动 kafka 应用测试
[root@master bigdata]# /bigdata/kafka_2.11-2.1.1/bin/kafka-server-start.sh -daemon /bigdata/kafka_2.11-2.1.1/config/server.properties
将 master 做为生产者,node 和 node1 做为消费者 一、在 master 上执行: 建立一个主题 test:一个分区,两个副本 [root@master bigdata]# /bigdata/kafka_2.11-2.1.1/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 2 --partitions 1 --topic test Created topic "test". 建立一个生产者(消息发布者) /bigdata/kafka_2.11-2.1.1/bin/kafka-console-producer.sh --broker-list master:9092 --topic test > # 此时会进入到新的console(以>开头) 二、分别在 node 和 node1 上执行 建立一个消费者(消息订阅者) /bigdata/kafka_2.11-2.1.1/bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic test --from-beginning #此时也会进入到另一个console下 再到 master 上的> 下输入一些字符,而后会同时出如今 node 和 node1 上。
查看主题 [root@master ~]# /bigdata/kafka_2.11-2.1.1/bin/kafka-topics.sh --list --zookeeper master:2181 __consumer_offsets test timewindows 查看主题详情 [root@master ~]# /bigdata/kafka_2.11-2.1.1/bin/kafka-topics.sh --describe --zookeeper master:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:2 Configs: Topic: test Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3 删除主题(需设置参数delete.topic.enable=true) [root@master ~]# /bigdata/kafka_2.11-2.1.1/bin/kafka-topics.sh --zookeeper master:2181 --delete --topic test Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. 生产者参数查看 [root@master ~]# /bigdata/kafka_2.11-2.1.1/bin/kafka-console-producer.sh 消费者参数查看 [root@master ~]# /bigdata/kafka_2.11-2.1.1/bin/kafka-console-consumer.sh