做者:王相node
TiDB-Binlog 组件用于收集 TiDB 的 binlog,并提供实时备份和同步功能。该组件在功能上相似于 MySQL 的主从复制,MySQL 的主从复制依赖于记录的 binlog 文件,TiDB-Binlog 组件也是如此,主要的不一样点是 TiDB 是分布式的,所以须要收集各个 TiDB 实例产生的 binlog,并按照事务提交的时间排序后才能同步到下游。若是你须要部署 TiDB 集群的从库,或者想订阅 TiDB 数据的变动输出到其余的系统中,TiDB-Binlog 则是必不可少的工具。git
TiDB-Binlog 这个组件已经发布了 2 年多时间,经历过几回架构演进,去年十月到如今大规模使用的是 Kafka 版本,架构图以下:github
Kafka 版本的 TiDB-Binlog 主要包括两个组件:算法
Pump:一个守护进程,在每一个 TiDB 主机的后台运行。其主要功能是实时记录 TiDB 产生的 binlog 并顺序写入 Kafka 中。数据库
Drainer: 从 Kafka 中收集 binlog,并按照 TiDB 中事务的提交顺序转化为指定数据库兼容的 SQL 语句或者指定格式的数据,最后同步到目的数据库或者写到顺序文件。json
这个架构的工做原理为:缓存
TiDB 须要与 Pump 绑定,即 TiDB 实例只能将它生成的 binlog 发送到一个指定的 Pump 中;bash
Pump 将 binlog 先写到本地文件,再异步地写入到 Kafka;数据结构
Drainer 从 Kafka 中读出 binlog,对 binlog 进行排序,对 binlog 解析后生成 SQL 或指定格式的数据再同步到下游。架构
根据用户的反馈,以及咱们本身作的一些测试,发现该版本主要存在一些问题。
首先,TiDB 的负载可能不均衡,部分 TiDB 业务较多,产生的 binlog 也比较多,对应的 Pump 的负载高,致使数据同步延迟高。
其次,依赖 Kafka 集群,增长了运维成本;并且 TiDB 产生的单条 binlog 的大小可达 2G(例如批量删除数据、批量写入数据),须要配置 Kafka 的消息大小相关设置,而 Kafka 并不太适合单条数据较大的场景。
最后,Drainer 须要读取 Kafka 中的 binlog、对 binlog 进行排序、解析 binlog,同步数据到下游等工做,能够看出 Drainer 的工做较多,并且 Drainer 是一个单点,因此每每同步数据的瓶颈都在 Drainer。
以上这些问题咱们很难在已有的框架下进行优化,所以咱们对 TiDB-Binlog 进行了重构,最新版本的 TiDB-Binlog 的整体架构以下图所示:
新版本 TiDB-Binlog 再也不使用 Kafka 存储 binlog,仍然保留了 Pump 和 Drainer 两个组件,可是对功能进行了调整:
Pump 用于实时记录 TiDB 产生的 binlog,并将 binlog 按照事务的提交时间进行排序,再提供给 Drainer 进行消费。
Drainer 从各个 Pump 中收集 binlog 进行归并,再将 binlog 转化成 SQL 或者指定格式的数据,最终同步到下游。
该版本的主要优势为:
多个 Pump 造成一个集群,能够水平扩容,各个 Pump 能够均匀地承担业务的压力。
TiDB 经过内置的 Pump Client 将 binlog 分发到各个 Pump,即便有部分 Pump 出现故障也不影响 TiDB 的业务。
Pump 内部实现了简单的 kv 来存储 binlog,方便对 binlog 数据的管理。
原来 Drainer 的 binlog 排序逻辑移到了 Pump 来作,而 Pump 是可扩展的,这样就能提升总体的同步性能。
Drainer 再也不须要像原来同样读取一批 binlog 到内存里进行堆排序,只须要依次读取各个 Pump 的 binlog 进行归并排序,这样能够大大节省内存的使用,同时也更容易作内存控制。
因为该版本最大的特色是多个 Pump 组成了一个集群(cluster),所以该版本命名为 cluster 版本。下面咱们以最新的 cluster 版本的架构来介绍 TiDB-Binlog 的实现原理。
首先咱们先介绍一下 TiDB 中的 binlog,TiDB 的事务采用 2pc 算法,一个成功的事务会写两条 binlog,包括一条 Prewrite binlog 和 一条 Commit binlog;若是事务失败,会发一条 Rollback binlog。
binlog 的结构定义为:
// Binlog 记录事务中全部的变动,能够用 Binlog 构建 SQL
message Binlog {
// Binlog 的类型,包括 Prewrite、Commit、Rollback 等
optional BinlogType tp = 1 [(gogoproto.nullable) = false];
// Prewrite, Commit 和 Rollback 类型的 binlog 的 start_ts,记录事务开始的 ts
optional int64 start_ts = 2 [(gogoproto.nullable) = false];
// commit_ts 记录事务结束的 ts,只记录在 commit 类型的 binlog 中
optional int64 commit_ts = 3 [(gogoproto.nullable) = false];
// prewrite key 只记录在 Prewrite 类型的 binlog 中,
// 是一个事务的主键,用于查询该事务是否提交
optional bytes prewrite_key = 4;
// prewrite_value 记录在 Prewrite 类型的 binlog 中,用于记录每一行数据的改变
optional bytes prewrite_value = 5;
// ddl_query 记录 ddl 语句
optional bytes ddl_query = 6;
// ddl_job_id 记录 ddl 的 job id
optional int64 ddl_job_id = 7 [(gogoproto.nullable) = false];
}
复制代码
binlog 及相关的数据结构定义见: binlog.proto
其中 start_ts
为事务开始时的 ts,commit_ts
为事务提交的 ts。ts 是由物理时间和逻辑时间转化而成的,在 TiDB 中是惟一的,由 PD 来统一提供。在开始一个事务时,TiDB 会请求 PD,获取一个 ts 做为事务的 start_ts
,在事务提交时则再次请求 PD 获取一个 ts 做为 commit_ts
。 咱们在 Pump 和 Drainer 中就是根据 binlog 的 commit_ts
来对 binlog 进行排序的。
TiDB 的 binlog 记录为 row 模式,即保存每一行数据的改变。数据的变化记录在 prewrite_value
字段中,该字段的数据主要由序列化后的 TableMutation 结构的数据组成。TableMutation 的结构以下所示:
// TableMutation 存储表中数据的变化
message TableMutation {
// 表的 id,惟一标识一个表
optional int64 table_id = 1 [(gogoproto.nullable) = false];
// 保存插入的每行数据
repeated bytes inserted_rows = 2;
// 保存修改前和修改后的每行的数据
repeated bytes updated_rows = 3;
// 已废弃
repeated int64 deleted_ids = 4;
// 已废弃
repeated bytes deleted_pks = 5;
// 删除行的数据
repeated bytes deleted_rows = 6;
// 记录数据变动的顺序
repeated MutationType sequence = 7;
}
复制代码
下面以一个例子来讲明 binlog 中是怎么存储数据的变化的。
例如 table 的结构为:
create table test
(id
int, name
varchar(24), primary key id
)
按照顺序执行以下 SQL:
begin;
insert into test(id, name) values(1, "a");
insert into test(id, name) values(2, "b");
update test set name = "c" where id = 1;
update test set name = "d" where id = 2;
delete from test where id = 2;
insert into test(id, name) values(2, "c");
commit;
复制代码
则生成的 TableMutation 的数据以下所示:
inserted_rows:
1, "a"
2, "b"
2, "c"
updated_rows:
1, "a", 1, "c"
2, "b", 2, "d"
deleted_rows:
2, "d"
sequence:
Insert, Insert, Update, Update, DeleteRow, Insert
复制代码
能够从例子中看出,sequence 中保存的数据变动类型的顺序为执行 SQL 的顺序,具体变动的数据内容则保存到了相应的变量中。
Drainer 在把 binlog 数据同步到下游前,就须要把上面的这些数据还原成 SQL,再同步到下游。
另外须要说明的是,TiDB 在写 binlog 时,会同时向 TiKV 发起写数据请求和向 Pump 发送 Prewrite binlog,若是 TiKV 和 Pump 其中一个请求失败,则该事务失败。当 Prewrite 成功后,TiDB 向 TiKV 发起 Commit 消息,并异步地向 Pump 发送一条 Commit binlog。因为 TiDB 是同时向 TiKV 和 Pump 发送请求的,因此只要保证 Pump 处理 Prewrite binlog 请求的时间小于等于 TiKV 执行 Prewrite 的时间,开启 binlog 就不会对事务的延迟形成影响。
从上面的介绍中咱们知道由多个 Pump 组成一个集群,共同承担写 binlog 的请求,那么就须要保证 TiDB 可以将写 binlog 的请求尽量均匀地分发到各个 Pump,而且须要识别不可用的 Pump,及时获取到新加入集群中 Pump 信息。这部分的工做是在 Pump Client 中实现的。
Pump Client 以包的形式集成在 TiDB 中,代码连接:pump_client。
Pump Client 维护 Pump 集群的信息,Pump 的信息主要来自于 PD 中保存的 Pump 的状态信息,状态信息的定义以下(代码连接:Status):
type Status struct {
// Pump/Drainer 实例的惟一标识
NodeID string `json:"nodeId"`
// Pump/Drainer 的服务地址
Addr string `json:"host"`
// Pump/Drainer 的状态,值能够为 online、pausing、paused、closing、offline
State string `json:"state"`
// Pump/Drainer 是否 alive(目前没有使用该字段)
IsAlive bool `json:"isAlive"`
// Pump的分数,该分数是由节点的负载、磁盘使用率、存储的数据量大小等因素计算得来的,
// 这样 Pump Client 能够根据分数来选取合适的 Pump 发送 binlog(待实现)
Score int64 `json:"score"`
// Pump 的标签,能够经过 label 对 TiDB 和 Pump 进行分组,
// TiDB 只能将 binlog 发送到相同 label 的 Pump(待实现)
Label *Label `json:"label"`
// Pump: 保存的 binlog 的最大的 commit_ts
// Drainer:已消费的 binlog 的最大的 commit_ts
MaxCommitTS int64 `json:"maxCommitTS"`
// 该状态信息的更新时间对应的 ts.
UpdateTS int64 `json:"updateTS"`
}
复制代码
Pump Client 根据 Pump 上报到 PD 的信息以及写 binlog 请求的实际状况将 Pump 划分为可用 Pump 与不可用 Pump 两个部分。
划分的方法包括:
初始化时从 PD 中获取全部 Pump 的信息,将状态为 online 的 Pump 加入到可用 Pump 列表中,其余 Pump 加入到非可用列表中。
Pump 每隔固定的时间会发送心跳到 PD,并更新本身的状态。Pump Client 监控 PD 中 Pump 上传的状态信息,及时更新内存中维护的 Pump 信息,若是状态由非 online 转换为 online 则将该 Pump 加入到可用 Pump 列表;反之加入到非可用列表中。
在写 binlog 到 Pump 时,若是该 Pump 在重试屡次后仍然写 binlog 失败,则把该 Pump 加入到非可用 Pump 列表中。
定时发送探活请求(数据为空的 binlog 写请求)到非可用 Pump 列表中的状态为 online 的 Pump,若是返回成功,则把该 Pump 从新加入到可用 Pump 列表中。
经过上面的这些措施,Pump Client 就能够及时地更新所维护的 Pump 集群信息,保证将 binlog 发送到可用的 Pump 中。
另一个问题是,怎么保证 Pump Client 能够将 binlog 写请求均匀地分发到各个 Pump?咱们目前提供了几种路由策略:
range: 按照顺序依次选取 Pump 发送 binlog,即第一次选取第一个 Pump,第二次选取第二个 Pump...
hash:对 binlog 的 start_ts
进行 hash,而后选取 hash 值对应的 Pump。
score:根据 Pump 上报的分数按照加权平均算法选取 Pump 发送 binlog(待实现)。
须要注意的地方是,以上的策略只是针对 Prewrite binlog,对于 Commit binlog,Pump Client 会将它发送到对应的 Prewrite binlog 所选择的 Pump,这样作是由于在 Pump 中须要将包含 Prewrite binlog 和 Commit binlog 的完整 binlog(即执行成功的事务的 binlog)提供给 Drainer,将 Commit binlog 发送到其余 Pump 没有意义。
Pump Client 向 Pump 提交写 binlog 的请求接口为 pump.proto 中的 WriteBinlog,使用 grpc 发送 binlog 请求。
Pump 主要用来承担 binlog 的写请求,维护 binlog 数据,并将有序的 binlog 提供给 Drainer。咱们将 Pump 抽象成了一个简单的 kv 数据库,key 为 binlog 的 start _ts
(Priwrite binlog) 或者 commit_ts
(Commit binlog),value 为 binlog 的元数据,binlog 的数据则存在数据文件中。Drainer 像查数据库同样的来获取所须要的 binlog。
Pump 内置了 leveldb 用于存储 binlog 的元信息。在 Pump 收到 binlog 的写请求时,会首先将 binlog 数据以 append 的形式写到文件中,而后将 binlog 的 ts、类型、数据长度、所保存的文件以及在文件中的位置信息保存在 leveldb 中,若是为 Prewrite binlog,则以 start_ts
做为 key;若是是 Commit binlog,则以 commit_ts
做为 key。
当 Drainer 向 Pump 请求获取指定 ts 以后的 binlog 时,Pump 则查询 leveldb 中大于该 ts 的 binlog 的元数据,若是当前数据为 Prewrite binlog,则必须找到对应的 Commit binlog;若是为 Commit binlog 则继续向前推动。这里有个问题,在 binlog 一节中提到,若是 TiKV 成功写入了数据,而且 Pump 成功接收到了 Prewrite binlog,则该事务就提交成功了,那么若是在 TiDB 发送 Commit binlog 到 Pump 前发生了一些异常(例如 TiDB 异常退出,或者强制终止了 TiDB 进程),致使 Pump 没有接收到 Commit binlog,那么 Pump 中就会一直找不到某些 Prewrite binlog 对应的 Commit binlog。这里咱们在 Pump 中作了处理,若是某个 Prewrite binlog 超过了十分钟都没有找到对应的 Commit binlog,则经过 binlog 数据中的 prewrite_key
去查询 TiKV 该事务是否提交,若是已经提交成功,则 TiKV 会返回该事务的 commit_ts
;不然 Pump 就丢弃该条 Prewrite binlog。
binlog 元数据中提供了数据存储的文件和位置,能够经过这些信息读取 binlog 文件的指定位置获取到数据。由于 binlog 数据基本上是按顺序写入到文件中的,所以咱们只须要顺序地读 binlog 文件便可,这样就保证了不会由于频繁地读取文件而影响 Pump 的性能。最终,Pump 以 commit_ts
为排序标准将 binlog 数据传输给 Drainer。Drainer 向 Pump 请求 binlog 数据的接口为 pump.proto 中的 PullBinlogs,以 grpc streaming 的形式传输 binlog 数据。
值得一提的是,Pump 中有一个 fake binlog 机制。Pump 会定时(默认三秒)向本地存储中写入一条数据为空的 binlog,在生成该 binlog 前,会向 PD 中获取一个 ts,做为该 binlog 的 start_ts
与 commit_ts
,这种 binlog 咱们叫做 fake binlog。这样作的缘由在 Drainer 中介绍。
Drainer 从各个 Pump 中获取 binlog,归并后按照顺序解析 binlog、生成 SQL 或者指定格式的数据,而后再同步到下游。
既然要从各个 Pump 获取数据,Drainer 就须要维护 Pump 集群的信息,及时获取到新增长的 Pump,并识别出不可用的 Pump,这部分功能与 Pump Client 相似,Drainer 也是经过 PD 中存储的 Pump 的状态信息来维护 Pump 信息。另外须要注意的是,若是新增长了一个 Pump,必须让该 Pump 通知 Drainer 本身上线了,这么作是为了保证不会丢数据。例如:
集群中已经存在 Pump1 和 Pump2,Drainer 读取 Pump1 和 Pump2 的数据并进行归并:
Pump1 存储的 binlog 为{ 1,3,5,7,9 },Pump2 存储的 binlog 为{2,4,6,10}。Drainer 从两个 Pump 获取 binlog,假设当前已经读取到了{1,2,3,4,5,6,7}这些 binlog,已处理的 binlog 的位置为 7。此时 Pump3 加入集群,从 Pump3 上报本身的上线信息到 PD,到 Drainer 从 PD 中获取到 Pump3 信息须要必定的时间,若是 Pump3 没有通知 Drainer 就直接提供写 binlog 服务,写入了 binlog{8,12},Drainer 在此期间继续读取 Pump1 和 Pump2 的 binlog,假设读取到了 9,以后才识别到了 Pump3 并将 Pump3 加入到归并排序中,此时 Pump3 的 binlog 8 就丢失了。为了不这种状况,须要让 Pump3 通知 Drainer 本身已经上线,Drainer 收到通知后将 Pump3 加入到归并排序,并返回成功给 Pump3,而后 Pump3 才能提供写 binlog 的服务。
Drainer 经过如上所示的方式对 binlog 进行归并排序,并推动同步的位置。那么可能会存在这种状况:某个 Pump 因为一些特殊的缘由一直没有收到 binlog 数据,那么 Drainer 中的归并排序就没法继续下去,正如咱们用两条腿走路,其中一只腿不动就不能继续前进。咱们使用 Pump 一节中提到的 fake binlog 的机制来避免这种问题,Pump 每隔指定的时间就生成一条 fake binlog,即便某些 Pump 一直没有数据写入,也能够保证归并排序正常向前推动。
Drainer 将全部 Pump 的数据按照 commit_ts
进行归并排序后,将 binlog 数据传递给 Drainer 中的数据解析及同步模块。经过上面的 binlog 格式的介绍,咱们能够看出 binlog 文件中并无存储表结构的信息,所以须要在 Drainer 中维护全部库和表的结构信息。在启动 Drainer 时,Drainer 会请求 TiKV,获取到全部历史的 DDL job 的信息,对这些 DDL job 进行过滤,使用 Drainer 启动时指定的 initial-commit-ts(或者 checkpoint 中保存的 commit_ts
)以前的 DDL 在内存中构建库和表结构信息。这样 Drainer 就有了一份 ts 对应时间点的库和表的快照,在读取到 DDL 类型的 binlog 时,则更新库和表的信息;读取到 DML 类型的 binlog 时,则根据库和表的信息来生成 SQL。
在生成 SQL 以后,就能够同步到下游了。为了提升 Drainer 同步的速度,Drainer 中使用多个协程来执行 SQL。在生成 SQL 时,咱们会使用主键/惟一键的值做为该条 SQL 的 key,经过对 key 进行 hash 来将 SQL 发送到对应的协程中。当每一个协程收集到了足够多的 SQL,或者超过了必定的时间,则将这一批的 SQL 在一个事务中提交到下游。
可是有些 SQL 是相关的,若是被分到了不一样的协程,那 SQL 的执行顺序就不能获得保证,形成数据的不一致。例如:
SQL1: delete from test.test where id = 1;
SQL2: replace into test.test (id, name ) values(1, "a");
复制代码
按照顺序执行后表中存在 id = 1 该行数据,若是这两条 SQL 分别分配到了协程 1 和协程 2 中,而且协程 2 先执行了 SQL,则表中再也不存在 id = 1 的数据。为了不这种状况的发生,Drainer 中加入了冲突检测的机制,若是检测出来两条 SQL 存在冲突(修改了同一行数据),则暂时不将后面的 SQL 发送到协程,而是生成一个 Flush 类型的 job 发送到全部的协程, 每一个协程在遇到 Flush job 时就会立刻执行所缓存的 SQL。接着才会把该条有冲突的 SQL 发送到对应的协程中。下面给出一个例子说明一下冲突检测的机制:
有如下这些 SQL,其中 id 为表的主键:
SQL1: update itest set id = 4, name = "c", age = 15 where id = 3; key: 3, 4
SQL2: update itest set id = 5, name = "b", age = 14 where id = 2; key:5, 2
SQL3:delete from itest where id = 3; key: 3
复制代码
首先将 SQL1 发送到指定的协程,这时全部的 keys 为[3,4];
SQL2 的 key[5,2]与 keys 中的[3,4]都没有冲突,将 SQL2 发送到指定的协程,这时 keys 为[3,4,5,2];
SQL3 的 key[3]与 keys 中的[3]存在冲突,发送 Flush job 到全部协程,SQL1 和 SQL2 被执行,清空 keys;
将 SQL3 发送到指定的协程,同时更新 keys 为[3]。
Drainer 经过以上这些机制来高效地同步数据,而且保证数据的一致。