上篇文章介绍了用于将 binlog 同步到 MySQL / TiDB 的 Loader package,本文往回退一步,介绍 Drainer 同步到不一样下游的机制。
TiDB Binlog(github.com/pingcap/tidb-binlog)用于收集 TiDB 的 binlog,并准实时同步给下游。 同步数据这一步重要操做由 Drainer 模块支持,它能够将 binlog 同步到 TiDB / MySQL / Kafka / File (增量备份)等下游组件。mysql
本文将按如下几个小节介绍 Drainer 如何将收到的 binlog 同步到下游:git
Sync
模块调度整个同步过程,全部的下游相关的同步逻辑统一封装成了 Syncer
接口。同步机制的核心是 Syncer
接口,定义以下:github
// Syncer sync binlog item to downstream type Syncer interface { // Sync the binlog item to downstream Sync(item *Item) error // will be close if Close normally or meet error, call Error() to check it Successes() <-chan *Item // Return not nil if fail to sync data to downstream or nil if closed normally Error() <-chan error // Close the Syncer, no more item can be added by `Sync` Close() error }
其中 Sync
方法表示异步地向下游同步一个 binlog,对应的参数类型是 *Item,这是一个封装了 binlog 的结构体;Successes
方法返回一个 channel,从中能够读取已经成功同步到下游的 Item;Error
方法返回一个 channel,当 Syncer
同步过程出错中断时,会往这个 channel 发送遇到的错误;Close
用于关掉 Syncer
,释放资源。golang
支持的每一个下游类型在 drainer/sync 目录下都有一个对应的 Syncer 实现,例如 MySQL 对应的是 mysql.go
里的 MySQLSyncer,Kafka 对应的是 kafka.go
里的 KafkaSyncer。Drainer 启动时,会根据配置文件中指定的下游,找到对应的 Syncer 实现,而后就能够用统一的接口管理整个同步过程了。sql
同步进程可能由于各类缘由退出,重启后要恢复同步就须要知道上次同步的进度。在 Drainer 里记录同步进度的功能抽象成 Checkpoint
接口,其定义以下:数据库
type CheckPoint interface { // Load loads checkpoint information. Load() error // Save saves checkpoint information. Save(int64) error // Pos gets position information. TS() int64 // Close closes the CheckPoint and release resources, after closed other methods should not be called again. Close() error }
从以上定义中能够看到,Save
的参数和 TS 的返回结果都是 int64 类型,由于同步的进度是以 TiDB 中单调递增的 commit timestamp 来记录的,它的类型就是 int64。缓存
Drainer 支持不一样类型的 Checkpoint 实现,例如 mysql.go
里的 MySQLCheckpoint
,默认将 commit timestamp 写到 tidb_binlog 库下的 checkpoint 表。Drainer 会根据下游类型自动选择不一样的 Checkpoint 实现,例如 TiDB / MySQL 的下游就会使用 MySQLCheckPoint,File(增量备份) 则使用 PbCheckpoint。异步
在 Syncer 小节,咱们看到 Syncer 的 Successes
方法提供了一个 channel 用来接收已经处理完毕的 binlog,收到 binlog 后,咱们用 Checkpoint 的 Save
方法保存 binlog 的 commit timestamp 就能够记下同步进度,细节可查看源码中的 handleSuccess 方法。函数
Syncer 在收到 binlog 后须要将里面记录的变动转换成适合下游 Syncer 类型的格式,这部分实如今 drainer/translator 包。工具
如下游是 MySQL / TiDB 的状况为例。MySQLSyncer.Sync
会先调用 TiBinlogToTxn
将 binlog 转换成 loader.Txn 以便接入下层的 loader
模块 (loader 接收一个个 loader.Txn 结构并还原成对应的 SQL 批量写入 MySQL / TiDB)。
loader.Txn
定义以下:
// Txn holds transaction info, an DDL or DML sequences type Txn struct { DMLs []*DML DDL *DDL // This field is used to hold arbitrary data you wish to include so it // will be available when receiving on the Successes channel Metadata interface{} }
Txn 主要有两类:DDL 和 DML。Metadata
目前放的就是传给 Sync
的 *Item 对象。DDL 的状况比较简单,由于 binlog 中已经直接包含了咱们要用到的 DDL Query。DML 则须要遍历 binlog 中的一个个行变动,根据它的类型 insert / update / delete 还原成相应的 loader.DML
。
上个小节中,咱们提到了对行变动数据的解析,在 binlog 中编码的行变动是没有列信息的,咱们须要查到对应版本的列信息才能还原出 SQL 语义。Schema 就是解决这个问题的模块。
在 Drainer 启动时,会调用 loadHistoryDDLJobs 从 TiKV 处查询截至当前时间全部已完成的 DDL Job 记录,按 SchemaVersion
升序排序(能够粗略认为这是一个单调递增地赋给每一个 DDL 任务的版本号)。这些记录在 Syncer 中会用于建立一个 Schema 对象。在运行过程当中,Drainer 每遇到一条 DDL 也会添加到 Schema 中。
binlog 中带有一个 SchemaVersion
信息,记录这条 binlog 生成的时刻 Schema 版本。在同步 Binlog 前,咱们会先用这个 SchemaVersion
信息调用 Schema 的一个方法 handlePreviousDDLJobIfNeed。上一段中咱们看到 Schema 从何处收集到有序的 DDL Job 记录,这个方法则是按顺序应用 SchemaVersion
小于等于指定版本的 DDL Job,在 Schema 中维护每一个表对应版本的最新结构信息,去掉一些错误代码后实现大体以下:
func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error { var i int for i = 0; i < len(s.jobs); i++ { if s.jobs[i].BinlogInfo.SchemaVersion <= version { _, _, _, err := s.handleDDL(s.jobs[i]) if err != nil { return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s) } } else { break } } s.jobs = s.jobs[i:] return nil }
对于每一个符合条件的 Job,由 handleDDL
方法将其表结构 TableInfo 等信息更新到 Schema
中,其余模块就能够查询到表格当前最新的信息。
咱们知道 Drainer 除了能够将 binlog 直接还原到下游数据库之外,还支持同步到其余外部存储系统块,因此咱们也提供了相应的工具来处理存储下来的文件,Reparo
是其中之一,用于读取存储在文件系统中的 binlog 文件,写入 TiDB 中。本节简单介绍下 Reparo 的用途与实现,读者能够做为示例了解如何处理同步到文件系统的 binlog 增量备份。
Reparo 能够读取同步到文件系统上的 binlog 增量备份并同步到 TiDB。
当下游设置成 File(增量备份) 时,Drainer 会将 Protobuf 编码的 binlog 保存到指定目录,每写满 512 MB 新建一个文件。每一个文件有个编号,从 0 开始依次类推。文件名格式定义以下:
// BinlogName creates a binlog file name. The file name format is like binlog-0000000000000001-20181010101010 func BinlogName(index uint64) string { currentTime := time.Now() return binlogNameWithDateTime(index, currentTime) } // binlogNameWithDateTime creates a binlog file name. func binlogNameWithDateTime(index uint64, datetime time.Time) string { return fmt.Sprintf("binlog-%016d-%s", index, datetime.Format(datetimeFormat)) }
文件的前缀都是 “binlog-”,后面跟一个 16 位右对齐的编号和一个时间戳。将目录里的文件按字母顺序排序就能够获得按编号排序的 binlog 文件名。从指定目录获取文件列表的实现以下:
// ReadDir reads and returns all file and dir names from directory func ReadDir(dirpath string) ([]string, error) { dir, err := os.Open(dirpath) if err != nil { return nil, errors.Trace(err) } defer dir.Close() names, err := dir.Readdirnames(-1) if err != nil { return nil, errors.Annotatef(err, "dir %s", dirpath) } sort.Strings(names) return names, nil }
这个函数简单地获取目录里所有文件名,排序后返回。在上层还作了一些过滤来去掉临时文件等。获得文件列表后,Reparo
会用标准库的 bufio.NewReader 逐个打开文件,而后用 Decode
函数读出其中的一条条 binlog:
func Decode(r io.Reader) (*pb.Binlog, int64, error) { payload, length, err := binlogfile.Decode(r) if err != nil { return nil, 0, errors.Trace(err) } binlog := &pb.Binlog{} err = binlog.Unmarshal(payload) if err != nil { return nil, 0, errors.Trace(err) } return binlog, length, nil }
这里先调用了 binlogfile.Decode
从文件中解析出对应 Protobuf 编码的一段二进制数据而后解码出 binlog。
获得 binlog 后就能够准备写入 TiDB。Reparo 这部分实现像一个简化版的 Drainer 的 Sync
模块,一样有一个 Syncer 接口以及几个具体实现(除了 mysqlSyncer
还有用于调试的 printSyncer
和 memSyncer
),因此就再也不介绍。值得一提的是,这里也跟前面不少 MySQL / TiDB 同步相关的模块同样使用了 loader 模块。
本文介绍了 Drainer 是如何实现数据同步的以及 Reparo 如何从文件系统中恢复增量备份数据到 MySQL / TiDB。在 Drainer 中,Syncer 封装了同步到各个下游模块的具体细节,Checkpoint 记录同步进度,Translator 从 binlog 中还原出具体的变动,Schema 在内存中维护每一个表对应的表结构定义。
TiDB Binlog 源码阅读系列在此就所有完结了,相信你们经过本系列文章更全面地理解了 TiDB Binlog 的原理和实现细节。咱们将继续打磨优化,欢迎你们给咱们反馈使用过程当中遇到的问题或建议;若是社区小伙伴们想参与 TiDB Binlog 的设计、开发和测试,也欢迎与咱们联系 info@pingcap.com,或者在 Repo 中提 issue 讨论。