golang 数据库链接池database/sql 实现原理分析

golang对数据库的请求,抽象出来一套通用的链接池,用go的机制来讲,golang只须要提供一个驱动(driver)的interface,底层不一样数据库协议,由用户根据本身的数据库实现对应的驱动便可。css

本文从源码实现的角度,探索这里的细节以及须要避免的坑,基于1.14代码分析,部分bug在1.15中有修复或优化,这里也会说起。mysql

golang版本:1.14git

目录结构说明

└── sql
    ├── convert.go           # 结果行的读取与转换
    ├── convert_test.go
    ├── ctxutil.go           # 绑定上下文的一些通用方法
    ├── doc.txt
    ├── driver               # driver 定义来实现数据库驱动所须要的接口
    │   ├── driver.go
    │   ├── types.go         # 数据类型别名和转换
    │   └── types_test.go
    ├── example_cli_test.go
    ├── example_service_test.go
    ├── example_test.go
    ├── fakedb_test.go
    ├── sql.go               # 通用的接口和类型,包括事物,链接等
    └── sql_test.go

主要数据结构

1. sql.DB

type DB struct {
    // Atomic access only. At top of struct to prevent mis-alignment
    // on 32-bit platforms. Of type time.Duration.
    waitDuration int64          // 等待新的链接所须要的总时间
    connector driver.Connector  // 数据库驱动本身实现
    // numClosed is an atomic counter which represents a total number of
    // closed connections. Stmt.openStmt checks it before cleaning closed
    // connections in Stmt.css.
    numClosed uint64           // 关闭的链接数

    mu           sync.Mutex // protects following fields
    freeConn     []*driverConn
    connRequests map[uint64]chan connRequest
    nextRequest  uint64 // Next key to use in connRequests.
    numOpen      int    // number of opened and pending open connections
    // Used to signal the need for new connections
    // a goroutine running connectionOpener() reads on this chan and
    // maybeOpenNewConnections sends on the chan (one send per needed connection)
    // It is closed during db.Close(). The close tells the connectionOpener
    // goroutine to exit.
    openerCh          chan struct{}      // 用于通知须要建立新的链接
    // resetterCh        chan *driverConn  // 已废弃
    closed            bool
    dep               map[finalCloser]depSet // map[一级对象]map[二级对象]bool,一个外部以来,用于自动关闭
    lastPut           map[*driverConn]string // stacktrace of last conn's put; debug only
    maxIdle           int                    // zero means defaultMaxIdleConns(2); negative means 0
    maxOpen           int                    // <= 0 means unlimited
    maxLifetime       time.Duration          // maximum amount of time a connection may be reused
    cleanerCh         chan struct{}          // 用于通知清理过时的链接,maxlife时间改变或者链接被关闭时会经过该channel通知
    waitCount         int64 // Total number of connections waited for.   // 这些状态数据,能够经过db.Stat() 获取
    maxIdleClosed     int64 // Total number of connections closed due to idle.
    maxLifetimeClosed int64 // Total number of connections closed due to max free limit.

    stop func() // stop cancels the connection opener and the session resetter.
}

sql.DB不是一个链接,它是数据库的抽象接口,也是整个链接池的句柄,对多个goroutine是并发安全的。它能够根据driver打开关闭数据库链接,管理链接池。这对不一样的数据库来讲都是同样的。github

2. sql.driverConn

// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
   db        *DB
   createdAt time.Time

   sync.Mutex  // guards following
   ci          driver.Conn  // 由不一样的驱动本身实现,对应一条具体的数据库链接
   needReset   bool         // The connection session should be reset before use if true.
   closed      bool         // 当前链接的状态,是否已经关闭
   finalClosed bool         // ci.Close has been called
   openStmt    map[*driverStmt]bool

   // guarded by db.mu
   inUse      bool
   onPut      []func() // code (with db.mu held) run when conn is next returned  // 归还链接的时候调用
   dbmuClosed bool     // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}

对单个链接的封装,包含了实际的数据库链接以及相关的状态信息等golang

3. driver.Conn

// Conn is a connection to a database. It is not used concurrently
// by multiple goroutines.
//
// Conn is assumed to be stateful.
type Conn interface {
   // Prepare returns a prepared statement, bound to this connection.
   Prepare(query string) (Stmt, error)

   // Close invalidates and potentially stops any current
   // prepared statements and transactions, marking this
   // connection as no longer in use.
   //
   // Because the sql package maintains a free pool of
   // connections and only calls Close when there's a surplus of
   // idle connections, it shouldn't be necessary for drivers to
   // do their own connection caching.
   Close() error

   // Begin starts and returns a new transaction.
   //
   // Deprecated: Drivers should implement ConnBeginTx instead (or additionally).
   Begin() (Tx, error)
}

一条具体的数据库链接,须要由不一样驱动本身去实现接口sql

4. driver.Driver

type Driver interface {
    Open(name string) (Conn, error)
}

Driver 只包含一个函数,Open()用来返回一个可用链接,多是新创建的,也多是以前缓存的关闭的链接。数据库

5. driver.DriverContext

type DriverContext interface {
// OpenConnector must parse the name in the same format that Driver.Open
// parses the name parameter.
    OpenConnector(name string) (Connector, error)
}

DriverContext 的目的是维护drievr上下文信息,避免了每次新建链接的时候都须要解析一遍 dsn。须要有Driver对象本身去实现。缓存

6. driver.Connector

type Connector interface {
// Connect returns a connection to the database.
// Connect may return a cached connection (one previously
// closed), but doing so is unnecessary; the sql package
// maintains a pool of idle connections for efficient re-use.
//
// The provided context.Context is for dialing purposes only
// (see net.DialContext) and should not be stored or used for
// other purposes.
//
// The returned connection is only used by one goroutine at a
// time.
    Connect(context.Context) (Conn, error)
// Driver returns the underlying Driver of the Connector,
// mainly to maintain compatibility with the Driver method
// on sql.DB.
    Driver() Driver
}

driver.Connector 是driver的插口,是一个接口类型的对象,由不一样类型的数据库来实现。
driver.Connector 包含两个函数。安全

  • Connect 用来创建链接
  • Driver 用来返回一个 Driver 对象,Driver也是个接口类型对象,须要不一样的数据库本身去实现。

主要操做流程

1. 注册驱动

import (
    _ "github.com/go-sql-driver/mysql"
)

var (
    driversMu sync.RWMutex
    drivers   = make(map[string]driver.Driver)
)
func Register(name string, driver driver.Driver) {
    driversMu.Lock()
    defer driversMu.Unlock()
    if driver == nil {
        panic("sql: Register driver is nil")
    }
    if _, dup := drivers[name]; dup {
        panic("sql: Register called twice for driver " + name)
    }
    drivers[name] = driver
}

/database/sql 提供的是一个通用的数据库链接池,当咱们链接不一样的数据库时,只须要将对应的数据库驱动注册进去就可使用。session

这里的注册,实际上就是将数据库名称和对应的数据库驱动(数据库链接包装器)添加的一个map中,每一个import进来的库,须要在init函数中调用注册函数来实现。

2. 建立链接池句柄 sql.Open()

func Open(driverName, dataSourceName string) (*DB, error) {
    driversMu.RLock()
    driveri, ok := drivers[driverName]  // 1
    driversMu.RUnlock()
    if !ok {
        return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
    }
    if driverCtx, ok := driveri.(driver.DriverContext); ok {  // 2
        connector, err := driverCtx.OpenConnector(dataSourceName)
        if err != nil {
            return nil, err
        }
        return OpenDB(connector), nil  // 3
    }
    return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil  // 4
}

func OpenDB(c driver.Connector) *DB {
   ctx, cancel := context.WithCancel(context.Background())
   db := &DB{
      connector:    c,
      openerCh:     make(chan struct{}, connectionRequestQueueSize),
      lastPut:      make(map[*driverConn]string),
      connRequests: make(map[uint64]chan connRequest),
      stop:         cancel,
   }

   go db.connectionOpener(ctx)  // 经过channel通知来建立链接
   // go db.connectionResetter(ctx) // 用于重置链接,1.14废弃
   return db
}

Open函数一般解释为初始化db,这里只是经过驱动名称,获取到对应的驱动,并对驱动进行一系列的初始化操做,须要注意的是,Open并不会和db创建链接,只是在操做这些数据结构,启动后台协程之类的动做。

这里的dataSourceName简称dsn,包含了链接数据库所必须的参数,用户名密码ip端口等信息,由不一样的驱动本身实现解析,固然,有些驱动也支持在dsn中配置一些数据库参数,如autocommit等。因为解析字符串获得这些信息会有必定的资源消耗,所以,还提供了对解析后的结果缓存的功能,避免了每次创建新的链接都须要解析一次,要作到这一点,须要驱动实现 driver.DriverContext 接口。

这个时候你就有了这样一个结构,不过此时的链接池中并无链接,也就是说没有真正访问db

golang 数据库链接池database/sql 实现原理分析

3. 设置数据库链接参数

最大空闲链接数,空闲链接数超过该值就会被关闭,默认为defaultMaxIdleConns(2)

func (db *DB) SetMaxIdleConns(n int) {}

最大容许打开的链接数,超过该数量后,不容许创建新的链接,工做协程只能阻塞等待链接的释放

func (db *DB) SetMaxOpenConns(n int) {}

链接能够被重用的最大时间,换言之,一个链接多久后会被关闭,不过会等待当前的请求完成后才会关闭,be closed lazily,一个很鸡肋的参数

func (db *DB) SetConnMaxLifetime(d time.Duration) {
    // 经过启动一个单独的协程 connectionCleaner 来实现 
    startCleanerLocked {
        go db.connectionCleaner(db.shortestIdleTimeLocked())
    }
}

1.15 以后新增参数,链接最大空闲时间,idle时间超过该值会被关闭,不过会等待当前的请求完成后才会关闭,be closed lazily

func (db *DB) SetConnMaxIdleTime(d time.Duration) {
    // 1.15 实现了对空闲链接的超时回收,复用了SetConnMaxLifetime的部分逻辑,也是在connectionCleaner协程中实现的
}

SetConnMaxLifetime 和 SetConnMaxIdleTime 细节实现

  • 1.14 实现
func (db *DB) startCleanerLocked() {
   if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {
      db.cleanerCh = make(chan struct{}, 1)
      go db.connectionCleaner(db.maxLifetime)
   }
}

func (db *DB) connectionCleaner(d time.Duration) {
   const minInterval = time.Second

   if d < minInterval {
      d = minInterval
   }
   t := time.NewTimer(d)

   for {
      // 当maxlife时间到达
      // 或者maxlife发生改变及db被close
      select {
      case <-t.C:
      case <-db.cleanerCh: // maxLifetime was changed or db was closed.
      }

      db.mu.Lock()
      d = db.maxLifetime
      if db.closed || db.numOpen == 0 || d <= 0 {
         db.cleanerCh = nil
         db.mu.Unlock()
         return
      }

      // 循环处理free状态的链接
      expiredSince := nowFunc().Add(-d)
      var closing []*driverConn
      for i := 0; i < len(db.freeConn); i++ {
         c := db.freeConn[i]
         if c.createdAt.Before(expiredSince) {
            closing = append(closing, c)
            last := len(db.freeConn) - 1
            db.freeConn[i] = db.freeConn[last]
            db.freeConn[last] = nil
            db.freeConn = db.freeConn[:last]
            i--
         }
      }
      db.maxLifetimeClosed += int64(len(closing))
      db.mu.Unlock()

      for _, c := range closing {
         c.Close()
      }

      // 若是maxlife被重置,须要更新定时器时间
      if d < minInterval {
         d = minInterval
      }
      t.Reset(d)
   }
}
  • 1.15 实现
func (db *DB) startCleanerLocked() {
  if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
    db.cleanerCh = make(chan struct{}, 1)
    go db.connectionCleaner(db.shortestIdleTimeLocked())  // maxidle和maxlife取较小值
  }
}

func (db *DB) connectionCleaner(d time.Duration) {
  const minInterval = time.Second

  if d < minInterval {
    d = minInterval
  }
  t := time.NewTimer(d)

  for {
    select {
    case <-t.C:
    case <-db.cleanerCh: // maxLifetime was changed or db was closed.
    }

    db.mu.Lock()
    d = db.shortestIdleTimeLocked()
    if db.closed || db.numOpen == 0 || d <= 0 {
      db.cleanerCh = nil
      db.mu.Unlock()
      return
    }

    closing := db.connectionCleanerRunLocked()
    db.mu.Unlock()
    for _, c := range closing {
      c.Close()
    }

    if d < minInterval {
      d = minInterval
    }
    t.Reset(d)
  }
}

// 对idle超时和life超时的链接分别收集,统一返回
func (db *DB) connectionCleanerRunLocked() (closing []*driverConn) {
  if db.maxLifetime > 0 {
    expiredSince := nowFunc().Add(-db.maxLifetime)
    for i := 0; i < len(db.freeConn); i++ {
      c := db.freeConn[i]
      if c.createdAt.Before(expiredSince) {
        closing = append(closing, c)
        last := len(db.freeConn) - 1
        db.freeConn[i] = db.freeConn[last]
        db.freeConn[last] = nil
        db.freeConn = db.freeConn[:last]
        i--
      }
    }
    db.maxLifetimeClosed += int64(len(closing))
  }

  if db.maxIdleTime > 0 {
    expiredSince := nowFunc().Add(-db.maxIdleTime)
    var expiredCount int64
    for i := 0; i < len(db.freeConn); i++ {
      c := db.freeConn[i]
      if db.maxIdleTime > 0 && c.returnedAt.Before(expiredSince) {
        closing = append(closing, c)
        expiredCount++
        last := len(db.freeConn) - 1
        db.freeConn[i] = db.freeConn[last]
        db.freeConn[last] = nil
        db.freeConn = db.freeConn[:last]
        i--
      }
    }
    db.maxIdleTimeClosed += expiredCount
  }
  return
}

1.14 和 1.15的实现逻辑基本一致,只是增长了对idle超时的判断作了兼容

4. 访问数据库

当咱们作完上面这些初始化动做后,按照咱们的习惯,一般会尝试性链接下db,用来判断链接参数是否正常,如用户名密码是否正确,但并非发送用户请求,通常的作法是调用 db.Ping(),

func (db *DB) Ping() error {
   return db.PingContext(context.Background())
}

func (db *DB) PingContext(ctx context.Context) error {
   var dc *driverConn
   var err error

   // 获取一个可用链接,后面会看到同样的逻辑,这里先跳过细节
   for i := 0; i < maxBadConnRetries; i++ {
      dc, err = db.conn(ctx, cachedOrNewConn)
      if err != driver.ErrBadConn {
         break
      }
   }
   if err == driver.ErrBadConn {
      dc, err = db.conn(ctx, alwaysNewConn)  // db.conn 是来获取可用链接的,是数据库链接池较为核心的一部分
   }
   if err != nil {
      return err
   }

   // 发送ping命令
   return db.pingDC(ctx, dc, dc.releaseConn)
}

func (db *DB) pingDC(ctx context.Context, dc *driverConn, release func(error)) error {
   var err error
   if pinger, ok := dc.ci.(driver.Pinger); ok {
      withLock(dc, func() {
         err = pinger.Ping(ctx)  // 这里须要驱动本身去实现,对应mysql来讲,发送的是sql_type=14(COM_PING)的请求包
      })
   }
   release(err)   // 将该链接放回到free池
   return err
}

5. 发送sql请求

这里看几个最简单的发送sql的方法

// 没有结果集,值返回ok/error包
func (db *DB) Exec(query string, args ...interface{}) (Result, error) {}
func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {}

// 返回大于0条结果集
func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {}
func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {}

// 预期结果集只有一行,没有结果集Scan时报ErrNoRows,Scan结果若是有多行,只取第一行,多余的数据行丢弃
func (db *DB) QueryRow(query string, args ...interface{}) *Row {}
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {}

这里有几个注意事项:

  • 咱们能够发现,每个方法都会同时有另一个带 Context 后缀的方法,查看调用关系的话,会发现,不带Context的函数(Exec/Query/QueryRow)其实里面就是调用的带Context的函数(ExecContext/QueryContext/QueryRowContext),这里的Context和大多数库函数同样,用来进行信号的同步,例如超时限制等,通常不须要单独设置
  • 咱们能够发现,每一个函数参数都是支持可变参数列表,用法和prepare用法同样,用 ? 作占位符,那咱们直接拼好sql和使用占位符哪一种更优呢?
    rows1, err := db.Query("select * from t1 where a = 1”)
    rows2, err := db.Query("select * from t1 where a = ?", 1)

这两条sql执行的结果是同样的,可是底层是不同的,与不一样驱动的具体实现略有差异。

以mysql为例,区别在于第一个Query,实际发送了一条sql(sql_type:3),第二条Query,实际发送了两条sql(sql_type:22 和 sql_tyep:23),先prepare,再execute,虽然说二进制协议要快些,可是每次都会发送两条sql,第一次发送的prepare,以后只会execute一次且不会主动回收这个prepare信息。

这个接口设计之初,应该就是按照prepare+execute的思想设计的,当占位符参数个数为0时,可否优化直接发送一条sql,要看底层的驱动接口是否支持,换言之,prepare+execute

接下来,以Query为例,看下具体的实现流程

func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
   return db.QueryContext(context.Background(), query, args...)
}

func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
   var rows *Rows
   var err error

   // 执行query,优先从链接池获取链接,若是获取到badconn(以及关闭的链接),重试,最多重试maxBadConnRetries(2)次
   for i := 0; i < maxBadConnRetries; i++ {
      rows, err = db.query(ctx, query, args, cachedOrNewConn)
      if err != driver.ErrBadConn {
         break
      }
   }

   // 必定建立新的链接执行query
   if err == driver.ErrBadConn {
      return db.query(ctx, query, args, alwaysNewConn)
   }
   return rows, err
}

func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
   // 获取链接
   dc, err := db.conn(ctx, strategy)
   if err != nil {
      return nil, err
   }

   // 使用获取的链接执行查询
   return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
}

能够发现,执行一条普通sql,须要两步,第一步,获取链接(db.conn),第二步,执行查询(db.queryDC)

6. 获取链接

// 提供了两种获取链接的策略,alwaysNewConn & cachedOrNewConn,字面意思,老是新建 & 优先复用free链接

func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
   // 全局加锁 这里有个链接池的大锁,须要注意
   db.mu.Lock()
   if db.closed {
      db.mu.Unlock()
      return nil, errDBClosed
   }

   // context 超时检测
   select {
   default:
   case <-ctx.Done():
      db.mu.Unlock()
      return nil, ctx.Err()
   }
   lifetime := db.maxLifetime

   // 优先从free池中获取链接
   numFree := len(db.freeConn)
   if strategy == cachedOrNewConn && numFree > 0 {
      // 取第一个free链接
      conn := db.freeConn[0]
      // 切片拷贝
      copy(db.freeConn, db.freeConn[1:])
      // 调整切片长度
      db.freeConn = db.freeConn[:numFree-1]
      conn.inUse = true
      db.mu.Unlock()
      // 检查链接是否超时,超时则返回错误
      if conn.expired(lifetime) {
         conn.Close()
         return nil, driver.ErrBadConn
      }

      // 对链接状态进行重置,一般是使用过的链接须要重置,避免链接已经处于不可用状态
      if err := conn.resetSession(ctx); err == driver.ErrBadConn {
         conn.Close()
         return nil, driver.ErrBadConn
      }
      return conn, nil
   }

   // 已经没有free链接,或者策略要求建立一个新链接

   // 当前打开的链接已经达到了容许打开链接数的上限,须要阻塞等待
   if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
      // Make the connRequest channel. It's buffered so that the
      // connectionOpener doesn't block while waiting for the req to be read.

      // 创建一个惟一key和请求链接connRequest channel的映射
      req := make(chan connRequest, 1)
      reqKey := db.nextRequestKeyLocked()
      db.connRequests[reqKey] = req
      db.waitCount++
      db.mu.Unlock()

      waitStart := time.Now()
      // Timeout the connection request with the context.
      select {
      // 若是超时,从map中删除该key,记录统计信息,并检查链接是否已经就绪
      case <-ctx.Done():
         // Remove the connection request and ensure no value has been sent
         // on it after removing.
         db.mu.Lock()
         delete(db.connRequests, reqKey)
         db.mu.Unlock()
         atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
         // 若是已经生成了可用链接,将新链接放回到free池中
         select {
         default:
         case ret, ok := <-req:
            if ok && ret.conn != nil {
               db.putConn(ret.conn, ret.err, false)
            }
         }
         return nil, ctx.Err()

      case ret, ok := <-req:
         atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

         if !ok {
            return nil, errDBClosed
         }
         // Only check if the connection is expired if the strategy is cachedOrNewConns.
         // If we require a new connection, just re-use the connection without looking
         // at the expiry time. If it is expired, it will be checked when it is placed
         // back into the connection pool.
         // This prioritizes giving a valid connection to a client over the exact connection
         // lifetime, which could expire exactly after this point anyway.
         // 对cachedOrNewConn策略的链接请求,须要判断链接是否过时
         // 若是是请求新链接,则不作判断,等链接被放回free池中时再回收
         if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
            ret.conn.Close()
            return nil, driver.ErrBadConn
         }
         if ret.conn == nil {
            return nil, ret.err
         }

         // Reset the session if required.
         if err := ret.conn.resetSession(ctx); err == driver.ErrBadConn {
            ret.conn.Close()
            return nil, driver.ErrBadConn
         }
         return ret.conn, ret.err
      }
   }

   // 因为未达到链接数上限,直接建立新链接
   db.numOpen++ // optimistically
   db.mu.Unlock()
   ci, err := db.connector.Connect(ctx)
   if err != nil {
      db.mu.Lock()
      db.numOpen-- // correct for earlier optimism
      db.maybeOpenNewConnections()
      db.mu.Unlock()
      return nil, err
   }
   db.mu.Lock()
   dc := &driverConn{
      db:        db,
      createdAt: nowFunc(),
      ci:        ci,
      inUse:     true,
   }
   db.addDepLocked(dc, dc)
   db.mu.Unlock()
   return dc, nil
}

综上,当咱们向链接池申请链接时,

  • 若是策略是 cachedOrNewConn,free链接池中有,则直接取出;
  • 若是链接池没有空闲链接或者策略为alwaysNewConn,当前链接不超过上限,则直接建立;
  • 不然经过channel去异步建立创建,调用点阻塞等待链接。

7. 执行查询

Query

// ctx 是调用sql设置的上下文
// txctx 是事务的上下文,若是有
// releaseConn 上层传递的函数句柄,链接使用完后,将该链接放回到链接池

func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
   queryerCtx, ok := dc.ci.(driver.QueryerContext)
   var queryer driver.Queryer
   if !ok {
      queryer, ok = dc.ci.(driver.Queryer)
   }
   if ok {
      var nvdargs []driver.NamedValue
      var rowsi driver.Rows
      var err error
      withLock(dc, func() {
         nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
         if err != nil {
            return
         }
         rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs)
      })
      // err要么为nil,要么为ErrSkip之外的其余错误
      // ErrSkip 一般为某些可选接口不存在,能够尝试其余接口
      if err != driver.ErrSkip {
         if err != nil {
            releaseConn(err)
            return nil, err
         }
         // err != nil
         // 数据库链接的全部权转交给了rows,rows须要主动Close,以将该链接放回到free链接池中
         rows := &Rows{
            dc:          dc,
            releaseConn: releaseConn,
            rowsi:       rowsi,
         }

         // 经过context,当收到上层事件或者事务关闭的消息,rows可以自动调用Close释放链接
         rows.initContextClose(ctx, txctx)
         return rows, nil
      }
   }

   // prepare
   var si driver.Stmt
   var err error
   withLock(dc, func() {
      si, err = ctxDriverPrepare(ctx, dc.ci, query)
   })
   if err != nil {
      releaseConn(err)
      return nil, err
   }

   // execute
   ds := &driverStmt{Locker: dc, si: si}
   rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...)
   if err != nil {
      ds.Close()
      releaseConn(err)
      return nil, err
   }

   // Note: ownership of ci passes to the *Rows, to be freed
   // with releaseConn.
   rows := &Rows{
      dc:          dc,
      releaseConn: releaseConn,
      rowsi:       rowsi,
      closeStmt:   ds,
   }

   // 同上
   rows.initContextClose(ctx, txctx)
   return rows, nil
}

能够发现,在sql包这一层,已经作好了全部的链接管理的动做,具体的收发包/包协议逻辑给了不一样的驱动本身实现,当执行完查询后,链接的全部权转交给了rows对象,意味着须要rows主动调用 Close() 函数,才会将当前使用的链接放回链接池中去。

QueryRow

一样的,QueryRow() 和 Query() 其实底层是用的一套方法,返回值也仅仅是多包了一层

func (db *DB) QueryRow(query string, args ...interface{}) *Row {
   return db.QueryRowContext(context.Background(), query, args...)
}

func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
   rows, err := db.QueryContext(ctx, query, args...)
   return &Row{rows: rows, err: err}
}

// Row 和 Rows 的关系
type Row struct {
   // One of these two will be non-nil:
   err  error // deferred error for easy chaining
   rows *Rows
}

细心的话,可以发现 Row 仅仅提供了 Scan 一个方法,甚至 Close() 都没有,相比 Rows,看着又些单薄,那如何释放链接呢?

在 Row 的 Scan() 方法里,会从rows读取第一条数据,在最后,调用了rows的Close() 方法

func (r *Row) Scan(dest ...interface{}) error {
   if r.err != nil {
      return r.err
   }

   defer r.rows.Close()
   for _, dp := range dest {
      if _, ok := dp.(*RawBytes); ok {
         return errors.New("sql: RawBytes isn't allowed on Row.Scan")
      }
   }

   if !r.rows.Next() {
      if err := r.rows.Err(); err != nil {
         return err
      }
      return ErrNoRows
   }
   err := r.rows.Scan(dest...)
   if err != nil {
      return err
   }
   // Make sure the query can be processed to completion with no errors.
   return r.rows.Close()
}

意味着,当咱们使用 QueryRow() 时,必须使用row.Scan( ) 来获取结果,不然该链接就不会放回链接池中去。

Exec

Exec 因为不须要结果集,所以,对链接的release就不像前两个那么麻烦,除此以外的处理流程基本同样。

func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []interface{}) (res Result, err error) {
   // 调用 Exec 函数就不须要额外关心链接的release,在函数结束以前就放回free池中
   defer func() {
      release(err)
   }()
   execerCtx, ok := dc.ci.(driver.ExecerContext)
   var execer driver.Execer
   if !ok {
      execer, ok = dc.ci.(driver.Execer)
   }

   // 和Query同样,若是驱动有实现这两个接口,就直接调用,不然由sql包主动触发调用prepare+execute
   if ok {
      var nvdargs []driver.NamedValue
      var resi driver.Result
      withLock(dc, func() {
         nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
         if err != nil {
            return
         }
         resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs)
      })
      if err != driver.ErrSkip {
         if err != nil {
            return nil, err
         }
         return driverResult{dc, resi}, nil
      }
   }

   var si driver.Stmt
   withLock(dc, func() {
      si, err = ctxDriverPrepare(ctx, dc.ci, query)
   })
   if err != nil {
      return nil, err
   }
   ds := &driverStmt{Locker: dc, si: si}
   defer ds.Close()

   // 从 statement 中保存结果
   return resultFromStatement(ctx, dc.ci, ds, args...)
}

8. 优雅地使用stmt

上面提到,直接使用占位符的方式来执行二进制sql,实际每次会发送两条sql,并不能提升执行效率,那statement的正确执行方式是什么呢?

stmt, err := db.Prepare("select * from t1 where a = ?”)   // prepare,sql_type=22
if err != nil {
   return
}
_, err = stmt.Exec(1)  // 第一次执行,sql_type=23
if err != nil {
   return
}
rows, err := stmt.Query(1)  // 第二次执行,链接全部权转交给rows,sql_type=23
if err != nil {
   return
}
_ = rows.Close()  // 归还链接的全部权

_ = stmt.Close()  // sql_type=25

咱们知道,db是一个链接池对象,这里prepare只须要显示调用一次,以后stmt在执行时,若是获取到了新的链接或者没有执行过prepare的链接,那么它会首先调用prepare,以后再去执行execute,所以,咱们无需担忧是否会在一个没有prepare过的链接上execute。
一样,stmt在调用Close()时,会对全部链接上都执行close,关闭掉这个stmt,所以,关闭以前,要保证这个stmt不会再被执行。

9. 释放链接

前面提到,咱们链接执行完一次普通查询,就须要及时放回到freeConn链接池中,中间链接的拥有权虽然会转移,但最终都须要被回收,其实,开启事务的请求也相似,会在事务提交或回滚后释放链接。链接释放的方法从上层不断向下传递,全部可能拥有链接全部权的对象,均可能接受到该释放链接到方法。

// 用来将使用完的链接放回到free链接池中

func (dc *driverConn) releaseConn(err error) {
   dc.db.putConn(dc, err, true)
}

func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
   // 检查链接是否还能复用
   if err != driver.ErrBadConn {
      if !dc.validateConnection(resetSession) {
         err = driver.ErrBadConn
      }
   }

   // debugGetPut 是测试信息
   db.mu.Lock()
   if !dc.inUse {
      db.mu.Unlock()
      if debugGetPut {
         fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
      }
      panic("sql: connection returned that was never out")
   }

   if err != driver.ErrBadConn && dc.expired(db.maxLifetime) {
      err = driver.ErrBadConn
   }
   if debugGetPut {
      db.lastPut[dc] = stack()
   }
   dc.inUse = false

   // 在这个链接上注册的一些statement的关闭函数
   for _, fn := range dc.onPut {
      fn()
   }
   dc.onPut = nil

   // 若是当前链接已经不可用,意味着可能会有新的链接请求,调用maybeOpenNewConnections进行检测
   if err == driver.ErrBadConn {
      // Don't reuse bad connections.
      // Since the conn is considered bad and is being discarded, treat it
      // as closed. Don't decrement the open count here, finalClose will
      // take care of that.
      db.maybeOpenNewConnections()
      db.mu.Unlock()
      dc.Close()
      return
   }

   // hook 的一个函数,用于测试,默认为nil
   if putConnHook != nil {
      putConnHook(db, dc)
   }
   added := db.putConnDBLocked(dc, nil)
   db.mu.Unlock()

   if !added {
      dc.Close()
      return
   }
}

10. 链接管理

对链接的管理,主要包括链接的申请,链接的回收及复用,异步释放超时的链接。

链接管理的整个流程以下

golang 数据库链接池database/sql 实现原理分析

11. 不开启事务,如何固定占用一条链接

经过前面这些内容,可以发现,在不开启事务的状况下,链接完成一笔请求,回被放回到free池里去,因此哪怕连续执行两条select,也有可能用的不是同一个实际的数据库链接,某些特殊场景,好比咱们执行完存储过程,想要select输出型结果时,这里就不知足要求。

简化下需求,实际上是咱们想要长时间占用一个链接,开启事务是一种解决方案,不过额外引入事务,可能会形成锁的延迟释放(以mysql两阶段锁为例), 这里能够用Context方法来实现,用法举例

{
   var a int
   ctx := context.Background()
   cn, err := db.Conn(ctx)  // 绑定一个链接
   if err != nil {
      return
   }

   // 执行第一次查询,将链接全部权转交给rows1
   rows1, err := cn.QueryContext(ctx, "select * from t1")
   if err != nil {
      return
   }
   _ = rows1.Scan(&a)
   _ = rows1.Close() // rows1 close,将链接全部权交给cn 

   // 执行第二次查询,将链接全部权转交给rows2
   rows2, err = cn.QueryContext(ctx, "select * from t1")
   if err != nil {
      return
   }
   _ = rows2.Scan(&a)
   _ = rows2.Close() // rows1 close,将链接全部权交给cn

   // cn close,链接回收,放回free队列
   _ = cn.Close()
}

关于db.Conn( ) 返回的sql.Conn对象,须要和driver.Conn 作区分,sql.Conn 是对driverConn的再一次封装,是为里提供连续的单个数据库链接,driver.Conn 是不一样驱动要实现的接口

// Conn represents a single database connection rather than a pool of database
// connections. Prefer running queries from DB unless there is a specific
// need for a continuous single database connection.
//
// A Conn must call Close to return the connection to the database pool
// and may do so concurrently with a running query.
//
// After a call to Close, all operations on the
// connection fail with ErrConnDone.

type Conn struct {
   db *DB

   // closemu prevents the connection from closing while there
   // is an active query. It is held for read during queries
   // and exclusively during close.
   closemu sync.RWMutex

   // dc is owned until close, at which point
   // it's returned to the connection pool.
   dc *driverConn

   // done transitions from 0 to 1 exactly once, on close.
   // Once done, all operations fail with ErrConnDone.
   // Use atomic operations on value when checking value.
   done int32
}

12. 监控链接池状态

因为mysql协议是同步的,所以,当客户端游大量的并发请求,可是链接数要小于并发数的状况下,是会有一部分请求被阻塞,等待其它请求释放链接,在某些场景或使用不当的状况下,这里也可能会成为瓶颈。不过库中并无详细记录每一笔请求的链接等待时间,只提供了累计的等待时间之和,以及其它的监控指标,在定位问题时能够用作参考。

库提供了 db.Stats( ) 方法,会从db对象中获取全部的监控指标,并生成对象 DBStats 对象

func (db *DB) Stats() DBStats {
   wait := atomic.LoadInt64(&db.waitDuration)

   db.mu.Lock()
   defer db.mu.Unlock()

   stats := DBStats{
      MaxOpenConnections: db.maxOpen,

      Idle:            len(db.freeConn),
      OpenConnections: db.numOpen,
      InUse:           db.numOpen - len(db.freeConn),

      WaitCount:         db.waitCount,
      WaitDuration:      time.Duration(wait),
      MaxIdleClosed:     db.maxIdleClosed,
      MaxLifetimeClosed: db.maxLifetimeClosed,
   }
   return stats
}

一个简单的使用例子

func monitorConn(db *sql.DB) {
   go func(db *sql.DB) {
      mt := time.NewTicker(monitorDbInterval * time.Second)
      for {
         select {
         case <-mt.C:
            stat := db.Stats()
            logutil.Errorf("monitor db conn(%p): maxopen(%d), open(%d), use(%d), idle(%d), "+
               "wait(%d), idleClose(%d), lifeClose(%d), totalWait(%v)",
               db,
               stat.MaxOpenConnections, stat.OpenConnections,
               stat.InUse, stat.Idle,
               stat.WaitCount, stat.MaxIdleClosed,
               stat.MaxLifetimeClosed, stat.WaitDuration)
         }
      }
   }(db)
}

须要注意的是,1.15 以前,对 stat.MaxLifetimeClosed 对象统计会有异常,1.15 以后作了修复。

Attention

  • 注意链接全部者的传递关系,使用完成后要及时回收,如rows.Close(),row.Scan()等,不回收会形成链接泄漏,新的请求会被一直阻塞
  • 尽可能避免使用占位符的方式执行sql,推荐本身完成sql的拼接或正常使用stmt
  • 1.15 后支持了对单个链接空闲时间的限制
  • db.Conn( ) 可以持续占用一条链接,可是在该链接中,就没有办法调用以前prepare生成的stmt,可是在事务中能够,tx.Stmt( )能够生成特定于该事务的stmt
  • go提供了数据库链接池回收策略,是针对freeConn的,换句话说,链接若是被一直占用,哪怕已经超过了生存时间,也不会被回收
  • 咱们注意到,每次对链接池操做时,都要先加一把全局大锁,所以,当链接数较多(>1000),且请求量较大时,会存在较为严重的锁竞争,这一点经过top(sys)指标,以及pprof也能发现,由于,一个简单的方式,是将一个大的链接池拆分为多个小的链接池,通常状况下,经过简单的轮询将请求打散在多个链接池上,能有效下降锁的粒度

【完】

相关文章
相关标签/搜索