TiDB 源码阅读系列文章(二十四)TiDB Binlog 源码解析

做者:姚维html

TiDB Binlog Overview

这篇文章不是讲 TiDB Binlog 组件的源码,而是讲 TiDB 在执行 DML/DDL 语句过程当中,如何将 Binlog 数据 发送给 TiDB Binlog 集群的 Pump 组件。目前 TiDB 在 DML 上的 Binlog 用的相似 Row-based 的格式。具体 Binlog 具体的架构细节能够参考这篇 文章mysql

这里只描述 TiDB 中的代码实现。git

DML Binlog

TiDB 采用 protobuf 来编码 binlog,具体的格式能够见 binlog.proto。这里讨论 TiDB 写 Binlog 的机制,以及 Binlog 对 TiDB 写入的影响。github

TiDB 会在 DML 语句提交,以及 DDL 语句完成的时候,向 pump 输出 Binlog。算法

Statement 执行阶段

DML 语句包括 Insert/Replace、Update、Delete,这里挑 Insert 语句来阐述,其余的语句行为都相似。首先在 Insert 语句执行完插入(未提交)以前,会把本身新增的数据记录在 binlog.TableMutation 结构体中。sql

// TableMutation 存储表中数据的变化
message TableMutation {
	    // 表的 id,惟一标识一个表
	    optional int64 table_id      = 1 [(gogoproto.nullable) = false]; 
	    
	    // 保存插入的每行数据
	    repeated bytes inserted_rows = 2;
	    
	    // 保存修改前和修改后的每行的数据
	    repeated bytes updated_rows  = 3;
	    
	    // 已废弃
	    repeated int64 deleted_ids   = 4;
	    
	    // 已废弃
	    repeated bytes deleted_pks   = 5;
	     
	    // 删除行的数据
	    repeated bytes deleted_rows  = 6;
	    
	    // 记录数据变动的顺序
	    repeated MutationType sequence = 7;
}

这个结构体保存于跟每一个 Session 连接相关的事务上下文结构体中 TxnState.mutations。 一张表对应一个 TableMutation 对象,TableMutation 里面保存了这个事务对这张表的全部变动数据。Insert 会把当前语句插入的行,根据 RowID + Row-value 的格式编码以后,追加到 TableMutation.InsertedRows 中:session

func (t *Table) addInsertBinlog(ctx context.Context, h int64, row []types.Datum, colIDs []int64) error {
	mutation := t.getMutation(ctx)
	pk, err := codec.EncodeValue(ctx.GetSessionVars().StmtCtx, nil, types.NewIntDatum(h))
	if err != nil {
		return errors.Trace(err)
	}
	value, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs, nil, nil)
	if err != nil {
		return errors.Trace(err)
	}
	bin := append(pk, value...)
	mutation.InsertedRows = append(mutation.InsertedRows, bin)
	mutation.Sequence = append(mutation.Sequence, binlog.MutationType_Insert)
	return nil
}

等到全部的语句都执行完以后,在 TxnState.mutations 中就保存了当前事务对全部表的变动数据。架构

Commit 阶段

对于 DML 而言,TiDB 的事务采用 2-phase-commit 算法,一次事务提交会分为 Prewrite 阶段,以及 Commit 阶段。这里分两个阶段来看看 TiDB 具体的行为。app

Prewrite Binlog

session.doCommit 函数中,TiDB 会构造 binlog.PrewriteValue异步

message PrewriteValue {
    optional int64         schema_version = 1 [(gogoproto.nullable) = false];
    repeated TableMutation mutations      = 2 [(gogoproto.nullable) = false];
}

这个 PrewriteValue 中包含了跟此次变更相关的全部行数据,TiDB 会填充一个类型为 binlog.BinlogType_Prewrite 的 Binlog:

info := &binloginfo.BinlogInfo{
	Data: &binlog.Binlog{
		Tp:            binlog.BinlogType_Prewrite,
		PrewriteValue: prewriteData,
	},
	Client: s.sessionVars.BinlogClient.(binlog.PumpClient),
}

TiDB 这里用一个事务的 Option kv.BinlogInfo 来把 BinlogInfo 绑定到当前要提交的 transaction 对象中:

s.txn.SetOption(kv.BinlogInfo, info)

twoPhaseCommitter.execute 中,在把数据 prewrite 到 TiKV 的同时,会调用 twoPhaseCommitter.prewriteBinlog,这里会把关联的 binloginfo.BinlogInfo 取出来,把 Binlog 的 binlog.PrewriteValue 输出到 Pump。

binlogChan := c.prewriteBinlog()
err := c.prewriteKeys(NewBackoffer(prewriteMaxBackoff, ctx), c.keys)
if binlogChan != nil {
	binlogErr := <-binlogChan // 等待 write prewrite binlog 完成
	if binlogErr != nil {
		return errors.Trace(binlogErr)
	}
}

这里值得注意的是,在 prewrite 阶段,是须要等待 write prewrite binlog 完成以后,才能继续作接下去的提交的,这里是为了保证 TiDB 成功提交的事务,Pump 至少必定能收到 Prewrite Binlog。

Commit Binlog

twoPhaseCommitter.execute 事务提交结束以后,事务可能提交成功,也可能提交失败。TiDB 须要把这个状态告知 Pump:

err = committer.execute(ctx)
if err != nil {
	committer.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
	return errors.Trace(err)
}
committer.writeFinishBinlog(binlog.BinlogType_Commit, int64(committer.commitTS))

若是发生了 error,那么输出的 Binlog 类型就为 binlog.BinlogType_Rollback,若是成功提交,那么输出的 Binlog 类型就为 binlog.BinlogType_Commit

func (c *twoPhaseCommitter) writeFinishBinlog(tp binlog.BinlogType, commitTS int64) {
	if !c.shouldWriteBinlog() {
		return
	}
	binInfo := c.txn.us.GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo)
	binInfo.Data.Tp = tp
	binInfo.Data.CommitTs = commitTS
	go func() {
		err := binInfo.WriteBinlog(c.store.clusterID)
		if err != nil {
			log.Errorf("failed to write binlog: %v", err)
		}
	}()
}

值得注意的是,这里 WriteBinlog 是单独启动 goroutine 异步完成的,也就是 Commit 阶段,是再也不须要等待写 binlog 完成的。这里能够节省一点 commit 的等待时间,这里不须要等待是由于 Pump 即便接收不到这个 Commit Binlog,在超过 timeout 时间后,Pump 会自行根据 Prewrite Binlog 到 TiKV 中确认当条事务的提交状态。

DDL Binlog

一个 DDL 有以下几个状态:

const (
	JobStateNone    		JobState = 0
	JobStateRunning 		JobState = 1
	JobStateRollingback  	JobState = 2
	JobStateRollbackDone 	JobState = 3
	JobStateDone         	JobState = 4
	JobStateSynced 			JobState = 6
	JobStateCancelling 		JobState = 7
)

这些状态表明了一个 DDL 任务所处的状态:

  1. JobStateNone,表明 DDL 任务还在处理队列,TiDB 尚未开始作这个 DDL。

  2. JobStateRunning,当 DDL Owner 开始处理这个任务的时候,会把状态设置为 JobStateRunning,以后 DDL 会开始变动,TiDB 的 Schema 可能会涉及多个状态的变动,这中间不会改变 DDL job 的状态,只会变动 Schema 的状态。

  3. JobStateDone, 当 TiDB 完成本身全部的 Schema 状态变动以后,会把 Job 的状态改成 Done。

  4. JobStateSynced,当 TiDB 每作一次 schema 状态变动,就会须要跟集群中的其余 TiDB 作一次同步,可是当 Job 状态为 JobStateDone 以后,在 TiDB 等到全部的 TiDB 节点同步以后,会将状态修改成 JobStateSynced

  5. JobStateCancelling,TiDB 提供语法 ADMIN CANCEL DDL JOBS job_ids 用于取消某个正在执行或者还未执行的 DDL 任务,当成功执行这个命令以后,DDL 任务的状态会变为 JobStateCancelling

  6. JobStateRollingback,当 DDL Owner 发现 Job 的状态变为 JobStateCancelling 以后,它会将 job 的状态改变为 JobStateRollingback,以示已经开始处理 cancel 请求。

  7. JobStateRollbackDone,在作 cancel 的过程,也会涉及 Schema 状态的变动,也须要经历 Schema 的同步,等到状态回滚已经作完了,TiDB 会将 Job 的状态设置为 JobStateRollbackDone

对于 Binlog 而言,DDL 的 Binlog 输出机制,跟 DML 语句也是相似的,只有开始处理事务提交阶段,才会开始写 Binlog 出去。那么对于 DDL 来讲,跟 DML 不同,DML 有事务的概念,对于 DDL 来讲,SQL 的事务是不影响 DDL 语句的。可是 DDL 里面,上面提到的 Job 的状态变动,是做为一个事务来提交的(保证状态一致性)。因此在每一个状态变动,都会有一个事务与之对应,可是上面提到的中间状态,DDL 并不会往外写 Binlog,只有 JobStateRollbackDone 以及 JobStateDone 这两种状态,TiDB 会认为 DDL 语句已经完成,会对外发送 Binlog,发送以前,会把 Job 的状态从 JobStateDone 修改成 JobStateSynced,此次修改,也涉及一次事务提交。这块逻辑的代码以下:

worker.handleDDLJobQueue():

if job.IsDone() || job.IsRollbackDone() {
		binloginfo.SetDDLBinlog(d.binlogCli, txn, job.ID, job.Query)
		if !job.IsRollbackDone() {
			job.State = model.JobStateSynced
		}
		err = w.finishDDLJob(t, job)
		return errors.Trace(err)
}

type Binlog struct {
	DdlQuery []byte
	DdlJobId         int64
}

DdlQuery 会设置为原始的 DDL 语句,DdlJobId 会设置为 DDL 的任务 ID。

对于最后一次 Job 状态的提交,会有两条 Binlog 与之对应,这里有几种状况:

  1. 若是事务提交成功,类型分别为 binlog.BinlogType_Prewritebinlog.BinlogType_Commit

  2. 若是事务提交失败,类型分别为 binlog.BinlogType_Prewritebinlog.BinlogType_Rollback

因此,Pumps 收到的 DDL Binlog,若是类型为 binlog.BinlogType_Rollback 应该只认为以下状态是合法的:

  1. JobStateDone (由于修改成 JobStateSynced 还未成功)

  2. JobStateRollbackDone

若是类型为 binlog.BinlogType_Commit,应该只认为以下状态是合法的:

  1. JobStateSynced

  2. JobStateRollbackDone

当 TiDB 在提交最后一个 Job 状态的时候,若是事务提交失败了,那么 TiDB Owner 会尝试继续修改这个 Job,直到成功。也就是对于同一个 DdlJobId,后续还可能会有屡次 Binlog,直到出现 binlog.BinlogType_Commit

相关文章
相关标签/搜索