启动服务(每台都须要执行)java
cd /usr/local/zookeeper/zookeeper-3.4.10/bin ./zkServer.sh start
检查服务状态linux
使用 ./zkServer.sh status
命令检查服务状态程序员
192.168.1.7 --- follower面试
192.168.1.8 --- leaderapache
192.168.1.9 --- followerbootstrap
zk集群通常只有一个leader,多个follower,主通常是相应客户端的读写请求,而从主同步数据,当主挂掉以后就会从follower里投票选举一个leader出来。服务器
在 /usr/local
下新建 kafka
文件夹,而后把下载完成的 tar.gz 包移到 /usr/local/kafka 目录下,使用 tar -zxvf 压缩包
进行解压,解压完成后,进入到 kafka_2.12-2.3.0 目录下,新建 log 文件夹,进入到 config 目录下markdown
咱们能够看到有不少 properties 配置文件,这里主要关注 server.properties
这个文件便可。网络
kafka 启动方式有两种,一种是使用 kafka 自带的 zookeeper 配置文件来启动(能够按照官网来进行启动,并使用单个服务多个节点来模拟集群http://kafka.apache.org/quickstart#quickstart_multibroker),一种是经过使用独立的zk集群来启动,这里推荐使用第二种方式,使用 zk 集群来启动socket
须要为每一个服务
都修改一下配置项,也就是server.properties
, 须要更新和添加的内容有
broker.id=0 //初始是0,每一个 server 的broker.id 都应该设置为不同的,就和 myid 同样 个人三个服务分别设置的是 1,2,3 log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #在log.retention.hours=168 下面新增下面三项 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #设置zookeeper的链接端口 zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181
配置项的含义
broker.id=0 #当前机器在集群中的惟一标识,和zookeeper的myid性质同样 port=9092 #当前kafka对外提供服务的端口默认是9092 host.name=192.168.1.7 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。 num.network.threads=3 #这个是borker进行网络处理的线程数 num.io.threads=8 #这个是borker进行I/O处理的线程数 log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目录,这个目录能够配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,若是配置多个目录,新建立的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个 socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一会儿就发送的,先回存储到缓冲区了到达必定的大小后在发送,能提升性能 socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达必定大小后在序列化到磁盘 socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小 num.partitions=1 #默认的分区数,一个topic默认1个分区数 log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天 message.max.byte=5242880 #消息保存的最大值5M default.replication.factor=2 #kafka保存消息的副本数,若是一个副本失效了,另外一个还能够继续提供服务 replica.fetch.max.bytes=5242880 #取消息的最大直接数 log.segment.bytes=1073741824 #这个参数是:由于kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过时的消息若是有,删除 log.cleaner.enable=false #是否启用log压缩,通常不用启用,启用的话能够提升性能 zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #设置zookeeper的链接端口
/usr/local/kafka/kafka_2.12-2.3.0/bin
目录下# 启动后台进程 ./kafka-server-start.sh -daemon ../config/server.properties
# 执行命令 jps 6201 QuorumPeerMain 7035 Jps 6972 Kafka
# cd .. 往回退一层 到 /usr/local/kafka/kafka_2.12-2.3.0 目录下 bin/kafka-topics.sh --create --zookeeper 192.168.1.7:2181 --replication-factor 2 --partitions 1 --topic cxuan
对上面的解释
--replication-factor 2 复制两份
--partitions 1 建立1个分区
--topic 建立主题
查看咱们的主题是否出建立成功
bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181
启动一个服务就能把集群启动起来
在一台机器上建立一个发布者
# 建立一个broker,发布者 ./kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic cxuantopic
在一台服务器上建立一个订阅者
# 建立一个consumer, 消费者 bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning
注意:这里使用 --zookeeper 的话可能出现
zookeeper is not a recognized option
的错误,这是由于 kafka 版本过高,须要使用--bootstrap-server
指令
测试结果
发布
消费
显示 topic
bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181 # 显示 cxuantopic
查看 topic 状态
bin/kafka-topics.sh --describe --zookeeper 192.168.1.7:2181 --topic cxuantopic # 下面是显示的详细信息 Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs: Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 # 分区为为1 复制因子为2 主题 cxuantopic 的分区为0 # Replicas: 0,1 复制的为1,2
Leader
负责给定分区的全部读取和写入的节点,每一个节点都会经过随机选择成为 leader。
Replicas
是为该分区复制日志的节点列表,不管它们是 Leader 仍是当前处于活动状态。
Isr
是同步副本的集合。它是副本列表的子集,当前仍处于活动状态并追随Leader。
至此,kafka 集群搭建完毕。
刚刚咱们都使用的是 相同的ip 服务,下面使用其余集群中的节点,验证是否可以接受到服务
在另外两个节点上使用
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning
而后再使用 broker 进行消息发送,经测试三个节点均可以接受到消息。
在搭建 Kafka 的时候咱们简单介绍了一下 server.properties
中配置的含义,如今咱们来详细介绍一下参数的配置和概念
这些参数是 kafka 中最基本的配置
每一个 broker 都须要有一个标识符,使用 broker.id 来表示。它的默认值是 0,它能够被设置成其余任意整数,在集群中须要保证每一个节点的 broker.id 都是惟一的。
若是使用配置样原本启动 kafka ,它会监听 9092 端口,修改 port 配置参数能够把它设置成其余任意可用的端口。
用于保存 broker 元数据的地址是经过 zookeeper.connect 来指定。 localhost:2181 表示运行在本地 2181 端口。该配置参数是用逗号分隔的一组 hostname:port/path 列表,每一部分含义以下:
hostname 是 zookeeper 服务器的服务名或 IP 地址
port 是 zookeeper 链接的端口
/path 是可选的 zookeeper 路径,做为 Kafka 集群的 chroot 环境。若是不指定,默认使用跟路径
Kafka 把消息都保存在磁盘上,存放这些日志片断的目录都是经过 log.dirs
来指定的。它是一组用逗号分隔的本地文件系统路径。若是指定了多个路径,那么 broker 会根据 "最少使用" 原则,把同一分区的日志片断保存到同一路径下。要注意,broker 会向拥有最少数目分区的路径新增分区,而不是向拥有最小磁盘空间的路径新增分区。
对于以下 3 种状况,Kafka 会使用可配置的线程池来处理日志片断
服务器正常启动,用于打开每一个分区的日志片断;
服务器崩溃后启动,用于检查和截断每一个分区的日志片断;
服务器正常关闭,用于关闭日志片断
默认状况下,每一个日志目录只使用一个线程。由于这些线程只是在服务器启动和关闭时会用到,因此彻底能够设置大量的线程来达到井行操做的目的。特别是对于包含大量分区的服务器来讲,一旦发生崩愤,在进行恢复时使用井行操做可能会省下数小时的时间。设置此参数时须要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,若是 num.recovery.threads.per.data.dir 被设为 8,而且 log.dir 指定了 3 个路径,那么总共须要 24 个线程。
默认状况下,Kafka 会在以下 3 种状况下建立主题
当一个生产者开始往主题写入消息时
当一个消费者开始从主题读取消息时
当任意一个客户向主题发送元数据请求时
若是你想要删除一个主题,你可使用主题管理工具。默认状况下,是不容许删除主题的,delete.topic.enable 的默认值是 false 所以你不能随意删除主题。这是对生产环境的合理性保护,可是在开发环境和测试环境,是能够容许你删除主题的,因此,若是你想要删除主题,须要把 delete.topic.enable 设为 true。
Kafka 为新建立的主题提供了不少默认配置参数,下面就来一块儿认识一下这些参数
num.partitions 参数指定了新建立的主题须要包含多少个分区。若是启用了主题自动建立功能(该功能是默认启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,咱们能够增长主题分区的个数,但不能减小分区的个数。
这个参数比较简单,它表示 kafka保存消息的副本数,若是一个副本失效了,另外一个还能够继续提供服务default.replication.factor 的默认值为1,这个参数在你启用了主题自动建立功能后有效。
Kafka 一般根据时间来决定数据能够保留多久。默认使用 log.retention.hours 参数来配置时间,默认是 168 个小时,也就是一周。除此以外,还有两个参数 log.retention.minutes 和 log.retentiion.ms 。这三个参数做用是同样的,都是决定消息多久之后被删除,推荐使用 log.retention.ms。
另外一种保留消息的方式是判断消息是否过时。它的值经过参数 log.retention.bytes
来指定,做用在每个分区上。也就是说,若是有一个包含 8 个分区的主题,而且 log.retention.bytes 被设置为 1GB,那么这个主题最多能够保留 8GB 数据。因此,当主题的分区个数增长时,整个主题能够保留的数据也随之增长。
上述的日志都是做用在日志片断上,而不是做用在单个消息上。当消息到达 broker 时,它们被追加到分区的当前日志片断上,当日志片断大小到达 log.segment.bytes 指定上限(默认为 1GB)时,当前日志片断就会被关闭,一个新的日志片断被打开。若是一个日志片断被关闭,就开始等待过时。这个参数的值越小,就越会频繁的关闭和分配新文件,从而下降磁盘写入的总体效率。
上面提到日志片断经关闭后需等待过时,那么 log.segment.ms
这个参数就是指定日志多长时间被关闭的参数和,log.segment.ms 和 log.retention.bytes 也不存在互斥问题。日志片断会在大小或时间到达上限时被关闭,就看哪一个条件先获得知足。
broker 经过设置 message.max.bytes
参数来限制单个消息的大小,默认是 1000 000, 也就是 1MB,若是生产者尝试发送的消息超过这个大小,不只消息不会被接收,还会收到 broker 返回的错误消息。跟其余与字节相关的配置参数同样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于 mesage.max.bytes,那么消息的实际大小能够大于这个值
以上就是个人面试过程,为了此次面试,也收集了不少的面试题,反正我已经面过了,那就免费分享出来吧!
须要的朋友:关注一下,而后点击这里便可免费领取
如下是部分面试题截图