做者:杨非html
本文为 DM 源码阅读系列文章的第四篇,上篇文章 介绍了数据同步处理单元实现的功能,数据同步流程的运行逻辑以及数据同步处理单元的 interface 设计。本篇文章在此基础上展开,详细介绍 dump 和 load 两个数据同步处理单元的设计实现,重点关注数据同步处理单元 interface 的实现,数据导入并发模型的设计,以及导入任务在暂停或出现异常后如何恢复。mysql
dump 处理单元的代码位于 github.com/pingcap/dm/mydumper 包内,做用是从上游 MySQL 将表结构和数据导出到逻辑 SQL 文件,因为该处理单元老是运行在任务的第一个阶段(full 模式和 all 模式),该处理单元每次运行不依赖于其余处理单元的处理结果。另外一方面,若是在 dump 运行过程当中被强制终止(例如在 dmctl 中执行 pause-task 或者 stop-task),也不会记录已经 dump 数据的 checkpoint 等信息。不记录 checkpoint 是由于每次运行 mydumper 从上游导出数据,上游的数据均可能发生变动,为了能获得一致的数据和 metadata 信息,每次恢复任务或从新运行任务时该处理单元会 清理旧的数据目录,从新开始一次完整的数据 dump。git
导出表结构和数据的逻辑并非在 DM 内部直接实现,而是 经过 os/exec
包调用外部 mydumper 二进制文件 来完成。在 mydumper 内部,咱们须要关注如下几个问题:github
数据导出时的并发模型是如何实现的。sql
no-locks, lock-all-tables, less-locking 等参数有怎样的功能。数据库
库表黑白名单的实现方式。c#
mydumper 的一次完整的运行流程从主线程开始,主线程按照如下步骤执行:安全
解析参数。并发
建立到数据库的链接。less
会根据 no-locks
选项进行一系列的备份安全策略,包括 long query guard
和 lock all tables or FLUSH TABLES WITH READ LOCK
。
若是配置了 trx-consistency-only
选项,执行 UNLOCK TABLES /* trx-only */
释放以前获取的表锁。注意,若是开启该选项,是没法保证非 InnoDB 表导出数据的一致性。更多关于一致性读的细节能够参考 MySQL 官方文档 Consistent Nonlocking Reads 部分。
若是没有配置 no-locks
和 trx-consistency-only
选项,执行 UNLOCK TABLES /* FTWRL */ 释放锁。
工做线程的并发控制包括了两个层面,一层是在不一样表级别的并发,另外一层是同一张表级别的并发。mydumper 的主线程会将一次同步任务拆分为多个同步子任务,并将每一个子任务分发给同一个异步队列 conf.queue_less_locking/conf.queue
,工做子线程从队列中获取任务并执行。具体的子任务划分包括如下策略:
开启 less-locking
选项的非 InnoDB 表的处理。
non_innodb_table
分为 num_threads
组,分组方式是遍历这些表,依此将遍历到的表加入到当前数据量最小的分组,尽可能保证每一个分组内的数据量相近。rows-per-file
选项,会对每张表进行 chunks
估算,对于每一张表,若是估算结果包含多个 chunks,会将子任务进一步按照 chunks
进行拆分,分发 chunks
数量个子任务,若是没有 chunks
划分,分发为一个独立的子任务。queue_less_locking
,并在编号为 num_threads
~ 2 * num_threads
的子线程中处理任务。
less_locking_threads
任务执行完成以后,主线程就会 UNLOCK TABLES /* FTWRL */ 释放锁,这样有助于减小锁持有的时间。主线程根据 conf.unlock_tables
来判断非 InnoDB 表是否所有导出,普通工做线程 或者 queue_less_locking 工做线程每次处理完一个非 InnoDB 表任务都会根据 non_innodb_table_counter
和 non_innodb_done
两个变量判断是否还有没有导出结束的非 InnoDB 表,若是都已经导出结束,就会向异步队列 conf.unlock_tables
中发送一条数据,表示能够解锁全局锁。less_locking_threads
处理非 InnoDB 表任务时,会先 加表锁,导出数据,最后 解锁表锁。未开启 less-locking
选项的非 InnoDB 表的处理。
InnoDB 表的处理。
less-locking
选项的非 InnoDB 表的处理相同,一样是 按照表分发子任务,若是有 chunks
子任务会进一步细分。从上述的并发模型能够看出 mydumper 首先按照表进行同步任务拆分,对于同一张表,若是配置 rows-per-file
参数,会根据该参数和表行数将表划分为合适的 chunks
数,这便是同一张表内部的并发。具体表行数的估算和 chunks
划分的实现见 get_chunks_for_table
函数。
须要注意目前 DM 在任务配置中指定的库表黑白名单功能只应用于 load 和 binlog replication 处理单元。若是在 dump 处理单元内使用库表黑白名单功能,须要在同步任务配置文件的 dump 处理单元配置提供 extra-args 参数,并指定 mydumper 相关参数,包括 --database, --tables-list 和 --regex。mydumper 使用 regex 过滤库表的实现参考 check_regex
函数。
load 处理单元的代码位于 github.com/pingcap/dm/loader 包内,该处理单元在 dump 处理单元运行结束后运行,读取 dump 处理单元导出的 SQL 文件解析并在下游数据库执行逻辑 SQL。咱们重点分析 Init
和 Process
两个 interface 的实现。
该阶段进行一些初始化和清理操做,并不会开始同步任务,若是在该阶段运行中出现错误,会经过 rollback 机制 清理资源,不须要调用 Close 函数。该阶段包含的初始化操做包括如下几点:
建立 checkpoint
,checkpoint
用于记录全量数据的导入进度和 load 处理单元暂停或异常终止后,恢复或从新开始任务时能够从断点处继续导入数据。
应用任务配置的数据同步规则,包括如下规则:
该阶段的工做流程也很直观,经过 一个收发数据类型为 *pb.ProcessError
的 channel
接收运行过程当中出现的错误,出错后经过 context 的 CancelFunc
强制结束处理单元运行。在核心的 数据导入函数 中,工做模型与 mydumper 相似,即在 主线程中分发任务,有多个工做线程执行具体的数据导入任务。具体的工做细节以下:
主线程会按照库,表的顺序读取建立库语句文件 <db-name>-schema-create.sql
和建表语句文件 <db-name>.<table-name>-schema-create.sql
,并在下游执行 SQL 建立相对应的库和表。
主线程读取 checkpoint
信息,结合数据文件信息建立 fileJob 随机分发任务给一个工做子线程,fileJob 任务的结构以下所示 :
type fileJob struct { schema string table string dataFile string offset int64 // 表示读取文件的起始 offset,若是没有 checkpoint 断点信息该值为 0 info *tableInfo // 保存原库表,目标库表,列名,insert 语句 column 名字列表等信息 }
在每一个工做线程内部,有一个循环不断从本身 fileJobQueue
获取任务,每次获取任务后会对文件进行解析,并将解析后的结果分批次打包为 SQL 语句分发给线程内部的另一个工做协程,该工做协程负责处理 SQL 语句的执行。工做流程的伪代码以下所示,完整的代码参考 func (w *Worker) run()
:
// worker 工做线程内分发给内部工做协程的任务结构 type dataJob struct { sql string // insert 语句, insert into <table> values (x, y, z), (x2, y2, z2), … (xn, yn, zn); schema string // 目标数据库 file string // SQL 文件名 offset int64 // 本次导入数据在 SQL 文件的偏移量 lastOffset int64 // 上一次已导入数据对应 SQL 文件偏移量 } // SQL 语句执行协程 doJob := func() { for { select { case <-ctx.Done(): return case job := <-jobQueue: sqls := []string{ fmt.Sprintf("USE `%s`;", job.schema), // 指定插入数据的 schema job.sql, checkpoint.GenSQL(job.file, job.offset), // 更新 checkpoint 的 SQL 语句 } executeSQLInOneTransaction(sqls) // 在一个事务中执行上述 3 条 SQL 语句 } } } // worker 主线程 for { select { case <-ctx.Done(): return case job := <-fileJobQueue: go doJob() readDataFileAndDispatchSQLJobs(ctx, dir, job.dataFile, job.offset, job.info) } }
dispatchSQL
函数负责在工做线程内部读取 SQL 文件和重写 SQL,该函数会在运行初始阶段 建立所操做表的 checkpoint
信息,须要注意在任务中断恢复以后,若是这个文件的导入尚未完成,checkpoint.Init
仍然会执行,可是此次运行不会更新该文件的 checkpoint
信息。列值转换和库表路由也是在这个阶段内完成。
列值转换:须要对输入 SQL 进行解析拆分为每个 field,对须要转换的 field 进行转换操做,而后从新拼接起 SQL 语句。详细重写流程见 reassemble 函数。
库表路由:这种场景下只须要 替换源表到目标表 便可。
在工做线程执行一个批次的 SQL 语句以前,会首先根据文件 offset
信息生成一条更新 checkpoint 的语句,加入到打包的 SQL 语句中,具体执行时这些语句会 在一个事务中提交,这样就保证了断点信息的准确性,若是导入过程暂停或中断,恢复任务后从断点从新同步能够保证数据一致。
本篇详细介绍 dump 和 load 两个数据同步处理单元的设计实现,对核心 interface 实现、数据导入并发模型、数据导入暂停或中断的恢复进行了分析。接下来的文章会继续介绍 binlog replication
,relay log
两个数据同步处理单元的实现。