关于Kafka区分请求处理优先级的讨论

全部的讨论都是基于KIP-291展开的。抱歉,这又是一篇没有图的文字。apache


目前Kafka broker对全部发过来的请求都是一视同仁的,不会区别对待。不论是用于生产消费的PRODUCE和FETCH请求,仍是controller端发送的LeaderAndIsr/StopReplica/UpdateMetadata请求,亦或是其余类型的请求也是同样。一般咱们这里把PRODUCE/FETCH请求称为数据类请求;把controller发送的那3种请求称为控制类请求或controller类请求——在源码中前者被称为data plane request,后者称为controller plane request。缓存

这种公平处理原则在不少场合下都是不合理的。为何?简单来讲控制类请求具备直接令数据类请求失效的能力。举个例子,若是我有个topic,单分区双副本,其中broker0上保存leader副本,broker1上保存follower副本。当broker0上积压了大量的PRODUCE请求时,此时用户执行了重分区或preferred分区选举将broker1变动成了leader,那么controller会向broker0发送LeaderAndIsr请求告诉它如今是一个follower了,而broker1上的follower已经中止向leader拉取数据(由于它要成为leader了)——此时一个比较尴尬的情形出现了:若是producer的acks设置的是all,那么这些在LeaderAndIsr请求以前积压的PRODUCE请求就没法正常完成——要么一直缓存在purtagory中要么请求超时返回给client。设想一下,若是Kafka可以及时地处理LeaderAndIsr请求,那么这些积压的PRODUCE请求就能当即失败(NOT_LEADER_FOR_PARTITION),立刻返回给client。Client不用等到 purgatory中的请求超时,下降了请求的处理时间。即便acks不是all,纵然积压的PRODUCE请求写入本地日志后成功返回,但处理过LeaderAndIsr请求后broker0上副本变为follower,还要执行截断(truncation),所以在client看来这些消息就丢失了。安全

再举一个例子,一样是在积压大量数据类请求的broker上,若是用户删除了topic,那么StopReplica请求没法及时处理,致使topic没法真正删除,增长了删除topic的延时。网络

最后还能够举个例子说明对UpdateMetadata的影响。若是UpdateMetadata不能及时处理,broker上保存的就是过时的元数据,当client获取到这些数据时,不论是producer仍是consumer均可能没法正常工做,直到获取到最新的元数据信息。fetch

经过上面3个例子能够看出一般状况下咱们但愿controller类请求的处理优先级要高于数据类请求,这也是社区作KIP-291的初衷 。可喜的是Kafka 2.2正式实现了这个功能,下面咱们来看看社区是怎么作的:线程

其实在KIP-291以前,我也思考过这个问题。当时我提出的想法是这样的:在broker的KafkaRequestHandlerPool中实现一个优先级队列,当controller类请求到达时,它可以”抢占式“地排在处理队列的最前部——这是很天然的想法,因此我本觉得KIP-291也是这么实现的,但通篇看下来我尴尬地发现我这个解决思路记录在“Rejected Alternatives"中。这个方案最大的问题在于它没法处理队列已满的情形,即当处理队列已经没法容纳任何新的请求时该如何支持优先处理controller类请求?纵然有优先级队列也没法解决这个问题。scala

KIP-291是怎么解决的呢?很简单,Kafka从新为controller类请求作了专属的监听器+请求队列+acceptor+processor线程。监听器经过Kafka的listeners和advertised.listeners设置,新的请求队列则专门保存controller类请求,而acceptor和processor线程负责接收网络发送过来的以及处理队列中的controller类请求。咱们一个一个说吧。设计

当前,用户能够在listeners中指定多套监听器,好比PLAINTEXT://kafka1:9092, SSL://kafka1:9093。你其实也能够自定义你的监听器,好比INTERNAL://kafka1:9094。用户能够指定broker端参数inter.broker.listener.name或security.inter.broker.protocol(两个不能同时指定)来设定,同时你还须要在listener.security.protocol.map中指定这个自定义listener使用的安全协议,好比: listener.security.protocol.map=INTERNAL:PLAINTEXT。KIP-291复用了这个设计,若是你设置了inter.broker.listener.name或security.inter.broker.protocol,Kafka会默认使用这个listener专属服务controller类请求。同时社区还引入了一个新的参数:control.plane.listener.name,用来专门让你设置服务controller类请求的监听器名称。这个参数的优先级要高于前面那两个参数,所以仍是推荐用户直接设置此参数,好比设置control.plane.listener.name=CONTROLLER,同时更新listener.security.protocol.map,增长CONTROLLER:PLAINTEXT匹配对(假设你用的是PLAINTEXT)。这就是为controller类请求建立监听器的方法。日志

下面说请求队列和acceptor、processor线程。 其实也不用细说,和现有的设计如出一辙,只是默认的队列大小再也不是500,而是20,默认的线程数再也不是8而是2,由于咱们假设controller类请求一般不该该有积压。具体的实现原理有兴趣的话直接读KafkaRequestHandlerPool.scala、RequestChannel.scala和SocketServer.scala源码吧。还须要修改的地方是controller代码,特别是在ControllerChannelManager.scala中增长新的broker时必定要使用controller类请求专属的监听器。队列

除了以上这些,该KIP也引入了不少监控controller类请求处理的JMX指标,如队列请求数、线程空闲程度等,这些和以前的指标都是同样的,只是仅监控controller plane监听器之用。再说一点,当前Kafka支持动态地调整请求处理线程数。在对请求进行区分处理后,我估计后续也要支持对controller类请求线程数的动态调整吧。

整体来讲,将请求作区分处理后对于繁忙Kafka集群将可以更迅速地处理控制类请求,表现为状态的更新更加及时,集群不一致状态窗口将会缩小,同时还提高了总体可用性。目前该KIP还只是对请求作两类处理,也许往后会作一些更加细粒度的区分——好比Metadata请求是否也应该享有更高的优先级处理。

最后还想提一句,KIP-291是我认为近期社区改动影响比较大的两个KIP之一。另外一个则是KIP-392——还记得Kafka不能从follower副本读数据的限制吧?这个KIP要打破这个限制!只是目前该KIP还在讨论中,咱们后面拭目以待吧。

相关文章
相关标签/搜索