SOFAStack Scalable Open Financial Architecture Stack 是蚂蚁金服自主研发的金融级分布式架构,包含了构建金融级云原生架构所需的各个组件,是在金融场景里锤炼出来的最佳实践。java
SOFAJRaft 是一个基于 Raft 一致性算法的生产级高性能 Java 实现,支持 MULTI-RAFT-GROUP,适用于高负载低延迟的场景。git
本文为《剖析 | SOFAJRaft 实现原理》第五篇,本篇做者袖扣,来自蚂蚁金服。算法
《剖析 | SOFAJRaft 实现原理》系列由 SOFA 团队和源码爱好者们出品,项目代号:<SOFA:JRaftLab/>
,文章尾部有参与方式,欢迎一样对源码热情的你加入。数据库
SOFAJRaft :https://gitee.com/sofastack/sofa-jraft网络
RheaKV 是首个以 JRaft 为基础实现的一个原生支持分布式的嵌入式键值(key、value)数据库,如今本文将从 RheaKV 是如何利用 MULTI-RAFT-GROUP 的方式实现 RheaKV 的高性能及容量的可扩展性的,从而进行全面的源码、实例剖析。架构
经过对 Raft 协议的描述咱们知道:用户在对一组 Raft 系统进行更新操做时必须先通过 Leader,再由 Leader 同步给大多数 Follower。而在实际运用中,一组 Raft 的 Leader 每每存在单点的流量瓶颈,流量高便没法承载,同时每一个节点都是全量数据,因此会受到节点的存储限制而致使容量瓶颈,没法扩展。并发
MULTI-RAFT-GROUP 正是经过把整个数据从横向作切分,分为多个 Region 来解决磁盘瓶颈,而后每一个 Region 都对应有独立的 Leader 和一个或多个 Follower 的 Raft 组进行横向扩展,此时系统便有多个写入的节点,从而分担写入压力,图以下:运维
此时磁盘及 I/O 瓶颈解决了,那多个 Raft Group 是如何协做的呢,咱们接着往下看。异步
RheaKV 主要由 3 个角色组成:PlacementDriver(如下成为 PD) 、Store、Region。因为 RheaKV 支持多组 Raft,因此比单组场景多出一个 PD 角色,用来调度以及收集每一个 Store 及 Region 的基础信息。分布式
PD 负责整个集群的管理调度、Region ID 生成等。此组件非必须的,若是不使用 PD,设置 PlacementDriverOptions 的 fake 属性为 true 便可。PD 通常经过 Region 的心跳返回信息进行对 Region 调度,Region 处理完后,PD 则会在下一个心跳返回中收到 Region 的变动信息来更新路由及状态表。
一般一个 Node 负责一个 Store,Store 能够被看做是 Region 的容器,里面存储着多个分片数据。Store 会向 PD 主动上报 StoreHeartbeatRequest 心跳,心跳交由 PD 的 handleStoreHeartbeat 处理,里面包含该 Store 的基本信息,好比,包含多少 Region,有哪些 Region 的 Leader 在该 Store 等。
Region 是数据存储、搬迁的最小单元,对应的是 Store 里某个实际的数据区间。每一个 Region 会有多个副本,每一个副本存储在不一样的 Store,一块儿组成一个Raft Group。Region 中的 Leader 会向 PD 主动上报 RegionHeartbeatRequest 心跳,交由 PD 的 handleRegionHeartbeat 处理,而 PD 是经过 Region 的 Epoch 感知 Region 是否有变化。
Muti-Raft-Group 的多 Region 是经过 RegionRouteTable 路由表组件进行管理的,可经过 addOrUpdateRegion、removeRegion 进行添加、更新、移除 Region,也包括 Region 的拆分。目前暂时还未实现 Region 的聚合,后面会考虑实现。
“让每组 Raft 负责一部分数据。”
数据分区或者分片算法一般就是 Range 和 Hash,RheaKV 是经过 Range 进行数据分片的,分红一个个 Raft Group,也称为 Region。这里为什么要设计成 Range 呢?缘由是 Range 切分是按照对 Key 进行字节排序后再作每段每段切分,像相似 scan 等操做对相近 key 的查询会尽量集中在某个 Region,这个是 Hash 没法支持的,就算遇到单个 Region 的拆分也会更好处理一些,只用修改部分元数据,不会涉及到大范围的数据挪动。
固然 Range 也会有一个问题那就是,可能会存在某个 Region 被频繁操做成为热点 Region。不过也有一些优化方案,好比 PD 调度热点 Region 到更空闲的机器上,或者提供 Follower 分担读的压力等。
Region 和 RegionEpoch 结构以下:
class Region { long id; // region id // Region key range [startKey, endKey) byte[] startKey; // inclusive byte[] endKey; // exclusive RegionEpoch regionEpoch; // region term List<Peer> peers; // all peers in the region } class RegionEpoch { // Conf change version, auto increment when add or remove peer long confVer; // Region version, auto increment when split or merge long version; } class Peer { long id; long storeId; Endpoint endpoint; }
Region.id:为 Region 的惟一标识,经过 PD 全局惟一分配。
Region.startKey、Region.endKey:这个表示的是 Region 的 key 的区间范围 [startKey, endKey),特别值得注意的是针对最开始 Region 的 startKey,和最后 Region 的 endKey 都为空。
Region.regionEpoch:当 Region 添加和删除 Peer,或者 split 等,此时 regionEpoch 就会发生变化,其中 confVer 会在配置修改后递增,version 则是每次有 split 、merge(还未实现)等操做时递增。
Region.peers:peers 则指的是当前 Region 所包含的节点信息,Peer.id 也是由 PD 全局分配的,Peer.storeId 表明的是 Peer 当前所处的 Store。
因为数据被拆分到不一样 Region 上,因此在进行多 key 的读、写、更新操做时须要操做多个 Region,这时操做前咱们须要获得具体的 Region,而后再单独对不一样 Region 进行操做。咱们以在多 Region上 scan 操做为例, 目标是返回某个 key 区间的全部数据:
例如:com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore#scan(byte[], byte[], boolean, boolean)
咱们很容易看到,在调用 scan 首先让 PD Client 经过 RegionRouteTable.findRegionsByKeyRange 检索 startKey、endKey 所覆盖的 Region,最后返回的可能为多个 Region,具体 Region 覆盖检索方法以下:
检索相关变量定义以下:
咱们能够看到整个 RheaKV 的 range 路由表是经过 TreeMap 的进行存储的,正呼应咱们前面讲过全部的 key 是经过对应字节进行排序存储。对应的 Value 为该 Region 的 RegionId,随后咱们经过 Region 路由 regionTable 查出便可。
如今咱们获得 scan 覆盖到的全部 Region:List<Region>
在循环查询中咱们看到有一个“retryCause -> {}”的 Lambda 表达式很容易看出这里是加持异常重试处理,后面咱们会讲到,接下来会经过 internalRegionScan 查询每一个 Region 的结果。具体源码以下:
这里也一样有一个重试处理,能够看到代码中根据当前是否为 Region 节点来决定是本机查询仍是经过RPC进行查询,若是是本机则调用 rawKVStore.scan() 进行本地直接查询,反之经过 rheaKVRpcService 进行 RPC 远程节点查询。最后每一个 Region 查询都返回为一个 future,经过 FutureHelper.joinList 工具类 CompletableFuture.allOf 异步并发返回结果 List<KVEntry>
。
例如:com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore#put(java.lang.String, byte[])
咱们能够发现 put 基础方法是支持 batch 的,便可成批提交。如未使用 batch 即直接提交,具体逻辑以下:
经过 pdClinet 查询对应存储的 Region,而且经过 regionId 拿到 RegionEngine,再经过对应存储引擎 KVStore 进行 put,整个过程一样支持重试机制。咱们再回过去看看 batch 的实现,很容易发现利用到了 Disruptor 的 RingBuffer 环形缓冲区,无锁队列为性能提供了保障,代码现场以下:
前面咱们有讲过,PD 会在 Region 的 heartBeat 里面对 Region 进行调度,当某个 Region 里的 keys 数量超过预设阀值,咱们便可对该 Region 进行拆分,Store 的状态机 KVStoreStateMachine 即收到拆分消息进行拆分处理。具体拆分源码以下:
KVStoreStateMachine.doSplit 源码以下:
StoreEngine.doSplit 源码以下:
咱们能够轻易的看到从原始 parentRegion 切分红 region 和 pRegion,并重设了 startKey、endKey 和版本号,并添加到 RegionEngineTable 注册到 RegionKVService,同时调用 pdClient.getRegionRouteTable().splitRegion() 方法进行更新存储在 PD 的 Region 路由表。
既然数据过多须要进行拆分,那 Region 进行合并那就确定是 2 个或者多个连续的 Region 数据量明显小于绝大多数 Region 容量则咱们能够对其进行合并。这一块后面会考虑实现。
经过上面咱们知道,一个 Store 即为一个节点,里面包含着一个或者多个 RegionEngine,一个 StoreEngine 一般经过 PlacementDriverClient 对 PD 进行调用,同时拥有 StoreEngineOptions 配置项,里面配置着存储引擎和节点相关配置。
在这个过程当中里面的 StoreEngine 会记录着 regionKVServiceTable、regionEngineTable,它们分别掌握着具体每一个不一样的 Region 存储的操做功能,对应的 key 即为 RegionId。
每一个在 Store 里的 Region 副本中,RegionEngine 则是一个执行单元。它里面记录着关联着的 StoreEngine 信息以及对应的 Region 信息。因为它也是一个选举节点,因此也包含着对应状态机 KVStoreStateMachine,以及对应的 RaftGroupService,并启动里面的 RpcServer 进行选举同步。
这个里面有个transferLeadershipTo方法,这个可被调用用于平衡当前节点分区的Leader,避免压力重叠。
DefaultRegionKVService 是 RegionKVService 的默认实现类,主要处理对 Region 的具体操做。
须要特别讲到的是,在具体的 RheaKV 操做时,FailoverClosure 担任着比较重要的角色,也给整个系统增长了必定的容错性。假如在一次 scan 操做中,若是跨 Store 须要多节点 scan 数据的时候,任何网络抖动都会形成数据不完整或者失败状况,因此容许必定次数的重试有利于提升系统的可用性,可是重试次数不宜太高,若是出现网络堵塞,屡次 timeout 级别失败会给系统带来额外的压力。这里只须要在 DefaultRheaKVStore 中,进行配置 failoverRetries 设置次数便可。
PlacementDriverClient 接口主要由 AbstractPlacementDriverClient 实现,而后 FakePlacementDriverClient、RemotePlacementDriverClient 为主要功能。FakePlacementDriverClient 是当系统不须要 PD 的时候进行 PD 对象的模拟,这里主要讲到 RemotePlacementDriverClient。
因为不少传统存储中间件并不原生支持分布式,因此一直少有体感,Raft 协议是一套比较比较好理解的共识协议,SOFAJRaft 通俗易懂是一个很是好的代码和工程范例,同时 RheaKV 也是一套很是轻量化支持多存储结构可分片的嵌入式数据库。写一篇代码分析文章也是一个学习和进步的过程,由此咱们也能够窥探到了一些数据库的基础实现,祝愿社区能在 SOFAJRaft / RheaKV 基础上构建更加灵活和自治理的系统和应用。