6. SOFAJRaft源码分析— 透过RheaKV看线性一致性读

开篇

其实这篇文章我原本想在讲完选举的时候就开始讲线性一致性读的,可是感受直接讲没头没尾的看起来比比较困难,因此就有了RheaKV的系列,这是RheaKV,终于能够讲一下SOFAJRaft的线性一致性读是怎么作到了的。所谓线性一致性,一个简单的例子是在 T1 的时间写入一个值,那么在 T1 以后读必定能读到这个值,不可能读到 T1 以前的值。html

其中部份内容参考SOFAJRaft文档:
SOFAJRaft 线性一致读实现剖析 | SOFAJRaft 实现原理
SOFAJRaft 实现原理 - SOFAJRaft-RheaKV 是如何使用 Raft 的java

RheaKV读取数据

RheaKV的读取数据的入口是DefaultRheaKVStore的bGet。node

DefaultRheaKVStore#bGet安全

public byte[] bGet(final String key) {
    return FutureHelper.get(get(key), this.futureTimeoutMillis);
}

bGet方法中会一直调用到DefaultRheaKVStore的一个get方法中:
DefaultRheaKVStore#getapp

private CompletableFuture<byte[]> get(final byte[] key, final boolean readOnlySafe,
                                      final CompletableFuture<byte[]> future, final boolean tryBatching) {
    //校验started状态
    checkState();
    Requires.requireNonNull(key, "key");
    if (tryBatching) {
        final GetBatching getBatching = readOnlySafe ? this.getBatchingOnlySafe : this.getBatching;
        if (getBatching != null && getBatching.apply(key, future)) {
            return future;
        }
    }
    internalGet(key, readOnlySafe, future, this.failoverRetries, null, this.onlyLeaderRead);
    return future;
}

get方法会根据传入的参数来判断是否采用批处理的方式来读取数据,readOnlySafe表示是否开启线程一致性读,因为咱们调用的是get方法,因此readOnlySafe和tryBatching都会返回true。
因此这里会调用getBatchingOnlySafe的apply方法,将key和future传入。
getBatchingOnlySafe是在咱们初始化DefaultRheaKVStore的时候初始化的:
DefaultRheaKVStore#init异步

.....
this.getBatchingOnlySafe = new GetBatching(KeyEvent::new, "get_batching_only_safe",
        new GetBatchingHandler("get_only_safe", true));
.....

在初始化getBatchingOnlySafe的时候传入的处理器是GetBatchingHandler。分布式

而后咱们回到getBatchingOnlySafe#apply中,看看这个方法作了什么:ide

public boolean apply(final byte[] message, final CompletableFuture<byte[]> future) {
    //GetBatchingHandler
    return this.ringBuffer.tryPublishEvent((event, sequence) -> {
        event.reset();
        event.key = message;
        event.future = future;
    });
}

apply方法会向Disruptor发送一个事件进行异步处理,并把咱们的key封装到event的key中。getBatchingOnlySafe的处理器是GetBatchingHandler。函数

批量获取数据

GetBatchingHandler#onEvent源码分析

public void onEvent(final KeyEvent event, final long sequence, final boolean endOfBatch) throws Exception {
    this.events.add(event);
    this.cachedBytes += event.key.length;
    final int size = this.events.size();
    //校验一下数据量,没有达到MaxReadBytes而且不是最后一个event,那么直接返回
    if (!endOfBatch && size < batchingOpts.getBatchSize() && this.cachedBytes < batchingOpts.getMaxReadBytes()) {
        return;
    }

    if (size == 1) {
        reset();
        try {
            //若是只是一个get请求,那么不须要进行批量处理
            get(event.key, this.readOnlySafe, event.future, false);
        } catch (final Throwable t) {
            exceptionally(t, event.future);
        }
    } else {
        //初始化一个刚恰好大小的集合
        final List<byte[]> keys = Lists.newArrayListWithCapacity(size);
        final CompletableFuture<byte[]>[] futures = new CompletableFuture[size];
        for (int i = 0; i < size; i++) {
            final KeyEvent e = this.events.get(i);
            keys.add(e.key);
            futures[i] = e.future;
        }
        //遍历完events数据到entries以后,重置
        reset();
        try {
            multiGet(keys, this.readOnlySafe).whenComplete((result, throwable) -> {
                //异步回调处理数据
                if (throwable == null) {
                    for (int i = 0; i < futures.length; i++) {
                        final ByteArray realKey = ByteArray.wrap(keys.get(i));
                        futures[i].complete(result.get(realKey));
                    }
                    return;
                }
                exceptionally(throwable, futures);
            });
        } catch (final Throwable t) {
            exceptionally(t, futures);
        }
    }
}
}

onEvent方法首先会校验一下当前的event数量有没有达到阈值以及当前的event是否是Disruptor中最后一个event;而后会根据不一样的events集合中的数量来走不一样的实现,这里作了一个优化,若是是只有一条数据那么不会走批处理;最后将全部的key放入到keys集合中并调用multiGet进行批处理。

multiGet方法会调用internalMultiGet返回一个Future,从而实现异步的返回结果。
DefaultRheaKVStore#internalMultiGet

private FutureGroup<Map<ByteArray, byte[]>> internalMultiGet(final List<byte[]> keys, final boolean readOnlySafe,
                                                             final int retriesLeft, final Throwable lastCause) {
    //由于不一样的key是存放在不一样的region中的,因此一个region会对应多个key,封装到map中
    final Map<Region, List<byte[]>> regionMap = this.pdClient
            .findRegionsByKeys(keys, ApiExceptionHelper.isInvalidEpoch(lastCause));
    //返回值
    final List<CompletableFuture<Map<ByteArray, byte[]>>> futures =
            Lists.newArrayListWithCapacity(regionMap.size());
    //lastCause传入为null
    final Errors lastError = lastCause == null ? null : Errors.forException(lastCause);

    for (final Map.Entry<Region, List<byte[]>> entry : regionMap.entrySet()) {
        final Region region = entry.getKey();
        final List<byte[]> subKeys = entry.getValue();
        //重试次数减1,设置一个重试函数
        final RetryCallable<Map<ByteArray, byte[]>> retryCallable = retryCause -> internalMultiGet(subKeys,
                readOnlySafe, retriesLeft - 1, retryCause);
        final MapFailoverFuture<ByteArray, byte[]> future = new MapFailoverFuture<>(retriesLeft, retryCallable);
        //发送MultiGetRequest请求,获取数据
        internalRegionMultiGet(region, subKeys, readOnlySafe, future, retriesLeft, lastError, this.onlyLeaderRead);
        futures.add(future);
    }
    return new FutureGroup<>(futures);
}

internalMultiGet里会根据key去组装region,不一样的key会对应不一样的region,数据时存在region中的,因此要从不一样的region中获取数据,region和key是一对多的关系因此这里会封装成一个map。而后会遍历regionMap,每一个region所对应的数据做为一个批次调用到internalRegionMultiGet方法中,根据不一样的状况获取数据。

DefaultRheaKVStore#internalRegionMultiGet

private void internalRegionMultiGet(final Region region, final List<byte[]> subKeys, final boolean readOnlySafe,
                                    final CompletableFuture<Map<ByteArray, byte[]>> future, final int retriesLeft,
                                    final Errors lastCause, final boolean requireLeader) {
    //由于当前的是client,因此这里会是null
    final RegionEngine regionEngine = getRegionEngine(region.getId(), requireLeader);
    // require leader on retry
    //设置重试函数
    final RetryRunner retryRunner = retryCause -> internalRegionMultiGet(region, subKeys, readOnlySafe, future,
            retriesLeft - 1, retryCause, true);
    final FailoverClosure<Map<ByteArray, byte[]>> closure = new FailoverClosureImpl<>(future,
            false, retriesLeft, retryRunner);
    if (regionEngine != null) {
        if (ensureOnValidEpoch(region, regionEngine, closure)) {
            //若是不是null,那么会获取rawKVStore,并从中获取数据
            final RawKVStore rawKVStore = getRawKVStore(regionEngine);
            if (this.kvDispatcher == null) {
                rawKVStore.multiGet(subKeys, readOnlySafe, closure);
            } else {
                //若是是kvDispatcher不为空,那么放入到kvDispatcher中异步执行
                this.kvDispatcher.execute(() -> rawKVStore.multiGet(subKeys, readOnlySafe, closure));
            }
        }
    } else {
        final MultiGetRequest request = new MultiGetRequest();
        request.setKeys(subKeys);
        request.setReadOnlySafe(readOnlySafe);
        request.setRegionId(region.getId());
        request.setRegionEpoch(region.getRegionEpoch());
        //调用rpc请求
        this.rheaKVRpcService.callAsyncWithRpc(request, closure, lastCause, requireLeader);
    }
}

由于咱们这里是client端调用internalRegionMultiGet方法的,因此是没有设置regionEngine的,那么会直接向server的当前region所对应的leader节点发送一个MultiGetRequest请求。

由于上面的这些方法基本上和put是一致的,咱们已经在5. SOFAJRaft源码分析— RheaKV中如何存放数据?讲过了,因此这里不重复的讲了。

server端处理MultiGetRequest请求

MultiGetRequest请求会被KVCommandProcessor所处理,KVCommandProcessor里会根据请求的magic方法返回值来判断是用什么方式来进行处理。咱们这里会调用到DefaultRegionKVService的handleMultiGetRequest方法中处理请求。

public void handleMultiGetRequest(final MultiGetRequest request,
                                  final RequestProcessClosure<BaseRequest, BaseResponse<?>> closure) {
    final MultiGetResponse response = new MultiGetResponse();
    response.setRegionId(getRegionId());
    response.setRegionEpoch(getRegionEpoch());
    try {
        KVParameterRequires.requireSameEpoch(request, getRegionEpoch());
        final List<byte[]> keys = KVParameterRequires.requireNonEmpty(request.getKeys(), "multiGet.keys");
        //调用MetricsRawKVStore的multiGet方法
        this.rawKVStore.multiGet(keys, request.isReadOnlySafe(), new BaseKVStoreClosure() {

            @SuppressWarnings("unchecked")
            @Override
            public void run(final Status status) {
                if (status.isOk()) {
                    response.setValue((Map<ByteArray, byte[]>) getData());
                } else {
                    setFailure(request, response, status, getError());
                }
                closure.sendResponse(response);
            }
        });
    } catch (final Throwable t) {
        LOG.error("Failed to handle: {}, {}.", request, StackTraceUtil.stackTrace(t));
        response.setError(Errors.forException(t));
        closure.sendResponse(response);
    }
}

handleMultiGetRequest方法会调用MetricsRawKVStore的multiGet方法来批量获取数据。

MetricsRawKVStore#multiGet

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
    //实例化MetricsKVClosureAdapter对象
    final KVStoreClosure c = metricsAdapter(closure, MULTI_GET, keys.size(), 0);
    //调用RaftRawKVStore的multiGet方法
    this.rawKVStore.multiGet(keys, readOnlySafe, c);
}

multiGet方法会传入一个MetricsKVClosureAdapter实例,经过这个实例实现异步回调response。而后调用RaftRawKVStore的multiGet方法。

RaftRawKVStore#multiGet

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
    if (!readOnlySafe) {
        this.kvStore.multiGet(keys, false, closure);
        return;
    }
    // KV 存储实现线性一致读
    // 调用 readIndex 方法,等待回调执行
    this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {

        @Override
        public void run(final Status status, final long index, final byte[] reqCtx) {
            //若是状态返回成功,
            if (status.isOk()) {
                RaftRawKVStore.this.kvStore.multiGet(keys, true, closure);
                return;
            }
            //readIndex 读取失败尝试应用键值读操做申请任务于 Leader 节点的状态机 KVStoreStateMachine
            RaftRawKVStore.this.readIndexExecutor.execute(() -> {
                if (isLeader()) {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}, try to applying to the state machine.",
                            status);
                    // If 'read index' read fails, try to applying to the state machine at the leader node
                    applyOperation(KVOperation.createMultiGet(keys), closure);
                } else {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}.", status);
                    // Client will retry to leader node
                    new KVClosureAdapter(closure, null).run(status);
                }
            });
        }
    });
}

multiGet调用node的readIndex方法进行一致性读操做,并设置回调,若是返回成功那么就直接调用RocksRawKVStore读取数据,若是返回不是成功那么申请任务于 Leader 节点的状态机 KVStoreStateMachine。

线性一致性读readIndex

所谓线性一致读,一个简单的例子是在 t1 的时刻咱们写入了一个值,那么在 t1 以后,咱们必定能读到这个值,不可能读到 t1 以前的旧值(想一想 Java 中的 volatile 关键字,即线性一致读就是在分布式系统中实现 Java volatile 语义)。简而言之是须要在分布式环境中实现 Java volatile 语义效果,即当 Client 向集群发起写操做的请求而且得到成功响应以后,该写操做的结果要对全部后来的读请求可见。和 volatile 的区别在于 volatile 是实现线程之间的可见,而 SOFAJRaft 须要实现 Server 之间的可见。

SOFAJRaft提供的线性一致读是基于 Raft 协议的 ReadIndex 实现用 ;Node#readIndex(byte [] requestContext, ReadIndexClosure done) 发起线性一致读请求,当安全读取时传入的 Closure 将被调用,正常状况从状态机中读取数据返回给客户端。

Node#readIndex

public void readIndex(final byte[] requestContext, final ReadIndexClosure done) {
    if (this.shutdownLatch != null) {
        //异步执行回调
        Utils.runClosureInThread(done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
        throw new IllegalStateException("Node is shutting down");
    }
    Requires.requireNonNull(done, "Null closure");
    //EMPTY_BYTES
    this.readOnlyService.addRequest(requestContext, done);
}

readIndex会调用ReadOnlyServiceImpl#addRequest将requestContext和回调方法done传入,requestContext传入的是BytesUtil.EMPTY_BYTES
接着往下看

ReadOnlyServiceImpl#addRequest

public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
    if (this.shutdownLatch != null) {
        Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Was stopped"));
        throw new IllegalStateException("Service already shutdown.");
    }
    try {
        EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
            event.done = closure;
            //EMPTY_BYTES
            event.requestContext = new Bytes(reqCtx);
            event.startTime = Utils.monotonicMs();
        };
        int retryTimes = 0;
        while (true) {
            //ReadIndexEventHandler
            if (this.readIndexQueue.tryPublishEvent(translator)) {
                break;
            } else {
                retryTimes++;
                if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
                    Utils.runClosureInThread(closure,
                        new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
                    this.nodeMetrics.recordTimes("read-index-overload-times", 1);
                    LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
                    return;
                }
                ThreadHelper.onSpinWait();
            }
        }
    } catch (final Exception e) {
        Utils.runClosureInThread(closure, new Status(RaftError.EPERM, "Node is down."));
    }
}

addRequest方法里会将传入的reqCtx和closure封装成一个时间,传入到readIndexQueue队列中,事件发布成功后会交由ReadIndexEventHandler处理器处理,发布失败会进行重试,最多重试3次。

ReadIndexEventHandler

private class ReadIndexEventHandler implements EventHandler<ReadIndexEvent> {
    // task list for batch
    private final List<ReadIndexEvent> events = new ArrayList<>(
                                                  ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());

    @Override
    public void onEvent(final ReadIndexEvent newEvent, final long sequence, final boolean endOfBatch)
                                                                                                     throws Exception {
        if (newEvent.shutdownLatch != null) {
            executeReadIndexEvents(this.events);
            this.events.clear();
            newEvent.shutdownLatch.countDown();
            return;
        }

        this.events.add(newEvent);
        //批量执行
        if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
            executeReadIndexEvents(this.events);
            this.events.clear();
        }
    }
}

ReadIndexEventHandler是ReadOnlyServiceImpl里面的内部类,里面有一个全局的events集合用来作事件的批处理,若是当前的event已经达到了32个或是整个Disruptor队列里最后一个那么会调用ReadOnlyServiceImpl的executeReadIndexEvents方法进行事件的批处理。

ReadOnlyServiceImpl#executeReadIndexEvents

private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
    if (events.isEmpty()) {
        return;
    }
    //初始化ReadIndexRequest
    final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() //
        .setGroupId(this.node.getGroupId()) //
        .setServerId(this.node.getServerId().toString());

    final List<ReadIndexState> states = new ArrayList<>(events.size());

    for (final ReadIndexEvent event : events) {
        rb.addEntries(ZeroByteStringHelper.wrap(event.requestContext.get()));
        states.add(new ReadIndexState(event.requestContext, event.done, event.startTime));
    }
    final ReadIndexRequest request = rb.build();

    this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
}

executeReadIndexEvents封装好ReadIndexRequest请求和将ReadIndexState集合封装到ReadIndexResponseClosure中,为后续的操做作装备

NodeImpl#handleReadIndexRequest

public void handleReadIndexRequest(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> done) {
    final long startMs = Utils.monotonicMs();
    this.readLock.lock();
    try {
        switch (this.state) {
            case STATE_LEADER:
                readLeader(request, ReadIndexResponse.newBuilder(), done);
                break;
            case STATE_FOLLOWER:
                readFollower(request, done);
                break;
            case STATE_TRANSFERRING:
                done.run(new Status(RaftError.EBUSY, "Is transferring leadership."));
                break;
            default:
                done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s.", this.state));
                break;
        }
    } finally {
        this.readLock.unlock();
        this.metrics.recordLatency("handle-read-index", Utils.monotonicMs() - startMs);
        this.metrics.recordSize("handle-read-index-entries", request.getEntriesCount());
    }
}

由于线性一致读在任何集群内的节点发起,并不须要强制要求放到 Leader 节点上,容许在 Follower 节点执行,所以大大下降 Leader 的读取压力。
当在Follower节点执行一致性读的时候实际上Follower 节点调用 RpcService#readIndex(leaderId.getEndpoint(), newRequest, -1, closure) 方法向 Leader 发送 ReadIndex 请求,交由Leader节点实现一致性读。因此我这里主要介绍Leader的一致性读。

继续往下走调用NodeImpl的readLeader方法
NodeImpl#readLeader

private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.Builder respBuilder,
                        final RpcResponseClosure<ReadIndexResponse> closure) {
    //1. 获取集群节点中多数选票数是多少
    final int quorum = getQuorum();
    if (quorum <= 1) {
        // Only one peer, fast path.
        //若是集群中只有一个节点,那么直接调用回调函数,返回成功
        respBuilder.setSuccess(true) //
                .setIndex(this.ballotBox.getLastCommittedIndex());
        closure.setResponse(respBuilder.build());
        closure.run(Status.OK());
        return;
    }

    final long lastCommittedIndex = this.ballotBox.getLastCommittedIndex();
    //2. 任期必须相等
    //日志管理器 LogManager 基于投票箱 BallotBox 的 lastCommittedIndex 获取任期检查是否等于当前任期
    // 若是不等于当前任期表示此 Leader 节点未在其任期内提交任何日志,须要拒绝只读请求;
    if (this.logManager.getTerm(lastCommittedIndex) != this.currTerm) {
        // Reject read only request when this leader has not committed any log entry at its term
        closure
                .run(new Status(
                        RaftError.EAGAIN,
                        "ReadIndex request rejected because leader has not committed any log entry at its term, " +
                         "logIndex=%d, currTerm=%d.",
                        lastCommittedIndex, this.currTerm));
        return;
    }
    respBuilder.setIndex(lastCommittedIndex);

    if (request.getPeerId() != null) {
        // request from follower, check if the follower is in current conf.
        final PeerId peer = new PeerId();
        peer.parse(request.getServerId());
        //3. 来自 Follower 的请求须要检查 Follower 是否在当前配置
        if (!this.conf.contains(peer)) {
            closure
                    .run(new Status(RaftError.EPERM, "Peer %s is not in current configuration: {}.", peer,
                     this.conf));
            return;
        }
    }

    ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
    //4. 若是使用的是ReadOnlyLeaseBased,确认leader是不是在在租约有效时间内
    if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !isLeaderLeaseValid()) {
        // If leader lease timeout, we must change option to ReadOnlySafe
        readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
    }

    switch (readOnlyOpt) {
        //5
        case ReadOnlySafe:
            final List<PeerId> peers = this.conf.getConf().getPeers();
            Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty peers");
            //设置心跳的响应回调函数
            final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(closure,
                    respBuilder, quorum, peers.size());
            // Send heartbeat requests to followers
            //向 Followers 节点发起一轮 Heartbeat,若是半数以上节点返回对应的
            // Heartbeat Response,那么 Leader就可以肯定如今本身仍然是 Leader
            for (final PeerId peer : peers) {
                if (peer.equals(this.serverId)) {
                    continue;
                }
                this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
            }
            break;
        //6. 由于在租约期内不会发生选举,确保 Leader 不会变化
        //因此直接返回回调结果
        case ReadOnlyLeaseBased:
            // Responses to followers and local node.
            respBuilder.setSuccess(true);
            closure.setResponse(respBuilder.build());
            closure.run(Status.OK());
            break;
    }
}
  1. 获取集群节点中多数选票数是多少,即集群节点的1/2+1,若是当前的集群里只有一个节点,那么直接返回成功,并调用回调方法
  2. 校验 Raft 集群节点数量以及 lastCommittedIndex 所属任期符合预期,那么响应构造器设置其索引为投票箱 BallotBox 的 lastCommittedIndex
  3. 来自 Follower 的请求须要检查 Follower 是否在当前配置,若是不在当前配置中直接调用回调方法设置异常
  4. 获取 ReadIndex 请求级别 ReadOnlyOption 配置,ReadOnlyOption 参数默认值为 ReadOnlySafe。若是设置的是ReadOnlyLeaseBased,那么会调用isLeaderLeaseValid检查leader是不是在在租约有效时间内
  5. 配置为ReadOnlySafe 调用 Replicator#sendHeartbeat(rid, closure) 方法向 Followers 节点发送 Heartbeat 心跳请求,发送心跳成功执行 ReadIndexHeartbeatResponseClosure 心跳响应回调;ReadIndex 心跳响应回调检查是否超过半数节点包括 Leader 节点自身投票同意,半数以上节点返回客户端Heartbeat 请求成功响应,即 applyIndex 超过 ReadIndex 说明已经同步到 ReadIndex 对应的 Log 可以提供 Linearizable Read
  6. 配置为ReadOnlyLeaseBased,由于Leader 租约有效期间认为当前 Leader 是 Raft Group 内的惟一有效 Leader,因此忽略 ReadIndex 发送 Heartbeat 确认身份步骤,直接返回 Follower 节点和本地节点 Read 请求成功响应。Leader 节点继续等待状态机执行,直到 applyIndex 超过 ReadIndex 安全提供 Linearizable Read

不管是ReadOnlySafe仍是ReadOnlyLeaseBased,最后发送成功响应都会调用ReadIndexResponseClosure的run方法。

ReadIndexResponseClosure#run

public void run(final Status status) {
    //fail
    //传入的状态不是ok,响应失败
    if (!status.isOk()) {
        notifyFail(status);
        return;
    }
    final ReadIndexResponse readIndexResponse = getResponse();
    //Fail
    //response没有响应成功,响应失败
    if (!readIndexResponse.getSuccess()) {
        notifyFail(new Status(-1, "Fail to run ReadIndex task, maybe the leader stepped down."));
        return;
    }
    // Success
    //一致性读成功
    final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request,
        readIndexResponse.getIndex());
    for (final ReadIndexState state : this.states) {
        // Records current commit log index.
        //设置当前提交的index
        state.setIndex(readIndexResponse.getIndex());
    }

    boolean doUnlock = true;
    ReadOnlyServiceImpl.this.lock.lock();
    try {
        //校验applyIndex 是否超过 ReadIndex
        if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
            // Already applied, notify readIndex request.
            ReadOnlyServiceImpl.this.lock.unlock();
            doUnlock = false;
            //已经同步到 ReadIndex 对应的 Log 可以提供 Linearizable Read
            notifySuccess(readIndexStatus);
        } else {
            // Not applied, add it to pending-notify cache.
            ReadOnlyServiceImpl.this.pendingNotifyStatus
                .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
                .add(readIndexStatus);
        }
    } finally {
        if (doUnlock) {
            ReadOnlyServiceImpl.this.lock.unlock();
        }
    }
}

Run方法首先会校验一下是否须要响应失败,若是响应成功,那么会将全部封装的ReadIndexState更新一下index,而后校验一下applyIndex 是否超过 ReadIndex,超过了ReadIndex表明全部已经复制到多数派上的 Log(可视为写操做)被视为安全的 Log,该 Log 所体现的数据就能对客户端 Client 可见。

ReadOnlyServiceImpl#notifySuccess

private void notifySuccess(final ReadIndexStatus status) {
    final long nowMs = Utils.monotonicMs();
    final List<ReadIndexState> states = status.getStates();
    final int taskCount = states.size();
    for (int i = 0; i < taskCount; i++) {
        final ReadIndexState task = states.get(i);
        final ReadIndexClosure done = task.getDone(); // stack copy
        if (done != null) {
            this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs());
            done.setResult(task.getIndex(), task.getRequestContext().get());
            done.run(Status.OK());
        }
    }
}

若是是响应成功,那么会调用notifySuccess方法,会将status里封装的ReadIndexState集合遍历一遍,调用当中的run方法。

这个run方法会调用到咱们在multiGet中设置的run方法中
RaftRawKVStore#multiGet

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
    if (!readOnlySafe) {
        this.kvStore.multiGet(keys, false, closure);
        return;
    }
    // KV 存储实现线性一致读
    // 调用 readIndex 方法,等待回调执行
    this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {

        @Override
        public void run(final Status status, final long index, final byte[] reqCtx) {
            //若是状态返回成功,
            if (status.isOk()) {
                RaftRawKVStore.this.kvStore.multiGet(keys, true, closure);
                return;
            }
            //readIndex 读取失败尝试应用键值读操做申请任务于 Leader 节点的状态机 KVStoreStateMachine
            RaftRawKVStore.this.readIndexExecutor.execute(() -> {
                if (isLeader()) {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}, try to applying to the state machine.",
                            status);
                    // If 'read index' read fails, try to applying to the state machine at the leader node
                    applyOperation(KVOperation.createMultiGet(keys), closure);
                } else {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}.", status);
                    // Client will retry to leader node
                    new KVClosureAdapter(closure, null).run(status);
                }
            });
        }
    });

这个run方法会调用RaftRawKVStore的multiGet从RocksDB中直接获取数据。

总结

咱们这篇文章从RheaKVStore的客户端get方法一直讲到,RheaKVStore服务端使用JRaft实现线性一致性读,并讲解了线性一致性读是怎么实现的,经过这个例子你们应该对线性一致性读有了一个相对不错的理解了。

相关文章
相关标签/搜索