从阿里云申请三台云服务器,这里我使用了两个不一样的阿里云帐号去申请云服务器。咱们配置三台主机名分别为zy1,zy2,zy3。html
咱们经过阿里云能够获取主机的公网ip地址,以下:java
经过secureRCT链接主机106.15.74.155,运行ifconfig,能够查看其内网ip地址:node
主机zy1的公网ip为:106.15.74.155,内网ip为172.19.182.67。apache
主机zy2的公网ip为:47.103.134.70,内网ip为172.19.14.178。json
主机zy3的公网ip为:47.97.10.51,内网ip为172.16.229.255。bootstrap
因为主机位于不一样的局域网下,所以须要进行一个公网端口到内网端口的映射。在搭建zookeeper和kafka须要使用到2181,2888 ,3888,9092端口。须要在阿里云中配置入规则,具体能够参考阿里云官方收藏:同一个地域、不一样帐号下的实例实现内网互通 。vim
注意:若是47.103.134.70配置一个入端口3888,那么对该47.103.134.70:3888的访问会实际映射到172.19.14.178:3888下。若是是同一局域网下的两个主机,是不须要配置这个的,能够直接互通。服务器
若是想了解更多,能够参考如下博客:网络
经过SSH访问阿里云服务器的原理能够参考-用SSH访问内网主机的方法app
以主机zy1为例:配置以下:
注意zy1对应的ip须要配置为内网ip,也就是本机ip:172.19.182.67。而zy二、zy3配置的都是公网ip。
在每一个主机下执行如下操做:
rpm -qa |grep java rpm -qa |grep jdk rpm -qa |grep gcj
若是有就使用批量卸载命令
rpm -qa | grep java | xargs rpm -e --nodeps
yum install java-1.8.0-openjdk* -y
默认jre jdk 安装路径是/usr/lib/jvm 下面:
vim /etc/profile #set java environment , append export JAVA_HOME=/usr/lib/jvm/java export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar export PATH=$PATH:$JAVA_HOME/bin
使得配置生效
. /etc/profile
echo $JAVA_HOME echo $CLASSPATH java -version
在主机zy1下面执行如下操做:
wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
建立目录/opt/bigdata:
mkdir /opt/bigdata
解压文件到/opt/bigdata:
tar -zxvf zookeeper-3.4.13.tar.gz -C /opt/bigdata
跳转目录:
cd /opt/bigdata/zookeeper-3.4.13/
cp conf/zoo_sample.cfg conf/zoo.cfg
修改配置文件以下:
vim conf/zoo.cfg
其中部分参数意义以下:
建立/opt/bigdata/data/zookeeper/zkdata:
mkdir -vp /opt/bigdata/data/zookeeper/zkdata
建立myid文件:
echo 1 > /opt/bigdata/data/zookeeper/zkdata/myid
scp -r /opt/bigdata/zookeeper-3.4.13/ zy2:/opt/bigdata/ scp -r /opt/bigdata/zookeeper-3.4.13/ zy3:/opt/bigdata/
zyx主机:
建立/opt/bigdata/data/zookeeper/zkdata:
mkdir -vp /opt/bigdata/data/zookeeper/zkdata
建立myid文件:
echo x > /opt/bigdata/data/zookeeper/zkdata/myid
注意:x表示主机的编号。
vim /etc/profile #set java environment , append export ZOOKEEPER_HOME=/opt/bigdata/zookeeper-3.4.13 export PATH=$ZOOKEEPER_HOME/bin:$PATH
使得配置生效
. /etc/profile
进入到zookeeper目录下,在每一个主机下分别执行
cd /opt/bigdata/zookeeper-3.4.13 bin/zkServer.sh start
检查服务状态
bin/zkServer.sh status
能够用“jps”查看zk的进程,这个是zk的整个工程的main
jps
注意:zk集群通常只有一个leader,多个follower,主通常是相应客户端的读写请求,而从主同步数据,当主挂掉以后就会从follower里投票选举一个leader出来。
zookeeper服务开启后,进入客户端的命令:
zkCli.sh
更多经常使用命令参考博客:Kafka在zookeeper中存储结构和查看方式。
一、防火墙
防火墙没有关闭问题。解决方式参考:https://blog.csdn.net/weiyongle1996/article/details/73733228
二、端口没有开启
若是/etc/hosts所有配置为公网:在zy1运行zkServer.sh start,查看端口开启状态:
netstat -an | grep 3888
则会发现没法开启公网3888端口,咱们应该打开的是内网机器对应的端口。
若是端口已经开启,能够经过telnet ip port判断该端口是否能够从外部访问。
在主机zy1下执行:
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.1/kafka_2.12-2.1.1.tgz tar -zxvf kafka_2.12-2.1.1.tgz -C /opt/bigdata
重命名:
cd /opt/bigdata/ mv kafka_2.12-2.1.1 kafka
在/opt/bigdata/kafka下:
vim config/server.properties
各个参数意义:
注意,这里若是但愿在java中建立topic也是多个备份,须要添加一下属性
#default replication factors for automatically created topics,默认值1;
default.replication.factor=3
#When a producer sets acks to "all" (or "-1"), this configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful.
#min.insync.replicas and acks allow you to enforce greater durability guarantees,默认值1;
min.insync.replicas=3
上面是参数的解释,实际的修改项为:
broker.id=1
listeners=PLAINTEXT://zy1:9092 #内网地址
advertised.listeners=PLAINTEXT://106.15.74.155:9092 #公网地址(否则远程客户端没法访问)
log.dirs=/opt/bigdata/kafka/kafka-logs
#此外,能够在log.retention.hours=168 下面新增下面三项:
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
#设置zookeeper的链接端口
zookeeper.connect=zy1:2181,zy2:2181,zy3:2181
若是咱们须要删除topic,还须要配置一下内容:
delete.topic.enable=true
具体参考博客:kafka安装及删除Topic,Kafka0.8.2.1删除topic逻辑。
scp -r /opt/bigdata/kafka zy2:/opt/bigdata/
scp -r /opt/bigdata/kafka zy3:/opt/bigdata/
拷贝文件过去的其余两个节点须要更改broker.id和listeners,以zy2为例:
咱们能够根据Kafka内带的zk集群来启动,可是建议使用独立的zk集群:
zkServer.sh start
在/opt/bigdata/kafka下 ,三个节点分别执行以下命令,启动kafka集群:
bin/kafka-server-start.sh config/server.properties &
运行命令后服务确实后台启动了,但日志会打印在控制台,并且关掉命令行窗口,服务就会随之中止,这个让我挺困惑的。后来,参考了其余的启动脚本,经过测试和调试最终找到了彻底知足要求的命令。
bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
其中1>/dev/null 2>&1 是将命令产生的输入和错误都输入到空设备,也就是不输出的意思。/dev/null表明空设备。
注意:若是内存不足:打开kafka安装位置,在bin目录下找到kafka-server-start.sh文件,将export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"修改成export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"。
思路:如下给出几条kafka指令。建立一个topic,一个节点做为生产者,两个节点做为消费者分别看看可否接收数据,进行验证:
建立及查看topic:
cd /opt/big/data/kafka bin/kafka-topics.sh -list -zookeeper zy1:2181 bin/kafka-topics.sh --create --zookeeper zy1:2181 --replication-factor 3 --partitions 3 --topic zy-test
开启生产者:
bin/kafka-console-producer.sh --broker-list zy1:9092 --topic zy-test
开启消费者:
bin/kafka-console-consumer.sh --bootstrap-server zy2:9092 --topic zy-test --from-beginning
节点zy1产生消息,若是消息没有清理,在节点zy二、zy3均可以接收到消息。
如下是kafka经常使用命令行总结:
查看topic的详细信息
bin/kafka-topics.sh -zookeeper zy1:2181 --describe --topic zy-test
能够看到topic包含3个复本,每一个副本又分为三个partition。以zy-test:partition0为例,其leader保存在broker.id=1的主机上,副本保存在二、3节点上。其消息保存在配置参数log.dirs所指定的路径下:
为topic增长副本
bin/kafka-reassign-partitions.sh --zookeeper zy1:2181 --reassignment-json-file json/partitions-to-move.json -execute
建立topic
bin/kafka-topics.sh --create --zookeeper zy1:2181 --replication-factor 3 --partitions 3 --topic zy-test
为topic增长partition
bin/kafka-topics.sh –-zookeeper zy1:2181 –-alter –-partitions 3 –-topic zy-test
kafka生产者客户端命令
bin/kafka-console-producer.sh --broker-list zy1:9092 --topic zy-test
kafka消费者客户端命令
bin/kafka-console-consumer.sh --bootstrap-server zy2:9092 --topic zy-test --from-beginning
kafka服务启动
bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
删除topic
bin/kafka-topics.sh --zookeeper zy1:2181 --delete --topic zy-test
bin/kafka-server-stop.sh
因为Zookeeper并不适合大批量的频繁写入操做,新版Kafka已推荐将consumer的位移信息保存在kafka内部的topic中,即__consumer_offsets topic,而且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list zy1:9092 --topic zy-test
输出结果每一个字段分别表示topic、partition、untilOffset(当前partition的最大偏移);
上面的输出结果代表kafka队列总有产生过4条消息(这并不表明kafka队列如今必定有4条消息,由于kafka有两种策略能够删除旧数据:基于时间、基于大小)。
因为我使用kafka-console-producer.sh生成了四条消息:zy、1994110八、 liuyan、1。所以kafka消息队列中存在4条消息。
bin/kafka-console-consumer.sh --bootstrap-server zy1:9092 --consumer.config config/consumer.properties --topic zy-test --from-beginning
再次创建消费者:
会发现获取不到数据。这是由于咱们指定了消费组,第一次消费时从offset为0开始消费,把4条消息所有读出,此时offset移动到最后,当再次使用同一消费组读取数据,则会从上次的offset开始获取数据。
而使用:
bin/kafka-console-consumer.sh --bootstrap-server zy1:9092 --topic zy-test --from-beginning
每次都会获取四条数据,这是由于每次都会建立一个新的消费者,这些消费者会被随机分配到一个不一样的组,所以每次都是从offset为0开始消费。
参数解释:
bin/kafka-consumer-groups.sh --bootstrap-server zy1:9092 --list
能够看到有三个消费组,前两个消费者没有指定消费组,随机产生一个console-consumer-***的group.ig。
第三个是咱们刚刚在config/consumer.properties 中指定的消费组。
bin/kafka-consumer-groups.sh --bootstrap-server zy1:9092 --describe --group test-consumer-group
若是此时再使用生产者客户端生成两条消息:
再次查看消费组test-consumer-group的消费状况:
因为咱们尚未介绍KAFKA的API,这块内容就不先介绍,具体参考博客:Kafka消费者 之 指定位移消费。
所以上面介绍的kafka命令,与topic相关的使用--zookeeper zy1:2181,与生产者、消费者相关的使用 --bootstrap-server zy1:9092。
参考博客: