offset分析

一 提交offset

提交offset的功能是由ConsumerCoordinator实现的,它会发送OffsetCommitRequest请求和接受OffsetCommitResponse响应

我们先分析一下OffsetCommitRequest和OffsetCommitResponse消息体格式


OffsetCommitRequest:

group_id: 消费者组的id

group_generation_id: 消费者组保存的generation信息

member_id: GroupCoordinator分配给消费者的id

retention_time: 该offset的最长保存时间

topic: topic名称

partition: 分区编号

offset: 提交消息的offset

metadata: 希望与offset一起保存的自定义数据



OffsetCommitResponse:

topic: topic名称

partition: 分区编号

error_code: 错误码

 

提交Offset的流程:

public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
   invokeCompletedOffsetCommitCallbacks();
    // 如果GroupCoordinator已知,那么就开始提交
   
if (!coordinatorUnknown()) {
        doCommitOffsetsAsync(offsets, callback);
    } else {
        // 查找到GroupCoordinator再进行提交
       
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
            @Override
           
public void onSuccess(Void value) {
                doCommitOffsetsAsync(offsets, callback);
            }

            @Override
           
public void onFailure(RuntimeException e) {
                completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));
            }
        });
    }
    client.pollNoWakeup();
}

 

private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
    // needsFetchCommittedOffsets置为true     this.subscriptions.needRefreshCommits();
    // 发送OffsetCommitRequest请求     RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
    final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
    // RequestFuture添加监听器,如果成功则completedOffsetCommits添加OffsetCommitCompletion     // 如果失败也往completedOffsetCommits添加OffsetCommitCompletion,但是会带有异常     future.addListener(new RequestFutureListener<Void>() {
        @Override         public void onSuccess(Void value) {
            if (interceptors != null)
                interceptors.onCommit(offsets);

            completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
        }

        @Override         public void onFailure(RuntimeException e) {
            Exception commitException = e;

            if (e instanceof RetriableException)
                commitException = new RetriableCommitFailedException(e);

            completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
        }
    });
}

 

private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
    if (offsets.isEmpty())
        return RequestFuture.voidSuccess();
    // 获取GroupCoordinator     Node coordinator = coordinator();
    if (coordinator == null)
        return RequestFuture.coordinatorNotAvailable();

    // 遍历offsets,并且往offsetDataput数据     Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
        OffsetAndMetadata offsetAndMetadata = entry.getValue();
        offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
                offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
    }
    // 获取generation信息     final Generation generation;
    if (subscriptions.partitionsAutoAssigned())
        generation = generation();
    else         generation = Generation.NO_GENERATION;

    // if the generation is null, we are not part of an active group (and we expect to be).     // the only thing we can do is fail the commit and let the user rejoin the group in poll()     if (generation == null)
        return RequestFuture.failure(new CommitFailedException());
    // 构建OffsetCommitRequest     OffsetCommitRequest req = new OffsetCommitRequest(
            this.groupId,
            generation.generationId,
            generation.memberId,
            OffsetCommitRequest.DEFAULT_RETENTION_TIME,
            offsetData);

    log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);
    // 将请求放到unsent集合等待被发送     return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
            .compose(new OffsetCommitResponseHandler(offsets));
}

 

二 获取offset

在rebalance操作结束之后,每一个消费者都确定了其需要的分区。在开始消费之前,消费者需要确定拉取消息的起始位置。此时消费者可以通过OffsetFetchRequest请求获取上次提交的offset并从此开始消费

 

OffsetFetchRequest和OffsetFetchResponse消息体格式:

 

获取offset源码分析:

public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
    while (true) {
        // 确认已经准备好接受请求,否则一直阻塞         ensureCoordinatorReady();
        // 发送OffsetFetchRequest请求         RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
        // 阻塞发送OffsetFetchRequest请求         client.poll(future);
        if (future.succeeded())
            return future.value(); // 返回从服务器端得到的offset的值         if (!future.isRetriable())
            throw future.exception();
        time.sleep(retryBackoffMs);// 否则等待重试     }
}

 

private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
    // 获取GroupCoordinator     Node coordinator = coordinator();
    if (coordinator == null)
        return RequestFuture.coordinatorNotAvailable();

    log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions);
    // 构建OffsetFetchRequest请求     OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<>(partitions));

    // 发送请求到unsent请求队列,等待被发送     return client.send(coordinator, ApiKeys.OFFSET_FETCH, request)
            .compose(new OffsetFetchResponseHandler());
}

 

处理OffsetFetchResponse:在返回RequestFuture的时候,会通过OffsetFetchResponseHandlerOffsetFetchResponse做一些处理:
    public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
        // 构造一个<TopicPartition, OffsetAndMetadata> 集合offsets         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
        // 遍历返回的相应的数据         for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
            TopicPartition tp = entry.getKey();
            OffsetFetchResponse.PartitionData data = entry.getValue();
            if (data.hasError()) {
                Errors error = Errors.forCode(data.errorCode);
                log.debug("Group {} failed to fetch offset for partition {}: {}", groupId, tp, error.message());

                if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                    // just retry                     future.raise(error);
                } else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                    // re-discover the coordinator and retry                     coordinatorDead();
                    future.raise(error);
                } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                    future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));
                } else {
                    future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
                }
                return;
            } else if (data.offset >= 0) {// 如果没有错误则把数据放到offsets集合里                 // record the position with the offset (-1 indicates no committed offset to fetch)                 offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));
            } else {
                log.debug("Group {} has no committed offset for partition {}", groupId, tp);
            }
        }
        // 传播offset集合最终会通过fetchCommitedOffsets取到offset的值         future.complete(offsets);
    }
}