消费者的消费状态是保存在SubscriptionState类中的,而SubscriptionState有个重要的属性那就是assignment保存了消费者消费的partition及其partition的状态java
public class SubscriptionState { /* the pattern user has requested */ private Pattern subscribedPattern; /* the list of topics the user has requested */ private final Set<String> subscription; /* the list of topics the group has subscribed to (set only for the leader on join group completion) */ private final Set<String> groupSubscription; /* the list of partitions the user has requested */ private final Set<TopicPartition> userAssignment; /* the list of partitions currently assigned */ private final Map<TopicPartition, TopicPartitionState> assignment; // 关键, 保存了消费者消费的partition及其partition的状态 // ...
看下TopicPartitionState。TopicPartitionState用于表示消费者消费到该partition哪一个位置了,须要注意的是position表示下一条须要消费的位置而不是已经消费的位置,拉取消息的时候就是根据position来肯定须要拉取的第一条消息的offset网络
private static class TopicPartitionState { private Long position; // 下一条消费哪一个offset private OffsetAndMetadata committed; // 已经提交的position private boolean paused; // whether this partition has been paused by the user private OffsetResetStrategy resetStrategy; // 重置position的时候的策略 // ... } public class OffsetAndMetadata implements Serializable { private final long offset; private final String metadata; }
以KafkaConsumer#commitSync为例来看下客户端是如何提交offset的数据结构
KafkaConsumer#commitSyncide
public void commitSync() { acquire(); try { commitSync(subscriptions.allConsumed()); // 调用SubscriptionState#allConsumed来获取已经消费的消息的位置,而后将其提交 } finally { release(); } } public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) { acquire(); try { coordinator.commitOffsetsSync(offsets); } finally { release(); } }
来看下SubscriptionState#allConsumed,从哪获取到消费到的位置。从下面的代码能够看出提交的offset就是TopicPartitionState#positionui
public Map<TopicPartition, OffsetAndMetadata> allConsumed() { Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>(); for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) { TopicPartitionState state = entry.getValue(); if (state.hasValidPosition()) allConsumed.put(entry.getKey(), new OffsetAndMetadata(state.position));// 关键,原来是将TopicPartitionState中的position封装成OffsetAndMetadata,即提交的是TopicPartitionState#position } return allConsumed; }
获取到消费到的offset位置后,最终是经过ConsumerCoordinator#sendOffsetCommitRequest将offset发送到coordinator的this
private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) { if (coordinatorUnknown()) // 必须获取coordinator return RequestFuture.coordinatorNotAvailable(); if (offsets.isEmpty()) return RequestFuture.voidSuccess(); // create the offset commit request Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size()); for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { OffsetAndMetadata offsetAndMetadata = entry.getValue(); offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData( offsetAndMetadata.offset(), offsetAndMetadata.metadata())); // 以TopicPartition为key, offsetAndMetadat组成request中的数据 } OffsetCommitRequest req = new OffsetCommitRequest(this.groupId, this.generation, this.memberId, OffsetCommitRequest.DEFAULT_RETENTION_TIME, offsetData); log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId); return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req) .compose(new OffsetCommitResponseHandler(offsets));// 发送到coordinator }
从上面代码最后一行能够看出处理response的逻辑在OffsetCommitResponseHandler中。若是提交成功,那么会将TopicPartitionState#position更新到TopicPartitionState#commitcode
private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> { private final Map<TopicPartition, OffsetAndMetadata> offsets; public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) { this.offsets = offsets; } @Override public OffsetCommitResponse parse(ClientResponse response) { return new OffsetCommitResponse(response.responseBody()); } @Override public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) { sensors.commitLatency.record(response.requestLatencyMs()); Set<String> unauthorizedTopics = new HashSet<>(); for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) { TopicPartition tp = entry.getKey(); OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp); // this.offsets即sendOffsetCommitRequest中的入参,这点很关键 long offset = offsetAndMetadata.offset(); Errors error = Errors.forCode(entry.getValue()); if (error == Errors.NONE) { if (subscriptions.isAssigned(tp)) subscriptions.committed(tp, offsetAndMetadata); // 更新TopicPartitionState#committed为发送的时候的TopicPartitionState#position } // ... } } }