本文主要参考社区0.11版本Controller的重设计方案,试图给你们梳理一下Kafka controller这个组件在设计上的一些重要思考。众所周知,Kafka中有个关键组件叫controller,负责管理和协调Kafka集群。网上关于controller的源码分析也有不少,本文就再也不大段地列出代码重复作这件事情了。实际上,对于controller的代码我一直以为写的很是混乱,各类调用关系十分复杂,想要完整地理解它的工做原理确实不易。好在咱们就是普通的使用者,大体了解controller的工做原理便可。下面我就带各位简要了解一下当前Kafka controller的原理架构以及社区为何要在大改controller的设计。node
“负责管理和协调Kafka集群”的说法实在没有什么养分,上点干货吧——具体来讲Controller目前主要提供多达10种的Kafka服务功能的实现,它们分别是:缓存
当前controller启动时会为集群中全部broker建立一个各自的链接。这么说吧,假设你的集群中有100台broker,那么controller启动时会建立100个Socket链接(也包括与它本身的链接!)。当前新版本的Kafka统一使用了NetworkClient类来建模底层的网络链接(有兴趣研究源码的能够去看下这个类,它主要依赖于Java NIO的Selector)。Controller会为每一个链接都建立一个对应的请求发送线程,专门负责给对应的broker发送请求。也就是说,若是仍是那100台broker,那么controller启动时还会建立100个RequestSendThread线程。当前的设计中Controller只能给broker发送三类请求,它们是:网络
Controller一般都是发送请求给broker的,只有上面谈到的controller 10大功能中的ControlledShutdownRequest请求是例外:这个请求是待关闭的broker经过RPC发送给controller的,即它的方向是反的。另外这个请求还有一个特别之处就是其余全部功能或是请求都是经过Zookeeper间接与controller交互的,只有它是直接与controller进行交互的。多线程
构成controller的组件太多了,多到我已经不想用文字表达了,直接上图吧:架构
其中比较重要的组件包括:异步
缓存内容十分丰富,这也是controller能够协调管理整个cluster的基础。async
不谦虚地说,我混迹社区也有些日子了。在里面碰到过不少关于controller的bug。社区对于这些bug有个很共性的特色,那就是没有什么人愿意(敢去)改这部分代码,由于它实在是太复杂了。具体的问题包括:工具
编写正确的多线程程序一直是Java开发者的痛点。在Controller的实现类KafkaController中建立了不少线程,好比以前提到的RequestSendThread线程,另外ZkClient也会建立单独的线程来处理zookeeper回调,这还不算TopicDeletionManager建立的线程和其余IO线程等。几乎全部这些线程都须要访问ControllerContext(RequestSendThread只操做它们专属的请求队列,不会访问ControllerContext),所以必要的多线程同步机制是必定须要的。当前是使用controllerLock锁来实现的,所以能够说没有并行度可言。源码分析
看过源代码的人相信对这一点深有体会。KafkaController、PartitionStateMachine和ReplicaStateMachine每一个都是500+行的大类且彼此混调的现象明显,好比KafkaController的stopOldReplicasOfReassignedPartition方法调用ReplicaStateMachine的handleStateChanges方法,然后者又会调用KafkaController的remoteReplicaFromIsr方法。相似的状况还发生在KafkaController和ControllerChannelManager之间。性能
当前broker对入站请求类型不作任何优先级处理,不管是PRODUCE请求、FETCH请求仍是Controller类的请求。这就可能形成一个问题:即clients发送的数据类请求积压致使controller推迟了管理类请求的处理。设想这样的场景,假设controller向broker广播了leader发生变动。因而新leader开始接收clients端请求,而同时老leader所在的broker因为出现了数据类请求的积压使得它一直忙于处理这些请求而没法处理controller发来的LeaderAndIsrRequest请求,所以这是就会出现“双主”的状况——也就是所谓的脑裂。此时假若client发送的一个PRODUCE请求未指定acks=-1,那么由于日志水位截断的缘故这个请求包含的消息就可能“丢失”了。如今社区中关于controller丢失数据的bug大可能是由于这个缘由形成的。
当前controller操做Zookeeper是经过ZkClient来完成的。ZkClient目前是同步写入Zookeeper,而同步一般意味着性能不高。更为严重的是,controller是一个分区一个分区进行写入的,对于分区数不少的集群来讲,这无疑是个巨大的性能瓶颈。若是用户仔细查看源代码,能够发现PartitionStateMachine的electLeaderForPartition就是一个分区一个分区地选举的。
Controller当前发送请求都是按照分区级别发送的,即一个分区一个分区地发送。没有任何batch或并行可言,效率很低。
这里的版本号相似于new consumer的generation,总之是要有一种机制告诉controller broker的版本信息。由于有些状况下broker会处理本已过时或失效的请求致使broker状态不一致。举个例子,若是一个broker正常关闭过程当中“宕机”了,那么重启以后这个broker就有可能处理以前controller发送过来的StopReplicaRequest,致使某些副本被置成offline从而没法使用。而这确定不是咱们但愿看到的结果,对吧?
Contoller目前是使用了ZkClient这个开源工具,它能够自动重建会话并使用特有的线程顺序处理全部的Zookeeper监听消息。由于是顺序处理,它就有可能没法及时响应最新的状态变动致使Kafka集群状态的不一致。
和new consumer相似,controller摒弃多线程的模型,采用单线程的事件队列模型。这样简化了设计同时也避免了复杂的同步机制。各位在最新的trunk分支上已然能够看到这种变化:增长了ControllerEventManager类以及对应的ControllerEventThread线程类专门负责处理ControllerEvent。目前总共有9种controller event,它们分别是:
咱们基本上能够从名字就能判断出它们分别表明了什么事件。
将全部同步操做Zookeeper的地方都改为异步调用+回调的方式。实际上Apache Zookeeper客户端执行请求的方式有三种:同步、异步和batch。一般以batch性能最好,但Kafka社区目前仍是倾向于用async替换sync。毕竟实现起来相对简单同时性能上也能获得很多提高。
可能摒弃以前状态机的方式,采用和GroupCoordinator相似的方式,让controller保存全部的状态而且负责状态的流转以及状态流转过程当中的逻辑。固然,具体的实现还要再结合0.11最终代码才能肯定。
对管理类请求和数据类请求区分优先级。好比使用优先级队列替换现有的BlockingQueue——社区应该已经实现了这个功能,开发了一个叫PrioritizationAwareBlockingQueue的类来作这件事情,后续你们能够看下这个类的源代码
为broker设定版本号(generation id)。若是controller发送过来的请求中包含的generation与broker本身的generation不匹配, 那么broker会拒绝该请求。
ZkClient是同步顺序处理ZK事件的,而原生Zookeeper client支持async方式。另外使用原生API还可以在接收到状态变动通知时便立刻开始处理,而ZkClient的特定线程则必需要在队列中顺序处理到这条变动消息时才能处理。
以上就是关于Kafka controller的一些讨论,包括了它当前的组件构成、设计问题以及对应的改进方案。有不少地方可能理解的还不是透彻,期待着在Kafka 0.11正式版本中能够看到全新的controller组件。