Kafka(二)Kafka集群搭建


环境描述apache

服务器名称 系统 配置
Srv01.contoso.com CentOS 七、Kafka_2.11-1.1.0 IP:172.16.48.163
Srv02.contoso.com CentOS 七、Kafka_2.11-1.1.0 IP:172.16.48.149
Srv03.contoso.com CentOS 七、Kafka_2.11-1.1.0 IP:172.16.48.154

Zookeeper集群安装不作赘述,请看Zookeeper详解(二):Zookeeper安装和运行
bootstrap


安装kafka集群缓存

下载Kafka安装包:http://kafka.apache.org/downloads bash

Snip20180701_35.png

下载以后解压便可。(因为Kafka是用Scala语言开发,运行在JVM上,因此要先安装JDK)服务器

Snip20180701_36.png

编辑Kafka服务的配置文件server.properties。这里还有一个zookeeper.properties配置文件,该文件是若是你使用kafka自带zookeeper那么你就须要配置它。若是你有另外的zookeeper集群那就忽略就行了。数据结构

Snip20180701_38.png

Snip20180701_39.png

Snip20180701_42.png

配置要链接的zookeeper,我这里就使用一台。另外这个 /kafka的意思是给它指定一个节点。这个节点你须要提早创建。让kafka把他须要的数据结构都创建在这个节点下,不然它会使用 “/”节点。
socket

Snip20180701_43.png

最少运行起来至少要配置四项,也就是上面的内容。ide

listeners   # kafka服务器监听IP和端口,这里也能够写域名,只要能解析就行
broker.id   # 三台服务器的ID不能相同,第一台是0,第二台是1,第三台是2
log.dirs    # 日志路径
zookeeper.connect  # Zookeeper链接参数

# 另外我还配置了
message.max.bytes =5000000
default.replication.factor =2

配置好后把配置文件复制到其余2台服务器上,并修改broker.id和listeners字段。下面启动Kafka,进入bin目录运行下面的命令,有两种方式性能

# 第一种方式(推荐)
kafka-server-start.sh -daemon ../config/server.properties
# 第二种方式
nohup kafka-server-start.sh config/server.properties&

# 中止
kafka-server-stop.sh

Snip20180701_44.png

查看进程和端口测试

Snip20180701_46.png

按照一样的方法启动剩余2台。查看日志

日志名称 说明
server.log 是kafka系统日志,很经常使用
state-change.log leader切换日志,就是broker宕机副本切换
controller.log kafka集群中有一台机器是控制器,那么控制器角色的日志就记录在这里

Snip20180701_47.png

咱们看看zookeeper里面的三个节点注册信息

Snip20180701_49.png

测试生产者与消费者之间的消息发送,这是利用它自带的脚本程序来建立生产者和消费者

kafka-console-producer.sh --broker-list 172.16.48.163:9092 --topic test1

Snip20180701_50.png

kafka-console-consumer.sh --bootstrap-server 172.16.48.163:9092 --topic test1 --from-beginning

Snip20180701_51.png

而后在生产者的>提示符后面输入内容,那么消费者就会收到

Snip20180701_52.png


Kafka命令使用

说明:全部命令都要在bin目录下执行,除非你配置了环境变量。  --zookeeper 172.16.48.163:2181/kafka 之因此后面加一个/kafka这个是由于我在kafka配置文件中指明使用这个名称空间。

# 建立主题
./kafka-topics.sh --create --zookeeper 172.16.48.163:2181/kafka --replication-factor 2 --partitions 1 --topic test9999
--replication-factor 该主题每一个分区的副本数,不能大于broker数量。这个副本用于备份,假设每一个分区有2个副本,那么只有一个是leader负责读写,follower只是负责同步内容,对外提供读写的也是由leader提供。Leader宕机follower接管成为新的leader。这里的数量是包括Leader+Follwer
--partitions 分区数量,控制将主题切分红多少个LOG。消费者数量应该和分区数量相等,若是超过则毫无心义。
--topic
主题名称

Snip20180701_54.png

# 删除主题
kafka-topics.sh --delete --zookeeper 172.16.48.163:2181/kafka --topic test9999

Snip20180701_56.png

# 查看全部主题
./kafka-topics.sh --list --zookeeper 172.16.48.163:2181/kafka

Snip20180701_55.png

# 查看指定topic信息,从下图能够看出当时创建这个topic的时候咱们设置3个分区,每一个分区的副本是3.
./kafka-topics.sh --describe --zookeeper 172.16.48.163:2181/kafka --topic BBB
PartiticonCount 显示分区数量一共有多少
ReplicationFactor 副本因子是多少
Partition 分区编号
Leader

显示Leader副本在哪一个Broker上,这里是不一样分区会有不一样,表示Leader在broker.id=0的服务器上。三个分区每一个分区有三个副本,分区编号从0开始,因此这个Leader是说后面Replicas副本里面哪一个是Leader。Leader副本提供读写,非Leader副本只作数据备份

从下图能够看出分区0和1对外提供读写的副本都在broker 2上。固然这不是一个好现象,意味着这个服务器将处理一个主题的2个分区读写,咱们要平均分开。

Replicas 显示该partitions全部副本存储在哪些节点上 broker.id 这个是配置文件中设置的,它包括leaderfollower节点
Isr 显示副本都已经同步的节点集合,这个集合的全部节点都是存活的,而且跟LEADER节点同步

Snip20180701_58.png

# 平衡读写
kafka-preferred-replica-election.sh --zookeeper 172.16.48.163:2181/kafka

Snip20180701_59.png

再次查看BBB主题发现Leader均衡了,3个分区的Leader副本由3个broker各自承担一个。不像以前0和1分区的Leader副本都在broker 2上

Snip20180701_60.png

# 线上集群全部主题状况
./kafka-topics.sh --describe --zookeeper 172.16.48.163:2181/kafka

Snip20180701_57.png



kafka服务器配置文件说明

# ----------------------系统相关----------------------
# broker的全局惟一编号,不能重复,和zookeeper的myid是一个意思
broker.id=0

# broker监听IP和端口也能够是域名
listeners=PLAINTEXT://172.16.48.163:9092

# 用于接收请求的线程数量
num.network.threads=3

# 用于处理请求的线程数量,包括磁盘IO请求,这个数量和log.dirs配置的目录数量有关,这里的数量不能小于log.dirs的数量,
# 虽然log.dirs是配置日志存放路径,可是它能够配置多个目录后面用逗号分隔
num.io.threads=8

# 发送缓冲区大小,也就是说发送消息先发送到缓冲区,当缓冲区满了以后一块儿发送出去
socket.send.buffer.bytes=102400

# 接收缓冲区大小,同理接收到缓冲区,当到达这个数量时就同步到磁盘
socket.receive.buffer.bytes=102400

# 向kafka套接字请求最大字节数量,防止服务器OOM,也就是OutOfMemery,这个数量不要超过JAVA的堆栈大小,
socket.request.max.bytes=104857600

# 日志路径也就是分区日志存放的地方,你所创建的topic的分区就在这里面,可是它能够配置多个目录后面用逗号分隔
log.dirs=/tmp/kafka-logs

# 消息体(也就是往Kafka发送的单条消息)最大大小,单位是字节,必须小于socket.request.max.bytes值
message.max.bytes =5000000

# 自动平衡因为某个broker故障会致使Leader副本迁移到别的broker,当以前的broker恢复后也不会迁移回来,有时候咱们须要
# 手动进行平衡避免同一个主题不一样分区的Leader副本在同一台broker上,下面这个参数就是开启自动平衡功能
auto.leader.rebalance.enable=true

# 设置了上面的自动平衡,当故障转移后,隔300秒(默认)触发一个定时任务进行平衡操做,而只有代理的不均衡率为10%以上才会执行
leader.imbalance.check.interval.seconds=300

# 设置代理的不均衡率,默认是10%
leader.imbalance.per.broker.percentage=10

# ---------------分区相关-------------------------

# 默认分区数量,当创建Topic时不指定分区数量,默认就1
num.partitions=1

# 是否容许自动建立topic ,如果false,就须要经过命令建立topic
auto.create.topics.enable =true
 
# 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数
default.replication.factor =2

# ---------------日志相关-------------------------

# segment文件默认会被保留7天的时间,超时的话就会被清理,那么清理这件事情就须要有一些线程来作。
# 这里就是用来设置恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1

# 日志文件中每一个segment的大小,默认为1G。topic的分区是以一堆segment文件存储的,这个控制每一个segment的大小,当超过这个大小会创建一个新日志文件
# 这个参数会被topic建立时的指定参数覆盖,若是你建立Topic的时候指定了这个参数,那么你以你指定的为准。
log.segment.bytes=1073741824

# 数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端可以多久去消费数据
# log.retention.bytes和log.retention.minutes|hours任意一个达到要求,都会执行删除
# 若是你建立Topic的时候指定了这个参数,那么你以你指定的为准
log.retention.hours|minutes=168

# 这个参数会在日志segment没有达到log.segment.bytes设置的大小默认1G的时候,也会强制新建一个segment会被
# topic建立时的指定参数覆盖
log.roll.hours=168 

# 上面的参数设置了每个segment文件的大小是1G,那么就须要有一个东西去按期检查segment文件有没有达到1G,多长时间去检查一次,
# 就须要设置一个周期性检查文件大小的时间(单位是毫秒)。
log.retention.check.interval.ms=300000

# 日志清理策略 选择有:delete和compact 主要针对过时数据的处理,或是日志文件达到限制的额度,
# 若是你建立Topic的时候指定了这个参数,那么你以你指定的为准
log.cleanup.policy = delete

# 是否启用日志清理功能,默认是启用的且清理策略为compact,也就是压缩。
log.cleaner.enable=false

# 日志清理时所使用的缓存空间大小
log.cleaner.dedupe.buffer.size=134217728

# log文件"sync"到磁盘以前累积的消息条数,由于磁盘IO操做是一个慢操做,但又是一个"数据可靠性"的必要手段
# 因此此参数的设置,须要在"数据可靠性"与"性能"之间作必要的权衡.
# 若是此值过大,将会致使每次"fsync"的时间较长(IO阻塞)
# 若是此值太小,将会致使"fsync"的次数较多,这也意味着总体的client请求有必定的延迟.
# 物理server故障,将会致使没有fsync的消息丢失.
log.flush.interval.messages=9223372036854775807

# 检查是否须要固化到硬盘的时间间隔
log.flush.scheduler.interval.ms =3000
 
# 仅仅经过interval来控制消息的磁盘写入时机,是不足的.
# 此参数用于控制"fsync"的时间间隔,若是消息量始终没有达到阀值,可是离上一次磁盘同步的时间间隔
# 达到阀值,也将触发.
log.flush.interval.ms = None

# --------------------------复制(Leader、replicas) 相关-------------------
# partition leader与replicas之间通信时,socket的超时时间
controller.socket.timeout.ms =30000

# replicas响应partition leader的最长等待时间,如果超过这个时间,就将replicas列入ISR(in-sync replicas),
# 并认为它是死的,不会再加入管理中
replica.lag.time.max.ms =10000

# follower与leader之间的socket超时时间
replica.socket.timeout.ms=300000

# leader复制时候的socket缓存大小
replica.socket.receive.buffer.bytes=65536

# replicas每次获取数据的最大大小
replica.fetch.max.bytes =1048576

# replicas同leader之间通讯的最大等待时间,失败了会重试
replica.fetch.wait.max.ms =500

# fetch的最小数据尺寸,若是leader中还没有同步的数据不足此值,将会阻塞,直到知足条件
replica.fetch.min.bytes =1

# leader 进行复制的线程数,增大这个数值会增长follower的IO
num.replica.fetchers=1

# 最小副本数量
min.insync.replicas = 2
相关文章
相关标签/搜索