做者:黄梦龙git
TiKV 使用 Raft 算法来提供高可用且具备强一致性的存储服务。在 Raft 中,Snapshot 指的是整个 State Machine 数据的一份快照,大致上有如下这几种状况须要用到 Snapshot:github
TiKV 涉及到的是 1 和 2 这两种状况。在咱们的实现中,Snapshot 老是由 Region leader 所在的 TiKV 生成,经过网络发送给 Region follower/learner 所在的 TiKV。算法
理论上讲,咱们彻底能够把 Snapshot 看成普通的 RaftMessage
来发送,但这样作实践上会产生一些问题,主要是由于 Snapshot 消息的尺寸远大于其余 RaftMessage
:服务器
基于上面的缘由,TiKV 对 Snapshot 的发送和接收进行了特殊处理,为每一个 Snapshot 建立单独的网络链接,并将 Snapshot 拆分红 1M 大小的多个 Chunk 进行传输。网络
下面咱们分别从 RPC 协议、发送 Snapshot、收取 Snapshot 三个方面来解读相关源代码。本文的全部内容都基于 v3.0.0-rc.2 版本。app
与普通的 raft message 相似,Snapshot 消息也是使用 gRPC 远程调用的方式来传输的。在 pingcap/kvproto 项目中能够找到相关 RPC Call 的定义,具体在 tikvpb.proto 和 raft_serverpb.proto 文件中。异步
rpc Snapshot(stream raft_serverpb.SnapshotChunk) returns (raft_serverpb.Done) {} ... message SnapshotChunk { RaftMessage message = 1; bytes data = 2; } message Done {}
能够看出,Snapshot 被定义成 client streaming 调用,即对于每一个 Call,客户端依次向服务器发送多个相同类型的请求,服务器接收并处理完全部请求后,向客户端返回处理结果。具体在这里,每一个请求的类型是 SnapshotChunk
,其中包含了 Snapshot 对应的 RaftMessage
,或者携带一段 Snapshot 数据;回复消息是一个简单的空消息 Done
,由于咱们在这里实际不须要返回任何信息给客户端,只须要关闭对应的 stream。函数
Snapshot 的发送过程的处理比较简单粗暴,直接在将要发送 RaftMessage
的地方截获 Snapshot 类型的消息,转而经过特殊的方式进行发送。相关代码能够在 server/transport.rs 中找到:学习
fn write_data(&self, store_id: u64, addr: &str, msg: RaftMessage) { if msg.get_message().has_snapshot() { return self.send_snapshot_sock(addr, msg); } if let Err(e) = self.raft_client.wl().send(store_id, addr, msg) { error!("send raft msg err"; "err" => ?e); } } fn send_snapshot_sock(&self, addr: &str, msg: RaftMessage) { ... if let Err(e) = self.snap_scheduler.schedule(SnapTask::Send { addr: addr.to_owned(), msg, cb, }) { ... } }
从代码中能够看出,这里简单地把对应的 RaftMessage
包装成一个 SnapTask::Send
任务,并将其交给独立的 snap-worker
去处理。值得注意的是,这里的 RaftMessage
只包含 Snapshot 的元信息,而不包括真正的快照数据。TiKV 中有一个单独的模块叫作 SnapManager
,用来专门处理数据快照的生成与转存,稍后咱们将会看到从 SnapManager
模块读取 Snapshot 数据块并进行发送的相关代码。ui
咱们不妨顺藤摸瓜来看看 snap-worker
是如何处理这个任务的,相关代码在 server/snap.rs,精简掉非核心逻辑后的代码引用以下:
fn run(&mut self, task: Task) { match task { Task::Recv { stream, sink } => { ... let f = recv_snap(stream, sink, ...).then(move |result| { ... }); self.pool.spawn(f).forget(); } Task::Send { addr, msg, cb } => { ... let f = future::result(send_snap(..., &addr, msg)) .flatten() .then(move |res| { ... }); self.pool.spawn(f).forget(); } } }
snap-worker
使用了 future
来完成收发 Snapshot 任务:经过调用 send_snap()
或 recv_snap()
生成一个 future 对象,并将其交给 FuturePool
异步执行。
如今咱们暂且只关注 send_snap()
的 实现:
fn send_snap( ... addr: &str, msg: RaftMessage, ) -> Result<impl Future<Item = SendStat, Error = Error>> { ... let key = { let snap = msg.get_message().get_snapshot(); SnapKey::from_snap(snap)? }; ... let s = box_try!(mgr.get_snapshot_for_sending(&key)); if !s.exists() { return Err(box_err!("missing snap file: {:?}", s.path())); } let total_size = s.total_size()?; let chunks = { let mut first_chunk = SnapshotChunk::new(); first_chunk.set_message(msg); SnapChunk { first: Some(first_chunk), snap: s, remain_bytes: total_size as usize, } }; let cb = ChannelBuilder::new(env); let channel = security_mgr.connect(cb, addr); let client = TikvClient::new(channel); let (sink, receiver) = client.snapshot()?; let send = chunks.forward(sink).map_err(Error::from); let send = send .and_then(|(s, _)| receiver.map_err(Error::from).map(|_| s)) .then(move |result| { ... }); Ok(send) }
这一段流程仍是比较清晰的:先是用 Snapshot 元信息从 SnapManager
取到待发送的快照数据,而后将 RaftMessage
和 Snap
一块儿封装进 SnapChunk
结构,最后建立全新的 gRPC 链接及一个 Snapshot stream 并将 SnapChunk
写入。这里引入 SnapChunk
是为了不将整块 Snapshot 快照一次性加载进内存,它 impl 了 futures::Stream
这个 trait 来达成按需加载流式发送的效果。若是感兴趣能够参考它的 具体实现,本文就暂不展开了。
最后咱们来简单看一下 Snapshot 的收取流程,其实也就是 gRPC Call 的 server 端对应的处理,整个流程的入口咱们能够在 server/service/kv.rs 中找到:
fn snapshot( &mut self, ctx: RpcContext<'_>, stream: RequestStream<SnapshotChunk>, sink: ClientStreamingSink<Done>, ) { let task = SnapTask::Recv { stream, sink }; if let Err(e) = self.snap_scheduler.schedule(task) { ... } }
与发送过程相似,也是直接构建 SnapTask::Recv
任务并转发给 snap-worker
了,这里会调用上面出现过的 recv_snap()
函数,具体实现 以下:
fn recv_snap<R: RaftStoreRouter + 'static>( stream: RequestStream<SnapshotChunk>, sink: ClientStreamingSink<Done>, ... ) -> impl Future<Item = (), Error = Error> { ... let f = stream.into_future().map_err(|(e, _)| e).and_then( move |(head, chunks)| -> Box<dyn Future<Item = (), Error = Error> + Send> { let context = match RecvSnapContext::new(head, &snap_mgr) { Ok(context) => context, Err(e) => return Box::new(future::err(e)), }; ... let recv_chunks = chunks.fold(context, |mut context, mut chunk| -> Result<_> { let data = chunk.take_data(); ... if let Err(e) = context.file.as_mut().unwrap().write_all(&data) { ... } Ok(context) }); Box::new( recv_chunks .and_then(move |context| context.finish(raft_router)) .then(move |r| { snap_mgr.deregister(&context_key, &SnapEntry::Receiving); r }), ) }, ); f.then(move |res| match res { ... }) .map_err(Error::from) }
值得留意的是 stream 中的第一个消息(其中包含有 RaftMessage
)被用来建立 RecvSnapContext
对象,其后的每一个 chunk 收取后都依次写入文件,最后调用 context.finish()
把以前保存的 RaftMessage
发送给 raftstore
完成整个接收过程。
以上就是 TiKV 发送和接收 Snapshot 相关的代码解析了。这是 TiKV 代码库中较小的一个模块,它很好地解决了因为 Snapshot 消息特殊性所带来的一系列问题,充分应用了 grpc-rs
组件及 futures
/FuturePool
模型,你们能够结合本系列文章的 第七篇 和 第八篇 进一步拓展学习。
原文阅读:https://www.pingcap.com/blog-cn/tikv-source-code-reading-10/