TiKV 源码解析系列文章(九)Service 层处理流程解析

做者:周振靖bootstrap

以前的 TiKV 源码解析系列文章介绍了 TiKV 依赖的周边库,从本篇文章开始,咱们将开始介绍 TiKV 自身的代码。本文重点介绍 TiKV 最外面的一层——Service 层。网络

TiKV 的 Service 层的代码位于 src/server 文件夹下,其职责包括提供 RPC 服务、将 store id 解析成地址、TiKV 之间的相互通讯等。这一部分的代码并非特别复杂。本篇将会简要地介绍 Service 层的总体结构和组成 Service 层的各个组件。闭包

总体结构

位于 src/server/server.rs 文件中的 Server 是咱们本次介绍的 Service 层的主体。它封装了 TiKV 在网络上提供服务和 Raft group 成员之间相互通讯的逻辑。Server 自己的代码比较简短,大部分代码都被分离到 RaftClientTransportSnapRunner 和几个 gRPC service 中。上述组件的层次关系以下图所示:框架

接下来,咱们将详细介绍这些组件。异步

Resolver

在一个集群中,每一个 TiKV 实例都由一个惟一的 store id 进行标识。Resolver 的功能是将 store id 解析成 TiKV 的地址和端口,用于创建网络通讯。async

Resolver 是一个很简单的组件,其接口仅包含一个函数:函数

pub trait StoreAddrResolver: Send + Clone {
   fn resolve(&self, store_id: u64, cb: Callback) -> Result<()>;
}

其中 Callback 用于异步地返回结果。PdStoreAddrResolver 实现了该 trait,它的 resolve 方法的实现则是简单地将查询任务经过其 sched 成员发送给 Runner。而 Runner 则实现了 Runnable<Task>,其意义是 Runner 能够在本身的一个线程里运行,外界将会向 Runner 发送 Task 类型的消息,Runner 将对收到的 Task 进行处理。 这里使用了由 TiKV 的 util 提供的一个单线程 worker 框架,在 TiKV 的不少处代码中都有应用。Runnerstore_addrs 字段是个 cache,它在执行任务时首先尝试在这个 cache 中找,找不到则向 PD 发送 RPC 请求来进行查询,并将查询结果添加进 cache 里。性能

RaftClient

TiKV 是一个 Multi Raft 的结构,Region 的副本之间,即 Raft group 的成员之间须要相互通讯,RaftClient 的做用即是管理 TiKV 之间的链接,并用于向其它 TiKV 节点发送 Raft 消息。RaftClient 能够和另外一个节点创建多个链接,并把不一样 Region 的请求均摊到这些链接上。这部分代码的主要的复杂性就在于链接的创建,也就是 Conn::new 这个函数。创建链接的代码的关键部分以下:测试

let client1 = TikvClient::new(channel);

let (tx, rx) = batch::unbounded::<RaftMessage>(RAFT_MSG_NOTIFY_SIZE);
let rx = batch::BatchReceiver::new(rx, RAFT_MSG_MAX_BATCH_SIZE, Vec::new, |v, e| v.push(e));
let rx1 = Arc::new(Mutex::new(rx));

let (batch_sink, batch_receiver) = client1.batch_raft().unwrap();
let batch_send_or_fallback = batch_sink
   .send_all(Reusable(rx1).map(move |v| {
       let mut batch_msgs = BatchRaftMessage::new();
       batch_msgs.set_msgs(RepeatedField::from(v));
       (batch_msgs, WriteFlags::default().buffer_hint(false))
   })).then(/*...*/);

client1.spawn(batch_send_or_fallback.map_err(/*...*/));

上述代码向指定地址调用了 batch_raft 这个 gRPC 接口。batch_raftraft 都是 stream 接口。对 RaftClient 调用 send 方法会将消息发送到对应的 Connstream 成员,即上述代码的 tx 中,而在 gRPC 的线程中则会从 rx 中取出这些消息(这些消息被 BatchReceiver 这一层 batch 起来以提高性能),并经过网络发送出去。优化

若是对方不支持 batch,则会 fallback 到 raft 接口。这种状况一般仅在从旧版本升级的过程当中发生。

RaftStoreRouter 与 Transport

RaftStoreRouter 负责将收到的 Raft 消息转发给 raftstore 中对应的 Region,而 Transport 负责将 Raft 消息发送到指定的 store。

ServerRaftStoreRouter 是在 TiKV 实际运行时将会使用的 RaftStoreRouter 的实现,它包含一个内层的、由 raftstore 提供的 RaftRouter 对象和一个 LocalReader 对象。收到的请求若是是一个只读的请求,则会由 LocalReader 处理;其它状况则是交给内层的 router 来处理。

ServerTransport 则是 TiKV 实际运行时使用的 Transport 的实现(Transport trait 的定义在 raftstore 中),其内部包含一个 RaftClient 用于进行 RPC 通讯。发送消息时,ServerTransport 经过上面说到的 Resolver 将消息中的 store id 解析为地址,并将解析的结果存入 raft_client.addrs 中;下次向同一个 store 发送消息时便再也不须要再次解析。接下来,再经过 RaftClient 进行 RPC 请求,将消息发送出去。

Node

Node 能够认为是将 raftstore 的复杂的建立、启动和中止逻辑进行封装的一层,其内部的 RaftBatchSystem 即是 raftstore 的核心。在启动过程当中(即 Nodestart 函数中),若是该节点是一个新建的节点,那么会进行 bootstrap 的过程,包括分配 store id、分配第一个 Region 等操做。

Node 并无直接包含在 Server 以内,可是 raftstore 的运行须要有用于向其它 TiKV 发送消息的 Transport,而 Transport 做为提供网络通讯功能的一部分,则是包含在 Server 内。因此咱们能够看到,在 src/binutil/server.rs 文件的 run_raft_server 中(被 tikv-server 的 main 函数调用),启动过程当中须要先建立 Server,而后建立并启动 Node 并把 Server 所建立的 Transport 传给 Node,最后再启动 Node

Service

TiKV 包含多个 gRPC service。其中,最重要的一个是 KvService,位于 src/server/service/kv.rs 文件中。

KvService 定义了 TiKV 的 kv_getkv_scankv_prewritekv_commit 等事务操做的 API,用于执行 TiDB 下推下来的复杂查询和计算的 coprocessor API,以及 raw_getraw_put 等 Raw KV API。batch_commands 接口则是用于将上述的接口 batch 起来,以优化高吞吐量的场景。当咱们要为 TiKV 添加一个新的 API 时,首先就要在 kvproto 项目中添加相关消息体的定义,并在这里添加相关代码。另外,TiKV 的 Raft group 各成员之间通讯用到的 raftbatch_raft 接口也是在这里提供的。

下面以 kv_prewrite 为例,介绍 TiKV 处理一个请求的流程。首先,不管是直接调用仍是经过 batch_commands 接口调用,都会调用 future_prewrite 函数,并在该函数返回的 future 附加上根据结果发送响应的操做,再将获得的 future spawn 到 RpcContext,也就是一个线程池里。future_prewrite 的逻辑以下:

// 从请求体中取出调用 prewrite 所需的参数

let (cb, f) = paired_future_callback();
let res = storage.async_prewrite(/*其它参数*/, cb);

AndThenWith::new(res, f.map_err(Error::from)).map(|v| {
   let mut resp = PrewriteResponse::new();
   if let Some(err) = extract_region_error(&v) {
       resp.set_region_error(err);
   } else {
       resp.set_errors(RepeatedField::from_vec(extract_key_errors(v)));
   }
   resp
})

这里的 paired_future_callback 是一个 util 函数,它返回一个闭包 cb 和一个 future f,当 cb 被调用时 f 就会返回被传入 cb 的值。上述代码会马上返回,但 future 中的逻辑在 async_prewrite 中的异步操做完成以后才会执行。一旦 prewrite 操做完成,cb 便会被调用,将结果传给 f,接下来,咱们写在 future 中的建立和发送 Response 的逻辑便会继续执行。

总结

以上就是 TiKV 的 Service 层的代码解析。你们能够看到这些代码大量使用 trait 和泛型,这是为了方便将其中一些组件替换成另一些实现,方便编写测试代码。另外,在 src/server/snap.rs 中,咱们还有一个专门用于处理 Snapshot 的模块,因为 Snapshot 消息的特殊性,在其它模块中也有一些针对 snapshot 的代码。关于 Snapshot,咱们将在另外一篇文章里进行详细讲解,敬请期待。

原文阅读https://www.pingcap.com/blog-cn/tikv-source-code-reading-9/

相关文章
相关标签/搜索