Kafka万亿级消息实战

1、Kafka应用

本文主要总结当Kafka集群流量达到 万亿级记录/天或者十万亿级记录/天  甚至更高后,咱们须要具有哪些能力才能保障集群高可用、高可靠、高性能、高吞吐、安全的运行。html

这里总结内容主要针对Kafka2.1.1版本,包括集群版本升级、数据迁移、流量限制、监控告警、负载均衡、集群扩/缩容、资源隔离、集群容灾、集群安全、性能优化、平台化、开源版本缺陷、社区动态等方面。本文主要是介绍核心脉络,不作过多细节讲解。下面咱们先来看看Kafka做为数据中枢的一些核心应用场景。java

下图展现了一些主流的数据处理流程,Kafka起到一个数据中枢的做用。node

接下来看看咱们Kafka平台总体架构;git

1.1 版本升级

1.1.1  开源版本如何进行版本滚动升级与回退

官网地址:http://kafka.apache.orggithub

1.1.1.2 源码改造如何升级与回退apache

因为在升级过程当中,必然出现新旧代码逻辑交替的状况。集群内部部分节点是开源版本,另一部分节点是改造后的版本。因此,须要考虑在升级过程当中,新旧代码混合的状况,如何兼容以及出现故障时如何回退。bootstrap

1.2 数据迁移

因为Kafka集群的架构特色,这必然致使集群内流量负载不均衡的状况,因此咱们须要作一些数据迁移来实现集群不一样节点间的流量均衡。Kafka开源版本为数据迁移提供了一个脚本工具“bin/kafka-reassign-partitions.sh”,若是本身没有实现自动负载均衡,可使用此脚本。api

开源版本提供的这个脚本生成迁移计划彻底是人工干预的,当集群规模很是大时,迁移效率变得很是低下,通常以天为单位进行计算。固然,咱们能够实现一套自动化的均衡程序,当负载均衡实现自动化之后,基本使用调用内部提供的API,由程序去帮咱们生成迁移计划及执行迁移任务。须要注意的是,迁移计划有指定数据目录和不指定数据目录两种,指定数据目录的须要配置ACL安全认证。缓存

官网地址:http://kafka.apache.org安全

1.2.1 broker间数据迁移

不指定数据目录

//未指定迁移目录的迁移计划
{
    "version":1,
    "partitions":[
        {"topic":"yyj4","partition":0,"replicas":[1000003,1000004]},
        {"topic":"yyj4","partition":1,"replicas":[1000003,1000004]},
        {"topic":"yyj4","partition":2,"replicas":[1000003,1000004]}
    ]
}

指定数据目录

//指定迁移目录的迁移计划
{
    "version":1,
    "partitions":[
        {"topic":"yyj1","partition":0,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},
        {"topic":"yyj1","partition":1,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},
        {"topic":"yyj1","partition":2,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]}
    ]
}

1.2.2 broker内部磁盘间数据迁移

生产环境的服务器通常都是挂载多块硬盘,好比4块/12块等;那么可能出如今Kafka集群内部,各broker间流量比较均衡,可是在broker内部,各磁盘间流量不均衡,致使部分磁盘过载,从而影响集群性能和稳定,也没有较好的利用硬件资源。在这种状况下,咱们就须要对broker内部多块磁盘的流量作负载均衡,让流量更均匀的分布到各磁盘上。

1.2.3 并发数据迁移

当前Kafka开源版本(2.1.1版本)提供的副本迁移工具“bin/kafka-reassign-partitions.sh”在同一个集群内只能实现迁移任务的串行。对于集群内已经实现多个资源组物理隔离的状况,因为各资源组不会相互影响,可是却不能友好的进行并行的提交迁移任务,迁移效率有点低下,这种不足直到2.6.0版本才得以解决。若是须要实现并发数据迁移,能够选择升级Kafka版本或者修改Kafka源码。

1.2.4 终止数据迁移

当前Kafka开源版本(2.1.1版本)提供的副本迁移工具“bin/kafka-reassign-partitions.sh”在启动迁移任务后,没法终止迁移。当迁移任务对集群的稳定性或者性能有影响时,将变得一筹莫展,只能等待迁移任务执行完毕(成功或者失败),这种不足直到2.6.0版本才得以解决。若是须要实现终止数据迁移,能够选择升级Kafka版本或者修改Kafka源码。

1.3 流量限制

1.3.1 生产消费流量限制

常常会出现一些突发的,不可预测的异常生产或者消费流量会对集群的IO等资源产生巨大压力,最终影响整个集群的稳定与性能。那么咱们能够对用户的生产、消费、副本间数据同步进行流量限制,这个限流机制并非为了限制用户,而是避免突发的流量影响集群的稳定和性能,给用户能够更好的服务。

以下图所示,节点入流量由140MB/s左右突增到250MB/s,而出流量则从400MB/s左右突增至800MB/s。若是没有限流机制,那么集群的多个节点将有被这些异常流量打挂的风险,甚至形成集群雪崩。

图片生产/消费流量限制官网地址:点击连接

对于生产者和消费者的流量限制,官网提供了如下几种维度组合进行限制(固然,下面限流机制存在必定缺陷,后面在“Kafka开源版本功能缺陷”咱们将提到):

/config/users/<user>/clients/<client-id> //根据用户和客户端ID组合限流
/config/users/<user>/clients/<default>
/config/users/<user>//根据用户限流 这种限流方式是咱们最经常使用的方式
/config/users/<default>/clients/<client-id>
/config/users/<default>/clients/<default>
/config/users/<default>
/config/clients/<client-id>
/config/clients/<default>

在启动Kafka的broker服务时须要开启JMX参数配置,方便经过其余应用程序采集Kafka的各项JMX指标进行服务监控。当用户须要调整限流阈值时,根据单个broker所能承受的流量进行智能评估,无需人工干预判断是否能够调整;对于用户流量限制,主要须要参考的指标包括如下两个:

(1)消费流量指标:ObjectName:kafka.server:type=Fetch,user=acl认证用户名称 属性:byte-rate(用户在当前broker的出流量)、throttle-time(用户在当前broker的出流量被限制时间)
(2)生产流量指标:ObjectName:kafka.server:type=Produce,user=acl认证用户名称 属性:byte-rate(用户在当前broker的入流量)、throttle-time(用户在当前broker的入流量被限制时间)

1.3.2 follower同步leader/数据迁移流量限制

副本迁移/数据同步流量限制官网地址:连接

涉及参数以下:

//副本同步限流配置共涉及如下4个参数
leader.replication.throttled.rate
follower.replication.throttled.rate
leader.replication.throttled.replicas
follower.replication.throttled.replicas

辅助指标以下:

(1)副本同步出流量指标:ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec
(2)副本同步入流量指标:ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec

1.4 监控告警

关于Kafka的监控有一些开源的工具可用使用,好比下面这几种:

Kafka Manager

Kafka Eagle

Kafka Monitor

KafkaOffsetMonitor;

咱们已经把Kafka Manager做为咱们查看一些基本指标的工具嵌入平台,然而这些开源工具不能很好的融入到咱们本身的业务系统或者平台上。因此,咱们须要本身去实现一套粒度更细、监控更智能、告警更精准的系统。其监控覆盖范围应该包括基础硬件、操做系统(操做系统偶尔出现系统进程hang住状况,致使broker假死,没法正常提供服务)、Kafka的broker服务、Kafka客户端应用程序、zookeeper集群、上下游全链路监控。

1.4.1 硬件监控

网络监控:

核心指标包括网络入流量、网络出流量、网络丢包、网络重传、处于TIME.WAIT的TCP链接数、交换机、机房带宽、DNS服务器监控(若是DNS服务器异常,可能出现流量黑洞,引发大面积业务故障)等。

磁盘监控:

核心指标包括监控磁盘write、磁盘read(若是消费时没有延时,或者只有少许延时,通常都没有磁盘read操做)、磁盘ioutil、磁盘iowait(这个指标若是太高说明磁盘负载较大)、磁盘存储空间、磁盘坏盘、磁盘坏块/坏道(坏道或者坏块将致使broker处于半死不活状态,因为有crc校验,消费者将被卡住)等。

CPU监控:

监控CPU空闲率/负载,主板故障等,一般CPU使用率比较低不是Kafka的瓶颈。

内存/交换区监控:

内存使用率,内存故障。通常状况下,服务器上除了启动Kafka的broker时分配的堆内存之外,其余内存基本所有被用来作PageCache。

缓存命中率监控:

因为是否读磁盘对Kafka的性能影响很大,因此咱们须要监控Linux的PageCache缓存命中率,若是缓存命中率高,则说明消费者基本命中缓存。

详细内容请阅读文章:《Linux Page Cache调优在Kafka中的应用》。

系统日志:

咱们须要对操做系统的错误日志进行监控告警,及时发现一些硬件故障。

1.4.2 broker服务监控

broker服务的监控,主要是经过在broker服务启动时指定JMX端口,而后经过实现一套指标采集程序去采集JMX指标。(服务端指标官网地址

broker级监控:broker进程、broker入流量字节大小/记录数、broker出流量字节大小/记录数、副本同步入流量、副本同步出流量、broker间流量误差、broker链接数、broker请求队列数、broker网络空闲率、broker生产延时、broker消费延时、broker生产请求数、broker消费请求数、broker上分布leader个数、broker上分布副本个数、broker上各磁盘流量、broker GC等。

topic级监控:topic入流量字节大小/记录数、topic出流量字节大小/记录数、无流量topic、topic流量突变(突增/突降)、topic消费延时。

partition级监控:分区入流量字节大小/记录数、分区出流量字节大小/记录数、topic分区副本缺失、分区消费延迟记录、分区leader切换、分区数据倾斜(生产消息时,若是指定了消息的key容易形成数据倾斜,这严重影响Kafka的服务性能)、分区存储大小(能够治理单分区过大的topic)。

用户级监控:用户出/入流量字节大小、用户出/入流量被限制时间、用户流量突变(突增/突降)。

broker服务日志监控:对server端打印的错误日志进行监控告警,及时发现服务异常。

1.4.3.客户端监控

客户端监控主要是本身实现一套指标上报程序,这个程序须要实现

org.apache.kafka.common.metrics.MetricsReporter 接口。而后在生产者或者消费者的配置中加入配置项 metric.reporters,以下所示:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
//ClientMetricsReporter类实现org.apache.kafka.common.metrics.MetricsReporter接口
props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, ClientMetricsReporter.class.getName());
...

客户端指标官网地址:

http://kafka.apache.org/21/documentation.html#selector_monitoring

http://kafka.apache.org/21/documentation.html#common\_node\_monitoring

http://kafka.apache.org/21/documentation.html#producer_monitoring

http://kafka.apache.org/21/documentation.html#producer\_sender\_monitoring

http://kafka.apache.org/21/documentation.html#consumer_monitoring

http://kafka.apache.org/21/documentation.html#consumer\_fetch\_monitoring

客户端监控流程架构以下图所示:

1.4.3.1 生产者客户端监控

维度:用户名称、客户端ID、客户端IP、topic名称、集群名称、brokerIP;

指标:链接数、IO等待时间、生产流量大小、生产记录数、请求次数、请求延时、发送错误/重试次数等。

1.4.3.2 消费者客户端监控

维度:用户名称、客户端ID、客户端IP、topic名称、集群名称、消费组、brokerIP、topic分区;

指标:链接数、io等待时间、消费流量大小、消费记录数、消费延时、topic分区消费延迟记录等。

1.4.4 Zookeeper监控

  1. Zookeeper进程监控;
  2. Zookeeper的leader切换监控;
  3. Zookeeper服务的错误日志监控;

1.4.5 全链路监控

当数据链路很是长的时候(好比:业务应用->埋点SDk->数据采集->Kafka->实时计算->业务应用),咱们定位问题一般须要通过多个团队反复沟通与排查才能发现问题到底出如今哪一个环节,这样排查问题效率比较低下。在这种状况下,咱们就须要与上下游一块儿梳理整个链路的监控。出现问题时,第一时间定位问题出如今哪一个环节,缩短问题定位与故障恢复时间。

1.5 资源隔离

1.5.1 相同集群不一样业务资源物理隔离

咱们对全部集群中不一样对业务进行资源组物理隔离,避免各业务之间相互影响。在这里,咱们假设集群有4个broker节点(Broker1/Broker2/Broker3/Broker4),2个业务(业务A/业务B),他们分别拥有topic分区分布以下图所示,两个业务topic都分散在集群的各个broker上,而且在磁盘层面也存在交叉。

试想一下,若是咱们其中一个业务异常,好比流量突增,致使broker节点异常或者被打挂。那么这时候另一个业务也将受到影响,这样将大大的影响了咱们服务的可用性,形成故障,扩大了故障影响范围。

针对这些痛点,咱们能够对集群中的业务进行物理资源隔离,各业务独享资源,进行资源组划分(这里把4各broker划分为Group1和Group2两个资源组)以下图所示,不一样业务的topic分布在本身的资源组内,当其中一个业务异常时,不会波及另一个业务,这样就能够有效的缩小咱们的故障范围,提升服务可用性。

1.6 集群归类

咱们把集群根据业务特色进行拆分为日志集群、监控集群、计费集群、搜索集群、离线集群、在线集群等,不一样场景业务放在不一样集群,避免不一样业务相互影响。

1.7 扩容/缩容

1.7.1 topic扩容分区

随着topic数据量增加,咱们最初建立的topic指定的分区个数可能已经没法知足数量流量要求,因此咱们须要对topic的分区进行扩展。扩容分区时须要考虑一下几点:

必须保证topic分区leader与follower轮询的分布在资源组内全部broker上,让流量分布更加均衡,同时须要考虑相同分区不一样副本跨机架分布以提升容灾能力;

当topic分区leader个数除以资源组节点个数有余数时,须要把余数分区leader优先考虑放入流量较低的broker。

1.7.2 broker上线

随着业务量增多,数据量不断增大,咱们的集群也须要进行broker节点扩容。关于扩容,咱们须要实现如下几点:

扩容智能评估:根据集群负载,把是否须要扩容评估程序化、智能化;

智能扩容:当评估须要扩容后,把扩容流程以及流量均衡平台化。

1.7.3 broker下线

某些场景下,咱们须要下线咱们的broker,主要包括如下几个场景:

一些老化的服务器须要下线,实现节点下线平台化;

服务器故障,broker故障没法恢复,咱们须要下线故障服务器,实现节点下线平台化;

有更优配置的服务器替换已有broker节点,实现下线节点平台化。

1.8 负载均衡

咱们为何须要负载均衡呢?首先,咱们来看第一张图,下图是咱们集群某个资源组刚扩容后的流量分布状况,流量没法自动的分摊到咱们新扩容后的节点上。那么这个时候须要咱们手动去触发数据迁移,把部分副本迁移至新节点上才能实现流量均衡。

下面,咱们来看一下第二张图。这张图咱们能够看出流量分布很是不均衡,最低和最高流量误差数倍以上。这和Kafka的架构特色有关,当集群规模与数据量达到必定量后,必然出现当问题。这种状况下,咱们也须要进行负载均衡。

咱们再来看看第三张图。这里咱们能够看出出流量只有部分节点突增,这就是topic分区在集群内部不够分散,集中分布到了某几个broker致使,这种状况咱们也须要进行扩容分区和均衡。

咱们比较理想的流量分布应该以下图所示,各节点间流量误差很是小,这种状况下,既能够加强集群扛住流量异常突增的能力又能够提高集群总体资源利用率和服务稳定性,下降成本。

负载均衡咱们须要实现如下效果:

1)生成副本迁移计划以及执行迁移任务平台化、自动化、智能化;

2)执行均衡后broker间流量比较均匀,且单个topic分区均匀分布在全部broker节点上;

3)执行均衡后broker内部多块磁盘间流量比较均衡;

要实现这个效果,咱们须要开发一套本身的负载均衡工具,如对开源的 cruise control进行二次开发;此工具的核心主要在生成迁移计划的策略,迁移计划的生成方案直接影响到最后集群负载均衡的效果。参考内容:

1. linkedIn/cruise-control

2. Introduction to Kafka Cruise Control

3. Cloudera Cruise Control REST API Reference

cruise control架构图以下:

在生成迁移计划时,咱们须要考虑如下几点:

1)选择核心指标做为生成迁移计划的依据,好比出流量、入流量、机架、单topic分区分散性等;

2)优化用来生成迁移计划的指标样本,好比过滤流量突增/突降/掉零等异常样本;

3)各资源组的迁移计划须要使用的样本所有为资源组内部样本,不涉及其余资源组,无交叉;

4)治理单分区过大topic,让topic分区分布更分散,流量不集中在部分broker,让topic单分区数据量更小,这样能够减小迁移的数据量,提高迁移速度;

5)已经均匀分散在资源组内的topic,加入迁移黑名单,不作迁移,这样能够减小迁移的数据量,提高迁移速度;

6)作topic治理,排除长期无流量topic对均衡的干扰;

7)新建topic或者topic分区扩容时,应让全部分区轮询分布在全部broker节点,轮询后余数分区优先分布流量较低的broker;

8)扩容broker节点后开启负载均衡时,优先把同一broker分配了同一大流量(流量大而不是存储空间大,这里能够认为是每秒的吞吐量)topic多个分区leader的,迁移一部分到新broker节点;

9)提交迁移任务时,同一批迁移计划中的分区数据大小误差应该尽量小,这样能够避免迁移任务中小分区迁移完成后长时间等待大分区的迁移,形成任务倾斜;

1.9 安全认证

是否是咱们的集群全部人均可以随意访问呢?固然不是,为了集群的安全,咱们须要进行权限认证,屏蔽非法操做。主要包括如下几个方面须要作安全认证:

(1)生产者权限认证;

(2)消费者权限认证;

(3)指定数据目录迁移安全认证;

官网地址:http://kafka.apache.org

1.10 集群容灾

跨机架容灾:

官网地址:http://kafka.apache.org

跨集群/机房容灾:若是有异地双活等业务场景时,能够参考Kafka2.7版本的MirrorMaker 2.0。

GitHub地址:https://github.com

精确KIP地址 :https://cwiki.apache.org

ZooKeeper集群上Kafka元数据恢复:咱们会按期对ZooKeeper上的权限信息数据作备份处理,当集群元数据异常时用于恢复。

1.11 参数/配置优化

broker服务参数优化:这里我只列举部分影响性能的核心参数。

num.network.threads
#建立Processor处理网络请求线程个数,建议设置为broker当CPU核心数*2,这个值过低常常出现网络空闲过低而缺失副本。
 
num.io.threads
#建立KafkaRequestHandler处理具体请求线程个数,建议设置为broker磁盘个数*2
 
num.replica.fetchers
#建议设置为CPU核心数/4,适当提升能够提高CPU利用率及follower同步leader数据当并行度。
 
compression.type
#建议采用lz4压缩类型,压缩能够提高CPU利用率同时能够减小网络传输数据量。
 
queued.max.requests
#若是是生产环境,建议配置最少500以上,默认为500。
 
log.flush.scheduler.interval.ms
log.flush.interval.ms
log.flush.interval.messages
#这几个参数表示日志数据刷新到磁盘的策略,应该保持默认配置,刷盘策略让操做系统去完成,由操做系统来决定何时把数据刷盘;
#若是设置来这个参数,可能对吞吐量影响很是大;
 
auto.leader.rebalance.enable
#表示是否开启leader自动负载均衡,默认true;咱们应该把这个参数设置为false,由于自动负载均衡不可控,可能影响集群性能和稳定;

生产优化:这里我只列举部分影响性能的核心参数。

linger.ms
#客户端生产消息等待多久时间才发送到服务端,单位:毫秒。和batch.size参数配合使用;适当调大能够提高吞吐量,可是若是客户端若是down机有丢失数据风险;
 
batch.size
#客户端发送到服务端消息批次大小,和linger.ms参数配合使用;适当调大能够提高吞吐量,可是若是客户端若是down机有丢失数据风险;
 
compression.type
#建议采用lz4压缩类型,具有较高的压缩比及吞吐量;因为Kafka对CPU的要求并不高,因此,能够经过压缩,充分利用CPU资源以提高网络吞吐量;
 
buffer.memory
#客户端缓冲区大小,若是topic比较大,且内存比较充足,能够适当调高这个参数,默认只为33554432(32MB)
 
retries
#生产失败后的重试次数,默认0,能够适当增长。当重试超过必定次数后,若是业务要求数据准确性较高,建议作容错处理。
 
retry.backoff.ms
#生产失败后,重试时间间隔,默认100ms,建议不要设置太大或者过小。

除了一些核心参数优化外,咱们还须要考虑好比topic的分区个数和topic保留时间;若是分区个数太少,保留时间太长,可是写入数据量很是大的话,可能形成如下问题:

1)topic分区集中落在某几个broker节点上,致使流量副本失衡;

2)致使broker节点内部某几块磁盘读写超负载,存储被写爆;

1.11.1 消费优化

消费最大的问题,而且常常出现的问题就是消费延时,拉历史数据。当大量拉取历史数据时将出现大量读盘操做,污染pagecache,这个将加剧磁盘的负载,影响集群性能和稳定;

能够怎样减小或者避免大量消费延时呢?

1)当topic数据量很是大时,建议一个分区开启一个线程去消费;

2)对topic消费延时添加监控告警,及时发现处理;

3)当topic数据能够丢弃时,遇到超大延时,好比单个分区延迟记录超过千万甚至数亿,那么能够重置topic的消费点位进行紧急处理;【此方案通常在极端场景才使用】

4)避免重置topic的分区offset到很早的位置,这可能形成拉取大量历史数据;

1.11.2 Linux服务器参数优化

咱们须要对Linux的文件句柄、pagecache等参数进行优化。可参考《Linux Page Cache调优在Kafka中的应用》。

1.12.硬件优化

磁盘优化

在条件容许的状况下,能够采用SSD固态硬盘替换HDD机械硬盘,解决机械盘IO性能较低的问题;若是没有SSD固态硬盘,则能够对服务器上的多块硬盘作硬RAID(通常采用RAID10),让broker节点的IO负载更加均衡。若是是HDD机械硬盘,一个broker能够挂载多块硬盘,好比 12块*4TB。

内存

因为Kafka属于高频读写型服务,而Linux的读写请求基本走的都是Page Cache,因此单节点内存大一些对性能会有比较明显的提高。通常选择256GB或者更高。

网络

提高网络带宽:在条件容许的状况下,网络带宽越大越好。由于这样网络带宽才不会成为性能瓶颈,最少也要达到万兆网络( 10Gb,网卡为全双工)才能具有相对较高的吞吐量。若是是单通道,网络出流量与入流量之和的上限理论值是1.25GB/s;若是是双工双通道,网络出入流量理论值均可以达到1.25GB/s。

网络隔离打标:因为一个机房可能既部署有离线集群(好比HBase、Spark、Hadoop等)又部署有实时集群(如Kafka)。那么实时集群和离线集群挂载到同一个交换机下的服务器将出现竞争网络带宽的问题,离线集群可能对实时集群形成影响。因此咱们须要进行交换机层面的隔离,让离线机器和实时集群不要挂载到相同的交换机下。即便有挂载到相同交换机下面的,咱们也将进行网络通行优先级(金、银、铜、铁)标记,当网络带宽紧张的时候,让实时业务优先通行。

CPU

Kafka的瓶颈不在CPU,单节点通常有32核的CPU都足够使用。

1.13.平台化

如今问题来了,前面咱们提到不少监控、优化等手段;难道咱们管理员或者业务用户对集群全部的操做都须要登陆集群服务器吗?答案固然是否认的,咱们须要丰富的平台化功能来支持。一方面是为了提高咱们操做的效率,另一方面也是为了提高集群的稳定和下降出错的可能。

配置管理

黑屏操做,每次修改broker的server.properties配置文件都没有变动记录可追溯,有时可能由于有人修改了集群配置致使一些故障,却找不到相关记录。若是咱们把配置管理作到平台上,每次变动都有迹可循,同时下降了变动出错的风险。

滚动重启

当咱们须要作线上变动时,有时候须要对集群对多个节点作滚动重启,若是到命令行去操做,那效率将变得很低,并且须要人工去干预,浪费人力。这个时候咱们就须要把这种重复性的工做进行平台化,提高咱们的操做效率。

集群管理

集群管理主要是把原来在命令行的一系列操做作到平台上,用户和管理员再也不须要黑屏操做Kafka集群;这样作主要有如下优势:

提高操做效率;

操做出错几率更小,集群更安全;

全部操做有迹可循,能够追溯;

集群管理主要包括:broker管理、topic管理、生产/消费权限管理、用户管理等

1.13.1 mock功能

在平台上为用户的topic提供生产样例数据与消费抽样的功能,用户能够不用本身写代码也能够测试topic是否可使用,权限是否正常;

在平台上为用户的topic提供生产/消费权限验证功能,让用户能够明确本身的帐号对某个topic有没有读写权限;

1.13.2 权限管理

把用户读/写权限管理等相关操做进行平台化。

1.13.3 扩容/缩容

把broker节点上下线作到平台上,全部的上线和下线节点再也不须要操做命令行。

1.13.4 集群治理

1)无流量topic的治理,对集群中无流量topic进行清理,减小过多无用元数据对集群形成的压力;

2)topic分区数据大小治理,把topic分区数据量过大的topic(如单分区数据量超过100GB/天)进行梳理,看看是否须要扩容,避免数据集中在集群部分节点上;

3)topic分区数据倾斜治理,避免客户端在生产消息的时候,指定消息的key,可是key过于集中,消息只集中分布在部分分区,致使数据倾斜;

4)topic分区分散性治理,让topic分区分布在集群尽量多的broker上,这样能够避免因topic流量突增,流量只集中到少数节点上的风险,也能够避免某个broker异常对topic影响很是大;

5)topic分区消费延时治理;通常有延时消费较多的时候有两种状况,一种是集群性能降低,另一种是业务方的消费并发度不够,若是是消费者并发不够的化应该与业务联系增长消费并发。

1.13.5 监控告警

1)把全部指标采集作成平台可配置,提供统一的指标采集和指标展现及告警平台,实现一体化监控;

2)把上下游业务进行关联,作成全链路监控;

3)用户能够配置topic或者分区流量延时、突变等监控告警;

1.13.6 业务大屏

业务大屏主要指标:集群个数、节点个数、日入流量大小、日入流量记录、日出流量大小、日出流量记录、每秒入流量大小、每秒入流量记录、每秒出流量大小、每秒出流量记录、用户个数、生产延时、消费延时、数据可靠性、服务可用性、数据存储大小、资源组个数、topic个数、分区个数、副本个数、消费组个数等指标。

1.13.7 流量限制

把用户流量如今作到平台,在平台进行智能限流处理。

1.13.8 负载均衡

把自动负载均衡功能作到平台,经过平台进行调度和管理。

1.13.9 资源预算

当集群达到必定规模,流量不断增加,那么集群扩容机器从哪里来呢?业务的资源预算,让集群里面的多个业务根据本身在集群中当流量去分摊整个集群的硬件成本;固然,独立集群与独立隔离的资源组,预算方式能够单独计算。

1.14.性能评估

1.14.1 单broker性能评估

咱们作单broker性能评估的目的包括如下几方面:

1)为咱们进行资源申请评估提供依据;

2)让咱们更了解集群的读写能力及瓶颈在哪里,针对瓶颈进行优化;

3)为咱们限流阈值设置提供依据;

4)为咱们评估何时应该扩容提供依据;

1.14.2 topic分区性能评估

1)为咱们建立topic时,评估应该指定多少分区合理提供依据;

2)为咱们topic的分区扩容评估提供依据;

1.14.3 单磁盘性能评估

1)为咱们了解磁盘的真正读写能力,为咱们选择更合适Kafka的磁盘类型提供依据;

2)为咱们作磁盘流量告警阈值设置提供依据;

1.14.4 集群规模限制摸底

1)咱们须要了解单个集群规模的上限或者是元数据规模的上限,探索相关信息对集群性能和稳定性的影响;

2)根据摸底状况,评估集群节点规模的合理范围,及时预测风险,进行超大集群的拆分等工做;

1.15 DNS+LVS的网络架构

当咱们的集群节点达到必定规模,好比单集群数百个broker节点,那么此时咱们生产消费客户端指定bootstrap.servers配置时,若是指定呢?是随便选择其中几个broker配置仍是所有都配上呢?

其实以上作法都不合适,若是只配置几个IP,当咱们配置当几个broker节点下线后,咱们当应用将没法链接到Kafka集群;若是配置全部IP,那更不现实啦,几百个IP,那么咱们应该怎么作呢?

方案:采用DNS+LVS网络架构,最终生产者和消费者客户端只须要配置域名就能够啦。须要注意的是,有新节点加入集群时,须要添加映射;有节点下线时,须要从映射中踢掉,不然这批机器若是拿到其余的地方去使用,若是端口和Kafka的同样的话,原来集群部分请求将发送到这个已经下线的服务器上来,形成生产环境重点故障。

2、开源版本功能缺陷

RTMP协议主要的特色有:多路复用,分包和应用层协议。如下将对这些特色进行详细的描述。

2.1 副本迁移

没法实现增量迁移;【咱们已经基于2.1.1版本源码改造,实现了增量迁移】

没法实现并发迁移;【开源版本直到2.6.0才实现了并发迁移】

没法实现终止迁移;【咱们已经基于2.1.1版本源码改造,实现了终止副本迁移】【开源版本直到2.6.0才实现了暂停迁移,和终止迁移有些不同,不会回滚元数据】

当指定迁移数据目录时,迁移过程当中,若是把topic保留时间改短,topic保留时间针对正在迁移topic分区不生效,topic分区过时数据没法删除;【开源版本bug,目前尚未修复】

当指定迁移数据目录时,当迁移计划为如下场景时,整个迁移任务没法完成迁移,一直处于卡死状态;【开源版本bug,目前尚未修复】

迁移过程当中,若是有重启broker节点,那个broker节点上的全部leader分区没法切换回来,致使节点流量所有转移到其余节点,直到全部副本被迁移完毕后leader才会切换回来;【开源版本bug,目前尚未修复】。

在原生的Kafka版本中存在如下指定数据目录场景没法迁移完毕的状况,此版本咱们也不决定修复次bug:
 
1.针对同一个topic分区,若是部分目标副本相比原副本是所属broker发生变化,部分目标副本相比原副本是broker内部所属数据目录发生变化;
那么副本所属broker发生变化的那个目标副本能够正常迁移完毕,目标副本是在broker内部数据目录发生变化的没法正常完成迁移;
可是旧副本依然能够正常提供生产、消费服务,而且不影响下一次迁移任务的提交,下一次迁移任务只须要把此topic分区的副本列表所属broker列表变动后提交依然能够正常完成迁移,而且能够清理掉以前未完成的目标副本;
 
这里假设topic yyj1的初始化副本分布状况以下:
 
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000003,1000001],"log_dirs":["/kfk211data/data31","/kfk211data/data13"]}
]
}
//迁移场景1:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000003,1000002],"log_dirs":["/kfk211data/data32","/kfk211data/data23"]}
]
}
 
//迁移场景2:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000002,1000001],"log_dirs":["/kfk211data/data22","/kfk211data/data13"]}
]
}
针对上述的topic yyj1的分布分布状况,此时若是咱们的迁移计划为“迁移场景1”或迁移场景2“,那么都将出现有副本没法迁移完毕的状况。
可是这并不影响旧副本处理生产、消费请求,而且咱们能够正常提交其余的迁移任务。
为了清理旧的未迁移完成的副本,咱们只须要修改一次迁移计划【新的目标副本列表和当前分区已分配副本列表彻底不一样便可】,再次提交迁移便可。
 
这里,咱们依然以上述的例子作迁移计划修改以下:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000004,1000005],"log_dirs":["/kfk211data/data42","/kfk211data/data53"]}
]
}
这样咱们就能够正常完成迁移。

2.2 流量协议

限流粒度较粗,不够灵活精准,不够智能。

当前限流维度组合

/config/users/<user>/clients/<client-id>
/config/users/<user>/clients/<default>
/config/users/<user>
/config/users/<default>/clients/<client-id>
/config/users/<default>/clients/<default>
/config/users/<default>
/config/clients/<client-id>
/config/clients/<default>

存在问题

当同一个broker上有多个用户同时进行大量的生产和消费时,想要让broker能够正常运行,那必须在作限流时让全部的用户流量阈值之和不超过broker的吞吐上限;若是超过broker上限,那么broker就存在被打挂的风险;然而,即便用户流量没有达到broker的流量上限,可是,若是全部用户流量集中到了某几块盘上,超过了磁盘的读写负载,也会致使全部生产、消费请求将被阻塞,broker可能处于假死状态。

解决方案

(1)改造源码,实现单个broker流量上限限制,只要流量达到broker上限当即进行限流处理,全部往这个broker写的用户均可以被限制住;或者对用户进行优先级处理,放太高优先级的,限制低优先级的;

(2)改造源码,实现broker上单块磁盘流量上限限制(不少时候都是流量集中到某几块磁盘上,致使没有达到broker流量上限却超过了单磁盘读写能力上限),只要磁盘流量达到上限,当即进行限流处理,全部往这个磁盘写的用户均可以被限制住;或者对用户进行优先级处理,放太高优先级的,限制低优先级的;

(3)改造源码,实现topic维度限流以及对topic分区的禁写功能;

(4)改造源码,实现用户、broker、磁盘、topic等维度组合精准限流;

3、kafka发展趋势

3.1 Kafka社区迭代计划

3.2 逐步弃用ZooKeeper(KIP-500)

3.3 controller与broker分离,引入raft协议做为controller的仲裁机制(KIP-630)

3.4 分层存储(KIP-405)

3.5 能够减小topic分区(KIP-694)

3.6 MirrorMaker2精确一次(KIP-656)

3.7 下载与各版本特性说明

3.8 Kafka全部KIP地址

4、如何贡献社区

4.1 哪些点能够贡献

http://kafka.apache.org/contributing

4.2 wiki贡献地址

https://cwiki.apache.org/confluence/dashboard.action#all-updates

4.3 issues地址

1)https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-10444?filter=allopenissues

2)https://issues.apache.org/jira/secure/BrowseProjects.jspa?selectedCategory=all

4.4 主要committers

http://kafka.apache.org/committers

做者:vivo互联网服务器团队-Yang Yijun
相关文章
相关标签/搜索