github地址: https://github.com/Jeffail/tunny
tunny的项目结构很是简单,核心文件就是tunny.go与worker.gogit
tunny主要是经过reqChan管道来联系pool与worker之间的关系,worker的数量与协程池的大小相等,在初始化协程池时决定;各个worker竞争地获取reqChan中的数据,而后处理,最后返回给pool;github
type Pool struct { queuedJobs int64 ctor func() Worker workers []*workerWrapper reqChan chan workRequest workerMut sync.Mutex }
Pool结构体:golang
type Worker interface { // Process will synchronously perform a job and return the result. Process(interface{}) interface{} // BlockUntilReady is called before each job is processed and must block the // calling goroutine until the Worker is ready to process the next job. BlockUntilReady() // Interrupt is called when a job is cancelled. The worker is responsible // for unblocking the Process implementation. Interrupt() // Terminate is called when a Worker is removed from the processing pool // and is responsible for cleaning up any held resources. Terminate() }
worker在tunny中被设计成了一个interface,由于在以后的代码中能够看到,worker能够有许多不一样地实现,正如以前一篇整理的博客所说:golang编码技巧总结,咱们在写代码时都应该使用interface,来面向接口编程,实现解耦;编程
两种workersegmentfault
// closureWorker is a minimal Worker implementation that simply wraps a // func(interface{}) interface{} type closureWorker struct { processor func(interface{}) interface{} }
闭包worker,这个worker是最经常使用的一种worker,它主要执行初始化时赋予它的processeor函数来完成工做;闭包
type callbackWorker struct{} func (w *callbackWorker) Process(payload interface{}) interface{} { f, ok := payload.(func()) if !ok { return ErrJobNotFunc } f() return nil }
回调worker,这种worker处理的数据必须是一个函数,而后调用这个函数;app
// NewFunc creates a new Pool of workers where each worker will process using // the provided func. func NewFunc(n int, f func(interface{}) interface{}) *Pool { return New(n, func() Worker { return &closureWorker{ processor: f, } }) }
初始化协程池时须要两个参数,一个是协程池大小n,一个是但愿协程池执行的函数,这个函数最终交由闭包worker,运行时由它实际处理数据;ide
func New(n int, ctor func() Worker) *Pool { p := &Pool{ ctor: ctor, reqChan: make(chan workRequest), } p.SetSize(n) return p }
能够看到,reqChan在这时出现了,这个在以后的代码中将是链接pool与worker的核心;函数
SetSize会作什么呢?编码
func (p *Pool) SetSize(n int) { p.workerMut.Lock() defer p.workerMut.Unlock() lWorkers := len(p.workers) if lWorkers == n { return } // Add extra workers if N > len(workers) for i := lWorkers; i < n; i++ { p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor())) } // Asynchronously stop all workers > N for i := n; i < lWorkers; i++ { p.workers[i].stop() } // Synchronously wait for all workers > N to stop for i := n; i < lWorkers; i++ { p.workers[i].join() } // Remove stopped workers from slice p.workers = p.workers[:n] }
首先,会对这个函数加锁,这是为了防止在多个协程同时进行SetSize操做;
其次,当worker数量小于须要SetSize的数量,则增长worker的数量;
若worker数量大于SetSize的数量,则减少worker的数量;
增长worker的数量是如何增长呢?newWorkerWrapper
函数有不少值得关注的地方,值得注意的是,pool将它的reqChan传给了这个函数,也就是传给了worker;
func newWorkerWrapper( reqChan chan<- workRequest, worker Worker, ) *workerWrapper { w := workerWrapper{ worker: worker, interruptChan: make(chan struct{}), reqChan: reqChan, closeChan: make(chan struct{}), closedChan: make(chan struct{}), } go w.run() return &w }
能够看到,在调用初始化newWorkerWrapper后,go了一个协程,进行w.run()操做,worker在这里是调用的以前传入的闭包worker的构造函数生成的,所以这里的worker是闭包worker;
func (w *workerWrapper) run() { jobChan, retChan := make(chan interface{}), make(chan interface{}) defer func() { w.worker.Terminate() close(retChan) close(w.closedChan) }() for { // NOTE: Blocking here will prevent the worker from closing down. w.worker.BlockUntilReady() select { case w.reqChan <- workRequest{ jobChan: jobChan, retChan: retChan, interruptFunc: w.interrupt, }: select { case payload := <-jobChan: result := w.worker.Process(payload) select { case retChan <- result: case <-w.interruptChan: w.interruptChan = make(chan struct{}) } case _, _ = <-w.interruptChan: w.interruptChan = make(chan struct{}) } case <-w.closeChan: return } } }
解读这个run函数,这是整个worker的核心;
首先,能看到一个大的for循环,里面嵌套了select;
一进入select,会无脑往reqChan里传入workRequest,这时须要与pool的接收函数对应起来看:
func (p *Pool) Process(payload interface{}) interface{} { atomic.AddInt64(&p.queuedJobs, 1) request, open := <-p.reqChan if !open { panic(ErrPoolNotRunning) } request.jobChan <- payload payload, open = <-request.retChan if !open { panic(ErrWorkerClosed) } atomic.AddInt64(&p.queuedJobs, -1) return payload }
能够发现,由于worker会无脑往reqChan管道里传入workRequest,所以pool必定会取到塞入的值交给request变量,payload是实际处理的数据,pool将其塞入workRequest的jobChan中,以后阻塞等待从retChan取得结果,因为这个jobChan与worker的jobChan是同一个指针,所以payload能在worker的
select { case payload := <-jobChan: result := w.worker.Process(payload) select { case retChan <- result: case <-w.interruptChan: w.interruptChan = make(chan struct{}) } ...
case语句中被取到,而后进行处理,处理完后进入下一个select语句,无脑将result塞到retChan中;因为worker的retChan与pool的retChan是同一个指针,所以pool取到了retChan的结果,将其返回;
多个worker的状况,则会竞争从reqChan取数据,可是总能保证只有size个worker在工做,达到了限制协程数量的目的。