GORM之for(rows.Next)提早退出别忘了Close

近期一同事负责的线上模块,老是时不时的返回一下 504,检查发现,这个服务的内存使用异常的大,pprof分析后,发现有上万个goroutine,排查分析以后,是没有规范使用gorm包致使的,那么具体是什么缘由呢,会不会也像 《Go Http包解析:为何须要response.Body.Close()》 文中同样,由于没有释放链接致使的呢?mysql

问题现象

demo

首先咱们先来看一个示例,而后,猜想一下打印的结果git

package main

import (
	"log"
	"net/http"
	_ "net/http/pprof"
	"time"

	"github.com/jinzhu/gorm"
	_ "github.com/jinzhu/gorm/dialects/mysql"
)

var (
	db *gorm.DB
)

type User struct {
	ID    int64  `gorm:"column:id;primary_key" json:"id"`
	Name  string `gorm:"column:name" json:"name"`
}

func (user *User) TableName() string {
	return "ranger_user"
}

func main() {
	go func() {
		log.Println(http.ListenAndServe(":6060", nil))
	}()
	for true {
		GetUserList()
		time.Sleep(time.Second)
	}
}

func GetUserList() ([]*User, error) {
	users := make([]*User, 0)
	db := open()
	rows, err := db.Model(&User{}).Where("id > ?", 1).Rows()
	if err != nil {
		panic(err)
	}
  // 为了试验而写的特殊逻辑
	for rows.Next() {
		user := &User{}
		err = db.ScanRows(rows, user)
		return nil, err
	}
	return users, nil
}

func open() *gorm.DB {
  if db != nil {
		return db
	}
	var err error
	db, err = gorm.Open("mysql",
     "user:pass@(ip:port)/db?charset=utf8&parseTime=True&loc=Local")
	if err != nil {
		panic(err)
	}
	return db
}
复制代码

分析

咱们先看一下上面的demo,貌似没有什么问题,咱们就运行一段时间看看github

有点尴尬,我就一简单的查询返回,怎么会有那么多goroutine?sql

继续看一下都是哪些函数产生了goroutine数据库

startWatcher.func1 是个什么鬼json

func (mc *mysqlConn) startWatcher() {
	watcher := make(chan mysqlContext, 1)
	mc.watcher = watcher
	finished := make(chan struct{})
	mc.finished = finished
	go func() {
		for {
			var ctx mysqlContext
			select {
			case ctx = <-watcher:
			case <-mc.closech:
				return
			}

			select {
			case <-ctx.Done():
				mc.cancel(ctx.Err())
			case <-finished:
			case <-mc.closech:
				return
			}
		}
	}()
}
复制代码

猜想验证

startWatcher 这个函数的调用者,只有 MySQLDriver.Open 会调用,也就是建立新的链接的时候,才会去建立一个监控者的goroutinesegmentfault

根据 《Go Http包解析:为何须要response.Body.Close()》 中的分析结果,能够大胆猜想,有多是mysql每次去查询的时候,获取一个链接,没有空闲的链接,则建立一个新的,查询完成后释放链接到链接池,以便下一个请求使用,而因为没有调用rows.Close(), 致使拿了链接以后,没有再放回链接池复用,致使每一个请求过来都建立一个新的请求,从而致使产生了大量的goroutine去运行startWatcher.func1 监控新建立的链接 。因此咱们相似于 response.Close 同样,进行一下 rows.Close() 是否是就ok了,接下来验证一下缓存

对上面的测试代码增长一行rows.Close()markdown

defer rows.Close()
	for rows.Next() {
		user := &User{}
		err = db.ScanRows(rows, user)
		return nil, err
	}
复制代码

继续观察goroutine的变化网络

goroutine 再也不上升,貌似问题就解决了

疑问

  1. 咱们通常写代码的时候,都不会调用 rows.Close()的,不少状况下并无出现goroutine的暴增,这是为何

结构

照例,仍是先把可能用到的结构体提早放出来,混个眼熟

rows

// Rows is the result of a query. Its cursor starts before the first row
// of the result set. Use Next to advance from row to row.
type Rows struct {
	dc          *driverConn // owned; must call releaseConn when closed to release
	releaseConn func(error) // driverConn.releaseConn, 在query的时候,会传递过来 rowsi driver.Rows cancel func() // called when Rows is closed, may be nil. closeStmt *driverStmt // if non-nil, statement to Close on close // closemu prevents Rows from closing while there // is an active streaming result. It is held for read during non-close operations // and exclusively during close. // // closemu guards lasterr and closed. closemu sync.RWMutex closed bool lasterr error // non-nil only if closed is true // lastcols is only used in Scan, Next, and NextResultSet which are expected // not to be called concurrently. lastcols []driver.Value }s 复制代码

查询

创建链接、scope结构体、Model、Where 方法的逻辑就再也不赘述了,上一篇文章《GORM之ErrRecordNotFound采坑记录》已经粗略讲过了,直接进入Rows函数的解析

Rows

// Rows return `*sql.Rows` with given conditions
func (s *DB) Rows() (*sql.Rows, error) {
	return s.NewScope(s.Value).rows()
}

func (scope *Scope) rows() (*sql.Rows, error) {
	defer scope.trace(scope.db.nowFunc())

	result := &RowsQueryResult{}
  // 设置 row_query_result,供 callback 函数使用
	scope.InstanceSet("row_query_result", result)
	scope.callCallbacks(scope.db.parent.callbacks.rowQueries)

	return result.Rows, result.Error
}
复制代码

感受这里很快就进入了callback 的回调

根据上一篇文章的经验,rowQueries 所注册的回调函数,能够在 callback_row_query.go 中的 init() 函数中找到

func init() {
	DefaultCallback.RowQuery().Register("gorm:row_query", rowQueryCallback)
}

// queryCallback used to query data from database
func rowQueryCallback(scope *Scope) {
  // 对应 上面函数里面的 scope.InstanceSet("row_query_result", result)
	if result, ok := scope.InstanceGet("row_query_result"); ok {
    // 组装出来对应的sql语句,eg: SELECT * FROM `ranger_user` WHERE (id > ?)
		scope.prepareQuerySQL()
		if str, ok := scope.Get("gorm:query_option"); ok {
			scope.SQL += addExtraSpaceIfExist(fmt.Sprint(str))
		}

		if rowResult, ok := result.(*RowQueryResult); ok {
			rowResult.Row = scope.SQLDB().QueryRow(scope.SQL, scope.SQLVars...)
		} else if rowsResult, ok := result.(*RowsQueryResult); ok {
      // result 对应的结构体是 RowsQueryResult,因此执行到这里,继续跟进这个函数
			rowsResult.Rows, rowsResult.Error = scope.SQLDB().Query(scope.SQL, scope.SQLVars...)
		}
	}
}
复制代码

上面能够看到,rowQueryCallback 仅仅是组装了一下sql,而后又去调用go 提供的sql包,来进行查询

sql.Query

// Query executes a query that returns rows, typically a SELECT.
// The args are for any placeholder parameters in the query.
// query是sql语句,args则是sql中? 所表明的值
func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
	return db.QueryContext(context.Background(), query, args...)
}

// QueryContext executes a query that returns rows, typically a SELECT.
// The args are for any placeholder parameters in the query.
func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
	var rows *Rows
	var err error
  // maxBadConnRetries = 2
	for i := 0; i < maxBadConnRetries; i++ {
    // cachedOrNewConn 则是告诉query 去使用缓存的链接或者建立一个新的链接
		rows, err = db.query(ctx, query, args, cachedOrNewConn)
		if err != driver.ErrBadConn {
			break
		}
	}
  // 若是尝试了maxBadConnRetries次后,链接仍是有问题的,则建立一个新的链接去执行sql
	if err == driver.ErrBadConn {
		return db.query(ctx, query, args, alwaysNewConn)
	}
	return rows, err
}

func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
  // 根据上面定的获取链接的策略,来获取一个有效的链接
	dc, err := db.conn(ctx, strategy)
	if err != nil {
		return nil, err
	}
  // 使用获取的链接,进行查询
	return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
}


复制代码

上面的逻辑理解不难,这里有两个变量,解释一下

cachedOrNewConn: connReuseStrategy 类型,本质是uint8类型,值是1,这个标志会传递给下面的db.conn 函数,告诉这个函数,返回链接的策略

1. 若是链接池中有空闲链接,返回一个空闲的
2. 若是链接池中没有空的链接,且没有超过最大建立的链接数,则建立一个新的返回
3. 若是链接池中没有空的链接,且超过最大建立的链接数,则等待链接释放后,返回这个空闲链接
复制代码

alwaysNewConn:

  1. 每次都返回一个新的链接

获取链接

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
	db.mu.Lock()
	if db.closed {
		db.mu.Unlock()
		return nil, errDBClosed
	}
	// Check if the context is expired.
  // 校验一下ctx是否过时了
	select {
	default:
	case <-ctx.Done():
		db.mu.Unlock()
		return nil, ctx.Err()
	}
	lifetime := db.maxLifetime

	// Prefer a free connection, if possible.
	numFree := len(db.freeConn)
	if strategy == cachedOrNewConn && numFree > 0 {
    // 若是选择链接的策略是 cachedOrNewConn,而且有空闲的链接,则尝试获取链接池中的第一个链接
		conn := db.freeConn[0]
		copy(db.freeConn, db.freeConn[1:])
		db.freeConn = db.freeConn[:numFree-1]
		conn.inUse = true
		db.mu.Unlock()
    // 判断当前链接的空闲时间是否超过了设定的最大空闲时间
		if conn.expired(lifetime) {
			conn.Close()
			return nil, driver.ErrBadConn
		}
		// Lock around reading lastErr to ensure the session resetter finished.
    // 判断链接的lastErr,确保链接是被重置过的
		conn.Lock()
		err := conn.lastErr
		conn.Unlock()
		if err == driver.ErrBadConn {
			conn.Close()
			return nil, driver.ErrBadConn
		}
		return conn, nil
	}

	// Out of free connections or we were asked not to use one. If we're not
	// allowed to open any more connections, make a request and wait.
  // 走到这里说明没有获取到空闲链接,判断建立的链接数量是否超过最大容许的链接数量
	if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
		// Make the connRequest channel. It's buffered so that the
		// connectionOpener doesn't block while waiting for the req to be read.
    // 建立一个chan,用于接收释放的空闲链接
		req := make(chan connRequest, 1)
    // 建立一个key
		reqKey := db.nextRequestKeyLocked()
    // 将key 和chan绑定,便于根据key 定位所对应的chan
		db.connRequests[reqKey] = req
		db.waitCount++
		db.mu.Unlock()

		waitStart := time.Now()

		// Timeout the connection request with the context.
		select {
		case <-ctx.Done():
			// Remove the connection request and ensure no value has been sent
			// on it after removing.
      // 若是ctx失效了,则这个空闲链接也不须要了,删除刚刚建立的key,防止这个链接被移除后再次为这个key获取链接
			db.mu.Lock()
			delete(db.connRequests, reqKey)
			db.mu.Unlock()

			atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

			select {
			default:
			case ret, ok := <-req:
        // 若是获取到了空闲链接,则放回链接池里面
				if ok && ret.conn != nil {
					db.putConn(ret.conn, ret.err, false)
				}
			}
			return nil, ctx.Err()
		case ret, ok := <-req:
      // 此时拿到了空闲链接,且ctx没有过时,则判断链接是否有效
			atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

			if !ok {
				return nil, errDBClosed
			}
      // 判断链接是否过时
			if ret.err == nil && ret.conn.expired(lifetime) {
				ret.conn.Close()
				return nil, driver.ErrBadConn
			}
			if ret.conn == nil {
				return nil, ret.err
			}
			// Lock around reading lastErr to ensure the session resetter finished.
      // 判断链接的lastErr,确保链接是被重置过的
			ret.conn.Lock()
			err := ret.conn.lastErr
			ret.conn.Unlock()
			if err == driver.ErrBadConn {
				ret.conn.Close()
				return nil, driver.ErrBadConn
			}
			return ret.conn, ret.err
		}
	}
	// 上面两个都不知足,则建立一个新的链接,也就是 获取链接的策略是 alwaysNewConn 的时候
	db.numOpen++ // optimistically
	db.mu.Unlock()
	ci, err := db.connector.Connect(ctx)
	if err != nil {
		db.mu.Lock()
		db.numOpen-- // correct for earlier optimism
    // 若是链接建立失败,则再尝试建立一次
		db.maybeOpenNewConnections()
		db.mu.Unlock()
		return nil, err
	}
	db.mu.Lock()
	dc := &driverConn{
		db:        db,
		createdAt: nowFunc(),
		ci:        ci,
		inUse:     true,
	}
  // 关闭链接时会用到
	db.addDepLocked(dc, dc)
	db.mu.Unlock()
	return dc, nil
}
复制代码

在上面的逻辑中,能够看到,获取链接的策略跟咱们上面解释 cachedOrNewConn 和 alwaysNewConn 时是同样的,可是,这里面有两个问题

  1. 建立的链接数量超过最大容许的链接数量,则等待一个空闲的链接,这时候为 db.connRequests 这个map新增长了一个key,这个key对应一个chan,而后直接等待这个 chan 吐出来链接,既然是等待释放空闲链接,那么这个chan 里面插入的 链接,应该是在freeconn 函数里面,freeconn的逻辑又是怎么样的呢
  2. 建立新链接失败后,会调用 db.maybeOpenNewConnections, 这个函数又不返回链接,那么它作了什么

释放链接

释放链接主要依靠 putconn 来完成的,在 conn 函数的下面代码中

case ret, ok := <-req:
        // 若是获取到了空闲链接,则放回链接池里面
				if ok && ret.conn != nil {
					db.putConn(ret.conn, ret.err, false)
				}
			}
复制代码

也调用了,把获取到但再也不须要的链接放回池子里,下面看一下释放链接的过程

putConn

// putConn adds a connection to the db's free pool.
// err is optionally the last error that occurred on this connection.
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
	db.mu.Lock()
  // 释放一个正在用的链接,panic
	if !dc.inUse {
		panic("sql: connection returned that was never out")
	}
	dc.inUse = false
  
  // 省略部分无关代码...

	if err == driver.ErrBadConn {
		// Don't reuse bad connections.
		// Since the conn is considered bad and is being discarded, treat it
		// as closed. Don't decrement the open count here, finalClose will
		// take care of that.
    // maybeOpenNewConnections 这个函数又见到了,它到底干了什么
		db.maybeOpenNewConnections()
		db.mu.Unlock()
		dc.Close()
		return
	}
  
  ...
	
  if db.closed {
		// Connections do not need to be reset if they will be closed.
		// Prevents writing to resetterCh after the DB has closed.
		resetSession = false
	}
	if resetSession {
		if _, resetSession = dc.ci.(driver.SessionResetter); resetSession {
			// Lock the driverConn here so it isn't released until
			// the connection is reset.
			// The lock must be taken before the connection is put into
			// the pool to prevent it from being taken out before it is reset.
			dc.Lock()
		}
	}
  // 把链接放回链接池中,也是这个函数的核心逻辑
	added := db.putConnDBLocked(dc, nil)
	db.mu.Unlock()
  // 若是释放链接失败,则关闭链接
	if !added {
		if resetSession {
			dc.Unlock()
		}
		dc.Close()
		return
	}
	if !resetSession {
		return
	}
  // 尝试将链接放回resetterCh chan里面,若是失败,则标识链接异常
	select {
	default:
		// If the resetterCh is blocking then mark the connection
		// as bad and continue on.
		dc.lastErr = driver.ErrBadConn
		dc.Unlock()
	case db.resetterCh <- dc:
	}
}
复制代码

putConnDBLocked

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
	if db.closed {
		return false
	}
  // 已经超出最大的链接数量了,不须要再放回了
	if db.maxOpen > 0 && db.numOpen > db.maxOpen {
		return false
	}
  // 若是有其余等待获取空闲链接的协程,则
	if c := len(db.connRequests); c > 0 {
		var req chan connRequest
		var reqKey uint64
    // connRequests 获取一个 chan,并把这个链接返回到这个 chan里面
		for reqKey, req = range db.connRequests {
			break
		}
		delete(db.connRequests, reqKey) // Remove from pending requests.
		if err == nil {
			dc.inUse = true
		}
		req <- connRequest{
			conn: dc,
			err:  err,
		}
		return true
	} else if err == nil && !db.closed {
    // 若是没有超出最大数量限制,则把这个链接放到 freeConn 这个slice里面
		if db.maxIdleConnsLocked() > len(db.freeConn) {
			db.freeConn = append(db.freeConn, dc)
			db.startCleanerLocked()
			return true
		}
		db.maxIdleClosed++
	}
	return false
}
复制代码

梳理完释放链接的逻辑,咱们能够看出链接复用的大体流程

  1. 一个新的请求过来,须要获取一个新的链接
  2. 首先判断是否有空闲链接,若是没有且没有超过容许建立的最大链接数,则建立一个
  3. 多个请求以后,链接数量已经超过了设定的最大链接数,则等待释放空闲链接
  4. 此时,第一个请求完成了,准备释放链接,去看一下有没有等待空闲链接的请求,若是有的话,则把这个链接经过chan直接传过去,不然,把这个链接放到空闲的链接池里面
  5. 此时,后面等待空闲链接的请求,拿到了第一个请求传递过来的链接,继续处理请求
  6. 以上,循环往复

###maybeOpenNewConnections

这个函数,在上面的分析中已经出现了两次了,先分析一下 这个函数到底作了什么

func (db *DB) maybeOpenNewConnections() {
  // 计算须要建立的链接数,总共建立的有效链接数不能超过设置的最大链接数
	numRequests := len(db.connRequests)
	if db.maxOpen > 0 {
		numCanOpen := db.maxOpen - db.numOpen
		if numRequests > numCanOpen {
			numRequests = numCanOpen
		}
	}
	for numRequests > 0 {
		db.numOpen++ // optimistically
		numRequests--
		if db.closed {
			return
		}
    // 往 openerCh 这个chan里面插入一条数据
		db.openerCh <- struct{}{}
	}
}
复制代码

在前面的分析中,若是在获取链接时,发现产生的链接数>= 最大容许的链接数,则在 db.connRequests 这个map中建立一个惟一的 key value,用于接收释放的空闲链接,可是若是在释放链接的过程当中,发现这个链接失效了,这个链接就没法复用,这时候就会走到这个函数,尝试建立一个新的链接,给其余等待的请求使用

这里就会发现一个问题: 为何 db.openerCh <- struct{}{} 这样一条简单的命令就能建立一个链接,接下来就须要分析 db.openerCh 的接收方了

###connectionOpener

这个函数在db结构体建立的时候,就会开始执行了,一个常驻的goroutine

// Runs in a separate goroutine, opens new connections when requested.
func (db *DB) connectionOpener(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		case <-db.openerCh:
      // 这边接收到数据后,就开始建立一个新的链接
			db.openNewConnection(ctx)
		}
	}
}
复制代码

openNewConnection

// Open one new connection
func (db *DB) openNewConnection(ctx context.Context) {
	// maybeOpenNewConnctions has already executed db.numOpen++ before it sent
	// on db.openerCh. This function must execute db.numOpen-- if the
	// connection fails or is closed before returning.
  // 调用 sql driver 库来建立一个链接
	ci, err := db.connector.Connect(ctx)
	db.mu.Lock()
	defer db.mu.Unlock()
  // 若是db已经关闭,则关闭链接并返回
	if db.closed {
		if err == nil {
			ci.Close()
		}
		db.numOpen--
		return
	}
	if err != nil {
    // 建立链接失败了,从新调用 maybeOpenNewConnections 再建立一次
		db.numOpen--
		db.putConnDBLocked(nil, err)
		db.maybeOpenNewConnections()
		return
	}
	dc := &driverConn{
		db:        db,
		createdAt: nowFunc(),
		ci:        ci,
	}
  // 走到 putConnDBLocked,把链接交给等待的请求方或者链接池中
	if db.putConnDBLocked(dc, err) {
		db.addDepLocked(dc, dc)
	} else {
		db.numOpen--
		ci.Close()
	}
}
复制代码

Connect

这里是链接数据库的主要逻辑

func (t dsnConnector) Connect(_ context.Context) (driver.Conn, error) {
	return t.driver.Open(t.dsn)
}

func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
	var err error

	// New mysqlConn
	mc := &mysqlConn{
		maxAllowedPacket: maxPacketSize,
		maxWriteSize:     maxPacketSize - 1,
		closech:          make(chan struct{}),
	}
  // 解析dsn
	mc.cfg, err = ParseDSN(dsn)
	if err != nil {
		return nil, err
	}
	mc.parseTime = mc.cfg.ParseTime

	// Connect to Server
  // 找到对应网络链接类型(tcp...) 的链接函数,并建立链接
	dialsLock.RLock()
	dial, ok := dials[mc.cfg.Net]
	dialsLock.RUnlock()
	if ok {
		mc.netConn, err = dial(mc.cfg.Addr)
	} else {
		nd := net.Dialer{Timeout: mc.cfg.Timeout}
		mc.netConn, err = nd.Dial(mc.cfg.Net, mc.cfg.Addr)
	}
	if err != nil {
		return nil, err
	}

	// Enable TCP Keepalives on TCP connections
  // 开启Keepalives
	if tc, ok := mc.netConn.(*net.TCPConn); ok {
		if err := tc.SetKeepAlive(true); err != nil {
			// Don't send COM_QUIT before handshake.
			mc.netConn.Close()
			mc.netConn = nil
			return nil, err
		}
	}

	// Call startWatcher for context support (From Go 1.8)
  // 这里调用startWatcher,开始对链接进行监控,及时释放链接
	if s, ok := interface{}(mc).(watcher); ok {
		s.startWatcher()
	}

	// 下面一些设置与分析无关,忽略...

	return mc, nil
}
复制代码

startWatcher

这个函数主要是对链接进行监控

func (mc *mysqlConn) startWatcher() {
	watcher := make(chan mysqlContext, 1)
	mc.watcher = watcher
	finished := make(chan struct{})
	mc.finished = finished
	go func() {
		for {
			var ctx mysqlContext
			select {
			case ctx = <-watcher:
			case <-mc.closech:
				return
			}

			select {
      // ctx 过时的时候,关闭链接,这时候会关闭mc.closech
			case <-ctx.Done():
				mc.cancel(ctx.Err())
			case <-finished:
      // 关闭链接
			case <-mc.closech:
				return
			}
		}
	}()
}
复制代码

建立链接的逻辑

  1. 首先尝试建立一个链接,若是失败,则再次调用maybeOpenNewConnections函数,再度尝试建立一个新的链接,直到建立成功或者没有请求方须要等待链接位置
  2. 新链接建立时,会调用startWatcher函数,一个常驻的goroutine,来对链接进行监控,及时的关闭
  3. 链接建立成功后,经过 putConnDBLocked,把链接交给等待链接的请求方或者放到链接池中

至此,基本上链接建立及复用的流程大概清晰了,至此,对于咱们最开始遇到的问题也有了一个明确的解释:

  • 调用 Rows() 函数进行查询的时候,须要获取一个链接
  • 此时没有新的或空闲的链接,因此,须要建立一个新的链接
  • 建立链接是,建立一个 startWatcher的goroutine来进行监控
  • 因为 查询完成后,没有调用 rows.Close() 及时释放链接,致使此链接一直没有放回链接池或被复用,因此每次请求,都会建立一个新的链接
  • 屡次请求下来,就会建立不少的startWatcher的goroutine,最终产生了遇到的现象

Rows.Close

func (rs *Rows) Close() error {
	return rs.close(nil)
}

func (rs *Rows) close(err error) error {
	rs.closemu.Lock()
	defer rs.closemu.Unlock()
  // ...
  rs.closed = true

  // 相关字段的一些设置, 忽略 ....
	rs.releaseConn(err)
	return err
}

// 经过putConn 把链接释放
func (dc *driverConn) releaseConn(err error) {
	dc.db.putConn(dc, err, true)
}
复制代码

rs.releaseConn 所对应的函数,能够在 queryDC 这个方法里面找到,这里就直接列出来了

能够看到,rows.Close() 最后就是经过 putConn 把当前的链接释放以便复用

Rows.Next

Next 为scan方法准备下一条记录,以便scan方法读取,若是没有下一行的话,或者准备下一条记录的时候出错了,就会返回false

func (rs *Rows) Next() bool {
	var doClose, ok bool
	withLock(rs.closemu.RLocker(), func() {
    // 准备下一条记录
		doClose, ok = rs.nextLocked()
	})
	if doClose {
    // 若是 doClose 为true,说明没有记录了,或者准备下一条记录的时候,出错了,此时关闭链接
		rs.Close()
	}
	return ok
}

func (rs *Rows) nextLocked() (doClose, ok bool) {
  // 若是 已经关闭了,就不要读取下一条了
	if rs.closed {
		return false, false
	}

	// Lock the driver connection before calling the driver interface
	// rowsi to prevent a Tx from rolling back the connection at the same time.
	rs.dc.Lock()
	defer rs.dc.Unlock()

	if rs.lastcols == nil {
		rs.lastcols = make([]driver.Value, len(rs.rowsi.Columns()))
	}
	// 获取下一条记录,并放到lastcols里面
	rs.lasterr = rs.rowsi.Next(rs.lastcols)
	if rs.lasterr != nil {
		// Close the connection if there is a driver error.
    // 读取出错,返回true,以便后面关闭链接
		if rs.lasterr != io.EOF {
			return true, false
		}
		nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)
		if !ok {
      // 没有获取到记录了,返回true,以便后面关闭链接
			return true, false
		}
		// The driver is at the end of the current result set.
		// Test to see if there is another result set after the current one.
		// Only close Rows if there is no further result sets to read.
		if !nextResultSet.HasNextResultSet() {
			doClose = true
		}
		return doClose, false
	}
	return false, true
}
复制代码

Next() 的逻辑:

  1. 在调用Next() 的时候,准备下一条记录,以便scan读取
  2. 若是在准备数据的时候出错或者没有下一条记录的时候,返回false
  3. 若是Next() 在准备数据的时候,拿到了false,则调用 rows.Close() 把链接放回池子或者交给其余请求等待着,以便复用链接

因此,也就是为何一下的demo并不会出现问题同样

for rows.Next() {
		user := &User{}
		err = db.ScanRows(rows, user)
		if err != nil {
			continue
		}
	}
复制代码

总结

走到这里,开头提出的问题应该已经有了明确的答案了: rows.Next() 在获取到最后一条记录以后,会调用 rows.Close() 将链接放回链接池或交给其余等待的请求方,因此不须要手动调用 rows.Close(),

而出问题的demo中,因为rows.Next() 没有执行到最后一条记录处,也没有调用 rows.Close(), 因此在获取到链接后一直没有被放回进行复用,致使了每来一个请求建立一个新的链接,产生一个新的监控者 startWatcher.func1, 最终致使了内存爆炸💥

相关文章
相关标签/搜索