在这一篇文章中,我将向你介绍消费者的一些参数。算法
这些参数影响了每次poll()
请求的数据量,以及等待时间。session
在这以后,我将向你介绍Kafka用来保证消费者扩展性以及可用性的设计——消费者组。并发
在消费者组的介绍中,我将重点放在了Rebalance
的过程上,由于这是一个很重要又常常发生,还会致使消费者组不可用的操做。fetch
对于一个消费者来讲,他要作的事情只有一件,那就是使用poll()
来拉取消息。设计
至于他是从哪一个分区拉取,则是靠消费者组来动态的调整这个消费者所消费的分区,又或者是由开发者来自定义。code
但不管如何,这个消费者都须要经过poll()
来拉取消息。开发
这也是这一节的内容:经过参数配置可以影响poll操做的哪些内容。源码
首先须要肯定一点,当消费者使用poll()
拉取消息的时候,他只能拉到HW水位线及如下的消息。hash
咱们可让消费者针对于某一个分区进行消费。it
为了实现这个目标,咱们能够用assign()
方法。
可是注意,当这个消费者不是单独的一个消费者,而是属于某个消费者组的时候,将不容许使用自定义的分区分配。
对应的配置分别是:
fetch.min.bytes
对于每次拉取的最小字节数,默认是1。当拉取的消息大小小于设定的这个限度时,将会等待,直到此次被拉取的消息大小大于这个值。
因而咱们能够得知,当咱们即将要消费的消息比较小时,能够适当的调大这个参数的值,以提升吞吐量。
可是注意,这也可能形成消息的额外延迟。
fetch.max.bytes
这个参数跟上面的同样,只不过他表明的意义是最大的字节数。
可是这存在一个问题,若是咱们的消息大小全都大于这个参数的值,会发生什么状况呢?
答案是会返回即将拉取分区的第一条消息。
也就是说在这个参数中,不存在“不符合条件就不返回数据”的状况。
还有一个参数,叫作max.partition.fetch.bytes
这个参数跟上面提到的每次拉取的最大字节数工做原理是同样的,也是会保证当消息大于设定的值的时候,必定会返回数据。
而不一样的地方在于,这个参数表明的是分区。也就是说,一个参数表明的是一次拉取请求,而另一个参数表明的是针对于每个分区的拉取请求。
fetch.max.wait.ms
这个参数的意义在于:若是拉取消息的时间达到了这个参数设定的值,那么不管符不符合其余条件,都会返回数据。
那么你很容易能够猜到,这个参数跟fetch.min.bytes
是有关系的,这是为了防止当fetch.min.bytes
参数设置的过大,致使没法返回消息的状况。
固然了,这个参数还有一个意义,若是你的业务须要更小的延迟,那么应该调小这个参数。
若是咱们的最大拉取字节数设置成了很是大,那么是否是表明咱们每一次的poll()
,都能直接拉到HW水位呢?
答案是否认的。
还存在一个参数:
max.poll.records
这个参数的意义在于,每次拉取消息的最大数量。
一样的,若是消息的大小都比较小,那么能够调大这个参数,以提升消费速度。
另外,还存在一些消费者组相关的参数,我在这里先提一下,具体更详细的解释,将在后文给出。
heartbeat.interval.ms
这个参数是设置消费者与消费者组对应的Coordinator发送心跳响应的间隔时间。
session.timeout.ms
这个参数是用于Coordinator判断多长时间没收到消费者的心跳响应而认为这个消费者已经下线的时间。
max.poll.interval.ms
这个参数用于Coordinator判断多长时间内消费者都没有拉取消息,而认为这个消费者已经下线的时间。
auto.offset.reset
这个参数其实跟消费者组的联系不是很大,可是我认为能够写在这里。
由于有这么一个场景,当消费者Rebalance以后,若是位移主题以前保存的位移已经被删除了,那么这个参数就决定了消费者该从哪里开始消费。
固然了,关于消费者还有许多的参数,不只仅是上文提到的这些。
而上文提到的这些参数,是我认为可让初学者更好的理解消费者的工做原理。
在解释Rebalance的原理以前,我想先跟你说一下个人思路,省得你看的一头雾水。
固然了,这个思路是我认为更适合我本身去理解的。你也能够先看第三大节,再有了一个大概的认识后,再来看这一节的内容。
我但愿先告诉你Rebalance的过程是怎么样的,这里说的过程指的是Rebalance已经发生了,那么在Rebalance的过程当中,会发生哪些事情。
在这以后,我再跟你说说Rebalance的五种状态。
那么,咱们开始。
首先,应该有一个认识。Rebalance的全部操做都是经过Coordinator的协调下完成的,组内的消费者之间并不会进行相关的通讯与交流。
Coordinator你能够理解为是一个服务,位于某个broker节点上。
假设当前的消费者已经保存了这个这个节点的信息,那么将会直接进入第二步。
若是当前的消费者没有保存这个信息(好比这是一个新加入这个消费者组的消费者),那么他须要先找到这个Coordinator所在的broker节点。
这里的broker节点,是这个消费者对应的消费者组对应的位移主题的分区的leader节点。
听起来有点绕,让我来再解释一下。
消费者 -> 消费者组 -> __consumer_offsets -> partition -> leader
关于位移主题,我已经在第二篇文章中提到过了,在这里再也不赘述。
可是在这里,让咱们来再来回忆一遍消费者组对应的partition是怎么找到的。
Group ID
的hash值__consumer_offsets
的分区数取模在找到了对应的broker节点后,第二步是发送加入Group的请求。
在这一步中,不管是以前已经在Group内的成员,仍是准备加入Group的成员,都须要发送Join Group的申请。
在发起的JoinGroupRequest中,须要包含以下的数据:
Group id
Session_timeout
Rebalance_timeout
Menber_id
Partition assignor
须要事先说明的是,这里的名称并不严格,是为了更好的理解而这样写的。若是你想要知道更加严谨的请求内容,能够去看厮大的《深刻理解Kafka》。
下面咱们挨个解释:
Group ID
,消费者组ID,表明了即将加入的消费者组。
Session_timeout
,上文中提到过这个参数,用于Coordinator判断多长时间内没收到客户端的心跳包而认为这个客户端已经下线。
Rebalance_timeout
,值等同于max.poll.interval.ms
,意义在于告知Coordinator用多长的时间来等待其余消费者加入这个消费者组。
咱们在上文中提到,不管以前是否是这个消费者组的成员,只要开启了Rebalance,就须要从新加入这个消费者组。所以,Coordinator须要一段时间来接受JoinGroupRequest的请求。
至于为何须要一段时间来接受请求,以及这段时间发生了什么,我将在后面给你解释。
menber_id
,做为组内消费者的识别编号,若是是新加入组的消费者,这个字段留空。
Partition assignor
,指的是分区分配方式。由于Rebalance这个过程,就是分区分配的一个过程。每一个消费者将其接受的分配方式放在这个字段中,随后由Coordinator选出每一个消费者都承认的分区分配方式。
而后咱们来聊聊在这个阶段,Coordinator须要作什么。
Coordinator须要一段时间来接收来自客户端的JoinGroupRequest请求,是由于Coordinator须要收集每个成员的信息,选出leader和分区分配方式,所以,Coordinator须要足够的时间来“收集信息”。这就回答了上文说到的为何“Coordinator须要一段时间来接受JoinGroupRequest的请求”。
选举leader的算法很简单,第一个发送请求的consumer,就是leader。
选出分区分配策略的算法也很简单,首先Coordinator会收集全部消费者都支持的分区分配方式,而后每一个消费者为它支持的分配方式投上一票。注意,这里的投票行为没有通过多一次的交互,而是Coordinator选取每一个消费者的JoinGroupRequest中的第一个分区分配方式,做为这个消费者所投的票。
当Coordinator选取好Leader和分区分配方式后,将返回JoinGroupResponse给各个消费者。
在返回给各个消费者的JoinGroupResponse中,包含了menber_id,分区分配方式等。而对于leader消费者来讲,还将得到组内其余消费者的元数据,包含了各个消费者的menber_id,分区分配方式。
至此,JoinGroup阶段完成。
注意,每一个消费者从发送JoinGroupRequest到接收到JoinGroupResponse请求这段时间,是阻塞的。
在第二步结束以后,每一个消费者已经知道了本身的menber_id
,以及Coordinator所选择的分区分配方式。
可是此时每一个消费者还不知道本身应该消费哪一个分区。
这个分区分配的过程,是交给Leader消费者来完成的。
可是注意,虽说这个过程是Leader消费者完成的,可是Leader消费者并不会跟其余消费者直接通讯,而是将分配方式告知Coordinator,由Coordinator来告知各个消费者。
这个过程,称为Sync_Group
。
在这个过程当中,每个消费者都会发送SyncGroupRequest给Coordinator。要注意的是,Leader消费者在这个Request中还附带了其余消费者的分区分配信息。
在Coordinator收到了这些请求后,会将这个分区分配方案等元数据保存在__consumer_offsets
主题中。
随后,Coordinator将发送响应给各个消费者。
在这个响应中,包含了各个消费者应该负责消费的分区编号。
至此,每一个消费者都了解了本身应该消费的分区是哪些了。
在上一个阶段中,组内各个消费者已经知道了本身负责的是哪些分区。
可是还存在一个问题,消费者应该从分区的哪一个位置开始消费呢?
这就用到了__consumer_offsets
主题了,这个主题保存了某个消费者组的各个分区的消费位移。
此外,每一个消费者还须要不断地发送心跳包给Coordinator,以告知Coordinator本身没有下线。
这个发送心跳包的时间,就是咱们设置的heartbeat.interval.ms
参数。
在每一个心跳包的响应中,Coordinator就会告知这个消费者,需不须要Rebalance。
那么也就说明了,这个参数设置的越小,消费者就越早可以得知是否须要Rebalance。
而对应的session.timeout.ms
,指的就是Coordinator在这么长的时间内没收到消费者的心跳包,而认为这个消费者过时的参数。
在上面说完了Rebalance的核心原理后,咱们再来聊聊消费者组的各个状态。
先来介绍一下消费者组有哪几种状态:
__consumer_offsets
中也没有保存这个消费者组的元数据。一般发生在这个消费者组被删除了,或者__consumer_offsets
分区leader发生了改变。(至于这个状态我了解的也不是不少,若是能够的话,麻烦你评论区告诉我。)Rebalance_timeout
这么长的时间。消费者组的状态介绍大概就是这样的。
简单的来说,当一个消费者组须要Rebalance的时候,他就会进入PreparingRebalance阶段,而后一直流转到Stable阶段。
在这个期间,若是有任何的成员变更,就会回到PreparingRebalance阶段。
在这个期间,若是Coordinator改变,或者消费者组被删除等,就会进入Dead阶段。
首先,谢谢你能看到这里!
在这一篇文章中,我没有像介绍生产者那样介绍一遍源码。
由于对于生产者来讲,他只须要将消息发送到broker中,而对于消费者来讲,这个过程复杂得多,我但愿可以用比较浅显易懂的方式,让你可以了解消费者组的工做方式。
在有了这样的一个认识以后,不管使用什么客户端,我认为都不会有太大的问题。
此外,在这一篇中我花了较大的笔墨去介绍Rebalance的过程,是由于Rebalance是一个很常见的现象,并且在这期间会致使Kafka消费者的不可用,因此我但愿了解了Rebalance的工做原理,可以让你更容易的避免没必要要的Rebalance。
固然了,由于做者才疏学浅能力有限,可能在这个过程当中忽略了一些很重要的细节,又或者有一些错误的理解。若是你发现了,还请不吝指教,谢谢你!
再次谢谢你能看到这里,感恩~
PS:若是有任何的问题,能够在公众号找到我,欢迎来找我玩!