概览:mysql
为何须要链接池git
链接失效问题github
database/sql 中的链接池sql
使用链接池管理Thrift连接数据库
如下主要使用Golang做为编程语言编程
我以为使用链接池最大的一个好处就是减小链接的建立和关闭,增长系统负载能力,
以前就有遇到一个问题:TCP TIME_WAIT链接数过多致使服务不可用,由于未开启数据库链接池,再加上mysql并发较大,致使须要频繁的建立连接,最终产生了上万的TIME_WAIT的tcp连接,影响了系统性能。网络
连接池中的的功能主要是管理一堆的连接,包括建立和关闭,因此本身在fatih/pool基础上,改造了一下:https://github.com/silenceper/pool ,使得更加通用一些,增长的一些功能点以下:并发
链接对象不仅仅是net.Conn
,变为了interface{}
(池中存储本身想要的格式)app
增长了连接的最大空闲时间(保证了当链接空闲过久,连接失效的问题)less
主要是用到了channel
来管理链接,而且可以很好的利用管道的顺序性,当须要使用的时候Get
一个链接,使用完毕以后Put
放回channel
中。
使用链接池以后就再也不是短链接,而是长链接了,就引起了一些问题:
由于网络环境是复杂的,中间可能由于防火墙等缘由,致使长时间空闲的链接会断开,因此能够经过两个方法来解决:
客户端增长心跳,定时的给服务端发送请求
给链接池中的链接增长最大空闲时间,超时的链接再也不使用
在https://github.com/silenceper/pool就增长了一个这样最大空闲时间的参数,在链接建立或者链接被从新返回链接池中时重置,给每一个链接都增长了一个链接的建立时间,在取出的时候对时间进行比较:https://github.com/silenceper/pool/blob/master/channel.go#L85
远程服务端颇有可能重启,那么以前建立的连接就失效了。客户端在使用的时候就须要判断这些失效的链接并丢弃,在database/sql
中就判断了这些失效的链接,使用这种错误表示var ErrBadConn = errors.New("driver: bad connection")
另外值得一提的就是在database/sql
对这种ErrBadConn
错误进行了重试,默认重试次数是两次,因此可以保证即使是连接失效或者断开了,本次的请求可以正常响应(继续往下看就是分析了)。
链接失效的特征
对链接进行read读操做时,返回EOF
错误
对链接进行write操做时,返回write tcp 127.0.0.1:52089->127.0.0.1:8002: write: broken pipe
错误
在database/sql
中使用链接链接池很简单,主要涉及下面这些配置:
db.SetMaxIdleConns(10) //链接池中最大空闲链接数 db.SetMaxOpenConns(20) //打开的最大链接数 db.SetConnMaxLifetime(300*time.Second)//链接的最大空闲时间(可选)
注:若是
MaxIdleConns
大于0而且MaxOpenConns
小于MaxIdleConns
,那么会将MaxIdleConns
置为MaxIdleConns
来看下db这个结构,以及字段相关说明:
type DB struct { //具体的数据库实现的interface{}, //例如https://github.com/go-sql-driver/mysql 就注册并并实现了driver.Open方法,主要是在里面实现了一些鉴权的操做 driver driver.Driver //dsn链接 dsn string //在prepared statement中用到 numClosed uint64 mu sync.Mutex // protects following fields //可以使用的空闲的连接 freeConn []*driverConn //用来传递链接请求的管道 connRequests []chan connRequest //当前打开的链接数 numOpen int //当须要建立新的连接的时候,往这个管道中发送一个struct数据, //由于在Open数据库的就启用了一个goroutine执行connectionOpener方法读取管道中的数据 openerCh chan struct{} //数据库是否已经被关闭 closed bool //用来保证锁被正确的关闭 dep map[finalCloser]depSet //stacktrace of last conn's put; debug only lastPut map[*driverConn]string //最大空闲链接 maxIdle int //最大打开的链接 maxOpen int //链接的最大空闲时间 maxLifetime time.Duration //定时清理空闲链接的管道 cleanerCh chan struct{} }
看一个查询数据库的例子:
rows, err := db.Query("select * from table1")
在调用db.Query
方法以下:
func (db *DB) Query(query string, args ...interface{}) (*Rows, error) { var rows *Rows var err error //这里就作了对失效的连接的重试操做 for i := 0; i < maxBadConnRetries; i++ { rows, err = db.query(query, args, cachedOrNewConn) if err != driver.ErrBadConn { break } } if err == driver.ErrBadConn { return db.query(query, args, alwaysNewConn) } return rows, err }
在什么状况下会返回,能够从这里看到:
readPack,writePack
继续跟进去就到了
func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
方法主要是建立tcp链接,并判断了链接的生存时间lifetime,以及链接数的一些限制,若是超过的设定的最大打开连接数限制等待connRequest
管道中有链接产生(在putConn
释放连接的时候就会往这个管道中写入数据)
什么时候释放连接?
当咱们调用rows.Close()
的时候,就会把当前正在使用的连接从新放回freeConn
或者写入到db.connRequests
管道中
//putConnDBLocked 方法 //若是有db.connRequests有在等待链接的话,就把当前链接给它用 if c := len(db.connRequests); c > 0 { req := db.connRequests[0] // This copy is O(n) but in practice faster than a linked list. // TODO: consider compacting it down less often and // moving the base instead? copy(db.connRequests, db.connRequests[1:]) db.connRequests = db.connRequests[:c-1] if err == nil { dc.inUse = true } req <- connRequest{ conn: dc, err: err, } return true } else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) { //没人须要我这个连接,我就把他从新返回`freeConn`链接池中 db.freeConn = append(db.freeConn, dc) db.startCleanerLocked() return true }
这里是使用链接池https://github.com/silenceper/pool,如何构建一个thrift连接
客户端建立Thrift的代码:
type Client struct { *user.UserClient } //建立Thrift客户端连接的方法 factory := func() (interface{}, error) { protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() transportFactory := thrift.NewTTransportFactory() var transport thrift.TTransport var err error transport, err = thrift.NewTSocket(rpcConfig.Listen) if err != nil { panic(err) } transport = transportFactory.GetTransport(transport) //defer transport.Close() if err := transport.Open(); err != nil { panic(err) } rpcClient := user.NewUserClientFactory(transport, protocolFactory) //在链接池中直接放置Client对象 return &Client{UserClient: rpcClient}, nil } //关闭链接的方法 close := func(v interface{}) error { v.(*Client).Transport.Close() return nil } //建立了一个 初始化链接是 poolConfig := &pool.PoolConfig{ InitialCap: 10, MaxCap: 20, Factory: factory, Close: close, IdleTimeout: 300 * time.Second, } p, err := pool.NewChannelPool(poolConfig) if err != nil { panic(err) } //取得连接 conn, err := p.Get() if err != nil { return nil, err } v, ok := conn.(*Client) ...使用链接调用远程方法 //将链接从新放回链接池中 p.Put(conn)
pool链接池代码地址:https://github.com/silenceper...
原文地址:http://silenceper.com/blog/201611/%E8%81%8A%E8%81%8Atcp%E8%BF%9E%E6%8E%A5%E6%B1%A0/