这篇文章咱们来学习一下JRaft的SPI机制,以及快照的实现。对于SPI,在框架开发,尤为是模块设计中是很是必要的,他能够实现可插拔的业务逻辑,实现解藕。对于快照,其实就是raft对日志的优化,避免新节点加入后大量的网络和磁盘IO,而且能够节省磁盘空间,是很是必要的。接下来咱们慢慢看。java
主要实现类就是JRaftServiceLoader。
咱们来看看使用方式node
public static final JRaftServiceFactory defaultServiceFactory = JRaftServiceLoader.load(JRaftServiceFactory.class).first();
public static <S> JRaftServiceLoader<S> load(final Class<S> service) { return JRaftServiceLoader.load(service, Thread.currentThread().getContextClassLoader()); } public static <S> JRaftServiceLoader<S> load(final Class<S> service, final ClassLoader loader) { return new JRaftServiceLoader<>(service, loader); }
经过当前线程上线文类加载器(其实就是AppClassLoader)去进行加载工做。固然这里直接用JRaftServiceLoader的类加载器也是同样的效果。缓存
public S first() { final List<S> sortList = sort(); if (sortList.isEmpty()) { throw fail(this.service, "could not find any implementation for class"); } return sortList.get(0); }
这个sort方法其实就是获取到一个排序实现类的集合。而后从中获取第一个(最优先的)。网络
public List<S> sort() { final Iterator<S> it = iterator(); final List<S> sortList = new ArrayList<>(); while (it.hasNext()) { sortList.add(it.next()); } if (sortList.size() <= 1) { return sortList; } sortList.sort((o1, o2) -> { final SPI o1_spi = o1.getClass().getAnnotation(SPI.class); final SPI o2_spi = o2.getClass().getAnnotation(SPI.class); final int o1_priority = o1_spi == null ? 0 : o1_spi.priority(); final int o2_priority = o2_spi == null ? 0 : o2_spi.priority(); return -(o1_priority - o2_priority); }); return sortList; }
这个方法其实实现很简单,就是经过迭代器,去迭代获取全部的实现,而后根据实现类的SPI注解中的priority,也就是优先级进行排序,最后返回一个有序的集合。
咱们重点仍是关注这个迭代器的实现。session
final Iterator<Map.Entry<String, S>> knownProviders = JRaftServiceLoader.this.providers.entrySet() .iterator(); @Override public boolean hasNext() { return this.knownProviders.hasNext() || JRaftServiceLoader.this.lookupIterator.hasNext(); } @Override public S next() { if (this.knownProviders.hasNext()) { return this.knownProviders.next().getValue(); } final Class<S> cls = JRaftServiceLoader.this.lookupIterator.next(); try { final S provider = JRaftServiceLoader.this.service.cast(cls.newInstance()); JRaftServiceLoader.this.providers.put(cls.getName(), provider); return provider; } catch (final Throwable x) { throw fail(JRaftServiceLoader.this.service, "provider " + cls.getName() + " could not be instantiated", x); } }
knownProviders其实就是一个缓存,加载成功的会放在这个Map里面。也就是咱们第二次调用iterator方法的时候,就不须要从新执行类加载过程了(也不会从新执行),直接从缓存中获取。
若是是第一次执行该方法,那么依赖lookupIterator去执行加载过程。咱们先来看一下lookupIterator(LazyIterator)的实现。框架
@Override public boolean hasNext() { if (this.nextName != null) { return true; } if (this.configs == null) { try { final String fullName = PREFIX + this.service.getName(); if (this.loader == null) { this.configs = ClassLoader.getSystemResources(fullName); } else { this.configs = this.loader.getResources(fullName); } } catch (final IOException x) { throw fail(this.service, "error locating configuration files", x); } } while ((this.pending == null) || !this.pending.hasNext()) { if (!this.configs.hasMoreElements()) { return false; } this.pending = parse(this.service, this.configs.nextElement()); } this.nextName = this.pending.next(); return true; }
这里有个细节,就是加了一个this.nextName,若是不为空的话,直接返回true,由于后面在判断成功后会给this.nextName赋值。若是调用了hasNext方法,却没有调用next,那么下次调用hasNext就不用在走一遍判断逻辑。ide
其实方法逻辑很简单,就是获取对应路径下的文件,并加载出来。而后逐行解析。无论每行多个实现或者只有一个。都是能够适配的。到这里基本上就说完了。学习
实现仍是比较简单。经过迭代器,懒加载,缓存来实现的。并加了优先级。优化
在节点初始化的时候,会启动快照的定时器snapshotTimer。
默认时间为一天执行一次。ui
这个方法流程以下:
final SnapshotWriter writer = this.snapshotStorage.create(); if (writer == null) { Utils.runClosureInThread(done, new Status(RaftError.EIO, "Fail to create writer.")); reportError(RaftError.EIO.getNumber(), "Fail to create snapshot writer."); return; } this.savingSnapshot = true; final SaveSnapshotDone saveSnapshotDone = new SaveSnapshotDone(writer, done, null); if (!this.fsmCaller.onSnapshotSave(saveSnapshotDone)) { Utils.runClosureInThread(done, new Status(RaftError.EHOSTDOWN, "The raft node is down.")); return; }
在fsmCaller中的runApplyTask 方法会处理snapshot save事件,最终调用doSnapshotSave方法。
case SNAPSHOT_SAVE: this.currTask = TaskType.SNAPSHOT_SAVE; if (passByStatus(task.done)) { doSnapshotSave((SaveSnapshotClosure) task.done); } break;
这个方法主要是为SnapshotWriter构建SnapshotMeta 对象,填充对应元数据,最后会调用onSnapshotSave 方法。
final SnapshotWriter writer = done.start(metaBuilder.build()); if (writer == null) { done.run(new Status(RaftError.EINVAL, "snapshot_storage create SnapshotWriter failed")); return; } this.fsm.onSnapshotSave(writer, done);
this.fsm.onSnapshotSave方法须要业务方去实现,主要就是快照的保存和压缩。业务方需将快照文件数据加入到SnapshotWriter中。
安装快照实在Replicator中执行。
fillCommonFields返回false就调用installSnapshot方法。
或者当前要同步的日志数量为0(两种状况,一种是确实没有日志同步,另外一种是日志被删除,存储到快照文件了,若是是被删除,就须要同步快照),也会调用installSnapshot方法。
if (!fillCommonFields(rb, this.nextIndex - 1, isHeartbeat)) { // id is unlock in installSnapshot installSnapshot(); if (isHeartbeat && heartBeatClosure != null) { Utils.runClosureInThread(heartBeatClosure, new Status(RaftError.EAGAIN, "Fail to send heartbeat to peer %s", this.options.getPeerId())); } return; } if (rb.getEntriesCount() == 0) { if (nextSendingIndex < this.options.getLogManager().getFirstLogIndex()) { installSnapshot(); return false; } // _id is unlock in _wait_more waitMoreEntries(nextSendingIndex); return false; }
installSnapshot方法会构建一个InstallSnapshotRequest发送给follower。InstallSnapshotRequest主要存储了快照的元数据以及一个uri信息。
该处理器主要用来接受leader发送的InstallSnapshotRequest请求。最后会调用handleInstallSnapshot 方法。
该方法和处理其余rpc请求的逻辑基本一致。若是当前节点ok,leader合法,会调用snapshotExecutor.installSnapshot方法。
this.snapshotExecutor.installSnapshot(request, InstallSnapshotResponse.newBuilder(), done);
该方法首先会构建一个下载快照的对象。而后调用registerDownloadingSnapshot去进行下载。
下载逻辑其实很简单:先加载元数据,创建文件和数据映射。而后去逐个去写文件。
public void installSnapshot(final InstallSnapshotRequest request, final InstallSnapshotResponse.Builder response, final RpcRequestClosure done) { final SnapshotMeta meta = request.getMeta(); final DownloadingSnapshot ds = new DownloadingSnapshot(request, response, done); // DON'T access request, response, and done after this point // as the retry snapshot will replace this one. if (!registerDownloadingSnapshot(ds)) { LOG.warn("Fail to register downloading snapshot."); // This RPC will be responded by the previous session return; } Requires.requireNonNull(this.curCopier, "curCopier"); try { //阻塞 等待快照数据下载 this.curCopier.join(); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); LOG.warn("Install snapshot copy job was canceled."); return; } loadDownloadingSnapshot(ds, meta); }
最后调用loadDownloadingSnapshot去安装文件数据。
具体逻辑在FSMCallerImpl的doSnapshotLoad 方法中。这个方法最后会调用业务状态机的onSnapshotLoad 方法。
public boolean onSnapshotLoad(final LoadSnapshotClosure done) { return enqueueTask((task, sequence) -> { task.type = TaskType.SNAPSHOT_LOAD; task.done = done; }); }
参考CounterStateMachine中的实现。首先判断数据是否有效(是否存在该文件),而后去从文件读取。最后set到value
public boolean onSnapshotLoad(final SnapshotReader reader) { if (isLeader()) { LOG.warn("Leader is not supposed to load snapshot"); return false; } if (reader.getFileMeta("data") == null) { LOG.error("Fail to find data file in {}", reader.getPath()); return false; } final CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data"); try { this.value.set(snapshot.load()); return true; } catch (final IOException e) { LOG.error("Fail to load snapshot from {}", snapshot.getPath()); return false; } }
存储快照
经过定时任务。定时执行存储逻辑。逻辑须要业务方实现。
传递快照
由leader决定是否须要传递。通常写入快照的日志会被删除。leader在发送日志时候,发现日志已经被删除的话,就须要传递快照。
传递快照方式
由leader发送一个条RPC,告诉follower须要来下载快照。这条RPC包括快照元数据和uri信息。
follower接收到RPC,会去主动下载leader的快照信息。下载成功后写入本地文件。
安装快照
初始化的时候会加载。或者下载完后会去主动加载。加载逻辑很简单,就是根据下载的快照元数据信息去从磁盘加载数据,而后应用到状态机。