ELK 之分布式发布订阅消息系统 Kafka + zookeeper 集群搭建

本文与前文是有关联的,以前的两篇文章客官能够抬腿出门右转,elk 导读, 实战一
kafka的配置安装:
#kafka 和zookeeper 都依赖java ,机器上必须安装java,具体安装方法和效验方法,请各位客官抬腿向上看!
#下载安装包:一样放到/opt/elk/ 的目录里边:
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.1/kafka_2.12-1.0.1.tgz
#解压安装包:
tar -zxvf kafka_2.12-1.0.1.tgz
#kafka 采用集群方式部署,测试环境部署,因此我用两台机器模拟出3个节点的cluster 。kafka 依赖zookeeper,zookeeper 简称ZK,已经包含在kafka的tar包里,不须要而外下载安装,为啥要用三个节点呢?这个是zk的一个选举leader的特性,官方推荐是奇数个server,奇数个最少3个,这样坏一个还有两个,能够正常选举,如下是zookeeper给出的官方说法:
For replicated mode, a minimum of three servers are required, and it is strongly recommended that you have an odd number of servers. If you only have two servers, then you are in a situation where if one of them fails, there are not enough machines to form a majority quorum. Two servers is inherently less stable than a single server, because there are two single points of failure.
#因为是集群配置,配置之间几乎差很少,因此配置好一个,直接scp到其他节点就ok了,到了其他节点把关键参数修改一下就好了。
具体配置方法以下:
#给zookeeper 建立data 目录和logs 目录
mkdir /opt/elk/kafka_2.12-1.0.1/zookeeper/{data,logs} -p
#建立myid 文件
echo 1 > /opt/elk/kafka/zookeeper/data/myid
cd /opt/elk/kafka_2.12-1.0.1
#kafka &ZK 的全部配置文件都放到了config 目录下
cd /config
#因为kafka 依赖zk 因此启动的时候须要先配置zk,启动的时候先把zk启动了,而后再启动kafka
vi zookeeper.properties
具体配置内容以下:
#zk 存放数据的目录,同时由于是集群,zk 须要有一个叫作myid的文件也是放到这个目录下
dataDir=/opt/elk/kafka/zookeeper/datahtml

指定事务日志的存储路径,能够和dataDir在不一样设备,这意味着可使用一个日志的专用磁盘,避免日志IO和快照竞争。

#This option will direct the machine to write the transaction log to the dataLogDir rather than the dataDir. This allows a dedicated log device to be used, and helps avoid competition between logging and snaphots.
dataLogDir=/opt/elk/kafka/zookeeper/logs
#客户端链接端口
clientPort=2181
#最大客户端链接数
maxClientCnxns=20
#这个时间是做为Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔
tickTime=2000
#此配置表示,容许follower(相对于Leaderer言的“客户端”)链接并同步到Leader的初始化链接时间,以tickTime为单位。当初始化链接时间超过该值,则表示链接失败。
initLimit=10java

此配置项表示Leader与Follower之间发送消息时,请求和应答时间长度。若是follower在设置时间内不能与leader通讯,那么此follower将会被丢弃。

syncLimit=5
#集群模式关键配置参数
server.x=[hostname]:nnnnn[:nnnnn]
There are two port numbers nnnnn. The first followers use to connect to the leader, and the second is for leader election. The leader election port is only necessary if electionAlg is 1, 2, or 3 (default). If electionAlg is 0, then the second port is not necessary. If you want to test multiple servers on a single machine, then different ports can be used for each server.
#server.myid=ip:followers_connect to the leader:leader_election # server 是固定的,myid 是须要手动分配,第一个端口是follower是连接到leader的端口,第二个是用来选举leader 用的port apache

server.1=192.168.1.1:2888:3888
server.2=192.168.1.2:2888:3888
server.3=192.168.1.2:2889:3889
#个人第二个server和第三个server 是用的同一台机器跑了两个实例,因此端口须要使用不一样的端口来配置,切记若是有火墙的话,必定要放行你配置的口服务器

#kafka 的配置一样在config 目录中的server.properties 是kafka的配置文件
vi server.properties
#The broker id for this server. If unset, a unique broker id will be generated.To avoid conflicts between zookeeper generated broker id's and user configured broker id's, generated broker ids start from reserved.broker.max.id + 1.
#每一个server须要单独配置broker id,若是不配置系统会自动配置。
broker.id=0
#消费者的访问端口,logstash或者elasticsearch
listeners=PLAINTEXT://192.168.115.65:9092
#The number of threads that the server uses for receiving requests from the network and sending responses to the network ,接收和发送网络信息的线程数
num.network.threads=3
#The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
#The SO_SNDBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used.
socket.send.buffer.bytes=102400
#The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used.
socket.receive.buffer.bytes=102400
#The maximum number of bytes in a socket request
socket.request.max.bytes=104857600
#这个是设置log的目录
log.dirs=/usr/local/kafka/logs网络

The default number of log partitions per topic. More partitions allow greater, parallelism for consumption, but this will also result in more files across the brokers.

num.partitions=1
#The number of threads per data directory to be used for log recovery at startup and flushing at shutdown
num.recovery.threads.per.data.dir=1
#The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.
offsets.topic.replication.factor=1
#The replication factor for the transaction topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.
transaction.state.log.replication.factor=1
#Overridden min.insync.replicas config for the transaction topic.
transaction.state.log.min.isr=1
#The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property。 配置多少小时以后会删除以前的数据。
log.retention.hours=168
#The maximum size of a single log file。 单个日志文件的大小
log.segment.bytes=1073741824
#The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion。多少毫秒检查一次是否有须要删除的log 文件
log.retention.check.interval.ms=300000
#这块是重点,配置kafka连接的ZK server
zookeeper.connect=192.168.165.65:2181,192.168.101.242:2181,192.168.101.242:2182
#zookeeper 连接超时设置
zookeeper.connection.timeout.ms=6000
#The amount of time the group coordinator will wait for more consumers to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins.
group.initial.rebalance.delay.ms=0
#官方参考文档地址:http://kafka.apache.org/10/documentation.html#brokerconfigssession

192.168.1.1 节点上的全部配置都搞定了

#打包配置好的文件包
tar -zcvf /opt/elk/kafka_2.12-1.0.1-ready.tar.gz kafka_2.12-1.0.1
#scp 源文件到另一台机器上
scp /opt/elk/kafka_2.12-1.0.1-ready.tar.gz elk@192.168.1.2:/opt/elk/ less

#登陆机器1.2 上配置,这个机器上配置两套kafka,两套ZK
FYI:若是你是三台机器配置的话,就不须要这样麻烦了,只须要把配置好的安装包直接分发到不一样的机器上,而后修改zookeeper的myid,kafka的broker.id 就能够了。
#解压拷贝过来的包
tar -zxvf kafka_2.12-1.0.1-ready.tar.gz
#因为这台机器须要配置两套kafka和ZK 须要创建对用的数据目录和log目录给你不一样的实例用
#建立kafka 数据目录
mkdir /opt/elk/kafka_2.12-1.0.1/data/{k2,k3}
#建立 ZK的数据和log 目录,官方推荐这两个目录最好不在一个磁盘下,可能会影响磁盘写入读取性能,因此若是数据量大的话,最好分开
mkdir /opt/elk/kafka_2.12-1.0.1/zookeeper/{z2/{data,logs},z3/{data,logs}} -p
#建立myid 文件并写入ID number
echo 2 > /opt/elk/kafka_2.12-1.0.1/zookeeper/z2/data/myid
echo 3 > /opt/elk/kafka_2.12-1.0.1/zookeeper/z3/data/myid
#把ZK 和kafka 都是制定配置文件运行的,因此咱们须要分别把zookeeper.properties & server.properties 复制为两个不一样的文件名字
cd /opt/elk/kafka_2.12-1.0.1/
cp zookeeper.properties zookeeper-2.properties
mv zookeeper.properties zookeeper-3.properties
cp server.properties server-2.properties
mv server.properties server-3.properties
#全部配置文件搞定了以后,zk-2 &zk-3 须要修改地方以下:
dataDir=刚刚建立好的目录
dataLogDir=刚刚建立好的目录
#zk-3 须要多修改一个地方
clientPort=2182 socket

#Kafka 须要修改以下几个地方:
broker.id=指定的id
#kafka 3 修改port
listeners=PLAINTEXT://192.168.1.2:9093elasticsearch

#到这里全部kafka集群的全部配置都搞定了,开始启动集群了,顺序是先启动zk,而后再启动kafka
#1.1机器上执行以下命令
nohup /opt/elk/kafka_2.12-1.0.1/bin/zookeeper-server-start.sh config/zookeeper.properties >>/dev/null 2>&1 &
#1.2 机器上执行以下命令
nohup bin/zookeeper-server-start.sh config/zookeeper-2.properties >>/dev/null 2>&1 &
nohup bin/zookeeper-server-start.sh config/zookeeper-3.properties >>/dev/null 2>&1 &分布式

#能够经过lsof 命令查看服务是否正常启动
lsof -i:2181

#介绍几个简单的确认zk 服务是否正常的命令
#须要本机安装nc 命令
yum -y install nc
#使用echo ruok|nc 127.0.0.1 2181 测试是否启动了该Server,若回复imok表示已经启动。
ELK 之分布式发布订阅消息系统 Kafka + zookeeper 集群搭建
#查看zk的配置,配置正常返回证实zk service 正常
echo conf | nc 192.168.1.1 2181
ELK 之分布式发布订阅消息系统 Kafka + zookeeper 集群搭建

#stat 能够查看集群状态
echo stat | nc 127.0.0.1 2182

ELK 之分布式发布订阅消息系统 Kafka + zookeeper 集群搭建
还有以下经常使用命令:
ZooKeeper 支持某些特定的四字命令字母与其的交互。它们大可能是查询命令,用来获取 ZooKeeper 服务的当前状态及相关信息。用户在客户端能够经过 telnet 或 nc 向 ZooKeeper 提交相应的命令

  1. 能够经过命令:echo stat|nc 127.0.0.1 2181 来查看哪一个节点被选择做为follower或者leader
  2. 使用echo ruok|nc 127.0.0.1 2181 测试是否启动了该Server,若回复imok表示已经启动。
  3. echo dump| nc 127.0.0.1 2181 ,列出未经处理的会话和临时节点。
  4. echo kill | nc 127.0.0.1 2181 ,关掉server
  5. echo conf | nc 127.0.0.1 2181 ,输出相关服务配置的详细信息。
  6. echo cons | nc 127.0.0.1 2181 ,列出全部链接到服务器的客户端的彻底的链接 / 会话的详细信息。
  7. echo envi |nc 127.0.0.1 2181 ,输出关于服务环境的详细信息(区别于 conf 命令)。
  8. echo reqs | nc 127.0.0.1 2181 ,列出未经处理的请求。
  9. echo wchs | nc 127.0.0.1 2181 ,列出服务器 watch 的详细信息。
  10. echo wchc | nc 127.0.0.1 2181 ,经过 session 列出服务器 watch 的详细信息,它的输出是一个与 watch 相关的会话的列表。
  11. echo wchp | nc 127.0.0.1 2181 ,经过路径列出服务器 watch 的详细信息。它输出一个与 session 相

#zk 搞定了以后开始搞kafka 了
1.1 上执行以下命令
nohup /opt/elk/kafka_2.12-1.0.1/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties >>/dev/null 2>&1 &
1.2上执行以下两条命:
nohup /opt/elk/kafka_2.12-1.0.1/bin/kafka-server-start.sh /usr/local/kafka/config/server-2.properties >>/dev/null 2>&1 &
nohup /opt/elk/kafka_2.12-1.0.1/bin/kafka-server-start.sh /usr/local/kafka/config/server-3.properties >>/dev/null 2>&1 &

#经过lsof 命令查看端口是否正常启动

#测试kafka 工做是否正常,新建一个topic
/opt/elk/kafka_2.12-1.0.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#提示如下内容证实没有问题
Created topic "test".
ELK 之分布式发布订阅消息系统 Kafka + zookeeper 集群搭建
#经过list 来查看
/usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.1.1:2181 --list
test
ELK 之分布式发布订阅消息系统 Kafka + zookeeper 集群搭建#到这里kafka 集群就搞定了

相关文章
相关标签/搜索