问题导读:
1. 什么是多线程异步发送模型?
2. Metadata的线程安全性如何实现?
3. Metadata的数据结构是什么?
4. producer如何读取Metadata?
5. Sender的如何建立?
6. Sender poll()如何更新Metadata?
7. Metadata有哪2种更新机制?
8. 什么是Metadata失效检测?
9. Metadata有哪些其余的更新策略?
解决方案:
多线程异步发送模型
下图是通过源码分析以后,整理出来的Producer端的架构图:
在上一篇咱们讲过,Producer有同步发送和异步发送2种策略。在之前的Kafka client api实现中,同步和异步是分开实现的。而在0.9中,同步发送实际上是经过异步发送间接实现,其接口以下:html
1node 2bootstrap 3api 4安全 5数据结构 6多线程 7架构 |
|
要实现同步发送,只要在拿到返回的Future对象以后,直接调用get()就能够了。
基本思路
从上图咱们能够看出,异步发送的基本思路就是:send的时候,KafkaProducer把消息放到本地的消息队列RecordAccumulator,而后一个后台线程Sender不断循环,把消息发给Kafka集群。
要实现这个,还得有一个前提条件:就是KafkaProducer/Sender都须要获取集群的配置信息Metadata。所谓Metadata,也就是在上一篇所讲的,Topic/Partion与broker的映射关系:每个Topic的每个Partion,得知道其对应的broker列表是什么,其中leader是谁,follower是谁。
2个数据流
因此在上图中,有2个数据流:
Metadata流(A1,A2,A3):Sender从集群获取信息,而后更新Metadata; KafkaProducer先读取Metadata,而后把消息放入队列。
消息流(B1, B2, B3):这个很好理解,再也不详述。
本篇着重讲述Metadata流,消息流,将在后续详细讲述。
Metadata的线程安全性
从上图能够看出,Metadata是多个producer线程读,一个sender线程更新,所以它必须是线程安全的。
Kafka的官方文档上也有说明,KafkaProducer是线程安全的,能够在多线程中调用:
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
从下面代码也能够看出,它的全部public方法都是synchronized:
01 02 03 04 05 06 07 08 09 10 11 12 13 |
|
Metadata的数据结构
下面代码列举了Metadata的主要数据结构:一个Cluster对象 + 1堆状态变量。前者记录了集群的配置信息,后者用于控制Metadata的更新策略。
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
|
producer读取Metadata
下面是send函数的源码,能够看到,在send以前,会先读取metadata。若是metadata读不到,会一直阻塞在那,直到超时,抛出TimeoutException
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
|
总结:从上面代码能够看出,producer wait metadata的时候,有2个条件:
(1) while (metadata.fetch().partitionsForTopic(topic) == null)
(2)while (this.version <= lastVersion)
有wait就会有notify,notify在Sender更新Metadata的时候发出。
Sender的建立
下面是KafkaProducer的构造函数,从代码能够看出,Sender就是KafkaProducer中建立的一个Thread.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
|
Sender poll()更新Metadata
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
|
从上面能够看出,Metadata的更新,是在while循环,每次调用client.poll()的时候更新的。
更新机制又有如下2种:
Metadata的2种更新机制
(1)周期性的更新: 每隔一段时间更新一次,这个经过 Metadata的lastRefreshMs, lastSuccessfulRefreshMs 这2个字段来实现
对应的ProducerConfig配置项为:
metadata.max.age.ms //缺省300000,即10分钟1次
(2) 失效检测,强制更新:检查到metadata失效之后,调用metadata.requestUpdate()强制更新。 requestUpdate()函数里面其实什么都没作,就是把needUpdate置成了false
每次poll的时候,都检查这2种更新机制,达到了,就触发更新。
那如何断定Metadata失效了呢?这个在代码中很分散,有不少地方,会断定Metadata失效。
Metadata失效检测
条件1:initConnect的时候
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 |
|
条件2:poll里面IO的时候,链接断掉了
1 2 3 4 5 6 7 8 |
|
条件3:有请求超时
01 02 03 04 05 06 07 08 09 10 11 |
|
条件4:发消息的时候,有partition的leader没找到
1 2 3 4 5 6 |
|
条件5:返回的response和请求对不上的时候
1 2 3 4 5 6 7 8 |
|
总之1句话:发生各式各样的异常,数据不一样步,都认为metadata可能出问题了,要求更新。
Metadata其余的更新策略
除了上面所述,Metadata的更新,还有如下几个特色:
1.更新请求MetadataRequest是nio异步发送的,在poll的返回中,处理MetadataResponse的时候,才真正更新Metadata。
这里有个关键点:Metadata的cluster对象,每次是整个覆盖的,而不是局部更新。因此cluster内部不用加锁。
2.更新的时候,是从metadata保存的全部Node,或者说Broker中,选负载最小的那个,也就是当前接收请求最少的那个。向其发送MetadataRequest请求,获取新的Cluster对象。
文章转自About云(http://www.aboutyun.com/thread-19917-1-1.html),原文位于csdn,做者:travi