go自从出生就身带“高并发”的标签,其并发编程就是由groutine实现的,因其消耗资源低,性能高效,开发成本低的特性而被普遍应用到各类场景,例如服务端开发中使用的HTTP服务,在golang net/http包中,每个被监听到的tcp连接都是由一个groutine去完成处理其上下文的,由此使得其拥有极其优秀的并发量吞吐量git
for { // 监听tcp rw, e := l.Accept() if e != nil { ....... } tempDelay = 0 c := srv.newConn(rw) c.setState(c.rwc, StateNew) // before Serve can return // 启动协程处理上下文 go c.serve(ctx) }
虽然建立一个groutine占用的内存极小(大约2KB左右,线程一般2M左右),可是在实际生产环境无限制的开启协程显然是不科学的,好比上图的逻辑,若是来几千万个请求就会开启几千万个groutine,当没有更多内存可用时,go的调度器就会阻塞groutine最终致使内存溢出乃至严重的崩溃,因此本文将经过实现一个简单的协程池,以及剖析几个开源的协程池源码来探讨一下对groutine的并发控制以及多路复用的设计和实现。github
过年前作过一波小需求,是将主播管理系统中信息不完整的主播找出来而后再到其相对应的直播平台爬取完整信息并补全,当时考虑到每个主播的数据都要访问一次直播平台因此就用应对每个主播开启一个groutine去抓取数据,虽然这个业务量还远远远远达不到能形成groutine性能瓶颈的地步,可是内心老是不舒服,因而放假回来后将其优化成从协程池中控制groutine数量再开启爬虫进行数据抓取。思路其实很是简单,用一个channel当作任务队列,初始化groutine池时肯定好并发量,而后以设置好的并发量开启groutine同时读取channel中的任务并执行, 模型以下图golang
type SimplePool struct { wg sync.WaitGroup work chan func() //任务队列 } func NewSimplePoll(workers int) *SimplePool { p := &SimplePool{ wg: sync.WaitGroup{}, work: make(chan func()), } p.wg.Add(workers) //根据指定的并发量去读取管道并执行 for i := 0; i < workers; i++ { go func() { defer func() { // 捕获异常 防止waitGroup阻塞 if err := recover(); err != nil { fmt.Println(err) p.wg.Done() } }() // 从workChannel中取出任务执行 for fn := range p.work { fn() } p.wg.Done() }() } return p } // 添加任务 func (p *SimplePool) Add(fn func()) { p.work <- fn } // 执行 func (p *SimplePool) Run() { close(p.work) p.wg.Wait() }
测试设定为在并发数量为20的协程池中并发抓取一百我的的信息, 由于代码包含较多业务逻辑因此sleep 1秒模拟爬虫过程,理论上执行时间为5秒数据库
func TestSimplePool(t *testing.T) { p := NewSimplePoll(20) for i := 0; i < 100; i++ { p.Add(parseTask(i)) } p.Run() } func parseTask(i int) func() { return func() { // 模拟抓取数据的过程 time.Sleep(time.Second * 1) fmt.Println("finish parse ", i) } }
这样一来最简单的一个groutine池就完成了编程
上面的groutine池虽然简单,可是对于每个并发任务的状态,pool的状态缺乏控制,因此又去看了一下go-playground/pool的源码实现,先从每个须要执行的任务入手,该库中对并发单元作了以下的结构体,能够看到除工做单元的值,错误,执行函数等,还用了三个分别表示,取消,取消中,写 的三个并发安全的原子操做值来标识其运行状态。安全
// 须要加入pool 中执行的任务 type WorkFunc func(wu WorkUnit) (interface{}, error) // 工做单元 type workUnit struct { value interface{} // 任务结果 err error // 任务的报错 done chan struct{} // 通知任务完成 fn WorkFunc cancelled atomic.Value // 任务是否被取消 cancelling atomic.Value // 是否正在取消任务 writing atomic.Value // 任务是否正在执行 }
接下来看Pool的结构并发
type limitedPool struct { workers uint // 并发量 work chan *workUnit // 任务channel cancel chan struct{} // 用于通知结束的channel closed bool // 是否关闭 m sync.RWMutex // 读写锁,主要用来保证 closed值的并发安全 }
初始化groutine池, 以及启动设定好数量的groutineapp
// 初始化pool,设定并发量 func NewLimited(workers uint) Pool { if workers == 0 { panic("invalid workers '0'") } p := &limitedPool{ workers: workers, } p.initialize() return p } func (p *limitedPool) initialize() { p.work = make(chan *workUnit, p.workers*2) p.cancel = make(chan struct{}) p.closed = false for i := 0; i < int(p.workers); i++ { // 初始化并发单元 p.newWorker(p.work, p.cancel) } } // passing work and cancel channels to newWorker() to avoid any potential race condition // betweeen p.work read & write func (p *limitedPool) newWorker(work chan *workUnit, cancel chan struct{}) { go func(p *limitedPool) { var wu *workUnit defer func(p *limitedPool) { // 捕获异常,结束掉异常的工做单元,并将其再次做为新的任务启动 if err := recover(); err != nil { trace := make([]byte, 1<<16) n := runtime.Stack(trace, true) s := fmt.Sprintf(errRecovery, err, string(trace[:int(math.Min(float64(n), float64(7000)))])) iwu := wu iwu.err = &ErrRecovery{s: s} close(iwu.done) // need to fire up new worker to replace this one as this one is exiting p.newWorker(p.work, p.cancel) } }(p) var value interface{} var err error for { select { // workChannel中读取任务 case wu = <-work: // 防止channel 被关闭后读取到零值 if wu == nil { continue } // 先判断任务是否被取消 if wu.cancelled.Load() == nil { // 执行任务 value, err = wu.fn(wu) wu.writing.Store(struct{}{}) // 任务执行完在写入结果时须要再次检查工做单元是否被取消,防止产生竞争条件 if wu.cancelled.Load() == nil && wu.cancelling.Load() == nil { wu.value, wu.err = value, err close(wu.done) } } // pool是否被中止 case <-cancel: return } } }(p) }
往POOL中添加任务,并检查pool是否关闭tcp
func (p *limitedPool) Queue(fn WorkFunc) WorkUnit { w := &workUnit{ done: make(chan struct{}), fn: fn, } go func() { p.m.RLock() if p.closed { w.err = &ErrPoolClosed{s: errClosed} if w.cancelled.Load() == nil { close(w.done) } p.m.RUnlock() return } // 将工做单元写入workChannel, pool启动后将由上面newWorker函数中读取执行 p.work <- w p.m.RUnlock() }() return w }
在go-playground/pool包中, limitedPool的批量并发执行还须要借助batch.go来完成函数
// batch contains all information for a batch run of WorkUnits type batch struct { pool Pool // 上面的limitedPool实现了Pool interface m sync.Mutex // 互斥锁,用来判断closed units []WorkUnit // 工做单元的slice, 这个主要用在不设并发限制的场景,这里忽略 results chan WorkUnit // 结果集,执行完后的workUnit会更新其value,error,能够从结果集channel中读取 done chan struct{} // 通知batch是否完成 closed bool wg *sync.WaitGroup }
// go-playground/pool 中有设置并发量和不设并发量的批量任务,都实现Pool interface,初始化batch批量任务时会将以前建立好的Pool传入newBatch func newBatch(p Pool) Batch { return &batch{ pool: p, units: make([]WorkUnit, 0, 4), // capacity it to 4 so it doesn't grow and allocate too many times. results: make(chan WorkUnit), done: make(chan struct{}), wg: new(sync.WaitGroup), } } // 往批量任务中添加workFunc任务 func (b *batch) Queue(fn WorkFunc) { b.m.Lock() if b.closed { b.m.Unlock() return } //往上述的limitPool中添加workFunc wu := b.pool.Queue(fn) b.units = append(b.units, wu) // keeping a reference for cancellation purposes b.wg.Add(1) b.m.Unlock() // 执行完后将workUnit写入结果集channel go func(b *batch, wu WorkUnit) { wu.Wait() b.results <- wu b.wg.Done() }(b, wu) } // 通知批量任务再也不接受新的workFunc, 若是添加完workFunc不执行改方法的话将致使取结果集时done channel一直阻塞 func (b *batch) QueueComplete() { b.m.Lock() b.closed = true close(b.done) b.m.Unlock() } // 获取批量任务结果集 func (b *batch) Results() <-chan WorkUnit { go func(b *batch) { <-b.done b.m.Lock() b.wg.Wait() b.m.Unlock() close(b.results) }(b) return b.results }
func SendMail(int int) pool.WorkFunc { fn := func(wu pool.WorkUnit) (interface{}, error) { // sleep 1s 模拟发邮件过程 time.Sleep(time.Second * 1) // 模拟异常任务须要取消 if int == 17 { wu.Cancel() } if wu.IsCancelled() { return false, nil } fmt.Println("send to", int) return true, nil } return fn } func TestBatchWork(t *testing.T) { // 初始化groutine数量为20的pool p := pool.NewLimited(20) defer p.Close() batch := p.Batch() // 设置一个批量任务的过时超时时间 t := time.After(10 * time.Second) go func() { for i := 0; i < 100; i++ { batch.Queue(SendMail(i)) } batch.QueueComplete() }() // 由于 batch.Results 中要close results channel 因此不能将其放在LOOP中执行 r := batch.Results() LOOP: for { select { case <-t: // 登台超时通知 fmt.Println("recived timeout") break LOOP case email, ok := <-r: // 读取结果集 if ok { if err := email.Error(); err != nil { fmt.Println("err", err.Error()) } fmt.Println(email.Value()) } else { fmt.Println("finish") break LOOP } } } }
接近理论值5s, 通知模拟被取消的work也正常取消
go-playground/pool在比起以前简单的协程池的基础上, 对pool, worker的状态有了很好的管理。可是,可是问题来了,在第一个实现的简单groutine池和go-playground/pool中,都是先启动预约好的groutine来完成任务执行,在并发量远小于任务量的状况下确实可以作到groutine的复用,若是任务量很少则会致使任务分配到每一个groutine不均匀,甚至可能出现启动的groutine根本不会执行任务从而致使浪费,并且对于协程池也没有动态的扩容和缩小。因此我又去看了一下ants的设计和实现。
ants是一个受fasthttp启发的高性能协程池, fasthttp号称是比go原生的net/http快10倍,其快速高性能的缘由之一就是采用了各类池化技术(这个往后再开新坑去读源码), ants相比以前两种协程池,其模型更像是以前接触到的数据库链接池,须要从空余的worker中取出一个来执行任务, 当无可用空余worker的时候再去建立,而当pool的容量达到上线以后,剩余的任务阻塞等待当前进行中的worker执行完毕将worker放回pool, 直至pool中有空闲worker。 ants在内存的管理上作得很好,除了按期清除过时worker(必定时间内没有分配到任务的worker),ants还实现了一种适用于大批量相同任务的pool, 这种pool与一个须要大批量重复执行的函数锁绑定,避免了调用方不停的建立,更加节省内存。
先看一下ants的pool 结构体 (pool.go)
type Pool struct { // 协程池的容量 (groutine数量的上限) capacity int32 // 正在执行中的groutine running int32 // 过时清理间隔时间 expiryDuration time.Duration // 当前可用空闲的groutine workers []*Worker // 表示pool是否关闭 release int32 // lock for synchronous operation. lock sync.Mutex // 用于控制pool等待获取可用的groutine cond *sync.Cond // 确保pool只被关闭一次 once sync.Once // worker临时对象池,在复用worker时减小新对象的建立并加速worker从pool中的获取速度 workerCache sync.Pool // pool引起panic时的执行函数 PanicHandler func(interface{}) }
接下来看pool的工做单元 worker (worker.go)
type Worker struct { // worker 所属的poo; pool *Pool // 任务队列 task chan func() // 回收时间,即该worker的最后一次结束运行的时间 recycleTime time.Time }
执行worker的代码 (worker.go)
func (w *Worker) run() { // pool中正在执行的worker数+1 w.pool.incRunning() go func() { defer func() { if p := recover(); p != nil { //若worker因各类问题引起panic, //pool中正在执行的worker数 -1, //若是设置了Pool中的PanicHandler,此时会被调用 w.pool.decRunning() if w.pool.PanicHandler != nil { w.pool.PanicHandler(p) } else { log.Printf("worker exits from a panic: %v", p) } } }() // worker 执行任务队列 for f := range w.task { //任务队列中的函数所有被执行完后, //pool中正在执行的worker数 -1, //将worker 放回对象池 if f == nil { w.pool.decRunning() w.pool.workerCache.Put(w) return } f() //worker 执行完任务后放回Pool //使得其他正在阻塞的任务能够获取worker w.pool.revertWorker(w) } }() }
了解了工做单元worker如何执行任务以及与pool交互后,回到pool中查看其实现, pool的核心就是取出可用worker提供给任务执行 (pool.go)
// 向pool提交任务 func (p *Pool) Submit(task func()) error { if 1 == atomic.LoadInt32(&p.release) { return ErrPoolClosed } // 获取pool中的可用worker并向其任务队列中写入任务 p.retrieveWorker().task <- task return nil } // **核心代码** 获取可用worker func (p *Pool) retrieveWorker() *Worker { var w *Worker p.lock.Lock() idleWorkers := p.workers n := len(idleWorkers) - 1 // 当前pool中有可用worker, 取出(队尾)worker并执行 if n >= 0 { w = idleWorkers[n] idleWorkers[n] = nil p.workers = idleWorkers[:n] p.lock.Unlock() } else if p.Running() < p.Cap() { p.lock.Unlock() // 当前pool中无空闲worker,且pool数量未达到上线 // pool会先从临时对象池中寻找是否有已完成任务的worker, // 若临时对象池中不存在,则从新建立一个worker并将其启动 if cacheWorker := p.workerCache.Get(); cacheWorker != nil { w = cacheWorker.(*Worker) } else { w = &Worker{ pool: p, task: make(chan func(), workerChanCap), } } w.run() } else { // pool中没有空余worker且达到并发上限 // 任务会阻塞等待当前运行的worker完成任务释放会pool for { p.cond.Wait() // 等待通知, 暂时阻塞 l := len(p.workers) - 1 if l < 0 { continue } // 当有可用worker释放回pool以后, 取出 w = p.workers[l] p.workers[l] = nil p.workers = p.workers[:l] break } p.lock.Unlock() } return w } // 释放worker回pool func (p *Pool) revertWorker(worker *Worker) { worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) // 通知pool中已经获取锁的groutine, 有一个worker已完成任务 p.cond.Signal() p.lock.Unlock() }
在批量并发任务的执行过程当中, 若是有超过5纳秒(ants中默认worker过时时间为5ns)的worker未被分配新的任务,则将其做为过时worker清理掉,从而保证pool中可用的worker都能发挥出最大的做用以及将任务分配得更均匀
(pool.go)
// 该函数会在pool初始化后在协程中启动 func (p *Pool) periodicallyPurge() { // 建立一个5ns定时的心跳 heartbeat := time.NewTicker(p.expiryDuration) defer heartbeat.Stop() for range heartbeat.C { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers if len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 { p.lock.Unlock() return } n := -1 for i, w := range idleWorkers { // 由于pool 的worker队列是先进后出的,因此正序遍历可用worker时前面的每每里当前时间越久 if currentTime.Sub(w.recycleTime) <= p.expiryDuration { break } // 若是worker最后一次运行时间距如今超过5纳秒,视为过时,worker收到nil, 执行上述worker.go中 if n == nil 的操做 n = i w.task <- nil idleWorkers[i] = nil } if n > -1 { // 所有过时 if n >= len(idleWorkers)-1 { p.workers = idleWorkers[:0] } else { // 部分过时 p.workers = idleWorkers[n+1:] } } p.lock.Unlock() } }
func TestAnts(t *testing.T) { wg := sync.WaitGroup{} pool, _ := ants.NewPool(20) defer pool.Release() for i := 0; i < 100; i++ { wg.Add(1) pool.Submit(sendMail(i, &wg)) } wg.Wait() } func sendMail(i int, wg *sync.WaitGroup) func() { return func() { time.Sleep(time.Second * 1) fmt.Println("send mail to ", i) wg.Done() } }
这里虽只简单的测试批量并发任务的场景, 若是你们有兴趣能够去看看ants的压力测试, ants的吞吐量可以比原生groutine高出N倍,内存节省10到20倍, 可谓是协程池中的神器。
借用ants做者的原话来讲: 然而又有多少场景是单台机器须要扛100w甚至1000w同步任务的?基本没有啊!结果就是造出了屠龙刀,但是世界上没有龙啊!也是无情…
一口气从简单到复杂总结了三个协程池的实现,受益不浅, 感谢各开源库的做者, 虽然世界上没有龙,可是屠龙技是必须练的,由于它就像存款,不必定要所有都用了,可是必定不能没有!