(依据于0.10.0.0版本)java
这个接口的惟一实现类就是NetworkClient,它被用于实现Kafka的consumer和producer. 这个接口实际上抽象出来了Kafka client与网络交互的方式。node
为了对它的API有清楚的认识,先要了解下Kafka protocol所要求的client和broker对于网络请求的处理规则。react
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocolapache
The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well. The broker's request processing allows only a single in-flight request per connection in order to guarantee this ordering. Note that clients can (and ideally should) use non-blocking IO to implement request pipelining and achieve higher throughput. i.e., clients can send requests even while awaiting responses for preceding requests since the outstanding requests will be buffered in the underlying OS socket buffer. All requests are initiated by the client, and result in a corresponding response message from the server except where noted.缓存
这一段的信息量挺大的。网络
首先,broker按照请求被发送的顺序处理请求,而且按照一样的顺序发送响应。由于Kafka对消息的顺序性有以下的保证:session
- Messages sent by a producer to a particular topic partition will be appended in the order they are sent. That is, if a message M1 is sent by the same producer as a message M2, and M1 is sent first, then M1 will have a lower offset than M2 and appear earlier in the log.
- A consumer instance sees messages in the order they are stored in the log.
为了实现这种顺序性保证,最简单可靠的行为就是"The broker's request processing allows only a single in-flight request per connection in order to guarantee this ordering. ", 也就是说对于一个TCP链接,broker的请求处理链条中只会有一个正在处理的(in-flight)消息.app
那么,Kafka在broker端需不须要缓存待处理的消息呢?异步
首先,若是缓存请求的话,可能会占用大量内存.其次,若是缓存请求的话,在请求处理出错时,会使得Kafka client难以控制消息的顺序,由于本质上,这种缓存使得client的请求是异步处理的.而若是不进行缓存,那么broker的行为对于client而言更容易理解.socket
因此,broker是不会在本地缓存请求的.当它从某个链接读取一个请求以后,就会中止从这个链接继续读取请求.也就是说对于每一个TCP链接,broker的处理流程是:接收一个请求 -> 处理请求 -> 发送响应 -> 接收下一个请求 -> ...
具体的作法,能够在kafka.network.Processor(也就是reactive模型里的subRactor) 找到,在其run方法中,对于已经完整读取的request和发送完毕的response, 有如下的处理
selector.completedReceives.asScala.foreach { receive => try { val channel = selector.channel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) requestChannel.sendRequest(req) //把请求送入requestChannel,之后request handler会从中取出request来处理 selector.mute(receive.source) //中止从这个request的来源(并不仅用host来区分)读取消息 } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error("Closing socket for " + receive.source + " because of error", e) close(selector, receive.source) } } selector.completedSends.asScala.foreach { send => val resp = inflightResponses.remove(send.destination).getOrElse { throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`") } resp.request.updateRequestMetrics() selector.unmute(send.destination) //将已发送完毕的response的源设为可读的 }
可见,对于正在处理的请求,broker不会从它的来源再读取新的消息,直至请求被处理完毕,而且其响应被发送完毕。
另外一方面,对于client,若是它接收到上一个请求的响应以后,才开始生成新的请求,而后再发送新请求,那么在等待响应的过程当中,client就处理等待状态,这样挺没效率.所以,"clients can send requests even while awaiting responses for preceding requests since the outstanding requests will be buffered in the underlying OS socket buffer",也就是说client能够在等待响应的过程当中继续发送请求,由于即便broker不去经过网络读这些请求,这些请求也会被缓存在OS的socket buffer中,所以,当broker处理完以前的请求,就能够当即读出来新的请求.不过,若是client这么作的话,会使得它的行为更复杂(由于涉及到出错时的顺序性).
对于consumer,在接收到响应以前难以肯定下一次fetch开始的offset,所以在收到前一个fetch respones以后才发送下一次fetch request是比较稳妥的作法.不过若是能够比较准确判断fetch响应包含消息的数目,比而提早发出fetch request,的确有可能会提交consumer的性能.
并且,"收到fetch respone"和"用户处理完fetch到的消息"这两个时间点仍是有所不一样的,在收到fetch response以后,把抓取到的消息交给用户处理以前,发出下一个fetch request,这样能够提升consumer抓取的效率.新的consumer-KafkaConsumer的确是这么作的.这是KafkaConsumer的poll方法里的一段代码(用户经过执行这个poll方法来获取消息)
do { Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE that we use quickPoll() in this case which disables wakeups and delayed // task execution since the consumed positions has already been updated and we // must return these records to users to process before being interrupted or // auto-committing offsets fetcher.sendFetches(metadata.fetch()); client.quickPoll(); return this.interceptors == null ? new ConsumerRecords<>(records) : this.interceptors.onConsume(new ConsumerRecords<>(records)); } long elapsed = time.milliseconds() - start; remaining = timeout - elapsed; } while (remaining > 0);
中间的那一大段就是在说这个事情,可是它考虑的状况比刚才提到的要复杂一些.
首先,若是pollOnce获得的records不为空,就要把这些records返回给用户,因此在此以前要先发送一批fetch rquest(利用Fetcher#sendFetches).若是为空的话,在do-while循环里的pollOnce会发送新的fetch request.
其次,因为Fetcher的sendFetches并不会执行网络IO操做,而只是生成而且缓存fetch request,因此还须要利用ConsumerNetworkClient的quickPoll方法来执行一次IO操做把这些fetch request发出去.可是因为此时用户尚未获得此次pollOnce返回的records, 所以不能进行auto-commit操做,不然就会把还没返回给用户的records给commit了,而且也不能使得处理的过程被别的线程中断,由于这样用户也拿不到这些records了.因此,这里调用quickPoll,quickPoll会禁止wakeUp,而且不执行DelayedTasks(由于AutoCommitTask就是经过DelayedTask机制执行的).
Kafka的broker是一个典型的Reactor模型的socket server。其中Processor相关于sub reactor,而HandlerPool至关于worker pool. Processor和Handler 都有各自的线程,它们之间经过一些队列来传递请求和响应。Kafka把这些队列封装成了RequestChannel。
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { private var responseListeners: List[(Int) => Unit] = Nil private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize) private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
... }
Kafka对于一个链接一次只处理一个请求的特性,决定了这里的两种队列的类型。其中,存放请求的队列用的是ArrayBlockingQueue,队列大小为queuSize,而存放响应的队列用的是LinkedBlockingQueue,它的capcity是Integer.MAX_VALUE。
存放请求的队列必须用有界的阻塞队列,不然可能会有太多的请求撑爆内存。而使用有界队列,事实上能够阻塞Processor线程,使得在请求队列满的状况下,Broker拒绝新的请求。
可是响应队列选用无界的队列,其缘由倒是很隐晦的。
总的待发送响应的个数因为请求队列的限制,一般不会太大。但这也不意味着这种选择不会出问题,由于在最差状况下,可能会有至关于总的链接数的待发送响应。想象一种状况,假设有很是多的consumer(好比1W个)发送fetch请求,每一个请求抓取1M的数据,但这些consumer都不从socket中读取响应,那么会有什么状况发生呢?不是会把内存爆掉吗?事实上,因为Kafka在发送响应时的zero copy特性,使得FetchRepsonse自己不会占用太大内存,因此即便有很是多的待发送响应,但响应对象所占的大小跟要传送的数据比,仍是一般要小不少(取决于fetch请求的fetch size)。其它的响应,实际上也不会特别大,对于一个大集群,占用内存最大的也就是Metadata相关的响应了。
可是另外一方面,若是这个队列用有界的,那么当全部Handler都阻塞于往这些队列put元素,而全部Processor都阻塞于往RequestQueue里put元素,那么整个server就死锁了。因此Kafka仍是用了无界的队列。
另外一个有趣的队列就是Processor和Acceptor之间存放新创建的链接的队列了。
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
这里用了ConcurrentLinkedQueue,由于新链接的处理和消息的发送/接收是在同一个循环中的,因此存放消息的队列是非阻塞的更合适一些。
KafkaClient,是producer和consumer与broker通讯的接口,它的设计就创建在上边的协议的基础上。这个类包括了与链接状态和请求-响应状态有关的方法。producer和consumer实际使用的它的实现类是NetworkClient。如下方法的做用结合了KafkaClient和NetworkClient的注释,但以NetworkClient的实现为标准。
public boolean isReady(Node node, long now) 查看某个结点是否准备好发送新请求了。因为是给client用的,所以这里的“node"就是broker
public boolean ready(Node node, long now)是到指定node的链接已经被建立好而且能够发送请求。若是链接没有建立,就建立到这个node的链接。
public long connectionDelay(Node, long now) 基于链接状态,返回须要等待的时间。链接的状态有三种:disconnected, connecting, connected. 若是是disconnected状态,就返回reconnect的backoff time。当connecting或者connected,就返回Long.MAX_VALUE,由于此时须要等待别的事件发生(好比链接成功,或者收到响应)
public long connectionFailed(Node node) 查看到这个node的链接是否失败。
public void send(ClientRequest request, long now) 把这个request放入发送队列。若是request是要发给尚未链接好的node的,那么就会抛出IllegalStateException异常, 这是一个运行时异常。
public List<ClientResponse> poll(long timeout, long now) 对于socket进行读写操做。
public void close(String nodeId) 关闭到指定node的链接
public Node leastLoadedNode(long now) 选择有最少的未发送请求的node,要求这些node至少是能够链接的。这个方法会优先选择有可用的链接的节点,可是若是全部的已链接的节点都在使用,它就会选择尚未创建链接的节点。这个方法绝对不会选择忆经断开链接的节点或者正在reconnect backoff阶段的链接。
public int inFlightRequestCount() 全部已发送但还没收到响应的请求的总数
public int inFlightRequestCount(String nodeId) 对于某个特定node的in-flight request总数
public RequestHandler nextRequestHanlder(ApiKeys key) 为某种请求构造它的请求头。按照Kafka Protoocl, request包括如下部分:
RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage
ApiKey => int16
ApiVersion => int16
CorrelationId => int32
ClientId => string
RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
|
而这个方法构造了ApiKey, ApiVersion, CoorelationId和ClientId,做为请求的头部,request handler在源码里有对应类org.apache.kafka.common.requests.RequestHandler。
ApiKey表示请求的种类, 如produce request, fetch request, metadata request等。
puclic RequestHandler nextRequestHandler(ApiKey key, short version) 构造请求的头部,使用特定版本号。
public void wakeup() 若是这个client正在IO阻塞状态,就唤醒它。
Kafka protocol的一些细节,在Kafka client的接口设计中获得了体现.而且,有一些小细节是挺有意思的.
下面会看一下NetworkClient,它是KafkaClient接口的实现.