DM 源码阅读系列文章(四)dump/load 全量同步的实现

做者:杨非html

本文为 DM 源码阅读系列文章的第四篇,上篇文章 介绍了数据同步处理单元实现的功能,数据同步流程的运行逻辑以及数据同步处理单元的 interface 设计。本篇文章在此基础上展开,详细介绍 dump 和 load 两个数据同步处理单元的设计实现,重点关注数据同步处理单元 interface 的实现,数据导入并发模型的设计,以及导入任务在暂停或出现异常后如何恢复。mysql

dump 处理单元

dump 处理单元的代码位于 github.com/pingcap/dm/mydumper 包内,做用是从上游 MySQL 将表结构和数据导出到逻辑 SQL 文件,因为该处理单元老是运行在任务的第一个阶段(full 模式和 all 模式),该处理单元每次运行不依赖于其余处理单元的处理结果。另外一方面,若是在 dump 运行过程当中被强制终止(例如在 dmctl 中执行 pause-task 或者 stop-task),也不会记录已经 dump 数据的 checkpoint 等信息。不记录 checkpoint 是由于每次运行 mydumper 从上游导出数据,上游的数据均可能发生变动,为了能获得一致的数据和 metadata 信息,每次恢复任务或从新运行任务时该处理单元会 清理旧的数据目录,从新开始一次完整的数据 dump。git

导出表结构和数据的逻辑并非在 DM 内部直接实现,而是 经过 os/exec 包调用外部 mydumper 二进制文件 来完成。在 mydumper 内部,咱们须要关注如下几个问题:github

  • 数据导出时的并发模型是如何实现的。sql

  • no-locks, lock-all-tables, less-locking 等参数有怎样的功能。数据库

  • 库表黑白名单的实现方式。c#

mydumper 的实现细节

mydumper 的一次完整的运行流程从主线程开始,主线程按照如下步骤执行:安全

  1. 解析参数。并发

  2. 建立到数据库的链接less

  3. 会根据 no-locks 选项进行一系列的备份安全策略,包括 long query guardlock all tables or FLUSH TABLES WITH READ LOCK

  4. START TRANSACTION WITH CONSISTENT SNAPSHOT

  5. 记录 binlog 位点信息

  6. less locking 处理线程的初始化

  7. 普通导出线程初始化

  8. 若是配置了 trx-consistency-only 选项,执行 UNLOCK TABLES /* trx-only */ 释放以前获取的表锁。注意,若是开启该选项,是没法保证非 InnoDB 表导出数据的一致性。更多关于一致性读的细节能够参考 MySQL 官方文档 Consistent Nonlocking Reads 部分

  9. 根据配置规则(包括 --database, --tables-list 和 --regex 配置)读取须要导出的 schema 和表信息,并在这个过程当中有区分的记录 innodb_tables 和 non_innodb_table

  10. 为工做子线程建立任务,并将任务 push 到相关的工做队列

  11. 若是没有配置 no-lockstrx-consistency-only 选项,执行 UNLOCK TABLES /* FTWRL */ 释放锁

  12. 若是开启 less-locking,等待全部 less locking 子线程退出

  13. 等待全部工做子线程退出

工做线程的并发控制包括了两个层面,一层是在不一样表级别的并发,另外一层是同一张表级别的并发。mydumper 的主线程会将一次同步任务拆分为多个同步子任务,并将每一个子任务分发给同一个异步队列 conf.queue_less_locking/conf.queue,工做子线程从队列中获取任务并执行。具体的子任务划分包括如下策略:

从上述的并发模型能够看出 mydumper 首先按照表进行同步任务拆分,对于同一张表,若是配置 rows-per-file 参数,会根据该参数和表行数将表划分为合适的 chunks 数,这便是同一张表内部的并发。具体表行数的估算和 chunks 划分的实现见 get_chunks_for_table 函数。

须要注意目前 DM 在任务配置中指定的库表黑白名单功能只应用于 load 和 binlog replication 处理单元。若是在 dump 处理单元内使用库表黑白名单功能,须要在同步任务配置文件的 dump 处理单元配置提供 extra-args 参数,并指定 mydumper 相关参数,包括 --database, --tables-list 和 --regex。mydumper 使用 regex 过滤库表的实现参考 check_regex 函数。

load 处理单元

load 处理单元的代码位于 github.com/pingcap/dm/loader 包内,该处理单元在 dump 处理单元运行结束后运行,读取 dump 处理单元导出的 SQL 文件解析并在下游数据库执行逻辑 SQL。咱们重点分析 InitProcess 两个 interface 的实现。

Init 实现细节

该阶段进行一些初始化和清理操做,并不会开始同步任务,若是在该阶段运行中出现错误,会经过 rollback 机制 清理资源,不须要调用 Close 函数。该阶段包含的初始化操做包括如下几点:

Process 实现细节

该阶段的工做流程也很直观,经过 一个收发数据类型为 *pb.ProcessErrorchannel 接收运行过程当中出现的错误,出错后经过 context 的 CancelFunc 强制结束处理单元运行。在核心的 数据导入函数 中,工做模型与 mydumper 相似,即在 主线程中分发任务有多个工做线程执行具体的数据导入任务。具体的工做细节以下:

  • 主线程会按照库,表的顺序读取建立库语句文件 <db-name>-schema-create.sql 和建表语句文件 <db-name>.<table-name>-schema-create.sql,并在下游执行 SQL 建立相对应的库和表。

  • 主线程读取 checkpoint 信息,结合数据文件信息建立 fileJob 随机分发任务给一个工做子线程,fileJob 任务的结构以下所示 :

    type fileJob struct {
    	   schema    string
    	   table     string
    	   dataFile  string
    	   offset    int64 // 表示读取文件的起始 offset,若是没有 checkpoint 断点信息该值为 0
    	   info      *tableInfo // 保存原库表,目标库表,列名,insert 语句 column 名字列表等信息
    	}
  • 在每一个工做线程内部,有一个循环不断从本身 fileJobQueue 获取任务,每次获取任务后会对文件进行解析,并将解析后的结果分批次打包为 SQL 语句分发给线程内部的另一个工做协程,该工做协程负责处理 SQL 语句的执行。工做流程的伪代码以下所示,完整的代码参考 func (w *Worker) run()

    // worker 工做线程内分发给内部工做协程的任务结构
    	type dataJob struct {
    	   sql         string // insert 语句, insert into <table> values (x, y, z), (x2, y2, z2), … (xn, yn, zn);
    	   schema      string // 目标数据库
    	   file        string // SQL 文件名
    	   offset      int64 // 本次导入数据在 SQL 文件的偏移量
    	   lastOffset  int64 // 上一次已导入数据对应 SQL 文件偏移量
    	}
    
    	// SQL 语句执行协程
    	doJob := func() {
    	   for {
    	       select {
    	       case <-ctx.Done():
    	           return
    	       case job := <-jobQueue:
    	           sqls := []string{
    	               fmt.Sprintf("USE `%s`;", job.schema), // 指定插入数据的 schema
    	               job.sql,
    	               checkpoint.GenSQL(job.file, job.offset), // 更新 checkpoint 的 SQL 语句
    	           }
    	           executeSQLInOneTransaction(sqls) // 在一个事务中执行上述 3 条 SQL 语句
    	       }
    	   }
    	}
    	​
    	// worker 主线程
    	for {
    	   select {
    	   case <-ctx.Done():
    	       return
    	   case job := <-fileJobQueue:
    	       go doJob()
    	       readDataFileAndDispatchSQLJobs(ctx, dir, job.dataFile, job.offset, job.info)
    	   }
    	}
  • dispatchSQL 函数负责在工做线程内部读取 SQL 文件和重写 SQL,该函数会在运行初始阶段 建立所操做表的 checkpoint 信息,须要注意在任务中断恢复以后,若是这个文件的导入尚未完成,checkpoint.Init 仍然会执行,可是此次运行不会更新该文件的 checkpoint 信息列值转换和库表路由也是在这个阶段内完成

    • 列值转换:须要对输入 SQL 进行解析拆分为每个 field,对须要转换的 field 进行转换操做,而后从新拼接起 SQL 语句。详细重写流程见 reassemble 函数。

    • 库表路由:这种场景下只须要 替换源表到目标表 便可。

  • 在工做线程执行一个批次的 SQL 语句以前,会首先根据文件 offset 信息生成一条更新 checkpoint 的语句,加入到打包的 SQL 语句中,具体执行时这些语句会 在一个事务中提交,这样就保证了断点信息的准确性,若是导入过程暂停或中断,恢复任务后从断点从新同步能够保证数据一致。

小结

本篇详细介绍 dump 和 load 两个数据同步处理单元的设计实现,对核心 interface 实现、数据导入并发模型、数据导入暂停或中断的恢复进行了分析。接下来的文章会继续介绍 binlog replicationrelay log 两个数据同步处理单元的实现。

相关文章
相关标签/搜索