DataPipeline |ApacheKafka实战做者胡夕:Apache Kafka监控与调优

https://baijiahao.baidu.com/s?id=1610644333184173190&wfr=spider&for=pc前端

 

DataPipeline |ApacheKafka实战做者胡夕:Apache Kafka监控与调优

DataPipelinejava

18-09-0412:13
胡夕

《Apache Kafka实战》做者,北航计算机硕士毕业,现任某互金公司计算平台总监,曾就任于IBM、搜狗、微博等公司。国内活跃的Kafka代码贡献者。后端

 

前言缓存

虽然目前Apache Kafka已经全面进化成一个流处理平台,但大多数的用户依然使用的是其核心功能:消息队列。对于如何有效地监控和调优Kafka是一个大话题,不少用户都有这样的困扰,今天咱们就来讨论一下。服务器

 

1、Kafka综述网络

在讨论具体的监控与调优以前,我想用一张PPT图来简单说明一下当前Kafka生态系统的各个组件。就像我前面所说,Kafka目前已经进化成了一个流处理平台,除了核心的消息队列组件Kafka core以外,社区还新引入Kafka Connect和Kafka Streams两个新的组件:其中前者负责Kafka与外部系统的数据传输;后者则负责对数据进行实时流处理计算。下图罗列了一些关键的Kafka概念。数据结构

 

2、Kafka监控负载均衡

 

我打算从五个维度来讨论Kafka的监控。首先是要监控Kafka集群所在的主机;第二是监控Kafka broker JVM的表现;第三点,咱们要监控Kafka Broker的性能;第四,咱们要监控Kafka客户端的性能。这里的所指的是广义的客户端——多是指咱们本身编写的生产者、消费者,也有多是社区帮咱们提供的生产者、消费者,好比说Connect的Sink/Source或Streams等;最后咱们须要监控服务器之间的交互行为。框架

1.主机监控运维

我的认为对于主机的监控是最重要的。由于不少线上环境问题首先表现出来的症状就是主机的某些性能出现了明显的问题。此时一般是运维人员首先发现了它们而后告诉咱们这台机器有什么问题,对于Kafka主机监控一般是发现问题的第一步。这一页列出了常见的指标,包括CPU、内存、带宽等数据。须要注意的是CPU使用率的统计。可能你们听过这样的提法:个人Kafka Broker CPU使用率是400%,怎么回事?对于这样的问题,咱们首先要搞清楚这个使用率是怎么观测出来的? 不少人拿top命令中的vss或rss字段来表征CPU使用率,但实际上它们并非真正的CPU使用率——那只是全部CPU共同做用于Kafka进程所花的时间片的比例。举个例子,若是机器上有16个CPU,那么只要这些值没有超过或接近1600, 那么你的CPU使用率其实是很低的。所以要正确理解这些命令中各个字段的含义。

这页PPT右边给出了一本书,若是你们想监控主机性能的话,我我的建议这本《SystemsPerformance》就足够了。很是权威的一本书,推荐你们读一下。

2.监控JVM

Kafka自己是一个普通的Java进程,因此任何适用于JVM监控的方法对于监控Kafka都是相通的。第一步就是要先了解Kafka应用。比方说了解Kafka broker JVM的GC频率和延时都是多少,每次GC后存活对象的大小是怎样的等。了解了这些信息咱们才能明确后面调优的方向。固然,咱们毕竟不是特别资深的JVM专家,所以也没必要过多追求繁复的JVM监控与调优。只须要关注大的方面便可。另外,若是你们时间颇有限但又想快速掌握JVM监控与调优,推荐阅读《Java Performance》。

3.Per-Broker监控

首先要确保Broker进程是启动状态?这听起来好像有点搞笑,但个人确遇到过这样的状况。好比当把Kafka部署在Docker上时就容易出现进程启动但服务没有成功启动的情形。正常启动下,一个Kafka服务器起来的时候,应该有两个端口,一个端口是9092常规端口,会建一个TCP连接。还有一个端口是给JMX监控用的。固然有多台broker的话,那么controller机器会为每台broker都维护一个TCP链接。在实际监控时能够有意识地验证这一点。

对于Broker的监控,咱们主要是经过JMS指标来作的。用过Kafka的人知道,Kafka社区提供了特别多的JMS指标,其中不少指标用处不大。我这里列了一些比较重要的:首先是broker机器每秒出入的字节数,就是相似于我能够监控网卡的流量,必定要把这个指标监控起来,并实时与你的网卡带宽进行比较——若是发现该值很是接近于带宽的话,就证实broker负载太高,要么增长新的broker机器,要么把该broker上的负载均衡到其余机器上。

另外还有两个线程池空闲使用率小关注,最好确保它们的值都不要低于30%,不然说明Broker已经很是的繁忙。 此时须要调整线程池线程数。

接下来是监控broker服务器的日志。日志中包含了很是丰富的信息。这里所说的日志不只是broker服务器的日志,还包括Kafka controller的日志。咱们须要常常性地查看日志中是否出现了OOM错误抑或是时刻关注日志中抛出的ERROR信息。

咱们还须要监控一些关键后台线程的运行状态。我的认为有两个比较重要的线程须要监控:一个Log Cleaner线程——该线程是执行数据压实操做的,若是该线程出问题了,用户一般没法感知到,而后会发现全部compact策略的topic会愈来愈大直到占满全部磁盘空间;另外一个线程就是副本拉取线程,即follower broker使用该线程实时从leader处拉取数据。若是该线程“挂掉”了,用户一般也是不知道的,但会发现follower再也不拉取数据了。所以咱们必定要按期地查看这两个线程的状态,若是发现它们意味终止,则去找日志中寻找对应的报错信息。

4.Clients监控

客户端监控这块,我这边会分为两个,分别讨论对生产者和消费者的监控。生产者往Kafka发消息,在监控以前咱们至少要了解一下客户端机器与Broker端机器之间的RTT是多少。对于那种跨数据中心或者是异地的状况来讲,RTT原本就很大,若是不作特殊的调优,是不可能有过高的TPS的。目前Kafka producer是双线程的设计机制,分为用户主线程和Sender线程,当这个Sender线程挂了的时候,前端用户是不感知的,但表现为producer发送消息失败,因此用户最好监控一下这个Sender线程的状态。

还有就是监控PRODUCE请求的处理延时。一条消息从生产者端发送到Kafka broker进行处理,以后返回给producer的总时间。整个链路中各个环节的耗时最好要作到心中有数。由于不少状况下,若是你要提高生产者的TPS,了解整个链路中的瓶颈后才能作到有的放矢。后面PPT中我会讨论如何拆解这条链路。

如今说说消费者。这里的消费者说的是新版本的消费者,也就是java consumer。

社区已经很是不推荐再继续使用老版本的消费者了。新版本的消费者也是双线程设计,后面有一个心跳线程,若是这个线程挂掉的话,前台线程是不知情的。因此,用户最好按期监控该心跳线程的存活状况。心跳线程按期发心跳请求给Kafka服务器,告诉Kafka,这个消费者实例还活着,以免coordinator错误地认为此实例已“死掉”从而开启rebalance。Kafka提供了不少的JMX指标能够用于监控消费者,最重要的消费进度滞后监控,也就是所谓的consumerlag。

假设producer生产了100条消息,消费者读取了80条,那么lag就是20。显然落后的越少越好,这代表消费者很是及时,用户也能够用工具行命令来查lag,甚至写Java的API来查。与lag对应的还有一个lead指标,它表征的是消费者领先第一条消息的进度。好比最先的消费位移是1,若是消费者当前消费的消息是10,那么lead就是9。对于lead而言越大越好,不然代表此消费者可能处于停顿状态或者消费的很是慢,本质上lead和lag是一回事,之因此列出来是由于lead指标是我开发的,也算打个广告吧。

除了以上这些,咱们还须要监控消费者组的分区分配状况,避免出现某个实例被分配了过多的分区,致使负载严重不平衡的状况出现。通常来讲,若是组内全部消费者订阅的是相同的主题,那么一般不会出现明显的分配倾斜。一旦各个实例订阅的主题不相同且每一个主题分区数良莠不齐时就极易发生这种不平衡的状况。Kafka目前提供了3种策略来帮助用户完成分区分配,最新的策略是黏性分配策略,它能保证绝对的公平,你们能够去试一下。

最后就是要监控rebalance的时间——目前来看,组内超多实例的rebalance性能不好,可能都是小时级别的。并且比较悲剧的是当前无较好的解决方案。因此,若是你的Consumer特别特别多的话,必定会有这个问题,你监控一下两个步骤所用的时间,看看是否知足需求,若是不能知足的话,看看能不能把消费者去除,尽可能减小消费者数量。

5.Inter-Broker监控

最后一个维度就是监控Broker之间的表现,主要是指副本拉取。Follower副本实时拉取leader处的数据,咱们天然但愿这个拉取过程越快越好。Kafka提供了一个特别重要的JMX指标,叫作备份不足的分区数,好比说我规定了这条消息,应该在三个Broker上面保存,假设只有一个或者两个Broker上保存该消息,那么这条消息所在的分区就被称为“备份不足”的分区。这种状况是特别关注的,由于有可能形成数据的丢失。《Kafka权威指南》一书中是这样说的:若是你只能监控一个Kafka JMX指标,那么就监控这个好了,确保在你的Kafka集群中该值是永远是0。一旦出现大于0的情形赶忙处理。

还有一个比较重要的指标是表征controller个数的。整个集群中应该确保只能有一台机器的指标是1,其余全应该是0,若是你发现有一台机器是2或者是3,必定是出现脑裂了,此时应该去检查下是否出现了网络分区。Kafka自己是不能对抗脑裂的,彻底依靠Zookeeper来作,可是若是真正出现网络分区的话,也是没有办法处理的,不如赶快fail fast掉。

 

3、监控工具

当前没有一款Kafka监控工具是公认比较优秀的,每一个都有本身的特色但也有些致命的缺陷。咱们针对一些常见的监控工具逐个讨论下。

1.Kafka Manager

应该说在全部免费的监控框架中,Kafka Manager是最受欢迎的。它最先由雅虎开源,功能很是齐全,展现的数据很是丰富。另外,用户可以在界面上执行一些简单的集群管理操做。更加使人欣慰的是,该框架目前还在不断维护中,所以使用Kafka manager来监控Kafka是一个不错的选择。

2.Burrow

Burrow是去年下半年开源,专门监控消费者信息的框架。这个框架刚开始开源的时候,我还对它仍是寄予厚望的,毕竟是Kafka社区committer亲自编写的。不过Burrow的问题在于没有UI界面,不方便运维操做。另外因为是Go语言写的,你要用的话,必须搭建Go语言环境,而后编译部署,总之用起来不是很方便。还有就是它的更新不是很频繁,已经有点半荒废的状态,你们不妨一试。

3.Kafka Monitor

严格来讲,它不是监控工具,它是专门作Kafka集群系统性测试用的。待监控的指标能够由用户本身设定,主要是作一些端到端的测试。好比说你搭了一套Kafka集群,想测试端到端的性能怎样:从发消息到消费者读取消息这一总体流程的性能。该框架的优点也是由Kafka社区团队写的,质量有保障,但更新不是很频繁,目前好像几个月没有更新了。

4.Kafka Offset Monitor

KafkaOffsetMonitor是我用的最先的一个Kafka监控工具,也是监控消费者位移,只不过那时候Kafka把位移保持在Zookeepr上。这个框架的界面很是漂亮,国内用的人不少。可是如今有一个问题,由于咱们如今用了新版本的消费者,这个框架目前支持得的并非特别好。并且还有一个问题就是它已经再也不维护了,可能有1-2年没有任何更新了。

5.Kafka Eagle

这是国人本身开发的,我不知道具体是哪一个大牛开发的,可是在Kafka QQ群里面不少人推崇,由于界面很干净漂亮,上面有很好的数据展示。

6.Confluent Control Center

Control Center是目前我能收集到的功能最齐全的Kafka监控框架了,只不过只有购买了Confluent企业版也有的,也就是说是付费的。

综合来说,若是你是Kafka集群运维操做人员,推荐先用Kafka Manager来作监控,后面再根据实际监控需求定制化开发特有的工具或框架。

 

4、系统调优

Kafka监控的一个主要的目的就是调优Kafka集群。这里罗列了一些常见的操做系统级的调优。

首先是保证页缓存的大小——至少要设置页缓存为一个日志段的大小。咱们知道Kafka大量使用页缓存,只要保证页缓存足够大,那么消费者读取消息时就有大几率保证它可以直接命中页缓存中的数据而无需从底层磁盘中读取。故只要保证页缓存要知足一个日志段的大小。

第二是调优文件打开数。不少人对这个资源有点畏手畏脚。实际上这是一个很廉价的资源,设置一个比较大的初始值一般都是没有什么问题的。

第三是调优vm.max_map_count参数。主要适用于Kafka broker上的主题数超多的状况。Kafka日志段的索引文件是用映射文件的机制来作的,故若是有超多日志段的话,这种索引文件数必然是不少的,极易打爆这个资源限制,因此对于这种状况通常要适当调大这个参数。

第四是swap的设置。不少文章说把这个值设为0,就是彻底禁止swap,我我的不建议这样,由于当你设置成为0的时候,一旦你的内存耗尽了,Linux会自动开启OOM killer而后随机找一个进程杀掉。这并非咱们但愿的处理结果。相反,我建议设置该值为一个比较接近零的较小值,这样当个人内存快要耗尽的时候会尝试开启一小部分swap,虽然会致使broker变得很是慢,但至少给了用户发现问题并处理之的机会。

第五JVM堆大小。首先鉴于目前Kafka新版本已经不支持Java7了,而Java 8自己不更新了,甚至Java9其实都不作了,直接作Java10了,因此我建议Kafka至少搭配Java8来搭建。至于堆的大小,我的认为6-10G足矣。若是出现了堆溢出,就提jira给社区,让他们看究竟是怎样的问题。由于这种状况下即便用户调大heap size,也只是延缓OOM而已,不太可能从根本上解决问题。

最后,建议使用专属的多块磁盘来搭建Kafka集群。自1.1版本起Kafka正式支持JBOD,所以不必在底层再使用一套RAID了。

 

5、Kafka调优的四个层面

Kafka调优一般能够从4个维度展开,分别是吞吐量、延迟、持久性和可用性。在具体展开这些方面以前,我想先建议用户保证客户端与服务器端版本一致。若是版本不一致,就会出现向下转化的问题。举个例子,服务器端保存高版本的消息,当低版本消费者请求数据时,服务器端就要作转化,先把高版本消息转成低版本再发送给消费者。这件事情自己就很是很是低效。不少文章都讨论过Kafka速度快的缘由,其中就谈到了零拷贝技术——即数据不须要在页缓存和堆缓存中来回拷贝。

简单来讲producer把生产的消息放到页缓存上,若是两边版本一致,能够直接把此消息推给Consumer,或者Consumer直接拉取,这个过程是不须要把消息再放到堆缓存。可是你要作向下转化或者版本不一致的话,就要额外把数据再堆上,而后再放回到Consumer上,速度特别慢。

1.Kafka调优 – 吞吐量

调优吞吐量就是咱们想用更短的时间作更多的事情。这里列出了客户端须要调整的参数。前面说过了producer是把消息放在缓存区,后端Sender线程从缓存区拿出来发到broker。这里面涉及到一个打包的过程,它是批处理的操做,不是一条一条发送的。所以这个包的大小就和TPS息息相关。一般状况下调大这个值都会让TPS提高,可是也不会无限制的增长。不过调高此值的劣处在于消息延迟的增长。除了调整batch.size,设置压缩也能够提高TPS,它可以减小网络传输IO。当前Lz4的压缩效果是最好的,若是客户端机器CPU资源很充足那么建议开启压缩。

对于消费者端而言,调优TPS并无太好的办法,可以想到的就是调整fetch.min.bytes。适当地增长该参数的值可以提高consumer端的TPS。对于Broker端而言,一般的瓶颈在于副本拉取消息时间过长,所以能够适当地增长num.replica.fetcher值,利用多个线程同时拉取数据,能够加快这一进程。

 

2.Kafka调优 – 延时

所谓的延时就是指消息被处理的时间。某些状况下咱们天然是但愿越快越好。针对这方面的调优,consumer端能作的很少,简单保持fetch.min.bytes默认值便可,这样能够保证consumer可以当即返回读取到的数据。讲到这里,可能有人会有这样的疑问:TPS和延时不是一回事吗?假设发一条消息延时是2ms,TPS天然就是500了,由于一秒只能发500消息,其实这二者关系并非简单的。由于我发一条消息2毫秒,可是若是把消息缓存起来统一发,TPS会提高不少。假设发一条消息依然是2ms,可是我先等8毫秒,在这8毫秒以内可能能收集到一万条消息,而后我再发。至关于你在10毫秒内发了一万条消息,你们能够算一下TPS是多少。事实上,Kafka producer在设计上就是这样的实现原理。

3.Kafka调优 –消息持久性

消息持久化本质上就是消息不丢失。Kafka对消息不丢失的承诺是有条件的。之前碰到不少人说我给Kafka发消息,发送失败,消息丢失了,怎么办?严格来讲Kafka不认为这种状况属于消息丢失,由于此时消息没有放到Kafka里面。Kafka只对已经提交的消息作有条件的不丢失保障。

若是要调优持久性,对于producer而言,首先要设置重试以防止由于网络出现瞬时抖动形成消息发送失败。一旦开启了重试,还须要防止乱序的问题。好比说我发送消息1与2,消息2发送成功,消息1发送失败重试,这样消息1就在消息2以后进入Kafka,也就是形成乱序了。若是用户不容许出现这样的状况,那么还须要显式地设置max.in.flight.requests.per.connection为1。

本页PPT列出的其余参数都是很常规的参数,好比unclean.leader.election.enable参数,最好仍是将其设置成false,即不容许“脏”副本被选举为leader。

4.Kafka调优 –可用性

最后是可用性,与刚才的持久性是相反的,我容许消息丢失,只要保证系统高可用性便可。所以我须要把consumer心跳超时设置为一个比较小的值,若是给定时间内消费者没有处理完消息,该实例可能就被踢出消费者组。我想要其余消费者更快地知道这个决定,所以调小这个参数的值。

 

6、定位性能瓶颈

下面就是性能瓶颈,严格来讲这不是调优,这是解决性能问题。对于生产者来讲,若是要定位发送消息的瓶颈很慢,咱们须要拆解发送过程当中的各个步骤。就像这张图表示的那样,消息的发送共有6步。第一步就是生产者把消息放到Broker,第2、三步就是Broker把消息拿到以后,写到本地磁盘上,第四步是follower broker从Leader拉取消息,第五步是建立response;第六步是发送回去,告诉我已经处理完了。

这六步当中你须要肯定瓶颈在哪?怎么肯定?——经过不一样的JMX指标。好比说步骤1是慢的,可能你常常碰到超时,你若是在日志里面常常碰到request timeout,就表示1是很慢的,此时要适当增长超时的时间。若是二、3慢的状况下,则可能体如今磁盘IO很是高,致使往磁盘上写数据很是慢。假若是步骤4慢的话,查看名为remote-time的JMX指标,此时能够增长fetcher线程的数量。若是5慢的话,表现为response在队列致使待的时间过长,这时能够增长网络线程池的大小。6与1是同样的,若是你发现一、6常常出问题的话,查一下你的网络。因此,就这样来分解整个的耗时。这是到底哪一步的瓶颈在哪,须要看看什么样的指标,作怎样的调优。

 

7、Java Consumer的调优

最后说一下Consumer的调优。目前消费者有两种使用方式,一种是同一个线程里面就直接处理,另外一种是我采用单独的线程,consumer线程只是作获取消息,消息真正的处理逻辑放到单独的线程池中作。这两种方式有不一样的使用场景:第一种方法实现较简单,由于你的消息处理逻辑直接写在一个线程里面就能够了,可是它的缺陷在于TPS可能不会很高,特别是当你的客户端的机器很是强的时候,你用单线程处理的时候是很慢的,由于你没有充分利用线程上的CPU资源。第二种方法的优点是可以充分利用底层服务器的硬件资源,TPS能够作的很高,可是处理提交位移将会很难。

最后说一下参数,也是网上问的最多的,这几个参数究竟是作什么的。第一个参数,就是控制consumer单次处理消息的最大时间。好比说设定的是600s,那么consumer给你10分钟来处理。若是10分钟内consumer没法处理完成,那么coordinator就会认为此consumer已死,从而开启rebalance。

Coordinator是用来管理消费者组的协调者,协调者如何在有效的时间内,把消费者实例挂掉的消息传递给其余消费者,就靠心跳请求,所以能够设置heartbeat.interval.ms为一个较小的值,好比5s。

 

8、Q & A

Q1:胡老师在前面提到低版本与高版本有一个端口的问题,我想问一下高版本的、低版本的会有这个问题吗?

A1:会有。

 

Q2:两种模式,一个是Consumer怎么作到全部的partition,在里面作管理的。会有一个问题,某个Consumer的消费比较慢,由于全部的Partition的消费都是绑定在一个线程。一个消费比较慢,一个消费比较快,要等另外一个。有没有一种方案,消费者比较慢的能够暂定,若是涉及到暂停的话,频繁的暂定耗费的时间,是否是会比较慢?

A2:一个线程处理全部的分区。若是从开销来说并不大,可是的确会出现像你说的,若是一个消费者定了100个分区,目前我这边看到的效果,某段时间内有可能会形成某些分区的饿死,好比说某些分区长期得不到数据,可能有一些分区不停的有数据,这种状况下的确有可能状况。可是你说的两种方法自己开销不是很大,由于它就是内存当中的结构变动,就是定位信息,若是segment,就把定位信息先暂时关掉,不涉及到很复杂的数据结构的变动。

 

Q3:怎么决定顺序呢?

A3:这个事情如今在Broker端作的,简单会作轮询,好比说有100个分区,第一批随机给你一批分区,以后这些分区会排到整个队列的末尾,从其余的分区开始给你,作到尽可能的公平。

 

Q4:消费的时候会出现数据倾斜的状况,这块如何理解?

A4:数据倾斜。这种状况下发生在每一个消费者订阅信息不同的状况下,特别容易出现数据倾斜。好比说我订阅主题123,我订阅主题456,咱们又在同一个组里面这些主题分区数极不相同,颇有可能出现我订阅了10个分区,你可能订阅2个分区。若是你用的是有粘性的分配策略,那种保证不会出现超过两个以上相差的状况。这个策略推出的时间也不算短了,是0.11版本推出来的。