在Kafka中,TCP链接的管理交由底层的Selector类(org.apache.kafka.common.network)来维护。Selector类定义了不少数据结构,其中最核心的当属java.nio.channels.Selector实例,故全部的IO事件其实是使用Java的Selector来完成的。本文咱们探讨一下producer与Kafka集群进行交互时TCP链接的管理与维护。java
1、什么时候建立TCP链接node
Producer端在建立KafkaProducer实例时就会建立与broker的TCP链接——这个表述严格来讲不是很准确,应当这么说:在建立KafkaProducer实例时会建立并启动Sender线程实例。Sender线程开始运行时首先就会建立与broker的TCP链接,以下面这段日志所示:apache
[2018-12-09 09:35:45,620] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: -2 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,622] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)bootstrap
[2018-12-09 09:35:45,814] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,815] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
[2018-12-09 09:35:45,828] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9093 (id: -2 rack: null) (org.apache.kafka.clients.NetworkClient:1068)安全
在个人样例代码中,bootstrap.servers指定了"localhost:9092, localhost:9093"。由上面的日志能够看到KafkaProducer实例建立后(此时还没有开始发送消息)producer会建立与这两台broker的TCP链接。特别注意我标红的broker id——这里的id都是负值,我会在后文详细说说这里面的事情。另外,上述日志中最后一行代表producer选择了向localhost:9093的broker发送METADATA请求去获取集群的元数据信息——实际上producer选择的是当前负载最少的broker。这里的负载指的是未处理完的网络请求数。网络
总的来讲,TCP链接是在Sender线程运行过程当中建立的,因此即便producer不发送任何消息(即显式调用producer.send),底层的TCP链接也是会被建立出来的。数据结构
在转到下一个话题以前,我想聊聊针对这种设计的一些本身的理解:如社区文档所说,KafkaProducer类是线程安全的。我虽然没有详尽地去验证过是否真的thread-safe,但根据浏览源码大体能够得出这样的结论:producer主线程和Sender线程共享的可变数据结构大概就只有RecordAccumulator类,所以维护RecordAccumulator类的线程安全也就实现了KafkaProducer的线程安全,而RecordAccumulator类中主要的数据结构是ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,并且凡是用到Deque的地方基本上都由Java monitor lock来保护,因此基本上能够认定RecordAccumulator的线程安全性。this
我这里真正想说的是,即便KafkaProducer类是线程安全的,我其实也不太赞同建立KafkaProducer实例时当即启动Sender线程的作法。Brian Goetz大神著做《Java Concurrency in Practice》中明确给出了这样作的风险:在对象构造器中启动线程会形成this指针的逃逸——理论上Sender线程彻底可以看到一个未构造完整的KafkaProducer实例。固然在构造KafkaProducer实例时建立Sender线程实例自己没有任何问题,但最好不要启动它。spa
2、建立多少个TCP链接.net
咱们仍是结合日志来看。此次producer开始发送消息,日志以下:
[2018-12-09 10:06:46,761] DEBUG [Producer clientId=producer-1] 开始发送消息...
[2018-12-09 10:06:46,762] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: 0 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 10:06:46,762] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: 0 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
[2018-12-09 10:06:46,765] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: 1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 10:06:46,766] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: 1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
[2018-12-09 10:06:46,770] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=test) to node localhost:9092 (id: 0 rack: null) (org.apache.kafka.clients.NetworkClient:1068)
日志告诉咱们,producer又建立了与localhost:909二、localhost:9093的TCP链接。加上最开始建立的两个TCP链接,目前producer总共建立了4个TCP链接,连向localhost:9092和localhost:9093各有两个。再次注意标红的broker id——此时id再也不是负值了,或者说此时它们是真正的broker id了(即在server.properties中broker.id指定的值)。这个结论告诉了咱们一个有意思的事实:当前版本下(2.1.0),Kafka producer会为bootstrap.servers中指定的每一个broker都建立两个TCP链接:第一个TCP链接用于首次获取元数据信息;第二个TCP链接用于消息发送以及以后元数据信息的获取。注意,第一个TCP链接中broker id是假的;第二个TCP链接中broker id才是真实的broker id。
另外,注意上面日志的最后一行。当producer再次发送METADATA请求时它使用的是新建立的TCP链接,而非最开始的那个TCP链接。这点很是关键!这揭示了一个事实:最开始建立的TCP链接将再也不被使用,或者说彻底被废弃掉了。
3、什么时候关闭TCP链接
Producer端关闭TCP链接的方式有两种:一种是用户主动关闭;一种是Kafka自动关闭。咱们先说第一种,这里的主动关闭其实是广义的主动关闭,甚至包括用户调用kill -9主动“杀掉”producer应用。固然最推荐的方式仍是调用producer.close方法来关闭。第二种则是Kafka帮你关闭,这与producer端参数connections.max.idle.ms的值有关。默认状况下该参数值是9分钟,即若是在9分钟内没有任何请求“流过”该某个TCP链接,那么Kafka会主动帮你把该TCP链接关闭。用户能够在producer端设置connections.max.idle.ms=-1禁掉这种机制。一旦被设置成-1,TCP链接将成为永久长链接。固然这只是软件层面的“长链接”机制,因为Kafka建立的这些Socket链接都开启了keepalive,所以keepalive探活机制仍是会遵照的。
4、可能的问题?
显然,这种机制存在一个问题:假设你的producer指定了connections.max.idle.ms = -1(由于TCP链接的关闭与建立也是有开销的,故不少时候咱们确实想要禁掉自动关闭机制)并且bootstrap.servers指定了集群全部的broker链接信息。咱们假设你的broker数量是N,那么producer启动后它会建立2 * N个TCP链接,而其中的N个TCP链接在producer正常工做以后不再会被使用且不会被关闭。实际上,producer只须要N个TCP链接便可与N个broker进行通信。为了请求元数据而建立的N个TCP链接彻底是浪费——我我的倾向于认为Kafka producer应该重用最开始建立的那N个链接,所以我以为这是一个bug。
形成重复建立TCP链接的根本缘由在于broker id的记录。就像以前说到的,最开始producer请求元数据信息时它确定不知道broker的id信息,故它作了一个假的id(从-1开始,而后是-2, -3。。。。),同时它将这个id保存起来以判断是否存在与这个broker的TCP链接。Broker端返回元数据信息后producer获知了真正的broker id,因而它拿着这个broker id去判断是否存在与该broker的TCP链接——天然是不存在,所以它从新建立了一个新的Socket链接。这里的问题就在于咱们不能仅仅依靠broker id来判断是否存在链接。实际上使用host:port对来判断多是更好的方法。也许社区能够考虑在后续修正这个问题。
5、总结
简单总结一下当前的结论,针对最新版本Kafka(2.1.0)而言,Java producer端管理TCP链接的方式是:
1. KafkaProducer实例建立时启动Sender线程,从而建立与bootstrap.servers中全部broker的TCP链接
2. KafkaProducer实例拿到元数据信息以后还会再次建立与bootstrap.servers中全部broker的TCP链接
3. 步骤1中建立的TCP链接只用于首次获取元数据信息(实际上也只是会用到其中的一个链接,其余的N - 1个甚至彻底不会被用到)
4. 若是设置producer端connections.max.idle.ms参数大于0,则步骤1中建立的TCP链接会被自动关闭;若是设置该参数=-1,那么步骤1中建立的TCP链接将成为“僵尸”链接
5. 当前producer判断是否存在与某broker的TCP链接依靠的是broker id,这是有问题的,依靠<host, port>对多是更好的方式