Go组件学习——手写链接池并无那么简单

一、背景

前段时间在看gorm,发现gorm是复用database/sql的链接池。html

因而翻了下database/sql的数据库链接池的代码实现,看完代码,好像也不是很复杂,可是总以为理解不够深入,因而萌生了本身想写个链接池的想法。(最后也验证了,看源码的理解确实不够深入,一看就会,一作就跪)git

二、链接池的实现原理

什么是链接池github

  • 顾名思义是一个池子
  • 池子里面存放有限数量即时可用的链接,减小建立链接和关闭链接的时间
  • 链接是有存活时间的

具体到数据库链接池,我根据本身的理解画了一张获取链接的流程图sql

从上图咱们能够看出,除了链接池的容量大小,咱们还有一个最大链接数的限制。池子里的链接让咱们不用频繁的建立和关闭链接,同时应该也要有最大链接的限制,避免无限制的建立链接致使服务器资源耗尽,拖垮服务不可用。数据库

池子中的链接也有存活时间,若是超过存活时间则会销毁链接。缓存

三、实现链接池咱们须要考虑哪些问题

3.1 功能点

  • 获取链接安全

  • 释放链接服务器

  • Ping微信

  • 关闭链接池并发

  • 设置最大链接数和链接池容量(链接存活时间等等)

3.2 实现细节

  • 链接应该有哪些属性,好比最大链接数、链接池容量、链接建立时间和存活时间
  • 如何模拟使用链接池以及超过最大链接数后等待其余链接释放
  • 如何保证在多协程操做下数据的一致性
  • 若是实现链接的超时监听和通知

四、具体实现

这里的链接池实现包括

  • 设置最大链接数和链接池容量
  • 获取链接
  • 释放链接

4.1 结构定义

定义Conn结构体,这里包含了几乎全部的有关链接须要的信息属性

type Conn struct {
	maxConn       int                     // 最大链接数
	maxIdle       int                     // 最大可用链接数
	freeConn      int                     // 线程池空闲链接数
	connPool      []int                   // 链接池
	openCount     int                     // 已经打开的链接数
	waitConn      map[int]chan Permission // 排队等待的链接队列
	waitCount     int                     // 等待个数
	lock          sync.Mutex              // 锁
	nextConnIndex NextConnIndex						// 下一个链接的ID标识(用于区分每一个ID)
	freeConns     map[int]Permission 			// 链接池的链接	
}

  

这里并不会建立一个真正的数据库链接,而是使用一个非空的Permission表示拿到了链接。拿到一个非空的Permission才有资格执行后面相似增删改查的操做。

Permission对应的结构体以下

type Permission struct {
	NextConnIndex								 // 对应Conn中的NextConnIndex
	Content     string					 // 通行证的具体内容,好比"PASSED"表示成功获取
	CreatedAt   time.Time				 // 建立时间,即链接的建立时间
	MaxLifeTime time.Duration    // 链接的存活时间,本次没有用到这个属性,保留
}

  

NextConnIndex对应的结构体以下

type NextConnIndex struct {
	Index int
}

  

还有一个用来设置最大链接数以及链接池最大链接数的Config

type Config struct {
	MaxConn int
	MaxIdle int
}

  

4.2 初始化链接池参数

func Prepare(ctx context.Context, config *Config) (conn *Conn) {
	// go func() {
		//for {
		//conn.expiredCh = make(chan string, len(conn.freeConns))
		//for _, value := range conn.freeConns {
		//	if value.CreatedAt.Add(value.MaxLifeTime).Before(nowFunc()) {
		//		conn.expiredCh <- "CLOSE"
		//	}
		//}
	// }()
	return &Conn{
		maxConn:   config.MaxConn,
		maxIdle:   config.MaxIdle,
		openCount: 0,
		connPool:  []int{},
		waitConn:  make(map[int]chan Permission),
		waitCount: 0,
		freeConns: make(map[int]Permission),
	}
}

  

这里主要是初始化上面的Conn结构体参数。

注释的部分,主要想经过启动一个监听协程,用于监听已通过期的链接,并经过channel发送。(这块还有一些细节没有想清楚,先搁置)

4.3 设置MaxConn和MaxIdle

在main.go中添加代码

ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}

  

这里意味链接池只能缓存一个链接,最大新建链接数为2,超过则要加入等待队列。

4.4 获取链接

// 建立链接
func (conn *Conn) New(ctx context.Context) (permission Permission, err error) {
	/**
	一、若是当前链接池已满,即len(freeConns)=0
	二、断定openConn是否大于maxConn,若是大于,则丢弃获取加入队列进行等待
	三、若是小于,则考虑建立新链接
	*/
	conn.lock.Lock()

	select {
	default:
	case <-ctx.Done():	// context取消或超时,则退出
		conn.lock.Unlock()

		return Permission{}, errors.New("new conn failed, context cancelled!")
	}

  // 链接池不为空,从链接池获取链接
	if len(conn.freeConns) > 0 {
		var (
			popPermission Permission
			popReqKey     int
		)
    
    // 获取其中一个链接
		for popReqKey, popPermission = range conn.freeConns {
			break
		}
    // 从链接池删除
		delete(conn.freeConns, popReqKey)
		fmt.Println("log", "use free conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)
			conn.lock.Unlock()
		return popPermission, nil
	}

	if conn.openCount >= conn.maxConn { // 当前链接数大于上限,则加入等待队列
		nextConnIndex := getNextConnIndex(conn)

		req := make(chan Permission, 1)
		conn.waitConn[nextConnIndex] = req
		conn.waitCount++
		conn.lock.Unlock()

		select {
      // 若是在等待指定超时时间后,仍然没法获取释放链接,则放弃获取链接,这里若是不在超时时间后退出会一直阻塞
		case <-time.After(time.Second * time.Duration(3)):
			fmt.Println("超时,通知主线程退出")
			return
		case ret, ok := <-req: // 有放回的链接, 直接拿来用
			if !ok {
				return Permission{}, errors.New("new conn failed, no available conn release")
			}
			fmt.Println("log", "received released conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)
			return ret, nil
		}
		return Permission{}, errors.New("new conn failed")
	}

	// 新建链接
	conn.openCount++
	conn.lock.Unlock()
	permission = Permission{NextConnIndex: NextConnIndex{nextConnIndex},
		Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}
	fmt.Println("log", "create conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)
	return permission, nil
}

  

这里主要分为三个部分

  • 若是链接池不为空,则直接从池子里面获取链接使用便可

  • 若是链接池为空,且当前的链接数已经超过最大链接数maxConn,则会将当前任务加入等待队列,同时监听是否有释放的可用链接,若是有则拿来直接用,若是超过指定等待时间后仍然取不到链接则退出阻塞返回。

  • 若是链接池为空,且还没有达到最大链接数maxConn,则新建一个新链接。

getNextConnIndex函数

func getNextConnIndex(conn *Conn) int {
	currentIndex := conn.nextConnIndex.Index
	conn.nextConnIndex.Index = currentIndex + 1
	return conn.nextConnIndex.Index
}

  

4.5 释放链接

// 释放链接
func (conn *Conn) Release(ctx context.Context) (result bool, err error) {
	conn.lock.Lock()
  // 若是等待队列有等待任务,则通知正在阻塞等待获取链接的进程(即New方法中"<-req"逻辑)
  // 这里没有作指定链接的释放,只是保证释放的链接会被利用起来
	if len(conn.waitConn) > 0 {
		var req chan Permission
		var reqKey int
		for reqKey, req = range conn.waitConn {
			break
		}
		// 假定释放的链接就是下面新建的链接
		permission := Permission{NextConnIndex: NextConnIndex{reqKey},
			Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}
		req <- permission
		conn.waitCount--
		delete(conn.waitConn, reqKey)
		conn.lock.Unlock()
	} else {
		if conn.openCount > 0 {
			conn.openCount--
      
			if len(conn.freeConns) < conn.maxIdle {	// 确保链接池大小不会超过maxIdle
				nextConnIndex := getNextConnIndex(conn)
				permission := Permission{NextConnIndex: NextConnIndex{nextConnIndex},
					Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}
				conn.freeConns[nextConnIndex] = permission
			}
		}
		conn.lock.Unlock()
	}
	return
}

  

这里主要分为两部分

  • 若是释放链接的时候发现等待队列有任务在等待,则将释放的链接经过channel发送,给正在等待链接释放的阻塞任务使用,同时从等待队列中删除该任务。
  • 若是当前无等待任务,则将链接放入链接池

这里的nowFunc

var nowFunc = time.Now

  

五、Case模拟

5.1 无释放建立链接

即只有建立链接,拿到链接也不会释放链接

package main

import (
	"context"
	custom_pool "go-demo/main/src/custom-pool"
)

func main() {

	ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}
	conn := custom_pool.Prepare(ctx, config)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
}

  

执行结果以下

注意上面代码都是一直在获取链接,在获取链接后没有释放链接。

第一次获取,链接池为空,则新建链接

第二次获取,链接池为空,继续新建链接

第三次获取,链接池为空,同时已有链接数>=maxConn,因此会阻塞等待释放链接,可是由于没有链接释放,因此一直等待,直到3秒超时后退出。

因此第三次、第四次和第五次都是超时退出

5.2 释放链接

若是咱们释放链接会怎么样,咱们能够经过新启一个协程用于释放一个链接以下

package main

import (
	"context"
	custom_pool "go-demo/main/src/custom-pool"
)

func main() {

	ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}
	conn := custom_pool.Prepare(ctx, config)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
}

  

执行结果以下

log create conn!!!!! openCount:  1  freeConns:  map[]
log create conn!!!!! openCount:  2  freeConns:  map[]
log received released conn!!!!! openCount:  2  freeConns:  map[]
超时,通知主线程退出
超时,通知主线程退出

  

前两次和上面同样,可是第三次获取的时候,会收到一个释放的链接,因此能够直接复用释放的链接返回。

可是第四次和第五次建立,由于没有释放的链接,因此都会由于等待超时后退出。

5.3 使用链接池

上面的两个case是在MaxConn=2,MaxIdle=1的状况下执行的。

下面咱们看看若是基于以上两个参数设定,模拟出正好使用链接池的状况。

package main

import (
	"context"
	custom_pool "go-demo/main/src/custom-pool"
)

func main() {

	ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}
	conn := custom_pool.Prepare(ctx, config)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
}

  

即除了第一次,后面都会有链接释放。

执行结果可能状况以下

log create conn!!!!! openCount:  1  freeConns:  map[]
log create conn!!!!! openCount:  2  freeConns:  map[]
log use free conn!!!!! openCount:  1  freeConns:  map[]
log use free conn!!!!! openCount:  0  freeConns:  map[]
log create conn!!!!! openCount:  1  freeConns:  map[]

  

从执行结果能够看出,这里有两次使用了链接池中的链接。

注意:由于释放是新启协程执行,因此没法保证执行顺序,不一样的执行顺序,会有不一样的执行结果。上面只是执行结果的一种。

以上完整代码参见https://github.com/DMinerJackie/go-demo/tree/master/main/src/custom-pool

六、总结和展望

6.1 总结

  • 经过手写链接池加深对于链接池实现的理解
  • 学会使用channel和协程
  • 学会如何在channel阻塞指定时间后退出(设立超时时间)
  • 学会对于共享资源加锁,好比nextConnIndex的获取和更新须要加锁

6.2 展望

  • Close和Ping没有写(实现不难)
  • 链接池链接须要有存活时间,并在链接过时的时候从链接池删除
  • 实现使用的是普通的map集合,能够考虑并发安全的syncMap
  • 代码实现比较简陋不够优雅,能够继续完善保证职责单一

若是您以为阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写做动力!若是您想持续关注个人文章,请扫描二维码,关注JackieZheng的微信公众号,我会将个人文章推送给您,并和您一块儿分享我平常阅读过的优质文章。

原文出处:https://www.cnblogs.com/bigdataZJ/p/go-custom-pool.html