文章首发于公众号【大数据学徒】,搜索 dashujuxuetu 或文末扫码关注html
本文总结了 broker 是如何处理用户的各类请求,包括常见的 Produce 请求、Fetch 请求、Metadata 请求,清晰易懂。apache
文章思惟导图:bootstrap
Kafka 在 TCP 之上有一套本身的二进制协议,定义了如何处理各类请求的规范,详细的协议规范能够参见 协议文档。Kafka 用户请求有一些公共字段,好比请求类型、协议版本、客户端 ID、请求 ID 等等。缓存
broker 对于每一个监听的端口会启动一个 acceptor 线程用来接收用户请求,而后有若干个 processor 和 handler 线程处理这些请求,processor 负责将请求放入一个请求队列,以及从响应队列中取出响应返回给用户,handler 是实际处理请求(读写各个 topic 的消息等)的线程。网络
Produce 请求(来自生产者生产数据的请求)和 Fetch 请求(来自消费者消费数据和 follower 同步数据的请求)都是由每一个分区副本的 leader 来处理的。若是用户把对于一个分区的请求发到了 follower 上,那么会获得一个“我不是 Leader”的错误,将请求发送 leader 这一点由客户端本身保证,怎么保证呢?用 metadata 请求。性能
metadata 请求中包含了若干个 topic 的名称,客户端能够向任意 broker 发送 metadata 请求,broker 会告诉客户端这些 topic 有哪些分区,分区有哪些副本,以及谁是副本 leader,客户端会缓存这些元信息。缓存有过时时间,对应可配置参数 metadata.max.age.ms
,默认是 5 分钟,过时后,客户端会从新发送此请求更新元信息。此外,若是客户端收到了“我不是 Leader”这种错误,那么也会从新发送 metadata 请求,由于这个错误说明它的元信息已通过期。fetch
为何客户端访问 Kafka 时不须要指定所有的 broker,就是由于只要能链接上一个 broker,客户端就能够获取到全部须要访问的 broker 信息,因此叫 bootstrap-server。大数据
Produce 请求中会包含一个可配置参数 acks
,表示须要多少个 broker 确认接收到这些消息才能够认为写入成功。这个参数有三个合法值:0,1 和 all,0 表示不确认,1 表示 leader 确认收到就行,all 表示必须 ISR 中的全部 broker 都确认才行,显然数据越多,数据的可靠性更强,其中 all 等同于-1。线程
当一个 leader 收到 produce 请求时,它会作如下判断:日志
acks
参数的值是否合法,若是不是 0,1 或 all,则返回错误。acks
被设置为 all,则须要判断 ISR 中是否有足够的 broker,若是没有则返回一个 NotEnoughReplicas
的错误,怎么判断是否足够呢?有一个参数叫作 min.insync.replicas
,表示 ISR 中 broker 数目所容许的最小值。若是有足够的 broker,则 leader 会先将请求中的消息写入磁盘,而后这个请求会被放在一个缓冲区中,等待全部 ISR中的 follower 确认已经同步了这个请求中的消息,最后向客户端返回写入成功的响应。消费者和 follower 的 Fetch 请求内容大体相似于:请把这个 topic 的这些分区分别从这些 offset 开始的消息给我,很是直接,但指定的 offset 必须是存在的,不然会返回错误。
Fetch 请求分别能够指定两个参数: fetch.max.bytes
和 fetch.min.bytes
,用来限制 leader 返回消息的最大值和最小值,默认值分别是 50M 和 1B,限制最大值是防止撑爆客户端的缓存,限制最小值是避免消息很少浪费网络开销。对于这两个限制还有两个参数分别和它们有关系,对于最大值,有一个参数叫作 max.partition.fetch.bytes
用来限制单个分区所能返回的数据,默认值是 1M,对于最小值,有一个参数叫作 fetch.max.wait.ms
,表示等待最大的时间,即便没有知足最小值的要求也要返回响应,默认值是 500ms。
leader 处理 Fetch 请求使用了“零拷贝”技术,即消息是直接从日志文件写到网络的,中间没有通过任何缓冲区,免去了不少拷贝和管理缓存区的开销,性能极高。
此外,若是 Fetch 请求中指定的 offset 只在 leader 上存在尚未同步到其它全部 ISR 中的 follower,leader 也不会返回这些消息而是返回一个空响应。这样作的缘由是:若是这时容许客户端读取这些消息,若是这台 leader 忽然挂掉,而一台没有这些消息的 follower 成为 leader,若是客户端就没法再消费这些消息,这种数据不一致的状况是一个消息系统要避免的。
因为 Kafka 的协议一直在演进,不少协议格式有新老多个版本,这时候就涉及到版本兼容的问题。总的来讲,在 0.10.0 以后的版本,Kafka 的网络协议是先后兼容的,即高版本的客户端能够和低版本的 broker 通讯,低版本的客户端也能够和高版本的 broker 通讯(高版本的迁就低版本的),0.10.0 以前的 broker 收到高版本的客户端请求可能会报错。所以,若是老版本(0.10.0 以前)的系统须要升级,先升级 broker,再升级客户端,会比较稳妥。
欢迎交流讨论,吐槽建议,分享收藏。