在本文中将从演示如何搭建一个Kafka集群开始,而后简要介绍一下关于Kafka集群的一些基础知识点。但本文仅针对集群作介绍,对于Kafka的基本概念不作过多说明,这里假设读者拥有必定的Kafka基础知识。html
首先,咱们须要了解Kafka集群的一些机制:java
Kafka的集群拓扑图以下:算法
Kafka集群中的几个角色:apache
在本例中,为了更贴近实际的部署状况,使用了四台虚拟机做演示:vim
机器IP | 做用 | 角色 | brokerId |
---|---|---|---|
192.168.99.1 | 部署Kafka节点 | broker server | 0 |
192.168.99.2 | 部署Kafka节点 | broker server | 1 |
192.168.99.3 | 部署Kafka节点 | broker server | 2 |
192.168.99.4 | 部署Zookeeper节点 | 集群协调者 |
Kafka是基于Zookeeper来实现分布式协调的,因此在搭建Kafka节点以前须要先搭建好Zookeeper节点。而Zookeeper和Kafka都依赖于JDK,我这里已经事先安装好了JDK:tcp
[root@192.168.99.4 ~]# java --version java 11.0.5 2019-10-15 LTS Java(TM) SE Runtime Environment 18.9 (build 11.0.5+10-LTS) Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.5+10-LTS, mixed mode) [root@txy-server2 ~]#
准备好JDK环境后,到Zookeeper的官网下载地址,复制下载连接:分布式
而后到Linux中使用wget命令进行下载,以下:ide
[root@192.168.99.4 ~]# cd /usr/local/src [root@192.168.99.4 /usr/local/src]# wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
解压下载好的压缩包,并将解压后的目录移动和重命名:学习
[root@192.168.99.4 /usr/local/src]# tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz [root@192.168.99.4 /usr/local/src]# mv apache-zookeeper-3.6.1-bin ../zookeeper
进入到Zookeeper的配置文件目录,将zoo_sample.cfg这个示例配置文件拷贝一份并命名为zoo.cfg,这是Zookeeper默认的配置文件名称:ui
[root@192.168.99.4 /usr/local/src]# cd ../zookeeper/conf/ [root@192.168.99.4 /usr/local/zookeeper/conf]# ls configuration.xsl log4j.properties zoo_sample.cfg [root@192.168.99.4 /usr/local/zookeeper/conf]# cp zoo_sample.cfg zoo.cfg
修改一下配置文件中的dataDir配置项,指定一个磁盘空间较大的目录:
[root@192.168.99.4 /usr/local/zookeeper/conf]# vim zoo.cfg # 指定Zookeeper的数据存储目录,类比于MySQL的dataDir dataDir=/data/zookeeper [root@192.168.99.4 /usr/local/zookeeper/conf]# mkdir -p /data/zookeeper
若是只是学习使用的话,这一步其实能够忽略,采用默认配置便可
接下来就能够进入bin目录,使用启动脚原本启动Zookeeper了,以下示例:
[root@192.168.99.4 /usr/local/zookeeper/conf]# cd ../bin/ [root@192.168.99.4 /usr/local/zookeeper/bin]# ./zkServer.sh start ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED [root@192.168.99.4 /usr/local/zookeeper/bin]#
启动完成后,能够经过查看是否正常监听端口号来判断是否启动成功,以下则是启动成功了:
[root@192.168.99.4 ~]# netstat -lntp |grep 2181 tcp6 0 0 :::2181 :::* LISTEN 7825/java [root@192.168.99.4 ~]#
若是你的机器开启了防火墙的话,则须要开放Zookeeper的端口,不然其余节点没法注册上来:
[root@192.168.99.4 ~]# firewall-cmd --zone=public --add-port=2181/tcp --permanent [root@192.168.99.4 ~]# firwall-cmd --reload
安装完Zookeeper后,接下来就能够安装Kafka了,一样的套路首先去Kafka的官网下载地址,复制下载连接:
而后到Linux中使用wget命令进行下载,以下:
[root@192.168.99.1 ~]# cd /usr/local/src [root@192.168.99.1 /usr/local/src]# wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
解压下载好的压缩包,并将解压后的目录移动和重命名:
[root@192.168.99.1 /usr/local/src]# tar -xvf kafka_2.13-2.5.0.tgz [root@192.168.99.1 /usr/local/src]# mv kafka_2.13-2.5.0 ../kafka
进入Kafka的配置文件目录,修改配置文件:
[root@192.168.99.1 /usr/local/src]# cd ../kafka/config/ [root@192.168.99.1 /usr/local/kafka/config]# vim server.properties # 指定该节点的brokerId,同一集群中的brokerId须要惟一 broker.id=0 # 指定监听的地址及端口号,该配置项是指定内网ip listeners=PLAINTEXT://192.168.99.1:9092 # 若是须要开放外网访问,则在该配置项指定外网ip advertised.listeners=PLAINTEXT://192.168.99.1:9092 # 指定kafka日志文件的存储目录 log.dirs=/usr/local/kafka/kafka-logs # 指定zookeeper的链接地址,如有多个地址则用逗号分隔 zookeeper.connect=192.168.99.4:2181 [root@192.168.99.1 /usr/local/kafka/config]# mkdir /usr/local/kafka/kafka-logs
在完成配置文件的修改后,为了方便使用Kafka的命令脚本,咱们能够将Kafka的bin目录配置到环境变量中:
[root@192.168.99.1 ~]# vim /etc/profile export KAFKA_HOME=/usr/local/kafka export PATH=$PATH:$KAFKA_HOME/bin [root@192.168.99.1 ~]# source /etc/profile # 让配置生效
这样就可使用以下命令启动Kafka了:
[root@192.168.99.1 ~]# kafka-server-start.sh /usr/local/kafka/config/server.properties &
执行以上命令后,启动日志会输出到控制台,能够经过日志判断是否启动成功,也能够经过查看是否监听了9092端口来判断是否启动成功:
[root@192.168.99.1 ~]# netstat -lntp |grep 9092 tcp6 0 0 192.168.99.1:9092 :::* LISTEN 31943/java [root@192.168.99.1 ~]#
一样的,开启了防火墙的话,还须要开放相应的端口号:
[root@192.168.99.1 ~]# firewall-cmd --zone=public --add-port=9092/tcp --permanent [root@192.168.99.1 ~]# firwall-cmd --reload
到此为止,咱们就完成了第一个Kafka节点的安装,另外两个节点的安装步骤也是同样的,只须要修改一下配置文件中的brokerId和监听的ip就行了。因此我这里直接将该节点中的Kafka目录拷贝到另外两台机器上:
[root@192.168.99.1 ~]# rsync -av /usr/local/kafka 192.168.99.2:/usr/local/kafka [root@192.168.99.1 ~]# rsync -av /usr/local/kafka 192.168.99.3:/usr/local/kafka
而后修改一下这两个节点的brokerId和监听的ip:
[root@192.168.99.2 /usr/local/kafka/config]# vim server.properties # 修改brokerId broker.id=1 # 指定监听的地址及端口号,该配置项是指定内网ip listeners=PLAINTEXT://192.168.99.2:9092 # 若是须要开放外网访问,则在该配置项指定外网ip advertised.listeners=PLAINTEXT://192.168.99.2:9092 [root@192.168.99.2 /usr/local/kafka/config]# [root@192.168.99.3 /usr/local/kafka/config]# vim server.properties # 修改brokerId broker.id=2 # 指定监听的地址及端口号,该配置项是指定内网ip listeners=PLAINTEXT://192.168.99.3:9092 # 若是须要开放外网访问,则在该配置项指定外网ip advertised.listeners=PLAINTEXT://192.168.99.3:9092 [root@192.168.99.3 /usr/local/kafka/config]#
配置修改完成后,按以前所介绍的步骤启动这两个节点。启动成功后进入Zookeeper中,在/brokers/ids下有相应的brokerId数据表明集群搭建成功:
[root@192.168.99.4 ~]# /usr/local/zookeeper/bin/zkCli.sh [zk: localhost:2181(CONNECTED) 4] ls /brokers/ids [0, 1, 2] [zk: localhost:2181(CONNECTED) 5]
咱们都知道在Kafka中的Topic只是个逻辑概念,实际存储数据的是Partition,因此真正被复制的也是Partition。以下图:
关于副本因子:
副本分配算法以下:
Kafka节点(Broker)故障的两种状况:
Kafka对节点故障的处理方式:
Kafka集群之Leader选举:
“巧妇难为无米之炊”:Kafka有一种无奈的状况,就是ISR中副本所有宕机。对于这种状况,Kafka默认会进行unclean leader选举。Kafka提供了两种不一样的方式进行处理:
等待ISR中任一Replica恢复,并选它为Leader
Leader选举配置建议:
关于ISR更详细的内容能够参考:
原文连接:https://www.jianshu.com/p/cc0b90636715 做者:端碗吹水