zookeeper1:192.168.1.11 zookeeper2:192.168.1.12 zookeeper3:192.168.1.13 kafka1:192.168.1.14 kafka2:192.168.1.15 kafka3:192.168.1.16 kafka3:192.168.1.17 kafka-manager:192.168.1.18
下载地址:html
http://kafka.apache.org/downloads.htmljava
http://mirrors.hust.edu.cn/apache/git
http://zookeeper.apache.org/releases.htmlgithub
已经安装好java-1.8apache
[root@zookeeper1 ~]# yum install java-1.8.0
此处使用版本zookeeper-3.4.10.tar.gzvim
# 下载 ZooKeeper [root@zookeeper1 ~]# wget https://mirrors.aliyun.com/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz # 解压文件 [root@zookeeper1 ~]# tar -zxf zookeeper-3.4.10.tar.gz -C /opt/ [root@zookeeper1 ~]# cd /opt/ [root@zookeeper1 opt]# ln -s zookeeper-3.4.10 zookeeper [root@zookeeper1 opt]# cd zookeeper [root@zookeeper1 zookeeper]# cp ./conf/zoo_sample.cfg ./conf/zoo.cfg
zookeeper1,zookeeper2,zookeeper3都是一样的配置bash
[root@zookeeper1 zookeeper]# vim ./conf/zoo.cfg tickTime=2000 initLimit=5 syncLimit=2 dataDir=/opt/push/kafka-test/zookeeper/data dataLogDir=/opt/push/kafka-test/zookeeper/logs clientPort=2181 server.1=192.168.1.11:2888:3888 server.2=192.168.1.12:2888:3888 server.3=192.168.1.13:2888:3888 [root@zookeeper1 zookeeper]# mkdir data logs [root@zookeeper1 zookeeper]# echo '1' > /opt/zookeeper/data/myid # 其它两台此处须要修改 [root@zookeeper2 zookeeper]# echo '2' > /opt/zookeeper/data/myid [root@zookeeper3 zookeeper]# echo '3' > /opt/zookeeper/data/myid
此处使用版本为kafka_2.11-1.1.0.tgz服务器
# 下载 Kafka [root@kafka1 ~]# wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz # 解压tar文件 [root@kafka1 ~]# tar -zxf kafka_2.11-1.1.0.tgz.gz -C /opt/ [root@kafka1 ~]# cd /opt/ root@kafka1 opt]# ln -s kafka_2.11-1.1.0 kafka root@kafka1 opt]# cd kafka
进入kafka的安装配置目录网络
[root@kafka1 kafka]# cp config/server.properties config/server.properties.bak # 备份kafka默认配置文件
主要关注:server.properties
这个文件便可,咱们能够发如今目录下:app
有不少文件,这里能够发现有Zookeeper文件,咱们能够根据Kafka内带的zk集群来启动,可是建议使用独立的zk集群
server.properties(broker.id和host.name每一个节点都不相同)
//当前机器在集群中的惟一标识,和zookeeper的myid性质同样 broker.id=0 //Socket服务器侦听的地址,当前kafka对外提供服务的端口默认是9092 listeners = PLAINTEXT://your.host.name:9092 //代理将向生产者和消费者作广告的主机名和端口 advertised.listeners=PLAINTEXT://your.host.name:9092 //这个是borker进行网络处理的线程数 num.network.threads=3 //这个是borker进行I/O处理的线程数 num.io.threads=8 //发送缓冲区buffer大小,数据不是一会儿就发送的,先回存储到缓冲区了到达必定的大小后在发送,能提升性能 socket.send.buffer.bytes=102400 //kafka接收缓冲区大小,当数据到达必定大小后在序列化到磁盘 socket.receive.buffer.bytes=102400 //这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小 socket.request.max.bytes=104857600 //消息存放的目录,这个目录能够配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录, //若是配置多个目录,新建立的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个 log.dirs=/home/hadoop/log/kafka-logs //默认的分区数,一个topic默认1个分区数 num.partitions=1 //每一个数据目录用来日志恢复的线程数目 num.recovery.threads.per.data.dir=1 //默认消息的最大持久化时间,168小时,7天 log.retention.hours=168 //这个参数是:由于kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件 log.segment.bytes=1073741824 //每隔300000毫秒去检查上面配置的log失效时间 log.retention.check.interval.ms=300000 //设置zookeeper的链接端口 zookeeper.connect=localhost:2181 //设置zookeeper的链接超时时间 zookeeper.connection.timeout.ms=6000
kafka1:
[root@kafka1 kafka]# vim config/server.properties broker.id=1 # 当前机器在集群中的惟一标识 listeners=PLAINTEXT://192.168.1.14:9092 # 监听端口 advertised.listeners=PLAINTEXT://192.168.1.14:9092 # 提供给生产者,消费者的端口号。能够不设置则使用listeners的值 zookeeper.connect=192.168.1.11:2181,192.168.1.12:2181,192.168.1.13:2181/kafka # 链接zookeeper
kafka2:
[root@kafka2 kafka]# vim config/server.properties broker.id=2 # 当前机器在集群中的惟一标识 listeners=PLAINTEXT://192.168.1.15:9092 # 监听端口 advertised.listeners=PLAINTEXT://192.168.1.15:9092 # 提供给生产者,消费者的端口号。能够不设置则使用listeners的值 zookeeper.connect=192.168.1.11:2181,192.168.1.12:2181,192.168.1.13:2181/kafka # 链接zookeeper
kafka3:
[root@kafka3 kafka]# vim config/server.properties broker.id=3 # 当前机器在集群中的惟一标识 listeners=PLAINTEXT://192.168.1.16:9092 # 监听端口 advertised.listeners=PLAINTEXT://192.168.1.16:9092 # 提供给生产者,消费者的端口号。能够不设置则使用listeners的值 zookeeper.connect=192.168.1.11:2181,192.168.1.12:2181,192.168.1.13:2181/kafka # 链接zookeeper
kafka4:
[root@kafka4 kafka]# vim config/server.properties broker.id=4 # 当前机器在集群中的惟一标识 listeners=PLAINTEXT://192.168.1.17:9092 # 监听端口 advertised.listeners=PLAINTEXT://192.168.1.17:9092 # 提供给生产者,消费者的端口号。能够不设置则使用listeners的值 zookeeper.connect=192.168.1.11:2181,192.168.1.12:2181,192.168.1.13:2181/kafka # 链接zookeeper
全部zookeeper节点都须要执行
[root@zookeeper1 kafka]# ./bin/zkServer.sh start
全部kafka节点都须要执行
[root@kafka1 kafka]# ./bin/kafka-server-start.sh config/server.properties 或者: [root@kafka1 kafka]# ./bin/kafka-server-start.sh -daemon config/server.properties # 后台启动
[root@kafka1 kafka]# bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 3 --partitions 3 --topic topic2
[root@kafka1 kafka]# ./bin/kafka-topics.sh --describe --zookeeper kafka1:2181 --topic topic2
[root@kafka1 kafka]# ./bin/kafka-topics.sh --list --zookeeper kafka1:2181
kafka1显示接收到消息
[root@kafka1 kafka]# ./bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic2
在kafka2上消费消息
[root@kafka2 kafka]# ./bin/kafka-console-consumer.sh --zookeeper kafka1:2181 --from-beginning --topic topic2
官网:https://github.com/yahoo/kafka-manager
[root@kafka-manager ~]# git clone https://github.com/yahoo/kafka-manager.git [root@kafka-manager ~]# cd kafka-manager # 部署 [root@kafka-manager kafka-manager]# ./sbt clean dist 解压缩生成的zip文件 # 启动 [root@kafka-manager kafka-manager]# bin/kafka-manager [root@kafka-manager kafka-manager]# bin/kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8080