ConsumerNetworkClient
。ConsumerCoordinator
初始化拉取器Fetcher
java
subscribe()、assign()
会将订阅信息记录到SubscriptionState
,屡次订阅会覆盖旧数据。若是元数据缓存Metadata
不包含订阅的主题,则设置needUpdate=true
,标识须要更新元数据。缓存
poll()
方法指定超时时间timeoutMs
,在这个时间范围内不断轮询。拉取超时后,返回空记录。网络
private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) { long elapsedTime = 0L; do { final long metadataEnd; // 更新分配元数据,协调器、心跳、消费位置 if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) { return ConsumerRecords.empty(); } // 拉取消息 final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime)); if (!records.isEmpty()) { // 消息不为空时,当即发起下一轮的拉取消息,避免阻塞等待响应处理。 // 注意,在消息返回以前,不能触发唤醒或其余错误。 if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { client.pollNoWakeup(); } // 回调执行消费者拦截器后返回给消费者 return this.interceptors.onConsume(new ConsumerRecords<>(records)); } final long fetchEnd = time.milliseconds(); elapsedTime += fetchEnd - metadataEnd; } while (elapsedTime < timeoutMs); // 轮询拉取,知道超过输入的超时时间 return ConsumerRecords.empty(); }
PartitionRecords
存在缓存记录,则优先会从分区记录缓存队列completedFetches
中拉取一部分记录,直接返回。LinkedHashMap
。ConcurrentHashMap<Node, ConcurrentLinkedQueue<ClientRequest>> unsent
中,同时添加处理响应的监听器。unsent
,使用NetworkClient
发送请求,这里和生产者使用相同的方法,处理流程类似。发送完后即清空unsent
。ConcurrentLinkedQueue<CompletedFetch> completedFetches
。completedFetches
中拉取一部分记录返回给消费者。private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) { final long startMs = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs); final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); // 从缓存队列拉取 if (!records.isEmpty()) { // 缓存中有数据则直接返回 return records; } // 1.将拉取请求构形成节点和请求的映射关系,并缓存在 unsent // 2.添加响应处理监听器,处理发送拉取请求后,从服务端返回的消息,并缓存在队列中 fetcher.sendFetches(); // 用 NetworkClient 向服务端发送拉取请求 client.poll(pollTimeout, startMs, () -> return !fetcher.hasCompletedFetches()); return fetcher.fetchedRecords(); // 再次从缓存队列拉取 } // 从缓存拉取队列拉取消息 public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() { Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>(); int recordsRemaining = maxPollRecords; while (recordsRemaining > 0) { // 在超时时间内不断轮询 if (nextInLineRecords == null || nextInLineRecords.isFetched) { // 分区记录为空,或者已拉取 CompletedFetch completedFetch = completedFetches.peek(); // 从缓存队列拉取消息 nextInLineRecords = parseCompletedFetch(completedFetch); // 将消息解析成分区消息记录 PartitionRecords completedFetches.poll(); // 对缓存队列移除 } else { List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining); // 从分区记录拉取消息 TopicPartition partition = nextInLineRecords.partition; if (!records.isEmpty()) { // 拉取到消息,方法 Map,以返回给消费者 fetched.put(partition, records); } } return fetched; }