以前说过,Kafka 本身实现了一个 Selector 来管理最底层的 i/o 操做,selector 以轮询模式驱动不一样事件,来通知网络通道进行读写操做。java
而更加高层的 NetworkClient 则 管理多个节点的客户端请求,来驱动 selector 进行工做。那么它具体作些什么呢?node
(1) =>3.3 ClientRequest的监听器,适配器,与监听器链json
(2) =>2、KafkaSelector的管理api
(3) =>2、KafkaSelector的管理网络
(4) =>4、ClientRequest的后续处理数据结构
(5) =>4、ClientRequest的后续处理socket
(6) =>4、ClientRequest的后续处理ide
(7) =>4、ClientRequest的后续处理this
(8) =>3.3 ClientRequest的监听器,适配器,与监听器链.net
/* the state of each node's connection */ private final ClusterConnectionStates connectionStates;
它有一个管理全部节点链接状态的connectionStates,里面的实现比较简单,实际上就是维护一个退避时间以及一个管理全部节点链接状态的Map。
退避时间用于避免屡次请求链接某个节点。
final class ClusterConnectionStates { private final long reconnectBackoffMs; private final Map<String, NodeConnectionState> nodeState; /** * The state of our connection to a node */ private static class NodeConnectionState { ConnectionState state; long lastConnectAttemptMs; public NodeConnectionState(ConnectionState state, long lastConnectAttempt) { this.state = state; this.lastConnectAttemptMs = lastConnectAttempt; } public String toString() { return "NodeState(" + state + ", " + lastConnectAttemptMs + ")"; } }
开启一个链接也比较简单,实际上就是让selector去connect,若是失败则将其标记为链接失败,成功则标记为链接成功。
private void initiateConnect(Node node, long now) { String nodeConnectionId = node.idString(); try { log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); this.connectionStates.connecting(nodeConnectionId, now); selector.connect(nodeConnectionId, new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(nodeConnectionId, now); /* maybe the problem is our metadata, update it */ metadataUpdater.requestUpdate(); log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); } }
断开链接则更加暴力,代码很简单,这里就不说了。
@Override public void close(String nodeId) { selector.close(nodeId); for (ClientRequest request : inFlightRequests.clearAll(nodeId)) metadataUpdater.maybeHandleDisconnection(request); connectionStates.remove(nodeId); }
Kafka源码剖析 —— 网络I/O篇 —— 浅析KafkaChannel
inFlightRequests:发送中的ClientRequest。
上篇文章Kafka源码剖析 —— 生产者消息发送流水线上的大致流程说到,Kafka底层消息的发送单元是基于可发送对象ClientRequest的。Kafka全部发送的东西,最终都会包装成ClientRequest。
/** * A request being sent to the server. This holds both the network send as well as the client-level metadata. */ public final class ClientRequest { private final long createdTimeMs; private final boolean expectResponse; private final RequestSend request; private final RequestCompletionHandler callback; private final boolean isInitiatedByNetworkClient; private long sendTimeMs; }
ClientRequest 比较简单:
expectResponse表明需不须要ack。 RequestSend能够是kafka本身的数据结构,有点像json,实现有些复杂,用起来也有些复杂,可是实际上理解起来很简单,这里不过多阐述。 RequestCompletionHandler回调,是kafka中比较重要的部分。
RequestCompletionHandler,定义了请求成功后,KafkaClient将调用其complete方法。
public static class RequestFutureCompletionHandler extends RequestFuture<ClientResponse> implements RequestCompletionHandler { @Override public void onComplete(ClientResponse response) { if (response.wasDisconnected()) { ClientRequest request = response.request(); RequestSend send = request.request(); ApiKeys api = ApiKeys.forId(send.header() .apiKey()); int correlation = send.header() .correlationId(); log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected", api, request, correlation, send.destination()); raise(DisconnectException.INSTANCE); } else { complete(response); } } }
伪代码:在这个ClientRequest中,咱们在回调中简单的将response打印出来,response.responseBody()将得到一个Struct对象(将其理解成相似json就好了)。
ClientRequest clientRequest = new ClientRequest(System.currentTimeMillis(), true, new RequestSend(null, null, null), new RequestCompletionHandler() { @Override public void onComplete(ClientResponse response) { System.out.println(response.responseBody()); } }); networkClient.doSend(clientRequest,System.currentTimeMillis());
ClientRequest 其实是个很巧妙的设计,仿照Kafka,实现一个简单的监听器 + 适配器吧!
咱们从生产者发送一个JoinGroup请求来看看这一系列的流程:
RequestFuture<ByteBuffer> future = client.send(coordinator, ApiKeys.JOIN_GROUP, request) .compose(new JoinGroupResponseHandler());
拿到future之后:
future.addListener(new RequestFutureListener<ByteBuffer>() { @Override public void onSuccess(ByteBuffer value) { // handle join completion in the callback so that the callback will be invoked // even if the consumer is woken up before finishing the rebalance onJoinComplete(generation, memberId, protocol, value); needsJoinPrepare = true; heartbeatTask.reset(); } @Override public void onFailure(RuntimeException e) { // we handle failures below after the request finishes. if the join completes // after having been woken up, the exception is ignored and we will rejoin } });
看起来十分简单,但其实是这样的:
一、首先咱们发送了一个client.send(coordinator, ApiKeys.JOIN_GROUP, request)请求,实际上全部的请求都会使用默认的RequestFutureCompletionHandler。
也就是成功后,会调用Future<ClientResponse>的complete方法。
public static class RequestFutureCompletionHandler extends RequestFuture<ClientResponse> implements RequestCompletionHandler { @Override public void onComplete(ClientResponse response) { if (response.wasDisconnected()) { ClientRequest request = response.request(); RequestSend send = request.request(); ApiKeys api = ApiKeys.forId(send.header() .apiKey()); int correlation = send.header() .correlationId(); log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected", api, request, correlation, send.destination()); raise(DisconnectException.INSTANCE); } else { complete(response); } } }
二、默认的RequestFutureCompletionHandler的complete方法后,会触发针对这个future一系列的监听器,也就是会触发到咱们compose(new JoinGroupResponseHandler())的这个onSuccess方法。
private void fireSuccess() { for (RequestFutureListener<T> listener : listeners) listener.onSuccess(value); }
咱们的compose将Future<ClientResponse>适配成了一个新的Future<ByteBuffer>,咱们将其称为JoinGroupFuture,触发JoinGroupFuture的onSuccess会发生什么呢?
public void onSuccess(ClientResponse clientResponse, RequestFuture<ByteBuffer> future) { try { this.response = clientResponse; JoinGroupResponse responseObj = parse(clientResponse); handle(responseObj, future); } catch (RuntimeException e) { if (!future.isDone()) { future.raise(e); } } }
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future/* sendJoinGroupRequest#joinGroupFuture */) { Errors error = Errors.forCode(joinResponse.errorCode()); if (error == Errors.NONE) { log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct()); AbstractCoordinator.this.memberId = joinResponse.memberId(); AbstractCoordinator.this.generation = joinResponse.generationId(); // 正常收到JoinGroupResponse的相应,将rejoin设置为false AbstractCoordinator.this.rejoinNeeded = false; AbstractCoordinator.this.protocol = joinResponse.groupProtocol(); sensors.joinLatency.record(response.requestLatencyMs()); if (joinResponse.isLeader()) { onJoinLeader(joinResponse).chain(future); } else { onJoinFollower().chain(future); }
三、以onJoinFollower为例,onJoinFollower().chain(future);发生了什么?
又发送了一个新的request,并将其compose成了Future<ByteBuffer> SyncGroupFuture,重要的是!这里将JoinGroupFuture Chain 到了 SyncGroupFuture中。
chain就是在 SyncGroupFuture Success时,调用JoinGroupFuture的Complete。
private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest request) { if (coordinatorUnknown()) { return RequestFuture.coordinatorNotAvailable(); } return client.send(coordinator, ApiKeys.SYNC_GROUP, request) .compose(new SyncGroupResponseHandler()); }
public void chain(final RequestFuture<T> future) { addListener(new RequestFutureListener<T>() { @Override public void onSuccess(T value) { future.complete(value); } @Override public void onFailure(RuntimeException e) { future.raise(e); } }); }
也就是说,咱们的JoinGroupFuture会在SyncGroupFuture Success后才Success。
private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> { @Override public SyncGroupResponse parse(ClientResponse response) { return new SyncGroupResponse(response.responseBody()); } @Override public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) { Errors error = Errors.forCode(syncResponse.errorCode()); if (error == Errors.NONE) { log.info("Successfully joined group {} with generation {}", groupId, generation); sensors.syncLatency.record(response.requestLatencyMs()); future.complete(syncResponse.memberAssignment());
四、最后,当SyncGroupFuture Success后,调用JoinGroupFuture的Success,来到了咱们最开始的
future.addListener(new RequestFutureListener<ByteBuffer>() { @Override public void onSuccess(ByteBuffer value) { // handle join completion in the callback so that the callback will be invoked // even if the consumer is woken up before finishing the rebalance onJoinComplete(generation, memberId, protocol, value); needsJoinPrepare = true; heartbeatTask.reset(); } @Override public void onFailure(RuntimeException e) { // we handle failures below after the request finishes. if the join completes // after having been woken up, the exception is ignored and we will rejoin } });
也就是说,一个JoinGroupRequest经历了两个请求,只有在两个请求都成功之后,才会触发JoinGroupFuture的Listener
future.addListener(new RequestFutureListener<ByteBuffer>() { @Override public void onSuccess(ByteBuffer value) { // handle join completion in the callback so that the callback will be invoked // even if the consumer is woken up before finishing the rebalance onJoinComplete(generation, memberId, protocol, value); needsJoinPrepare = true; heartbeatTask.reset(); } @Override public void onFailure(RuntimeException e) { // we handle failures below after the request finishes. if the join completes // after having been woken up, the exception is ignored and we will rejoin } });
完成这一系列的控制,都要归功于Kafka 监听器+适配器的灵活使用与组合,让咱们看看这两个神奇的方法:
/** * Convert from a request future of one type to another type * * @param adapter The adapter which does the conversion * @param <S> The type of the future adapted to * * @return The new future */ public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) { final RequestFuture<S> adapted = new RequestFuture<S>(); addListener(new RequestFutureListener<T>() { // 实际上这里就是让原来的 future 在succeed 时,会调用 新future 的 onSuccess 方法 @Override public void onSuccess(T value) { adapter.onSuccess(value, adapted); } @Override public void onFailure(RuntimeException e) { adapter.onFailure(e, adapted); } }); // 返回的这个新的 future 对象 return adapted; } public void chain(final RequestFuture<T> future) { addListener(new RequestFutureListener<T>() { @Override public void onSuccess(T value) { future.complete(value); } @Override public void onFailure(RuntimeException e) { future.raise(e); } }); }
inFlightRequests能够简单理解为一个Map,Key为node,value则是Deque<ClientRequest>,每当咱们发送一个请求,就把ClientRequest扔进去,
this.inFlightRequests.add(request);
在发送完成后,不须要ack的,发送完成就直接结束了
private void handleCompletedSends(List<ClientResponse> responses, long now) { // if no response is expected then when the send is completed, return it // 这个 completedSends 是能保证消息必定发送出去了的 for (Send send : this.selector.completedSends()) { ClientRequest request = this.inFlightRequests.lastSent(send.destination()); // 判断一下这个request需不须要ack,若是不须要ack,添加到返回列表中 if (!request.expectResponse()) { this.inFlightRequests.completeLastSent(send.destination()); responses.add(new ClientResponse(request, now, false, null)); } } }
而须要ack的,则将结果封装起来,扔到ClientResponse里
private void handleCompletedReceives(List<ClientResponse> responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { String source = receive.source(); ClientRequest req = inFlightRequests.completeNext(source); // Receives,payload 是拿到 接收到的buffer的引用 Struct body = parseResponse(receive.payload(), req.request() .header()); if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body)) { responses.add(new ClientResponse(req, now, false, body)); } } }
inFlightRequests 还可用于判断节点的负载情况,控制总请求数等等。
咱们在发送消息时,会将消息扔进inFlightRequests里面。而进行一次轮询后,KafkaSelector会将各类消息对应地扔进发送完成的列表completedSends,接收完成的列表completedReceives,断开链接的列表disconnected之类的。
在一次轮询后,NetworkClient会根据上面列表的不一样采起不一样的处理。另外,还会额外处理inFlightRequests中超时的请求。
获取超时的请求实际上十分简单,就是将全部node的最先的一个request拿出来,看看是否是超时了,若是超时了,NetworkClient会断开它的链接。
public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) { List<String> nodeIds = new LinkedList<String>(); for (String nodeId : requests.keySet()) { if (inFlightRequestCount(nodeId) > 0) { ClientRequest request = requests.get(nodeId).peekLast(); long timeSinceSend = now - request.sendTimeMs(); if (timeSinceSend > requestTimeout) { nodeIds.add(nodeId); } } } return nodeIds; }
最后,NetworkClient会分别调用这些ClientRequest的回调,也就是上面 3.3 ClientRequest的监听器,适配器,与监听器链的触发条件。
参考书籍: 《Kafka技术内幕》 郑奇煌著 《Apache Kafka源码剖析》 徐郡明著