Kafka 客户端是如何找到 leader 分区的

在正常状况下,Kafka中的每一个Topic都会有不少个分区,每一个分区又会存在多个副本。在这些副本中,存在一个leader分区,而剩下的分区叫作 follower,全部对分区的读写操做都是对leader分区进行的。因此当咱们向Kafka写消息或者从Kafka读取消息的时候,必须先找到对应分区的Leader及其所在的Broker地址,这样才能够进行后续的操做。本文将要介绍的就是 Kafka 是如何找到 leader 分区的。apache

咱们知道, Kafka 是使用 Scala 语言编写的,可是其支持不少语言的客户端,包括:C/C++、PHP、Go以及Ruby等等(参见https://cwiki.apache.org/confluence/display/KAFKA/Clients)。这是为何呢?这是由于 Kafka 内部实现了一套基于TCP层的协议,只要使用这种协议与Kafka进行通讯,就可使用不少语言来操做Kafka。缓存

目前 Kafka 内部支持多达30多种协议,本文介绍的 Kafka 客户端是如何找到 leader 分区就涉及到 Kafka 内部的 Metadata 协议。Metadata 协议主要解决如下四种问题:服务器

  • Kafka中存在哪些主题?并发

  • 每一个主题有几个分区?3d

  • Leader分区所在的broker地址及端口?code

  • 每一个broker的地址及端口是多少?blog

客户端只须要构造相应的请求,并发送到Broker端,便可获取到上面四个问题的答案。整个过程以下:内存

  • 客户端构造相应的请求io

  • 客户端将请求发送到Broker端class

  • Broker端接收到请求处理,并将结果发送到客户端。

Metadata 请求协议(v0-v3版本)以下:

目前 Metadata 请求协议存在五个版本,v0-v3版本格式一致。可是这些协议存在一个问题:当 Kafka 服务器端将 auto.create.topics.enable 参数设置为 ture 时,若是咱们查询的主题不存在,Kafka 将会自动建立这个主题,这极可能不是咱们想要的结果。因此,基于这个问题,到了 Metadata 请求协议第五版,格式已经变化了,以下:客户端只须要构造一个 TopicMetadataRequest ,里面包括咱们须要查询主题的名字(TopicNames);固然,咱们能够一次查询多个主题,只须要将这些主题放进List里面便可。同时,咱们还能够不传入任何主题的名字,这时候 Kafka 将会把内部全部的主题相关的信息发送给客户端。

Kafka 的 Broker 收到客户端的请求处理完以后,会构造一个 TopicMetadataResponse,并发送给客户端。TopicMetadataResponse 协议的格式以下:咱们能够指定 allow_auto_topic_creation 参数来告诉 Kafka 是否须要在主题不存在的时候建立,这时候控制权就在咱们了。

能够看到,相应协议里面包含了每一个分区的 Leader、Replicas 以及 Isr 信息,同时还包括了Kafka 集群全部Broker的信息。若是处理出现了问题,会出现相应的错误信息码,主要包括下面几个:

并且,Metadata 协议是目前惟一一个能够向任何 Broker 发送的协议。由于任何一个 Broker 在启动以后会存储这些Metadata信息的。并且,Kafka 提供的客户端在获取到 Metadata 信息以后也会将它存储到内存中的。而且在如下几种状况会更新已经缓存下来的 Metadata 信息:

  • 在往Kafka发送请求是收到 Not a Leader 异常;

  • 在 meta‐data.max.age.ms 参数配置的时间过时以后。

以上两种状况 Kafka提供的客户端会自动再发送一次 Metadata 请求,这样就能够获取到更新的信息。整个过程以下:

好了,说了半天的,咱们来看看程序里面如何构造 TopicMetadataRequest 以及处理 TopicMetadataResponse

TopicMetadataRequest 是经过 SimpleConsumer 的 send 方法发送的,其返回的是 TopicMetadataResponse ,其中就包含了咱们须要的信息。 运行上面的程序输出以下:

上面的输出就能够看到各个分区的leader所在机器、isr以及全部replicas等信息。有一点咱们须要注意,由于目前存在多个版本的 Metadata 请求协议,咱们可使用低版本的协议与高版本的Kafka集群进行通讯,由于高版本的 Kafka 可以支持低版本的 Metadata 请求协议;可是咱们不能使用高版本的 Metadata 请求协议与低版本的 Kafka 通讯。

相关文章
相关标签/搜索