一 重要的字段 String clientId:Consumer惟一标识 ConsumerCoordinator coordinator: 控制Consumer与服务器端GroupCoordinator之间的通讯逻辑 Fetcher<K, V> fetcher: 负责从服务器端获取消息的组件,而且更新partition的offset ConsumerNetworkClient client: 负责和服务器端通讯 SubscriptionState subscriptions: 便于快速获取topic partition等状态,维护了消费者消费状态 Metadata metadata: 集群元数据信息 AtomicLong currentThread: 当前使用KafkaConsumer的线程id AtomicInteger refcount: 重入次数 二 核心的方法 2.1 subscribe 订阅主题 订阅给定的主题列表,以得到动态分配的分区 主题的订阅不是增量的,这个列表将会代替当前的分配。注意,不可能将主题订阅与组管理与手动分区分配相结合 做为组管理的一部分,消费者将会跟踪属于某一个特殊组的消费者列表,若是知足在下列条件,将会触发再平衡操做: 1 订阅的主题列表的那些分区数量的改变 2 主题建立或者删除 3 消费者组的成员挂了 4 经过join api将一个新的消费者添加到一个存在的消费者组 public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { // 取得一把锁 acquire(); try { if (topics == null) { // 主题列表为null,抛出异常 throw new IllegalArgumentException("Topiccollection to subscribe to cannot be null"); } else if (topics.isEmpty()) {// 主题列表为空,取消订阅 this.unsubscribe(); } else { for (String topic : topics) { if (topic == null || topic.trim().isEmpty()) throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or emptytopic"); } log.debug("Subscribed to topic(s):{}", Utils.join(topics, ", ")); this.subscriptions.subscribe(new HashSet<>(topics), listener); // 用新提供的topic集合替换当前的topic集合,若是启用了主题过时,主题的过时时间将在下一次更新中从新设置。 metadata.setTopics(subscriptions.groupSubscription()); } } finally { // 释放锁 release(); } } 2.2 assign 手动分配分区 对于用户手动指定topic的订阅模式,经过此方法能够分配分区列表给一个消费者: public void assign(Collection<TopicPartition> partitions) { acquire(); try { if (partitions == null) { throw new IllegalArgumentException("Topic partition collection to assign to cannot be null"); } else if (partitions.isEmpty()) {// partition为空取消订阅 this.unsubscribe(); } else { Set<String> topics = new HashSet<>(); // 遍历TopicPartition,把topic添加到一个集合里 for (TopicPartition tp : partitions) { String topic = (tp != null) ? tp.topic() : null; if (topic == null || topic.trim().isEmpty()) throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); topics.add(topic); } // 进行一次自动提交 this.coordinator.maybeAutoCommitOffsetsNow(); log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", ")); // 根据用户提供的指定的partitions 改变assignment this.subscriptions.assignFromUser(new HashSet<>(partitions)); metadata.setTopics(topics);// 更新metatdata topic } } finally { release(); } } 2.3 commitSync & commitAsync 提交消费者已经消费完的消息的offset,为指定已订阅的主题和分区列表返回最后一次poll返回的offset public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) { acquire(); try { coordinator.commitOffsetsSync(offsets); } finally { release(); } } public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { acquire(); try { log.debug("Committing offsets: {} ", offsets); coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback); } finally { release(); } } 2.4 seek 指定消费者消费的起始位置 public void seek(TopicPartition partition, long offset) { if (offset < 0) { throw new IllegalArgumentException("seek offset must not be a negative number"); } acquire(); try { log.debug("Seeking to offset {} for partition {}", offset, partition); this.subscriptions.seek(partition, offset); } finally { release(); } } // 为指定的分区查找第一个offset public void seekToBeginning(Collection<TopicPartition> partitions) { acquire(); try { Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; for (TopicPartition tp : parts) { log.debug("Seeking to beginning of partition {}", tp); subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST); } } finally { release(); } } // 为指定的分区查找最后的offset public void seekToEnd(Collection<TopicPartition> partitions) { acquire(); try { Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; for (TopicPartition tp : parts) { log.debug("Seeking to end of partition {}", tp); subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); } } finally { release(); } } 2.5 poll方法 获取消息 从指定的主题或者分区获取数据,在poll以前,你没有订阅任何主题或分区是不行的,每一次poll,消费者都会尝试使用最后一次消费的offset做为接下来获取数据的start offset,最后一次消费的offset也能够经过seek(TopicPartition, long)设置或者自动设置 public ConsumerRecords<K, V> poll(long timeout) { acquire(); try { if (timeout < 0) throw new IllegalArgumentException("Timeout must not be negative"); // 若是没有任何订阅,抛出异常 if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); // 一直poll新数据直到超时 long start = time.milliseconds(); // 距离超时还剩余多少时间 long remaining = timeout; do { // 获取数据,若是自动提交,则进行偏移量自动提交,若是设置offset重置,则进行offset重置 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining); if (!records.isEmpty()) { // 再返回结果以前,咱们能够进行下一轮的fetch请求,避免阻塞等待 fetcher.sendFetches(); client.pollNoWakeup(); // 若是有拦截器进行拦截,没有直接返回 if (this.interceptors == null) return new ConsumerRecords<>(records); else return this.interceptors.onConsume(new ConsumerRecords<>(records)); } long elapsed = time.milliseconds() - start; remaining = timeout - elapsed; } while (remaining > 0); return ConsumerRecords.empty(); } finally { release(); } } private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) { // 轮询coordinator事件,处理周期性的offset提交 coordinator.poll(time.milliseconds()); // fetch positions if we have partitions we're subscribed to that we // don't know the offset for // 判断上一次消费的位置是否为空,若是不为空,则 if (!subscriptions.hasAllFetchPositions()) // 更新fetch position updateFetchPositions(this.subscriptions.missingFetchPositions()); // 数据你准备好了就当即返回,也就是还有可能没有准备好 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); if (!records.isEmpty()) return records; // 咱们须要发送新fetch请求 fetcher.sendFetches(); long now = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout); client.poll(pollTimeout, now, new PollCondition() { @Override public boolean shouldBlock() { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); } }); // 早长时间的poll以后,咱们应该在返回数据以前检查是否这个组须要从新平衡,以致于这个组可以迅速的稳定 if (coordinator.needRejoin()) return Collections.emptyMap(); // 获取返回的消息 return fetcher.fetchedRecords(); } 2.6 pause 暂停消费者,暂停后poll返回空 public void pause(Collection<TopicPartition> partitions) { acquire(); try { for (TopicPartition partition: partitions) { log.debug("Pausing partition {}", partition); subscriptions.pause(partition); } } finally { release(); } } // 返回暂停的分区 public Set<TopicPartition> paused() { acquire(); try { return Collections.unmodifiableSet(subscriptions.pausedPartitions()); } finally { release(); } } 2.7 resume 恢复消费者 public void resume(Collection<TopicPartition> partitions) { acquire(); try { for (TopicPartition partition: partitions) { log.debug("Resuming partition {}", partition); subscriptions.resume(partition); } } finally { release(); } } 2.8 position方法 获取下一个消息的offset // 获取下一个record的offset public long position(TopicPartition partition) { acquire(); try { if (!this.subscriptions.isAssigned(partition)) throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer."); Long offset = this.subscriptions.position(partition); if (offset == null) { updateFetchPositions(Collections.singleton(partition)); offset = this.subscriptions.position(partition); } return offset; } finally { release(); } }