labix.org/mgo 链接池泄漏问题

问题复现

labix.org/mgo是golang经常使用的mongo driver,笔者的项目中重度依赖,不过项目年久失修,已经不维护。因此结论是用官方包git

最近在使用中,我发现了一个问题,服务对mongo的长链接一直缓慢增加,形似mongo链接泄漏。github

查看了下mgo源码,发现mgo内部维护了链接池,而默认链接池大小socketsPerServer是4096,链接池的链接新增是惰性的,不会在初始化时创建全部链接,而是在有新请求且当前无剩余可用链接时,创建新链接,使用结束后就放入链接池中供之后使用,期待链接池所维护的最多链接数是4096个。链接池大小限制代码以下,请注意mongoServer.unusedSockets(空闲的链接)和mongoServer.liveSockets(生效过的链接)的含义:golang

// 链接池大小:
var socketsPerServer = 4096
// 链接池大小设置:
func SetPoolLimitPerServer(limit int) {
	socketsPerServer = limit
}
// 链接池管理结构
type mongoServer struct {
  ...
	unusedSockets []*mongoSocket // 空闲的链接
	liveSockets   []*mongoSocket // 生效过的链接,这里记录全部创建的链接,当链接使用结束会复制一份进unusedSockets,可是不会从liveSockets删除
	... 
}
复制代码

如上代码,socketsPerServer是能够经过方法SetPoolLimitPerServer改写的,因而我在下面的代码中,OpenDB里调用SetPoolLimitPerServer将链接池上限改为了20,可是经过并发测试发现,链接池的限制并不许确,实际运行下来最后创建的链接依然会超过20。mongodb

主要代码以下,我插入200条数据,为了防止数据库压力,经过笔者写的goworker以稳定的并发90写入,其余会进行排队。数据库

最后经过mgo提供的GetStats()接口(或者在服务端经过db.serverStatus()也能看到链接数量connections.current),可以看到池里的链接为SocketsAlive 59个,远远超过了20,这是为何呢?并发

// 测试mgo链接泄露问题
func main() {
	db := OpenDB(xl, fmgo.Config{Config: mgo3.Config{Host: "127.0.0.1:27017", DB: "test_black", Coll: "connectiontest", Mode: "strong", SyncTimeoutInS: 5}})
	defer db.Close()
	mgo.SetStats(true)
	// 初始化 goworker,并发不超过90
	worker := goworker.New(goworker.WorkerConfig{
		ConcurrencyNum: 90,
	})
	// 总共插入200条数据
	for i := 0; i < 200; i++ {
		var j = i
		worker.Add(func() {
			db.Insert(mgoDBInfo{Name: strconv.Itoa(j) + "_name"})
		})
	}
	worker.IsDone()
	stats := mgo.GetStats()
	fmt.Printf("%+v", stats)
 // 输出:{Clusters:0 MasterConns:59 SlaveConns:0 SentOps:459 ReceivedOps:259 ReceivedDocs:259 SocketsAlive:59 SocketsInUse:0 SocketRefs:0}
	time.Sleep(100 * time.Second)
}
复制代码

问题定位

问题出如今当有新的请求时,mgo 的链接池管理逻辑,当须要新建一个链接时,并发状况下会有问题,精简代码以下app

func (server *mongoServer) AcquireSocket(limit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
 	for {
  	server.Lock()
    n := len(server.unusedSockets)
    // 判断当前正在使用的链接,是否到链接池上限,若是到了上限就退出等待
		if len(server.liveSockets)-n >= limit {
			server.Unlock()
			return nil, false, errSocketLimit
		}
   
    if n > 0{
      // 拿一个可用的 unusedSockets
    } else {
    	server.Unlock()
      // bug here
      // 这里 unlock->connect->lock,是由于若是不unlock,新建链接时间太长,致使阻塞全部并发AcquireSocket的请求
      // 这样作在并发时就会有一个bug,若是链接池大小是20,而同时并发的进入30个AcquireSocket,全部的请求都会走到下面的Connect(),并正常的拿到链接,加入liveSockets,返回成功,致使链接池里有30个链接
      socket, err = server.Connect()
      if err == nil {
				server.Lock()
				if server.closed {
          server.UnLock()
          return nil, abended, errServerClosed
        }
				server.liveSockets = append(server.liveSockets, socket)
				server.Unlock()
      }
    }
    return
  }
  panic("unreachable")
}


复制代码

问题解决

从代码逻辑看,mgo对链接池的理解和通常的理解不一样:socket

  • 通常而言,链接池大小,就表明有多少个长链接维持在池里,也就是 正在使用的链接+未使用的链接 <= 链接池大小
  • mgo的链接池管理里,从上面的方法里用if len(server.liveSockets)-n >= limit判断链接池是否已经满能够看出, mgo认为当前正在用的链接数 <= 链接池大小 就能够了,没有使用的链接不该该受链接池大小控制。(注意,第一部分已经强调,liveSockets是历史创建的全部链接数量,不是正在使用socket的数量)

很明显,mgo的定义是不对的,这样会致使真实创建的链接(正在使用的+池里未使用的)>链接池限制。测试

针对这一点,咱们在拿到新链接并加锁后,判断一下当前创建的链接是否已经超限,超限就关闭当前链接并等待,解决这个问题。ui

这个解决方案,会放弃新建的链接,对资源是有必定的浪费的,由于毕竟新建链接是耗时的。可是一旦创建后,就致使了链接泄漏,因此是不得已而为之。

func (server *mongoServer) AcquireSocket(limit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
     socket, err = server.Connect()
      if err == nil {
				server.Lock()
				if server.closed {
          server.UnLock()
          return nil, abended, errServerClosed
        }
        // fix bug start
        // +1 是要算上当前新建的这个链接
        if limit > 0 && len(server.liveSockets)-n+1 > limit {
					server.Unlock()
					socket.Release()
					socket.Close()
					return nil, false, errSocketLimit
				}
        // fix bug end
				server.liveSockets = append(server.liveSockets, socket)
				server.Unlock()
      }
}
复制代码
相关文章
相关标签/搜索