结构体分析
type Pool struct {
// 用来建立redis链接的方法
Dial func() (Conn, error)
// 若是设置了给func,那么每次p.Get()的时候都会调用改方法来验证链接的可用性
TestOnBorrow func(c Conn, t time.Time) error
// 定义链接池中最大链接数(超过这个数会关闭老的连接,总会保持这个数)
MaxIdle int
// 当前链接池中可用的连接数.
MaxActive int
// 定义连接的超时时间,每次p.Get()的时候会检测这个链接是否超时(超时会关闭,并释放可用链接数).
IdleTimeout time.Duration
// 当可用链接数为0是,那么当wait=true,那么当调用p.Get()时,会阻塞等待,不然,返回nil.
Wait bool
// 读写锁控制.
mu sync.Mutex
// 用来条件控制,这里主要是当连接被关闭时,提醒在等待的进程可使用了,或者能够自行建立了
cond *sync.Cond
// 当前链接池是否已经关闭
closed bool
// 当前可用的连接数
active int
// 连接存储在一个栈中.
idle list.List
}
链接池关闭方法
func (p *Pool) Close() error {
p.mu.Lock()
// 获取链接池全部连接栈
idle := p.idle
// 从新初始化
p.idle.Init()
// 标示已经关闭
p.closed = true
// 控制可用链接数
p.active -= idle.Len()
// 若是当前有进程正在等待获取的话,则通知能够获取或者自行建立
if p.cond != nil {
p.cond.Broadcast()
}
p.mu.Unlock()
// 遍历栈,逐个关闭连接
for e := idle.Front(); e != nil; e = e.Next() {
e.Value.(idleConn).c.Close()
}
return nil
}
释放一个连接
func (p *Pool) release() {
// 当连接超时,或者ping不通,或者建立失败,则当即使用连接表示
p.active -= 1
// 若是已经有进程在以前等待了,则通知其使用或者自行建立
if p.cond != nil {
p.cond.Signal()
}
}
关闭链接
func (p *Pool) put(c Conn, forceClose bool) error {
err := c.Err()
p.mu.Lock()
// 若是链接池没有关闭,而且不是强制关闭的
if !p.closed && err == nil && !forceClose {
// 把指定的连接放在空闲栈首位
p.idle.PushFront(idleConn{t: nowFunc(), c: c})
// 若是栈的长度大于指定长度,则吧最后一个(可能超时)剔除
if p.idle.Len() > p.MaxIdle {
c = p.idle.Remove(p.idle.Back()).(idleConn).c
} else {
c = nil
}
}
if c == nil {
//成功放回空闲链接通知其余阻塞的进程
if p.cond != nil {
p.cond.Signal()
}
p.mu.Unlock()
return nil
}
// 减小active计数(感受这里能够不用处理,上面若是是替换的话)
p.release()
p.mu.Unlock()
// 关闭链接
return c.Close()
}
获取连接
func (p *Pool) get() (Conn, error) {
p.mu.Lock()
// 处理旧的连接(检测是否超时)
if timeout := p.IdleTimeout; timeout > 0 {
for i, n := 0, p.idle.Len(); i < n; i++ {
e := p.idle.Back()
if e == nil {
break
}
ic := e.Value.(idleConn)
// 连接建立的时间加上超时时间是否小于当前时间
if ic.t.Add(timeout).After(nowFunc()) {
break
}
// 超时连接,从栈中删除
p.idle.Remove(e)
// 可用链接数减小,并通知其余等待的进程处理
p.release()
p.mu.Unlock()
// 关闭当前链接
ic.c.Close()
p.mu.Lock()
}
}
for {
// 从链接栈列表中获取一个可用连接
for i, n := 0, p.idle.Len(); i < n; i++ {
// 从idle列表前面取链接,那么必然是刚刚使用过的链接
e := p.idle.Front()
if e == nil {
break
}
// 类型断言
ic := e.Value.(idleConn)
// 从栈中删除
p.idle.Remove(e)
test := p.TestOnBorrow
p.mu.Unlock()
// 经过使用校验连接函数检测连接
if test == nil || test(ic.c, ic.t) == nil {
return ic.c, nil
}
// 检验出问题,关闭链接
ic.c.Close()
p.mu.Lock()
// 下降可用链接数,并通知其余进程处理
p.release()
}
// 检测链接池是否已经关闭.
if p.closed {
p.mu.Unlock()
return nil, errors.New("redigo: get on closed pool")
}
// 若是可用链接数为0 或者小于最大可用可用链接数范围,那么建立
if p.MaxActive == 0 || p.active < p.MaxActive {
dial := p.Dial
p.active += 1
p.mu.Unlock()
// 链接redis server
c, err := dial()
// 链接失败关闭之
if err != nil {
p.mu.Lock()
p.release()
p.mu.Unlock()
c = nil
}
return c, err
}
// 若是没有配置等待,那么就返回nil
if !p.Wait {
p.mu.Unlock()
return nil, ErrPoolExhausted
}
// 若是配置了等待,那么初始化,而且开始等待,直到有进程通知连接数够了或者能够建立了
if p.cond == nil {
p.cond = sync.NewCond(&p.mu)
}
p.cond.Wait()
}
}