SOFAJRaft 源码分析四(SPI机制、快照)

1.概述

这篇文章咱们来学习一下JRaft的SPI机制,以及快照的实现。对于SPI,在框架开发,尤为是模块设计中是很是必要的,他能够实现可插拔的业务逻辑,实现解藕。对于快照,其实就是raft对日志的优化,避免新节点加入后大量的网络和磁盘IO,而且能够节省磁盘空间,是很是必要的。接下来咱们慢慢看。java

2.JRaft的SPI实现

主要实现类就是JRaftServiceLoader。
咱们来看看使用方式node

public static final JRaftServiceFactory defaultServiceFactory = JRaftServiceLoader.load(JRaftServiceFactory.class).first();

JRaftServiceLoader#load

public static <S> JRaftServiceLoader<S> load(final Class<S> service) {
    return JRaftServiceLoader.load(service, Thread.currentThread().getContextClassLoader());
}

public static <S> JRaftServiceLoader<S> load(final Class<S> service, final ClassLoader loader) {
    return new JRaftServiceLoader<>(service, loader);
}

经过当前线程上线文类加载器(其实就是AppClassLoader)去进行加载工做。固然这里直接用JRaftServiceLoader的类加载器也是同样的效果。缓存

JRaftServiceLoader#first

public S first() {
    final List<S> sortList = sort();
    if (sortList.isEmpty()) {
        throw fail(this.service, "could not find any implementation for class");
    }
    return sortList.get(0);
}

这个sort方法其实就是获取到一个排序实现类的集合。而后从中获取第一个(最优先的)。网络

JRaftServiceLoader#sort

public List<S> sort() {
    final Iterator<S> it = iterator();
    final List<S> sortList = new ArrayList<>();
    while (it.hasNext()) {
        sortList.add(it.next());
    }
    if (sortList.size() <= 1) {
        return sortList;
    }
    sortList.sort((o1, o2) -> {
        final SPI o1_spi = o1.getClass().getAnnotation(SPI.class);
        final SPI o2_spi = o2.getClass().getAnnotation(SPI.class);
        final int o1_priority = o1_spi == null ? 0 : o1_spi.priority();
        final int o2_priority = o2_spi == null ? 0 : o2_spi.priority();
        return -(o1_priority - o2_priority);
    });
    return sortList;
}

这个方法其实实现很简单,就是经过迭代器,去迭代获取全部的实现,而后根据实现类的SPI注解中的priority,也就是优先级进行排序,最后返回一个有序的集合。
咱们重点仍是关注这个迭代器的实现。session

JRaftServiceLoader#iterator

final Iterator<Map.Entry<String, S>> knownProviders = JRaftServiceLoader.this.providers.entrySet()
    .iterator();
@Override
public boolean hasNext() {
    return this.knownProviders.hasNext() || JRaftServiceLoader.this.lookupIterator.hasNext();
}
@Override
public S next() {
    if (this.knownProviders.hasNext()) {
        return this.knownProviders.next().getValue();
    }
    final Class<S> cls = JRaftServiceLoader.this.lookupIterator.next();
    try {
        final S provider = JRaftServiceLoader.this.service.cast(cls.newInstance());
        JRaftServiceLoader.this.providers.put(cls.getName(), provider);
        return provider;
    } catch (final Throwable x) {
        throw fail(JRaftServiceLoader.this.service, "provider " + cls.getName()
            + " could not be instantiated", x);
    }
}

knownProviders其实就是一个缓存,加载成功的会放在这个Map里面。也就是咱们第二次调用iterator方法的时候,就不须要从新执行类加载过程了(也不会从新执行),直接从缓存中获取。
若是是第一次执行该方法,那么依赖lookupIterator去执行加载过程。咱们先来看一下lookupIterator(LazyIterator)的实现。框架

LazyIterator#hasNext

@Override
public boolean hasNext() {
    if (this.nextName != null) {
        return true;
    }
    if (this.configs == null) {
        try {
            final String fullName = PREFIX + this.service.getName();
            if (this.loader == null) {
                this.configs = ClassLoader.getSystemResources(fullName);
            } else {
                this.configs = this.loader.getResources(fullName);
            }
        } catch (final IOException x) {
            throw fail(this.service, "error locating configuration files", x);
        }
    }
    while ((this.pending == null) || !this.pending.hasNext()) {
        if (!this.configs.hasMoreElements()) {
            return false;
        }
        this.pending = parse(this.service, this.configs.nextElement());
    }
    this.nextName = this.pending.next();
    return true;
}

这里有个细节,就是加了一个this.nextName,若是不为空的话,直接返回true,由于后面在判断成功后会给this.nextName赋值。若是调用了hasNext方法,却没有调用next,那么下次调用hasNext就不用在走一遍判断逻辑。ide

其实方法逻辑很简单,就是获取对应路径下的文件,并加载出来。而后逐行解析。无论每行多个实现或者只有一个。都是能够适配的。到这里基本上就说完了。学习

实现仍是比较简单。经过迭代器,懒加载,缓存来实现的。并加了优先级。优化

2.JRaft的快照

快照初始化

在节点初始化的时候,会启动快照的定时器snapshotTimer。
默认时间为一天执行一次。ui

SnapshotExecutorImpl#doSnapshot

这个方法流程以下

  1. 判断节点状态,若是stop、savingSnapshot或downloadingSnapshot的时候,直接返回
  2. 若是没有新的数据则返回。
  3. 建立SnapshotWriter
  4. 封装成SaveSnapshotDone丢给fsmCaller
final SnapshotWriter writer = this.snapshotStorage.create();
if (writer == null) {
    Utils.runClosureInThread(done, new Status(RaftError.EIO, "Fail to create writer."));
    reportError(RaftError.EIO.getNumber(), "Fail to create snapshot writer.");
    return;
}
this.savingSnapshot = true;
final SaveSnapshotDone saveSnapshotDone = new SaveSnapshotDone(writer, done, null);
if (!this.fsmCaller.onSnapshotSave(saveSnapshotDone)) {
    Utils.runClosureInThread(done, new Status(RaftError.EHOSTDOWN, "The raft node is down."));
    return;
}

在fsmCaller中的runApplyTask 方法会处理snapshot save事件,最终调用doSnapshotSave方法。

case SNAPSHOT_SAVE:
    this.currTask = TaskType.SNAPSHOT_SAVE;
    if (passByStatus(task.done)) {
        doSnapshotSave((SaveSnapshotClosure) task.done);
    }
    break;

FSMCallerImpl#doSnapshotSave

这个方法主要是为SnapshotWriter构建SnapshotMeta 对象,填充对应元数据,最后会调用onSnapshotSave 方法。

final SnapshotWriter writer = done.start(metaBuilder.build());
if (writer == null) {
    done.run(new Status(RaftError.EINVAL, "snapshot_storage create SnapshotWriter failed"));
    return;
}
this.fsm.onSnapshotSave(writer, done);

this.fsm.onSnapshotSave方法须要业务方去实现,主要就是快照的保存和压缩。业务方需将快照文件数据加入到SnapshotWriter中。

安装快照

安装快照实在Replicator中执行。

fillCommonFields返回false就调用installSnapshot方法。

或者当前要同步的日志数量为0(两种状况,一种是确实没有日志同步,另外一种是日志被删除,存储到快照文件了,若是是被删除,就须要同步快照),也会调用installSnapshot方法。

if (!fillCommonFields(rb, this.nextIndex - 1, isHeartbeat)) {
    // id is unlock in installSnapshot
    installSnapshot();
    if (isHeartbeat && heartBeatClosure != null) {
        Utils.runClosureInThread(heartBeatClosure, new Status(RaftError.EAGAIN,
            "Fail to send heartbeat to peer %s", this.options.getPeerId()));
    }
    return;
}

if (rb.getEntriesCount() == 0) {
    if (nextSendingIndex < this.options.getLogManager().getFirstLogIndex()) {
        installSnapshot();
        return false;
    }
    // _id is unlock in _wait_more
    waitMoreEntries(nextSendingIndex);
    return false;
}

installSnapshot方法会构建一个InstallSnapshotRequest发送给follower。InstallSnapshotRequest主要存储了快照的元数据以及一个uri信息。

InstallSnapshotRequestProcessor

该处理器主要用来接受leader发送的InstallSnapshotRequest请求。最后会调用handleInstallSnapshot 方法。

该方法和处理其余rpc请求的逻辑基本一致。若是当前节点ok,leader合法,会调用snapshotExecutor.installSnapshot方法。

this.snapshotExecutor.installSnapshot(request, InstallSnapshotResponse.newBuilder(), done);

SnapshotExecutorImpl#installSnapshot

该方法首先会构建一个下载快照的对象。而后调用registerDownloadingSnapshot去进行下载。

下载逻辑其实很简单:先加载元数据,创建文件和数据映射。而后去逐个去写文件。

public void installSnapshot(final InstallSnapshotRequest request, final InstallSnapshotResponse.Builder response,
                            final RpcRequestClosure done) {
    final SnapshotMeta meta = request.getMeta();
    final DownloadingSnapshot ds = new DownloadingSnapshot(request, response, done);
    // DON'T access request, response, and done after this point
    // as the retry snapshot will replace this one.
    if (!registerDownloadingSnapshot(ds)) {
        LOG.warn("Fail to register downloading snapshot.");
        // This RPC will be responded by the previous session
        return;
    }
    Requires.requireNonNull(this.curCopier, "curCopier");
    try {
    //阻塞 等待快照数据下载
        this.curCopier.join();
    } catch (final InterruptedException e) {
        Thread.currentThread().interrupt();
        LOG.warn("Install snapshot copy job was canceled.");
        return;
    }

    loadDownloadingSnapshot(ds, meta);
}

最后调用loadDownloadingSnapshot去安装文件数据。

具体逻辑在FSMCallerImpl的doSnapshotLoad 方法中。这个方法最后会调用业务状态机的onSnapshotLoad 方法。

public boolean onSnapshotLoad(final LoadSnapshotClosure done) {
    return enqueueTask((task, sequence) -> {
        task.type = TaskType.SNAPSHOT_LOAD;
        task.done = done;
    });
}

参考CounterStateMachine中的实现。首先判断数据是否有效(是否存在该文件),而后去从文件读取。最后set到value

public boolean onSnapshotLoad(final SnapshotReader reader) {
    if (isLeader()) {
        LOG.warn("Leader is not supposed to load snapshot");
        return false;
    }
    if (reader.getFileMeta("data") == null) {
        LOG.error("Fail to find data file in {}", reader.getPath());
        return false;
    }
    final CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data");
    try {
        this.value.set(snapshot.load());
        return true;
    } catch (final IOException e) {
        LOG.error("Fail to load snapshot from {}", snapshot.getPath());
        return false;
    }
}

jraft的快照实现总结

存储快照

经过定时任务。定时执行存储逻辑。逻辑须要业务方实现。

传递快照

由leader决定是否须要传递。通常写入快照的日志会被删除。leader在发送日志时候,发现日志已经被删除的话,就须要传递快照。

传递快照方式

由leader发送一个条RPC,告诉follower须要来下载快照。这条RPC包括快照元数据和uri信息。
follower接收到RPC,会去主动下载leader的快照信息。下载成功后写入本地文件。

安装快照

初始化的时候会加载。或者下载完后会去主动加载。加载逻辑很简单,就是根据下载的快照元数据信息去从磁盘加载数据,而后应用到状态机。

相关文章
相关标签/搜索