由于丰巢自去年年末开始在推送平台上尝试了TiDB,最近又要将承接丰巢全部交易的支付平台切到TiDB上。我本人一直没有抽出时间对TiDB的源码进行学习,最近准备开始一系列的学习和分享。因为我本人没有数据库相关的经验,本着学习的心态和你们一块儿探讨,欢迎高手随时指正。总结一下本次学习分享的目的:java
言归正传,说一下本文的产生缘由:去年咱们在推送平台上使用TiDB的过程当中,就发现老版本的TiDB是没法经过外部手段kill调用慢查询的,而慢查询的危害对于数据库来讲会有致命的风险,后来pingcap公司在2.1版本(具体的版本参见TiDB的说明)中增长了show processlist和kill tidb命令,可是由于TiDB自己是无状态的,这两个命令属于单机命令,在使用的过程当中,你们仍是要提早作好准备,要直连到具体的TiDB的server上才可以使用,不要经过nginx等服务进行转发请求,到时不但不能解决问题,还有可能带来意外的风险。今天第一章,咱们先来看一下show processlist这个比较简单的命令的源码,下一章,咱们再分析kill tidb这个命令。mysql
上面的列表中展现了当前TiDB正在处理每一个链接的sql语句详情。linux
在我分析源码以前,我问了本身本次分析源码要搞清楚的两个问题,在这里和你们分享一下:nginx
首先,启动TiDB server.代码在tidb-server/main.go里面,主要方法是:runServer方法golang
func runServer() { err := svr.Run() }
再来看一下:server/server.go源码:sql
func (s *Server) Run() error { for { conn, err := s.listener.Accept() go s.onConn(conn) } }
重点代码是监听端口,并建立链接,启动另外一协程去服务新来的链接,接下来再看看server.go中的onConn方法:数据库
func (s *Server) onConn(c net.Conn) { conn := s.newConn(c) conn.Run() }
其中,s.newConn方法会将net. Conn链接包装成clientConn链接,并分配在这个TiDB server下惟一的connectionID,此connectionID为原子变量,每次新链接自增长1,咱们先记住这个id,后面分析的时候会用到它。咱们来看看server/conn.go下的Run方法:session
func (cc *clientConn) Run() { for { data, err := cc.readPacket() cc.dispatch(data) } }
Run方法主要就是不断的轮训读取clientConn中的内容,并将它交给dispatch方法进行下面的分析及返回结果操做,至此关于接收show processlist命令部分已经分析完毕,固然其它的sql语句也是通过这个过程进入到dispatch方法中的。架构
接着分析dispatch方法在处理show processlist命令的流程:框架
func (cc *clientConn) dispatch(data []byte) error { switch cmd { case mysql.ComQuery: return cc.handleQuery(ctx1, hack.String(data)) } }
show processlist命令属于mysql.ComQuery,所以流程会走到handleQuery方法里面,咱们来看一下:
func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) { rs, err := cc.ctx.Execute(ctx, sql) err = cc.writeResultset(ctx, rs[0], false, 0, 0) }
handleQuery中处理show processlist命令的重点代码就是上面的两行,咱们先来看一下server/driver_tidb.go中的Execute方法:
rsList, err := tc.session.Execute(ctx, sql)
Execute中的重点就是调用session/session.go中的Execute方法:
func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec.RecordSet, err error) { s.PrepareTxnCtx(ctx) stmtNodes, warns, err := s.ParseSQL(ctx, sql, charsetInfo, collation) compiler := executor.Compiler{Ctx: s} for _, stmtNode := range stmtNodes { recordSets, err = s.executeStatement(ctx, connID, stmtNode, stmt, recordSets); } }
上面的execute方法中会对sql语句进行处理及制定执行计划,处理完成后调用executeStatement方法,executeStatement中的重点方法是runStmt:
recordSet, err := runStmt(ctx, s, stmt)
咱们再来看看session/tidb.go中的runStmt方法:
func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) (sqlexec.RecordSet, error) { rs, err = s.Exec(ctx) err = finishStmt(ctx, sctx, se, sessVars, err) }
继续来分析executor/adapter中的(a *ExecStmt) Exec方法,同样采起划重点的方式:
func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { e, err := a.buildExecutor(sctx) e.Open(ctx) var pi processinfoSetter if raw, ok := sctx.(processinfoSetter); ok { pi = raw sql := a.OriginText() if simple, ok := a.Plan.(*plannercore.Simple); ok && simple.Statement != nil { if ss, ok := simple.Statement.(ast.SensitiveStmtNode); ok { // Use SecureText to avoid leak password information. sql = ss.SecureText() } } // Update processinfo, ShowProcess() will use it. pi.SetProcessInfo(sql) //fmt.Println(sql) a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode) } return &recordSet{ executor: e, stmt: a, processinfo: pi, txnStartTS: txnStartTS, }, nil }
(a *ExecStmt) Exec方法中raw, ok := sctx.(processinfoSetter)这段逻辑就是把当前链接正在执行的语句存储到processinfo里面取,关于这部分细节比较简单,在这里不展开来分析。咱们先来看看buildExecutor中作了什么事情?
b := newExecutorBuilder(ctx, a.InfoSchema) e := b.build(a.Plan)
重点要来了,在executor/builder.go中的build方法作了啥事?
case *plannercore.Show: return b.buildShow(v)
build方法会根据不一样的语句类型来构建不一样的Executor并返回,show processlist命令会匹配到plannercore.Show类型,咱们看看buildShow方法的实现:
e := &ShowExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), Tp: v.Tp, DBName: model.NewCIStr(v.DBName), Table: v.Table, Column: v.Column, User: v.User, Flag: v.Flag, Full: v.Full, GlobalScope: v.GlobalScope, is: b.is, } if len(v.Conditions) == 0 { return e } sel := &SelectionExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), e), filters: v.Conditions, } return sel
由于v.Conditions为0,因此返回类型为ShowExec的Executor,咱们接下来再刚才的Exec方法中的e.Open方法,其实就是ShowExec的Open方法,ShowExec位于executor/show.go文件中,咱们查找后发现ShowExec中没有Open方法,我当时是被搞蒙了,后来发现这是go的一个语言特性,它使用的是baseExecutor的Open方法:
func (e *baseExecutor) Open(ctx context.Context) error { for _, child := range e.children { err := child.Open(ctx) if err != nil { return errors.Trace(err) } } return nil }
上面的方法会遍历baseExecutor中的children的Executor,而后调用它们的Open方法,可是由于ShowExec在建立它的baseExecutor的时候,没有任何的children,因此在show processlist这个操做过程当中,Open方法至关于啥也没干,可是你们在分析其它语句时,这个Open方法是一个很重要的方法。咱们再来看刚才Exec中的最后的return块里面,返回了包装executor、processinfo等信息的recordSet类型。至此关于show processlist命令如何包装成Executor并和processinfo等信息做为recordSet类型的返回值返回给上层函数分析完毕。
接下来咱们再来看handleQuery中的writeResultset方法:
err = cc.writeResultset(ctx, rs[0], false, 0, 0)
在server/conn.go中的writeResultset主要的逻辑就是下面的逻辑:
err = cc.writeChunks(ctx, rs, binary, serverStatus)
咱们继续来分析writeChunks中的重要部分:
func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool, serverStatus uint16) error { for { err := rs.Next(ctx, chk) } }
writeChunks里面主要就是循环调用rs.Next的方法,直到知足条件为止,rs的类型其实是server/driver_tidb.go下的tidbResultSet类型,咱们来看一下它的Next方法:
func (trs *tidbResultSet) Next(ctx context.Context, chk *chunk.Chunk) error { return trs.recordSet.Next(ctx, chk) }
tidbResultSet的Next方法主要是调用了executor/adapter.go中的recordSet类型的Next方法,咱们来看看这个Next方法:
func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error { err := a.executor.Next(ctx, chk) }
recordSet方法的重点就是调用它的executor的Next方法,咱们在上一个小节 结尾处分析出recordSet的executor就是以前生成的ShowExec(可算是找到它了,我已经累晕)。那么,咱们接着分析它的Next方法:
e.fetchAll()
ShowExec中的Next方法的主要逻辑就是调用它的fetchAll方法,接着往下看:
case ast.ShowProcessList: return e.fetchShowProcessList()
由于匹配到了这个case,因此会调用它的fetchShowProcessList方法:
func (e *ShowExec) fetchShowProcessList() error { sm := e.ctx.GetSessionManager() pl := sm.ShowProcessList() }
上面的sm类型的server/server.go中的Server类型,咱们来看看它的ShowProcessList方法:
func (s *Server) ShowProcessList() map[uint64]util.ProcessInfo { s.rwlock.RLock() rs := make(map[uint64]util.ProcessInfo, len(s.clients)) for _, client := range s.clients { if atomic.LoadInt32(&client.status) == connStatusWaitShutdown { continue } pi := client.ctx.ShowProcess() rs[pi.ID] = pi } s.rwlock.RUnlock() return rs }
它主要是遍历当前全部的客户端,并获取到全部客户端的ShowProcess,其中的client.ctx类型为server.TiDBContext,咱们来看看它的ShowProcess:
func (tc *TiDBContext) ShowProcess() util.ProcessInfo { return tc.session.ShowProcess() }
逻辑比较简单,就是调用类型为session.session的ShowProcess方法,接着往下看:
func (s *session) ShowProcess() util.ProcessInfo { var pi util.ProcessInfo tmp := s.processInfo.Load() if tmp != nil { pi = tmp.(util.ProcessInfo) pi.Mem = s.GetSessionVars().StmtCtx.MemTracker.BytesConsumed() } return pi }
session的ShowProcess方法会从内存中加载当前session的processInfo信息。至此咱们分析show processlist命令的源码分析完毕,关于每一个链接如何设置自身的processinfo信息,逻辑也比较简单,你们有兴趣能够本身去研究一下。
咱们能够回答一下开头提出的两个问题:
经过上面的分析,咱们还能够总结如下的特色:
最后,我想和你们分享一下,我本身在源码阅读里面用到的一些方法和技巧,大的方面会有两种方法:
上面的两种方法,会伴随你们在源码阅读的各个阶段,可是有了这两种方法仍是远远不够的,我再分享一下个人相关技巧: