这里主要是 Kafka 集群基本配置的相关内容。java
Kafka 集群基本硬件的保证缓存
OS page cache:应当能够缓存全部活跃的 Segment(Kafka 中最基本的数据存储单位);性能优化
fd 限制:100k+;网络
禁用 swapping:简单来讲,swap 做用是当内存的使用达到一个临界值时就会将内存中的数据移动到 swap 交换空间,可是此时,内存可能还有不少空余资源,swap 走的是磁盘 IO,对于内存读写很在乎的系统,最好禁止使用 swap 分区;session
TCP 调优;架构
JVM 配置并发
JDK 8 而且使用 G1 垃圾收集器;app
至少要分配 6-8 GB 的堆内存。负载均衡
使用多块磁盘,并配置为 Kafka 专用的磁盘;异步
JBOD vs RAID10;
JBOD(Just a Bunch of Disks,简单来讲它表示一个没有控制软件提供协调控制的磁盘集合,它将多个物理磁盘串联起来,提供一个巨大的逻辑磁盘,数据是按序存储,它的性能与单块磁盘相似)
JBOD 的一些缺陷:
任何磁盘的损坏都会致使异常关闭,而且须要较长的时间恢复;
数据不保证一致性;
多级目录;
社区也正在解决这么问题,能够关注 KIP 1十二、113:
必要的工具用于管理 JBOD;
自动化的分区管理;
磁盘损坏时,Broker 能够将 replicas 迁移到好的磁盘上;
在同一个 Broker 的磁盘间 reassign replicas;
RAID 10 的特色:
能够容许单磁盘的损坏;
性能和保护;
不一样磁盘间的负载均衡;
高命中来减小 space;
单一的 mount point;
文件系统:
使用 EXT 或 XFS;
SSD;
Kafka 集群须要监控的一些指标,这些指标反应了集群的健康度。
CPU 负载;
Network Metrics;
File Handle 使用;
磁盘空间;
磁盘 IO 性能;
GC 信息;
ZooKeeper 监控。
Partition 有两种副本:Leader,Follower;
Leader 负责维护 in-sync-replicas(ISR)
replica.lag.time.max.ms
:默认为10000,若是 follower 落后于 leader 的消息数超过这个数值时,leader 就将 follower 从 isr 列表中移除;
num.replica.fetchers
,默认为1,用于从 leader 同步数据的 fetcher 线程数;
min.insync.replica
:Producer 端使用来用于保证 Durability(持久性);
当发现 replica 的配置与集群的不一样时,通常状况都是集群上的 replica 少于配置数时,能够从如下几个角度来排查问题:
JMX 监控项:kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions;
可能的缘由:
Broker 挂了?
Controller 的问题?
ZooKeeper 的问题?
Network 的问题?
解决办法:
调整 ISR 的设置;
Broker 扩容。
负责管理 partition 生命周期;
避免 Controller’s ZK 会话超时:
ISR 抖动;
ZK Server 性能问题;
Broker 长时间的 GC;
网络 IO 问题;
监控:
kafka.controller:type=KafkaController,name=ActiveControllerCount,应该为1;
LeaderElectionRate。
容许不在 isr 中 replica 被选举为 leader。
这是 Availability 和 Correctness 之间选择,Kafka 默认选择了可用性;
unclean.leader.election.enable
:默认为 true,即容许不在 isr 中 replica 选为 leader,这个配置能够全局配置,也能够在 topic 级别配置;
监控:kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec。
Broker 级别有几个比较重要的配置,通常须要根据实际状况进行相应配置的:
log.retention.{ms, minutes, hours}
, log.retention.bytes
:数据保存时间;
message.max.bytes
, replica.fetch.max.bytes
;
delete.topic.enable
:默认为 false,是否容许经过 admin tool 来删除 topic;
unclean.leader.election.enable
= false,参见上面;
min.insync.replicas
= 2:当 Producer 的 acks 设置为 all 或 -1 时, min.insync.replicas
表明了必须进行确认的最小 replica 数,若是不够的话 Producer 将会报 NotEnoughReplicas
或 NotEnoughReplicasAfterAppend
异常;
replica.lag.time.max.ms
(超过这个时间没有发送请求的话,follower 将从 isr 中移除), num.replica.fetchers;
replica.fetch.response.max.bytes
;
zookeeper.session.timeout.ms
= 30s;
num.io.threads
:默认为8,KafkaRequestHandlerPool 的大小。
欢迎学Java和大数据的朋友们加入java架构交流: 855835163
群内提供免费的架构资料还有:Java工程化、高性能及分布式、高性能、深刻浅出。高架构。性能调优、Spring,MyBatis,Netty源码分析和大数据等多个知识点高级进阶干货的免费直播讲解 能够进来一块儿学习交流哦
Broker 评估
每一个 Broker 的 Partition 数不该该超过2k;
控制 partition 大小(不要超过25GB);
集群评估(Broker 的数量根据如下条件配置)
数据保留时间;
集群的流量大小;
集群扩容:
磁盘使用率应该在 60% 如下;
网络使用率应该在 75% 如下;
集群监控
保持负载均衡;
确保 topic 的 partition 均匀分布在全部 Broker 上;
确保集群的阶段没有耗尽磁盘或带宽。
Partition 数:kafka.server:type=ReplicaManager,name=PartitionCount;
Leader 副本数:kafka.server:type=ReplicaManager,name=LeaderCount;
ISR 扩容/缩容率:kafka.server:type=ReplicaManager,name=IsrExpandsPerSec;
读写速率:Message in rate/Byte in rate/Byte out rate;
网络请求的平均空闲率:NetworkProcessorAvgIdlePercent;
请求处理平均空闲率:RequestHandlerAvgIdlePercent。
partition 数
Partition 数应该至少与最大 consumer group 中 consumer 线程数一致;
对于使用频繁的 topic,应该设置更多的 partition;
控制 partition 的大小(25GB 左右);
考虑应用将来的增加(可使用一种机制进行自动扩容);
使用带 key 的 topic;
partition 扩容:当 partition 的数据量超过一个阈值时应该自动扩容(实际上还应该考虑网络流量)。
根据吞吐量的要求设置 partition 数:
假设 Producer 单 partition 的吞吐量为 P;
consumer 消费一个 partition 的吞吐量为 C;
而要求的吞吐量为 T;
那么 partition 数至少应该大于 T/P、T/c 的最大值;
更多的 partition,意味着:
更多的 fd;
可能增长 Unavailability(可能会增长不可用的时间);
可能增长端到端的延迟;
client 端将会使用更多的内存。
这里简单讲述一下,Partition 的增长将会带来如下几个优势和缺点:
增长吞吐量:对于 consumer 来讲,一个 partition 只能被一个 consumer 线程所消费,适当增长 partition 数,能够增长 consumer 的并发,进而增长系统的吞吐量;
须要更多的 fd:对于每个 segment,在 broker 都会有一个对应的 index 和实际数据文件,而对于 Kafka Broker,它将会对于每一个 segment 每一个 index 和数据文件都会打开相应的 file handle(能够理解为 fd),所以,partition 越多,将会带来更多的 fd;
可能会增长数据不可用性(主要是指增长不可用时间):主要是指 broker 宕机的状况,越多的 partition 将会意味着越多的 partition 须要 leader 选举(leader 在宕机这台 broker 的 partition 须要从新选举),特别是若是恰好 controller 宕机,从新选举的 controller 将会首先读取全部 partition 的 metadata,而后才进行相应的 leader 选举,这将会带来更大不可用时间;
可能增长 End-to-end 延迟:一条消息只有其被同步到 isr 的全部 broker 上后,才能被消费,partition 越多,不一样节点之间同步就越多,这可能会带来毫秒级甚至数十毫秒级的延迟;
Client 将会须要更多的内存:Producer 和 Consumer 都会按照 partition 去缓存数据,每一个 partition 都会带来数十 KB 的消耗,partition 越多, Client 将会占用更多的内存。
避免被恶意 Client 攻击,保证 SLA;
设置 produce 和 fetch 请求的字节速率阈值;
能够应用在 user、client-id、或者 user 和 client-id groups;
Broker 端的 metrics 监控:throttle-rate、byte-rate;
replica.fetch.response.max.bytes
:用于限制 replica 拉取请求的内存使用;
进行数据迁移时限制贷款的使用, kafka-reassign-partitions.sh -- -throttle option
。
使用 Java 版的 Client;
使用 kafka-producer-perf-test.sh
测试你的环境;
设置内存、CPU、batch 压缩;
batch.size:该值设置越大,吞吐越大,但延迟也会越大;
linger.ms:表示 batch 的超时时间,该值越大,吞吐越大、但延迟也会越大;
max.in.flight.requests.per.connection
:默认为5,表示 client 在 blocking 以前向单个链接(broker)发送的未确认请求的最大数,超过1时,将会影响数据的顺序性;
compression.type
:压缩设置,会提升吞吐量;
acks
:数据 durability 的设置;
避免大消息
会使用更多的内存;
下降 Broker 的处理速度;
若是吞吐量小于网络带宽
增长线程;
提升 batch.size;
增长更多 producer 实例;
增长 partition 数;
设置 acks=-1 时,若是延迟增大:能够增大 num.replica.fetchers
(follower 同步数据的线程数)来调解;
跨数据中心的传输:增长 socket 缓冲区设置以及 OS tcp 缓冲区设置。
batch-size-avg
compression-rate-avg
waiting-threads
buffer-available-bytes
record-queue-time-max
record-send-rate
records-per-request-avg
使用 kafka-consumer-perf-test.sh
测试环境;
吞吐量问题:
partition 数太少;
OS page cache:分配足够的内存来缓存数据;
应用的处理逻辑;
offset topic( __consumer_offsets
)
offsets.topic.replication.factor
:默认为3;
offsets.retention.minutes
:默认为1440,即 1day;
– MonitorISR,topicsize;
offset commit较慢:异步 commit 或 手动 commit。
fetch.min.bytes
、 fetch.max.wait.ms
;
max.poll.interval.ms
:调用 poll()
以后延迟的最大时间,超过这个时间没有调用 poll()
的话,就会认为这个 consumer 挂掉了,将会进行 rebalance;
max.poll.records
:当调用 poll()
以后返回最大的 record 数,默认为500;
session.timeout.ms
;
Consumer Rebalance
– check timeouts
– check processing times/logic
– GC Issues
网络配置;
consumer 是否跟得上数据的发送速度。
Consumer Lag:consumer offset 与 the end of log(partition 能够消费的最大 offset) 的差值;
监控
metric 监控:records-lag-max;
经过 bin/kafka-consumer-groups.sh
查看;
用于 consumer 监控的 LinkedIn’s Burrow;
减小 Lag
分析 consumer:是 GC 问题仍是 Consumer hang 住了;
增长 Consumer 的线程;
增长分区数和 consumer 线程;
这个是经常使用的配置,
block.on.buffer.full
:默认设置为 false,当达到内存设置时,可能经过 block 中止接受新的 record 或者抛出一些错误,默认状况下,Producer 将不会抛出 BufferExhaustException,而是当达到 max.block.ms
这个时间后直接抛出 TimeoutException。设置为 true 的意义就是将 max.block.ms
设置为 Long.MAX_VALUE,将来版本中这个设置将被遗弃,推荐设置 max.block.ms
。
欢迎学Java和大数据的朋友们加入java架构交流: 855835163 群内提供免费的架构资料还有:Java工程化、高性能及分布式、高性能、深刻浅出。高架构。性能调优、Spring,MyBatis,Netty源码分析和大数据等多个知识点高级进阶干货的免费直播讲解 能够进来一块儿学习交流哦