Kafka needs no Keeper(关于KIP-500的讨论)

写在前面的

最近看了Kafka Summit上的这个分享,以为名字很霸气,标题直接沿用了。这个分享源于社区的KIP-500,大致的意思从此Apache Kafka再也不须要ZooKeeper。整个分享大约40几分钟。完整看下来感受干货不少,这里特地总结出来。若是你把这个分享看作是《三国志》的话,那么姑且就把个人这篇看作是裴松之注吧:)node

客户端演进

首先,社区committer给出了Kafka Java客户端移除ZooKeeper依赖的演进过程。下面两张图总结了0.8.x版本和0.11.x版本(是否真的是从0.11版本开始的变化并不重要)及之后的功能变迁:在Kafka 0.8时代,Kafka有3个客户端,分别是Producer、Consumer和Admin Tool。其中Producer负责向Kafka写消息,Consumer负责从Kafka读消息,而Admin Tool执行各类运维任务,好比建立或删除主题等。其中Consumer的位移数据保存在ZooKeeper上,所以Consumer端的位移提交和位移获取操做都须要访问ZooKeeper。另外Admin Tool执行运维操做也要访问ZooKeeper,好比在对应的ZooKeeper znode上建立一个临时节点,而后由预约义的Watch触发相应的处理逻辑。算法

后面随着Kafka的演进,社区引入了__consumer_offsets位移主题,同时定义了OffsetFetch和OffsetCommit等新的RPC协议,这样Consumer的位移提交和位移获取操做所有转移到与位移主题进行交互,避免了对ZooKeeper的访问。同时社区引入了新的运维工具AdminClient以及相应的CreateTopics、DeleteTopics、AlterConfigs等RPC协议,替换了原先的Admin Tool,这样建立和删除主题这样的运维操做也彻底移动Kafka这一端来作,就像下面右边这张图展现的:apache

至此, Kafka的3个客户端基本上都不须要和ZooKeeper交互了。应该说移除ZooKeeper的工做完成了大部分,但依然还有一部分工做要在ZooKeeper的帮助下完成,即Consumer的Rebalance操做。在0.8时代,Consumer Group的管理是交由ZooKeeper完成的,包括组成员的管理和订阅分区的分配。这个设计在新版Consumer中也获得了修正。所有的Group管理操做交由Kafka Broker端新引入的Coordinator组件来完成。要完成这些工做,Broker端新增了不少RPC协议,好比JoinGroup、SyncGroup、Heartbeat、LeaveGroup等。缓存

  

此时,Kafka的Java客户端除了AdminClient还有一点要依赖ZooKeeper以外,全部其余的组件所有摆脱了对ZooKeeper的依赖。安全

以后,社区引入了Kafka安全层,实现了对用户的认证和受权。这个额外的安全层也是不须要访问ZooKeeper的,所以以前依赖ZooKeeper的客户端是没法“享用”这个安全层。一旦启用,新版Clients都须要首先接入这一层并经过审核以后才能访问到Broker,以下图所示:网络

这么作的好处在于统一了Clients访问Broker的模式,即定义RPC协议,好比咱们熟知的PRODUCE协议、FETCH协议、METADATA协议、CreateTopics协议等。若是后面须要实现更多的功能,社区只须要定义新的RPC协议便可。同时新引入的安全层负责对这套RPC协议进行安全校验,统一了访问模式。另外这些协议都是版本化的(versioned),所以可以独立地进行演进,同时也兼顾了兼容性方面的考量。数据结构

Broker间交互

说完了Clients端,咱们说下Broker端的现状。目前,应该说Kafka Broker端对ZooKeeper是重度依赖的,主要表如今如下几个方面:架构

  • Broker注册管理
  • ACL安全层配置管理
  • 动态参数管理
  • 副本ISR管理
  • Controller选举

咱们拿一张图来讲明,图中有4个Broker节点和一个ZooKeeper,左上角的Broker充当Controller的角色。当前,全部的Broker启动后都必须维持与ZooKeeper的会话。Kafka依赖于这个会话实现Broker端的注册,并且Kafka集群中的全部配置信息、副本信息、主题信息也都保存在ZooKeeper上。最后Controller与集群中每一个Broker都维持了一个TCP长链接用于向这些Broker发送RPC请求。当前的Controller RPC类型主要有3大类:运维

  • LeaderAndIsr:主要用于向集群广播主题分区Leader和ISR的变动状况,好比对应的Broker应该是特定分区的Leader仍是Follower
  • StopReplica:向集群广播执行中止副本的命令
  • UpdateMetadata:向集群广播执行变动元数据信息的命令

图中还新增了一个AlterISR RPC,这是KIP-497要实现的新RPC协议。现阶段Kafka各个主题的ISR信息所有保存在ZooKeeper中。若是后续要舍弃ZooKeeper,必需要将这些信息从ZooKeeper中移出来,放在了Controller一端来作。同时还要在程序层面支持对ISR的管理。所以社区计划在KIP-497上增长AlterISR协议。对了,还要提一句,当前Controller的选举也是依靠ZooKeeper完成的。异步

因此后面Broker端的演进可能和Clients端的路线差很少:首先是把Broker与ZooKeeper的交互所有干掉,只让Controller与ZooKeeper进行交互,而其余全部Broker都只与Controller交互,以下图所示:

 

看上去这种演进路线社区已经走得轻车熟路了,但实际上还有遗留了一些问题须要解决。

Broker Liveness

首先就是Broker的liveness问题,即Kafka如何判断一个Broker究竟是否存活?在目前的设计中,Broker的生存性监测彻底依赖于与ZooKeeper之间的会话。一旦会话超时或断开Controller自动触发ZooKeeper端的Watch来移除该Broker,并对其上的分区作善后处理。若是移除了ZooKeeper,Kafka应该采用什么机制来判断Broker的生存性是一个问题。

Network Partition

如何防范网络分区也是一个须要讨论的话题。当前可能出现的Network Partition有4种:一、单个Broker彻底与集群隔离;二、Broker间没法通信;三、Broker与ZooKeeper没法通信;四、Broker与Controller没法通信。下面4张图分别展现了这4种状况:

 

咱们分别讨论下。首先是第一种状况,单Broker与集群其余Broker隔离,这其实并不算太严重的问题。当前的设计已然可以保证很好地应对此种状况。一旦Broker被隔离,Controller会将其从集群中摘除,虽然可用性下降了,可是整个集群的一致性依然可以获得保证。第二种状况是Broker间没法通信,可能的后果是消息的备份机制没法执行,Kafka要收缩ISR,依然是可用性上的下降,可是一致性状态并无被破坏。状况三是Broker没法与ZooKeeper通信。Broker能正常运转,它只是没法与ZooKeeper进行通信。此时咱们说该Broker处于僵尸状态,即所谓的Zoobie状态。因Zoobie状态引入的一致性bug社区jira中一直没有断过,社区这几年也一直在修正这方面的问题,主要对抗的机制就是fencing。好比leader epoch等。最后一类状况是Broker没法与Controller通信,那么全部的元数据更新通道被堵死,即便这个Broker依然是healthy的,可是它保存的元数据信息多是很是过时的。这样链接该Broker的客户端可能会看到各类很是古怪的问题。以前在知乎上回答过相似的问题:https://www.zhihu.com/question/313683699/answer/609887054。目前,社区对这种状况并无太好的解决办法,主要的缘由是Broker的liveness彻底交由ZooKeeper来作的。一旦Broker与ZooKeeper之间的交互没有问题,其余缘由致使的liveness问题就没法完全规避。

第四类Network Partition引入了一个经典的场景:元数据不一致。目前每一个Broker都缓存了一份集群的元数据信息,这份数据是异步更新的。当第四类Partition发生时,Broker端缓存的元数据信息必然与Controller的不一样步,从而形成各类各样的问题。

下面简要介绍一下元数据更新的过程。主要的流程就是Controller启动时会同步地从ZooKeeper上拉取集群全量的元数据信息,以后再以异步的方式同步给其余Broker。其余Broker与Controller之间的同步每每有一个时间差,也就是说可能Clients访问的元数据并非最新的。我我的认为如今社区不少flaky test failure都是由于这个缘由致使的。 事实上,实际使用过程当中有不少场景是Broker端的元数据与Controller端永远不一样步。一般状况下若是咱们不重启Broker的话,那么这个Broker上的元数据将永远“错误”下去。好在社区还给出了一个最后的“大招”: 登陆到ZooKeeper SHELL,手动执行rmr /controller,强迫Controller重选举,而后从新加载元数据,并给全部Broker重刷一份。不过在实际生产环境,我怀疑是否有人真的要这么干,毕竟代价不小,并且最关键的是这么作依然可能存在两个问题:1. 咱们如何确保Controller和Broker的数据是一致的?2. 加载元数据的过程一般很慢。

这里详细说说第二点,即加载元数据的性能问题。整体来讲,加载元数据是一个O(N)时间复杂度的过程,这里的N就是你集群中总的分区数。考虑到Controller从ZooKeeper加载以后还要推给其余的Broker,那么作这件事的总的时间复杂度就是O(N * M),其中M是集群中Broker的数量。能够想见,当M和N都很大时,在集群中广播元数据不是一个很快的过程。

Metadata as an Event Log

Okay,鉴于以上所提到的全部问题,当Kafka抛弃了ZooKeeper以后,社区应该如何解决它们呢?整体的思路就是Metadata as an Event Log + Controller quorum。咱们先说metadata as an event log。若是你读过Jay Kreps的《I ❤️Logs》,你应该有感触,整个Kafka的架构其实都是构建在Log上的。每一个topic的分区本质上就是一个Commit Log,但元数据信息的保存却不是Log形式。在现有的架构设计中你基本上能够认为元数据的数据结构是KV形式的。这一次,社区采用了与消息相同的数据保存方式,即将元数据做为Log的方式保存起来,以下表所示:

 

这样作的好处在于每次元数据的变动都被当作是一条消息保存在Log中,而这个Log能够被视做是一个普通的Kafka主题被备份到多台Broker上。Log的一个好处在于它有清晰的先后顺序关系,即每一个事件发生的时间是能够排序的,配合以恰当的处理逻辑,咱们就能保证对元数据变动的处理是按照变动发生时间顺序处理,不出现乱序的情形。另外Log机制还有一个好处是,在Broker间同步元数据时,咱们能够选择同步增量数据(delta),而非全量状态。如今Kafka Broker间同步元数据都是全量状态同步的。前面说过了,当集群分区数很大时,这个开销是很可观的。若是咱们可以只同步增量状态,势必能极大地下降同步成本。最后一个好处是,咱们能够很容易地量化元数据同步的进度,由于对Log的消费有位移数据,所以经过监控Log Lag就能算出当前同步的进度或是落后的进度。

采用Log机制后,其余Broker像是一个普通的Consumer,从Controller拉取元数据变动消息或事件。因为每一个Broker都是一个Consumer,因此它们会维护本身的消费位移,就像下面这张图同样:

 这种设计下,Controller所在的Broker必需要承担起全部元数据topic的管理工做,包括建立topic、管理topic分区的leader以及为每一个元数据变动建立相应的事件等。既然社区选择和__consumer_offsets相似的处理方式,一个很天然的问题在于这个元数据topic的管理是否可以复用Kafka现有的副本机制?答案是:不可行。理由是现有的副本机制依赖于Controller,所以Kafka无法依靠现有的副本机制来实现Controller——按照咱们的俗语来讲,这有点鸡生蛋、蛋生鸡的问题,属于典型的循环依赖。为了实现这个,Kafka须要一套leader选举协议,而这套协议或算法是不依赖于Controller的,即它是一个自管理的集群quorum(抱歉,在分布式领域内,特别是分布式共识算法领域中,针对quorum的恰当翻译我目前还未找到,所以直接使用quorum原词了)。最终社区决定采用Raft来实现这组quorum。这就是上面咱们提到的第二个解决思路:Controller quorum。

Controller Quorum

与借助Controller帮忙选择Leader不一样,Raft是让本身的节点自行选择Leader并最终令全部节点达成共识——对选择Controller而言,这是一个很好的特性。其实Kafka现有的备份机制与Raft已经很接近了,下表罗列了一下它们的异同:

 一眼扫过去,其实Kafka的备份机制和Raft很相似,好比Kafka中的offset其实就是Raft中的index,epoch对应于term。固然Raft中采用的半数机制来确保消息被提交以及Leader选举,而Kafka设计了ISR机制来实现这两点。整体来讲,社区认为只须要对备份机制作一些小改动就应该能够很容易地切换到Raft-based算法。

下面这张图展现Controller quorum可能更加直观:

整个controller quorum相似于一个小的集群。和ZooKeeper相似,这个quorum一般是3台或5台机器,不须要让Kafka中的每一个Broker都自动称为这个quorum中的一个节点。该quorum里面有一个Leader负责处理客户端发来的读写请求,这个Leader就是Kafka中的active controller。根据ZooKeeper的Zab协议,leader处理全部的写请求,而follower是能够处理读请求的。当写请求发送给follower后,follower会将该请求转发给leader处理。不过我猜Kafka应该不会这样实现,它应该只会让leader(即active controller)处理全部的读写请求,而客户端(也就是其余Broker)压根就不会发送读写请求给follower。在这一点上,这种设计和现有的Kafka请求处理机制是一致的。

如今还须要解决一个问题,即Leader是怎么被选出来的?既然是Raft-based,那么采用的也是Raft算法中的Leader选举策略。让Raft选出的Leader称为active controller。网上有不少关于Raft选主的文章,这里就不在赘述了,有兴趣的能够读一读Raft的论文:《In Search of an Understandable Consensus Algorithm(Extended Version)》。

这套Raft quorum的一个好处在于它自然提供了低延时的failover,所以leader的切换会很是的迅速和及时,由于理论上再也不有元数据加载的过程了,全部的元数据如今都同步保存follower节点的内存中,它已经有其余Broker须要拉取的全部元数据信息了!更酷的是,它避免了如今机制中一旦Controller切换要全量拉取元数据的低效行为,Broker无需从新拉取以前已经“消费”的元数据变动消息,它只须要重新Leader继续“消费”便可。

另外一个好处在于:采用了这套机制后,Kafka能够作元数据的缓存了(metadata caching):即Broker可以把元数据保存在磁盘上,同时就像刚才说的,Broker只需读取它关心的那部分数据便可。还有,和如今snapshot机制相似,若是一个Broker保存的元数据落后Controller太多或者是一个全新的Broker,Kafka甚至能够像Raft那样直接发送一个snapshot文件,快速令其追上进度。固然大多数状况下,Broker只须要拉取delta增量数据便可。

Post KIP-500 Broker注册

当前Broker启动以后会向ZooKeeper注册本身的信息,好比本身的主机名、端口、监听协议等数据。移除ZooKeeper以后,Broker的注册机制也要发生变化:Broker须要向active controller发送心跳来进行注册。Controller收集心跳中包含的Broker数据构建整个Kafka集群信息,以下图所示:

 同时Controller也会对心跳进行响应,显式地告知Broker它们是否被容许加入集群——若是不容许,则可能须要被隔离(fenced)。固然controller本身也能够对本身进行隔离。咱们针对前面提到的隔离场景讨论下KIP-500是怎么应对的。

Fencing

首先是普通Broker与集群彻底隔离的场景,好比该Broker没法与controller和其余Broker进行通讯,但它依然能够和客户端程序交互。此时,fencing机制就很简单了,直接让controller令其下线便可。这和如今依靠ZooKeeper会话机制维持Broker判活的机制是如出一辙的,没有太大改进。

第二种状况是Broker间的通信中断。此时消息没法在leader、follower间进行备份。可是对于元数据而言,咱们不会看到数据不一致的情形,由于Broker依然能够和controller通信,所以也不会有什么问题。

第三种状况是Broker与Controller的隔离。现有机制下这是个问题,但KIP-500以后,Controller仅仅将该Broker“踢出场”便可,不会形成元数据的不一致。

最后一种状况是Broker与ZooKeeper的隔离, 既然ZooKeeper要被移除了,天然这也不是问题了。

部署

终于聊到KIP-500以后的Kafka运维了。下表总结了KIP-500先后的部署状况对比:

很简单,如今任什么时候候部署和运维Kafka都要考虑对ZooKeeper的运维管理。在KIP-500以后咱们只须要关心Kafka便可。

Controller quorum共享模式

如前所述,controller改为Raft quorum机制后,可能使用3或5台机器构成一个小的quorum。那么一个很天然的问题是,这些Broker机器还可否用做他用,是惟一用做controller quorum仍是和其余Broker同样正常处理。社区对此也作了解释:两种都支持!

若是你的Kafka集群资源很紧张,你可使用共享controller模式(Shared Controller Mode),即充当controller quorum的Broker机器也能处理普通的客户端请求;相反地,若是你的Kafka资源很充足,专属controller模式(Separate Controller Mode)多是更适合的,即在controller quorum中的Broker机器排它地用做Controller的选举之用,再也不对客户端提供读写服务。这样能够实现更好的资源隔离,适用于大集群。

Roadmap

最后说一下KIP-500的计划。社区计划分三步走:

第一步是移除客户端对ZooKeeper的依赖——这一步基本上已经完成了,除了目前AdminClient还有少许的API依赖ZooKeeper以外,其余客户端应该说都不须要访问ZooKeeper了;第二步是移除Broker端的ZooKeeper依赖:这主要包括移除Broker端须要访问ZooKeeper的代码,以及增长新的Broker端API,如前面所说的AlterISR等,最后是将对ZooKeeper的访问所有集中在controller端;最后一步就是实现controller quorum,实现Raft-based的quorum负责controller的选举。

至于Kafka升级,若是从现有的Kafka直接升级到KIP-500以后的Kafka会比较困难,所以社区打算引入一个名为Bridge Release的中间过渡版本,以下图所示:

这个Bridge版本的特色在于全部对ZooKeeper的访问都集中到了controller端,Broker访问ZooKeeper的其余代码都被移除了。 

总结

KIP-500应该说是最近几年社区提出的最重磅的KIP改进了。它几乎是颠覆了Kafka已有的使用模式,摒弃了以前重度依赖的Apache ZooKeeper。就我我的而言,我是很期待这个KIP,后续有最新消息我也会在一并同步出来。让咱们静观其变吧~~~

相关文章
相关标签/搜索