真的,搞定kafka看这一篇就够了,linux系统入门视频

启动服务(每台都须要执行)java

cd /usr/local/zookeeper/zookeeper-3.4.10/bin
./zkServer.sh start

检查服务状态linux

使用 ./zkServer.sh status 命令检查服务状态程序员

192.168.1.7 --- follower面试

真的,搞定kafka看这一篇就够了,linux系统入门视频

192.168.1.8 --- leaderapache

真的,搞定kafka看这一篇就够了,linux系统入门视频

192.168.1.9 --- followerbootstrap

真的,搞定kafka看这一篇就够了,linux系统入门视频

zk集群通常只有一个leader,多个follower,主通常是相应客户端的读写请求,而从主同步数据,当主挂掉以后就会从follower里投票选举一个leader出来。服务器

Kafka 集群搭建

准备条件

  • 搭建好的 Zookeeper 集群
  • Kafka 压缩包

/usr/local 下新建 kafka 文件夹,而后把下载完成的 tar.gz 包移到 /usr/local/kafka 目录下,使用 tar -zxvf 压缩包 进行解压,解压完成后,进入到 kafka_2.12-2.3.0 目录下,新建 log 文件夹,进入到 config 目录下markdown

咱们能够看到有不少 properties 配置文件,这里主要关注 server.properties 这个文件便可。
真的,搞定kafka看这一篇就够了,linux系统入门视频网络

kafka 启动方式有两种,一种是使用 kafka 自带的 zookeeper 配置文件来启动(能够按照官网来进行启动,并使用单个服务多个节点来模拟集群http://kafka.apache.org/quickstart#quickstart_multibroker),一种是经过使用独立的zk集群来启动,这里推荐使用第二种方式,使用 zk 集群来启动socket

修改配置项

须要为每一个服务都修改一下配置项,也就是server.properties, 须要更新和添加的内容有

broker.id=0 //初始是0,每一个 server 的broker.id 都应该设置为不同的,就和 myid 同样 个人三个服务分别设置的是 1,2,3
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log

#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#设置zookeeper的链接端口
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181

配置项的含义

broker.id=0  #当前机器在集群中的惟一标识,和zookeeper的myid性质同样
port=9092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.1.7 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目录,这个目录能够配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,若是配置多个目录,新建立的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一会儿就发送的,先回存储到缓冲区了到达必定的大小后在发送,能提升性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达必定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,若是一个副本失效了,另外一个还能够继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:由于kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过时的消息若是有,删除
log.cleaner.enable=false #是否启用log压缩,通常不用启用,启用的话能够提升性能
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #设置zookeeper的链接端口

启动 Kafka 集群并测试

  • 启动服务,进入到 /usr/local/kafka/kafka_2.12-2.3.0/bin 目录下
# 启动后台进程
./kafka-server-start.sh -daemon ../config/server.properties
  • 检查服务是否启动
# 执行命令 jps
6201 QuorumPeerMain
7035 Jps
6972 Kafka
  • kafka 已经启动
  • 建立 Topic 来验证是否建立成功
# cd .. 往回退一层 到 /usr/local/kafka/kafka_2.12-2.3.0 目录下
bin/kafka-topics.sh --create --zookeeper 192.168.1.7:2181 --replication-factor 2 --partitions 1 --topic cxuan

对上面的解释

--replication-factor 2 复制两份

--partitions 1 建立1个分区

--topic 建立主题

查看咱们的主题是否出建立成功

bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181

真的,搞定kafka看这一篇就够了,linux系统入门视频

启动一个服务就能把集群启动起来

在一台机器上建立一个发布者

# 建立一个broker,发布者
./kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic cxuantopic

在一台服务器上建立一个订阅者

# 建立一个consumer, 消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning

注意:这里使用 --zookeeper 的话可能出现 zookeeper is not a recognized option 的错误,这是由于 kafka 版本过高,须要使用 --bootstrap-server 指令

测试结果

发布

真的,搞定kafka看这一篇就够了,linux系统入门视频

消费

真的,搞定kafka看这一篇就够了,linux系统入门视频

其余命令

显示 topic

bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181

# 显示
cxuantopic

查看 topic 状态

bin/kafka-topics.sh --describe --zookeeper 192.168.1.7:2181 --topic cxuantopic

# 下面是显示的详细信息
Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs:
Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2

# 分区为为1  复制因子为2   主题 cxuantopic 的分区为0 
# Replicas: 0,1   复制的为1,2

Leader 负责给定分区的全部读取和写入的节点,每一个节点都会经过随机选择成为 leader。

Replicas 是为该分区复制日志的节点列表,不管它们是 Leader 仍是当前处于活动状态。

Isr 是同步副本的集合。它是副本列表的子集,当前仍处于活动状态并追随Leader。

至此,kafka 集群搭建完毕。

验证多节点接收数据

刚刚咱们都使用的是 相同的ip 服务,下面使用其余集群中的节点,验证是否可以接受到服务

在另外两个节点上使用

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning

而后再使用 broker 进行消息发送,经测试三个节点均可以接受到消息。

配置详解

在搭建 Kafka 的时候咱们简单介绍了一下 server.properties 中配置的含义,如今咱们来详细介绍一下参数的配置和概念

常规配置

这些参数是 kafka 中最基本的配置

  • broker.id

每一个 broker 都须要有一个标识符,使用 broker.id 来表示。它的默认值是 0,它能够被设置成其余任意整数,在集群中须要保证每一个节点的 broker.id 都是惟一的。

  • port

若是使用配置样原本启动 kafka ,它会监听 9092 端口,修改 port 配置参数能够把它设置成其余任意可用的端口。

  • zookeeper.connect

用于保存 broker 元数据的地址是经过 zookeeper.connect 来指定。 localhost:2181 表示运行在本地 2181 端口。该配置参数是用逗号分隔的一组 hostname:port/path 列表,每一部分含义以下:

hostname 是 zookeeper 服务器的服务名或 IP 地址

port 是 zookeeper 链接的端口

/path 是可选的 zookeeper 路径,做为 Kafka 集群的 chroot 环境。若是不指定,默认使用跟路径

  • log.dirs

Kafka 把消息都保存在磁盘上,存放这些日志片断的目录都是经过 log.dirs 来指定的。它是一组用逗号分隔的本地文件系统路径。若是指定了多个路径,那么 broker 会根据 "最少使用" 原则,把同一分区的日志片断保存到同一路径下。要注意,broker 会向拥有最少数目分区的路径新增分区,而不是向拥有最小磁盘空间的路径新增分区。

  • num.recovery.threads.per.data.dir

对于以下 3 种状况,Kafka 会使用可配置的线程池来处理日志片断

服务器正常启动,用于打开每一个分区的日志片断;

服务器崩溃后启动,用于检查和截断每一个分区的日志片断;

服务器正常关闭,用于关闭日志片断

默认状况下,每一个日志目录只使用一个线程。由于这些线程只是在服务器启动和关闭时会用到,因此彻底能够设置大量的线程来达到井行操做的目的。特别是对于包含大量分区的服务器来讲,一旦发生崩愤,在进行恢复时使用井行操做可能会省下数小时的时间。设置此参数时须要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,若是 num.recovery.threads.per.data.dir 被设为 8,而且 log.dir 指定了 3 个路径,那么总共须要 24 个线程。

  • auto.create.topics.enable

默认状况下,Kafka 会在以下 3 种状况下建立主题

当一个生产者开始往主题写入消息时

当一个消费者开始从主题读取消息时

当任意一个客户向主题发送元数据请求时

  • delete.topic.enable

若是你想要删除一个主题,你可使用主题管理工具。默认状况下,是不容许删除主题的,delete.topic.enable 的默认值是 false 所以你不能随意删除主题。这是对生产环境的合理性保护,可是在开发环境和测试环境,是能够容许你删除主题的,因此,若是你想要删除主题,须要把 delete.topic.enable 设为 true。

主题默认配置

Kafka 为新建立的主题提供了不少默认配置参数,下面就来一块儿认识一下这些参数

  • num.partitions

num.partitions 参数指定了新建立的主题须要包含多少个分区。若是启用了主题自动建立功能(该功能是默认启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,咱们能够增长主题分区的个数,但不能减小分区的个数。

  • default.replication.factor

这个参数比较简单,它表示 kafka保存消息的副本数,若是一个副本失效了,另外一个还能够继续提供服务default.replication.factor 的默认值为1,这个参数在你启用了主题自动建立功能后有效。

  • log.retention.ms

Kafka 一般根据时间来决定数据能够保留多久。默认使用 log.retention.hours 参数来配置时间,默认是 168 个小时,也就是一周。除此以外,还有两个参数 log.retention.minutes 和 log.retentiion.ms 。这三个参数做用是同样的,都是决定消息多久之后被删除,推荐使用 log.retention.ms。

  • log.retention.bytes

另外一种保留消息的方式是判断消息是否过时。它的值经过参数 log.retention.bytes 来指定,做用在每个分区上。也就是说,若是有一个包含 8 个分区的主题,而且 log.retention.bytes 被设置为 1GB,那么这个主题最多能够保留 8GB 数据。因此,当主题的分区个数增长时,整个主题能够保留的数据也随之增长。

  • log.segment.bytes

上述的日志都是做用在日志片断上,而不是做用在单个消息上。当消息到达 broker 时,它们被追加到分区的当前日志片断上,当日志片断大小到达 log.segment.bytes 指定上限(默认为 1GB)时,当前日志片断就会被关闭,一个新的日志片断被打开。若是一个日志片断被关闭,就开始等待过时。这个参数的值越小,就越会频繁的关闭和分配新文件,从而下降磁盘写入的总体效率。

  • log.segment.ms

上面提到日志片断经关闭后需等待过时,那么 log.segment.ms 这个参数就是指定日志多长时间被关闭的参数和,log.segment.ms 和 log.retention.bytes 也不存在互斥问题。日志片断会在大小或时间到达上限时被关闭,就看哪一个条件先获得知足。

  • message.max.bytes

broker 经过设置 message.max.bytes 参数来限制单个消息的大小,默认是 1000 000, 也就是 1MB,若是生产者尝试发送的消息超过这个大小,不只消息不会被接收,还会收到 broker 返回的错误消息。跟其余与字节相关的配置参数同样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于 mesage.max.bytes,那么消息的实际大小能够大于这个值

写在最后

以上就是个人面试过程,为了此次面试,也收集了不少的面试题,反正我已经面过了,那就免费分享出来吧!

须要的朋友:关注一下,而后点击这里便可免费领取

如下是部分面试题截图

Java程序员秋招三面蚂蚁金服,我总结了全部面试题,也不过如此真的,搞定kafka看这一篇就够了,linux系统入门视频

相关文章
相关标签/搜索