在线 DDL 始终是数据库使用上的痛点。以前的工做中,有不少数据中心同事作 DDL 变动都很头疼,也吐槽过,谨慎选择时间点进行。即使如此,面对复杂庞杂的应用系统和各种定时运维脚本,DDL 操做依然可能干扰到业务正常运行。git
TiDB 的在线 DDL 是根据 Google F1 的在线异步 schema 变动算法实现github
F1 中 schema 以特殊的 kv 对存储于 Spanner 中,同时每一个 F1 服务器在运行过程当中自身也维护一份拷贝。为了保证同一时刻最多只有 2 份 schema 生效,F1 约定了长度为数分钟的 schema 租约,全部 F1 服务器在租约到期后都要从新加载 schema 。若是节点没法从新完成续租,它将会自动终止服务并等待被集群管理设施重启。
简单来讲,TiDB 的在线DDL和 MySQL 相比,主要有这些区别算法
MySQL 的数据和表结构是紧耦合的,想动表结构,势必会牵扯到数据。TiDB 的数据和表结构是分割的,操做数据时会比对表结构,经过两个 version 来应对不一样的 DML 语句。
详细的,能够参考这篇文章 👍数据库
https://github.com/zimulala/builddatabase/blob/master/f1/schema-change.mdgithub.comapi
先介绍几个比较重要的概念服务器
Job: 每一个单独的 DDL 操做可看作一个 job。在一个 DDL 操做开始时,会将此操做封装成一个 job 并存放到 job queue,等此操做完成时,会将此 job 从 job queue 删除,并在存入 history job queue,便于查看历史 job。
Worker:每一个节点都有一个 worker 用来处理 job。
Owner:整个系统只有一个节点的 worker 能当选 owner 角色,每一个节点均可能当选这个角色,当选 owner 后 worker 才有处理 job 的权利。owner 这个角色是有任期的,owner 的信息会存储在 KV 层中。worker按期获取 KV 层中的 owner 信息,若是其中 ownerID 为空,或者当前的 owner 超过了任期,则 worker 能够尝试更新 KV 层中的 owner 信息(设置 ownerID 为自身的 workerID),若是更新成功,则该 worker 成为 owner。在租期内这个用来确保整个系统同一时间只有一个节点在处理 schema 变动。
总结一下,每一个 TiDB 上有一个 Worker 线程,DDL 语句会封装为一个 Job ,由 Worker 进行处理。Worker 分为 Owner 和 非 Owner,每一个集群同时只能有一个 Owner,只有它能够处理队列中的 Job 。咱们先去源码中看看 Worker 的样子session
在 TiDB源码阅读(一) TiDB的入口 中,咱们提到了 main 函数中的 createStoreAndDomain 方法,这个方法初始化了一些重要的后台进程,其中就包括 Worker,启动流程基本是以下的路数app
func createStoreAndDomain() { dom, err = session.BootstrapSession(storage) } // 来到 BootstrapSession ,方法比较长,咱们只看关联到的地方,建立 session func BootstrapSession(store kv.Storage) (*domain.Domain, error) { se, err := createSession(store) } // 查看 createSession(store),再进一层就是 createSessionWithOpt func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) { dom, err := domap.Get(store) } // 进Get func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { d = domain.NewDomain(store, ddlLease, statisticLease, factory) }
来到 domain.go ,看 init 函数less
do.ddl = ddl.NewDDL( ctx, ddl.WithEtcdClient(do.etcdClient), ddl.WithStore(do.store), ddl.WithInfoHandle(do.infoHandle), ddl.WithHook(callback), ddl.WithLease(ddlLease), ddl.WithResourcePool(sysCtxPool), )
经过 ddl.NewDDL 进入 ddl.go,开始 start Worker运维
d.start(ctx, opt.ResourcePool)
这里有两种 Worker ,一种专门负责 index 类型的 DDL ,一种负责其余的
if RunWorker { d.workers[generalWorker] = newWorker(generalWorker, d.store, d.sessPool, d.delRangeMgr) d.workers[addIdxWorker] = newWorker(addIdxWorker, d.store, d.sessPool, d.delRangeMgr) for _, worker := range d.workers { w := worker go tidbutil.WithRecovery( func() { w.start(d.ddlCtx) },// 启动 worker }
这样 Worker 就启动了,启动 TiDB 也能看到这个日志打印:
[ddl_worker.go:130] ["[ddl] start DDL worker"] [worker="worker 1, tp general"] [ddl_worker.go:130] ["[ddl] start DDL worker"] [worker="worker 2, tp add index"]
Worker 启动后,就开始轮询处理队列的 Job ⬇️
logutil.Logger(w.logCtx).Info("[ddl] start DDL worker") // We use 4 * lease time to check owner's timeout, so here, we will update owner's status // every 2 * lease time. If lease is 0, we will use default 1s. // But we use etcd to speed up, normally it takes less than 1s now, so we use 1s as the max value. checkTime := chooseLeaseTime(2*d.lease, 1*time.Second) ticker := time.NewTicker(checkTime) defer ticker.Stop() for { err := w.handleDDLJobQueue(d) // 处理Job的流程,文章后半部分介绍 }
从 parser.y 开始,以 alter table add columns 语句为例展开,先找AlterTableStmt
AlterTableStmt: "ALTER" IgnoreOptional "TABLE" TableName AlterTableSpecListOpt AlterTablePartitionOpt { specs := $5.([]*ast.AlterTableSpec) if $6 != nil { specs = append(specs, $6.(*ast.AlterTableSpec)) } $$ = &ast.AlterTableStmt{ Table: $4.(*ast.TableName), Specs: specs, } } //看AlterTableSpecListOpt下 | "ADD" ColumnKeywordOpt IfNotExists ColumnDef ColumnPosition { $$ = &ast.AlterTableSpec{ IfNotExists: $3.(bool), Tp: ast.AlterTableAddColumns, NewColumns: []*ast.ColumnDef{$4.(*ast.ColumnDef)}, Position: $5.(*ast.ColumnPosition), } }
Token
IgnoreOptional TableName AlterTableSpecListOpt ColumnKeywordOpt IfNotExists ColumnDef ColumnPosition
主要看下 ast.AlterTableAddColumns ,来到 ddl_api.go
func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.AlterTableSpec) (err error) { case ast.AlterTableAddColumns: err = d.AddColumn(ctx, ident, spec) }
先进行了一系列的校验:
太长不看,来到这里
job := &model.Job{ SchemaID: schema.ID, TableID: t.Meta().ID, SchemaName: schema.Name.L, Type: model.ActionAddColumn, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{col, spec.Position, 0}, } err = d.doDDLJob(ctx, job)
进入 d.doDDLJob(ctx, job)
func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error { // 获取 DDL SQL job.Query, _ = ctx.Value(sessionctx.QueryString).(string) // 赋给 task task := &limitJobTask{job, make(chan error)} // 传入 limitJobCh ,我理解就是队列,毕竟下一句都 true 了 d.limitJobCh <- task ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true } // 通知 worker ,worker 分为 addIdxWorker 和 generalWorker d.asyncNotifyWorker(job.Type)
后面的那段代码我理解就是 for true 来查 Job 的状态,若失败就报错,若成功就 log 。这就是 TiDB 接收 DDL 语句后的大体流程。
如今主要的任务就回到了 ddl_worker.go ,也就是上文提到的 handleDDLJobQueue 函数中,这里开始循环处理队列中的 Job ,这个方法有点绕,须要循环好几回,由于每一个 Job 整个流程有好几种状态,根据不一样状态作不一样处理,这里我简单点说,有兴趣的能够 debug
func (w *worker) handleDDLJobQueue(d *ddlCtx) error { once := true for { waitTime := 2 * d.lease // 2个租约时间 1min 30s //开启事务,每次循环都会 commit err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { if !d.isOwner() {// 不是 owner 角色,就什么都不作 return nil } job, err = w.getFirstDDLJob(t) // 获取队列中头部 Job if job == nil || err != nil { // 没有则返回 return errors.Trace(err) } if once {// Job的第一次循环都会走这个,但只是处理异常状况, // 在正常流程中,w.waitSchemaSynced 直接 return 到第二轮循环 w.waitSchemaSynced(d, job, waitTime) once = false return nil } // 第二轮由于状态问题,不会走到这个分支 if job.IsDone() || job.IsRollbackDone() { err = w.finishDDLJob(t, job) } // 我怀疑这个地方是操做 KV 层的,由于没找到实现,若是说错了请各位指正 d.mu.RLock() d.mu.hook.OnJobRunBefore(job) d.mu.RUnlock() tidbutil.WithRecovery(func() { // runDDLJob 一看就是重要函数,下面会说,主要是更新5种状态 schemaVer, runJobErr = w.runDDLJob(d, t, job) } // 若是cancel了 就 finish if job.IsCancelled() { err = w.finishDDLJob(t, job) } // 更新 Job err = w.updateDDLJob(t, job, runJobErr != nil) d.mu.RLock() d.mu.hook.OnJobUpdated(job) d.mu.RUnlock() } }
w.runDDLJob
func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { timeStart := time.Now() if job.IsFinished() { return } if job.IsCancelling() { return convertJob2RollbackJob(w, d, t, job) } if !job.IsRollingback() && !job.IsCancelling() { job.State = model.JobStateRunning } // 上面是不一样状态的处理 // 这里根据不一样的 type 走不一样的函数,我们就看 onAddColumn switch job.Type { case model.ActionAddColumn: ver, err = onAddColumn(d, t, job) default: job.State = model.JobStateCancelled err = errInvalidDDLJob.GenWithStack("invalid ddl job type: %v", job.Type) } if err != nil { //这里主要异常状况 } return }
onAddColumn
func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { // 若是 job 是回滚状态, 就走 drop colunm if job.IsRollingback() { ver, err = onDropColumn(t, job) } // checkAddColumn 这里处理了几种状况,好比字段信息已经存在且是 public,就cancel tblInfo, columnInfo, col, pos, offset, err := checkAddColumn(t, job) if columnInfo == nil { // 根据 after first 语法 建立 ColumnInfo columnInfo, offset, err = createColumnInfo(tblInfo, col, pos) } // 五种状态 none -> delete only -> write only -> reorganization -> public originalState := columnInfo.State switch columnInfo.State { case model.StateNone: // none -> delete only ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != columnInfo.State) case model.StateDeleteOnly: // delete only -> write only ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State) case model.StateWriteOnly: // write only -> reorganization ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State) case model.StateWriteReorganization: // reorganization -> public adjustColumnInfoInAddColumn(tblInfo, offset) columnInfo.State = model.StatePublic ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State) // Finish this job. job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) asyncNotifyEvent(d, &util.Event{Tp: model.ActionAddColumn, TableInfo: tblInfo, ColumnInfo: columnInfo}) default: err = ErrInvalidDDLState.GenWithStackByArgs("column", columnInfo.State) } }
FinishTableJob
startTime := time.Now() err = t.AddHistoryDDLJob(job, updateRawArgs)
最后加入 historyDDLJob ,供查询历史 DDL 操做。再作些清理工做,整个 DDL 流程就差很少走完了。
还记得 domai.go 吧,仍是 init 函数,在这里 TiDB 的非 owner 会隔一个 lease 时间去同步 ver ,大概看一下
init@domain.go
if ddlLease > 0 { do.wg.Add(1) // Local store needs to get the change information for every DDL state in each session. go do.loadSchemaInLoop(ddlLease) }
do.loadSchemaInLoop
这里是具体过程了,循环 reload,
func (do *Domain) loadSchemaInLoop(lease time.Duration) { ticker := time.NewTicker(lease / 2) for { select { case <-ticker.C: err := do.Reload() case _, ok := <-syncer.GlobalVersionCh(): err := do.Reload() case <-syncer.Done(): do.SchemaValidator.Stop() err := do.mustRestartSyncer() exitLoop := do.mustReload() case <-do.exit: return } } }
这里大体逻辑有这些:
感受整个流程涉及的东西也很多,有些地方仍是要多看几遍领悟,以后少不了订正这篇了😓