Zookeeper集群:172.16.218.20一、172.16.218.20二、172.16.218.203
172.16.218.201 kafka1
172.16.218.202 kafka2
172.16.218.203 kafka3
172.16.200.48 kafka-managerjava
wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
# vi /etc/hosts 172.16.218.201 zookeeper1-201 172.16.218.202 zookeeper2-202 172.16.218.203 zookeeper3-203
# tar zxvf zookeeper-3.4.10.tar.gz # cd zookeeper-3.4.10 # mv zookeeper-3.4.10 zookeeper # chown -R root. /usr/local/zookeeper-node1 # cd /usr/local/zookepper-node1/conf/ # cp zoo_sample.cfg zoo.cfg
# tar zxvf jdk-8u74-linux-x64.tar.gz # mv jdk1.8.0_131 /usr/local/java/
#add java export JAVA_HOME=/usr/local/java/jdk1.8.0_131 export JAVA_BIN=/usr/local/java/jdk1.8.0_131/bin export PATH=$PATH:/usr/local/java/jdk1.8.0_131/bin export CLASSPATH=./:/usr/local/java/jdk1.8.0_131/lib:/usr/local/java/jdk1.8.0_131/jre/lib #而后执行 source /etc/profile
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/home/zookeeper/data clientPort=2181 maxClientCnxns=60 autopurge.snapRetainCount=3 autopurge.purgeInterval=24 dataLogDir=/home/zookeeper/logs server.1=172.16.218.201:2888:3888 server.2=172.16.218.202:2888:3888 server.3=172.16.218.203:2888:3888
# mkdir -p /data/zookeeper/data/ # mkdir -p /data/zookeeper/logs
在201机器上建立myid,并设置为1与配置文件zoo.cfg里面server.1对应。 # cd /data/zookeeper/data # echo 1 > myid 在202机器上建立myid,并设置为2与配置文件zoo.cfg里面server.2对应。 echo "2" > /data/zookeeper/data/myid 在203机器上建立myid,并设置为3与配置文件zoo.cfg里面server.3对应。 echo "3" > /data/zookeeper/data/myid
# cd /usr/local/zookeeper/bin/ # ./zkServer.sh start # netstat -lutnp |grep java tcp 0 0 0.0.0.0:2181
vim /etc/rc.local 添加: /usr/local/zookeeper-node1/bin/zkServer.sh start
# ./zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-node2/bin/../conf/zoo.cfg Mode: leader
网址:https://kafka.apache.org/downloads
mkdir kafka mkdir -p /data/kafka/kafkalogs cd /data/kafka tar -zxvf kafka_2.12-0.11.0.0.tgz
cd /data/kafka/kafka_2.12-0.11.0.0/config vim server.properties #broker.id=0 每台服务器的broker.id都不能相同(1/2/3) #hostname host.name=172.16.218.201 #在log.retention.hours=168 下面新增下面三项 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #设置zookeeper的链接端口 zookeeper.connect=172.16.218.201:2181,172.16.218.202:2181,172.16.218.203:2181
cd /data/kafka/kafka_2.12-0.11.0.0/bin #进入到kafka的bin目录 ./kafka-server-start.sh -daemon ../config/server.properties &
执行命令: [root@mariadb-node1 config] # jps 1618 Jps 27987 QuorumPeerMain 1260 Kafka
#建立Topic ./kafka-topics.sh --create --zookeeper 172.16.218.201:2181,172.16.218.202:2181,172.16.218.203:2181 --replication-factor 2 --partitions 1 --topic shequ #解释 --replication-factor 2 #复制两份 --partitions 1 #建立1个分区 --topic #主题为shequ #查看已建立的topic ./kafka-topics.sh --list --zookeeper 172.16.218.201:2181,172.16.218.202:2181,172.16.218.203:2181 '''在一台服务器上建立一个发布者'''#建立一个broker,发布者 ./kafka-console-producer.sh --broker-list 172.16.218.201:19092 --topic shequ
# ./kafka-console-producer.sh --broker-list 172.16.218.202:19092 --topic test > Hello world > Hello kafka
./kafka-console-consumer.sh --zookeeper 172.16.218.201:2181,172.16.218.202:2181,172.16.218.203:2181 --topic test --from-beginning Hello world Hello kafka
./kafka-topics.sh --describe --zookeeper 172.16.218.201:2181,172.16.218.202:2181,172.16.218.203:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:2 Configs: Topic: test Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2 1) Leader 是给定分区的节点编号,每一个分区的部分数据会随机指定不一样的节点 2) Replicas 是该日志会保存的复制 3) Isr 表示正在同步的复制
./kafka-topics.sh --delete --zookeeper 172.16.218.201:2181,172.16.218.202:2181,172.16.218.203:2181 --topic test Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
功能:
为了简化开发者和服务工程师维护Kafka集群的工做,yahoo构建了一个叫作Kafka管理器的基于Web工具,叫作 Kafka Manager。这个管理工具能够很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的状况。它支持管理多个集群、选择副本、副本从新分配以及建立Topic。同时,这个管理工具也是一个很是好的能够快速浏览这个集群的工具,有以下功能:node
1.管理多个kafka集群 2.便捷的检查kafka集群状态(topics,brokers,备份分布状况,分区分布状况) 3.选择你要运行的副本 4.基于当前分区情况进行 5.能够选择topic配置并建立topic(0.8.1.1和0.8.2的配置不一样) 6.删除topic(只支持0.8.2以上的版本而且要在broker配置中设置delete.topic.enable=true) 7.Topic list会指明哪些topic被删除(在0.8.2以上版本适用) 8.为已存在的topic增长分区 9.为已存在的topic更新配置 10.在多个topic上批量重分区 11.在多个topic上批量重分区(可选partition broker位置)
安装步骤linux
# cd /usr/local # git clone https://github.com/yahoo/kafka-manager # cd kafka-manager # ./sbt clean dist
注: 执行sbt编译打包可能花费很长时间,若是你hang在以下状况
将project/plugins.sbt 中的logLevel参数修改成logLevel := Level.Debug(默认为Warn)git
编译成功后,会在target/universal下生成一个zip包github
# cd /usr/local/kafka-manager/target/universal # unzip kafka-manager-1.3.3.7.zip 将application.conf中的kafka-manager.zkhosts的值设置为你的zk地址 如:kafka-manager.zkhosts="172.16.218.201:2181,172.16.218.202:2181,172.16.218.203:2181"
直接启动:web
# cd kafka-manager-1.3.3.7/bin # ./kafka-manager -Dconfig.file=../conf/application.conf
后台运行:apache
# ./kafka-manager -h # nohup ./kafka-manager -Dconfig.file=../conf/application.conf &
指定端口,例如:vim
# nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9001 &
第一次进入web UI要进行kafka cluster的相关配置,根据本身的信息进行配置。服务器
网址:http://172.16.200.48:9000/ app
添加kafka集群成功后,点击进去,界面以下: