worker pool其实就是线程池thread pool。对于go来讲,直接使用的是goroutine而非线程,不过这里仍然以线程来解释线程池。git
在线程池模型中,有2个队列一个池子:任务队列、已完成任务队列和线程池。其中已完成任务队列可能存在也可能不存在,依据实际需求而定。安全
只要有任务进来,就会放进任务队列中。只要线程执行完了一个任务,就将任务放进已完成任务队列,有时候还会将任务的处理结果也放进已完成队列中。函数
worker pool中包含了一堆的线程(worker,对go而言每一个worker就是一个goroutine),这些线程嗷嗷待哺,等待着为它们分配任务,或者本身去任务队列中取任务。取得任务后更新任务队列,而后执行任务,并将执行完成的任务放进已完成队列。spa
下图来自wiki:线程
在Go中有两种方式能够实现工做池:传统的互斥锁、channel。指针
假设Go中的任务的定义形式为:code
type Task struct { ... }
每次有任务进来时,都将任务放在任务队列中。blog
使用传统的互斥锁方式实现,任务队列的定义结构大概以下:队列
type Queue struct{ M sync.Mutex Tasks []Task }
而后在执行任务的函数中加上Lock()和Unlock()。例如:it
func Worker(queue *Queue) { for { // Lock()和Unlock()之间的是critical section queue.M.Lock() // 取出任务 task := queue.Tasks[0] // 更新任务队列 queue.Tasks = queue.Tasks[1:] queue.M.Unlock() // 在此goroutine中执行任务 process(task) } }
假如在线程池中激活了100个goroutine来执行Worker()。Lock()和Unlock()保证了在同一时间点只能有一个goroutine取得任务并随之更新任务列表,取任务和更新任务队列都是critical section中的代码,它们是具备原子性。而后这个goroutine能够执行本身取得的任务。于此同时,其它goroutine能够争夺互斥锁,只要争抢到互斥锁,就能够取得任务并更新任务列表。当某个goroutine执行完process(task),它将由于for循环再次参与互斥锁的争抢。
上面只是给出了一点主要的代码段,要实现完整的线程池,还有不少额外的代码。
经过互斥锁,上面的一切操做都是线程安全的。但问题在于加锁/解锁的机制比较重量级,当worker(即goroutine)的数量足够多,锁机制的实现将出现瓶颈。
在Go中,也能用buffered channel实现工做池。
示例代码很长,因此这里先拆分解释每一部分,最后给出完整的代码段。
在下面的示例中,每一个worker的工做都是计算每一个数值的位数相加之和。例如给定一个数值234,worker则计算2+3+4=9
。这里交给worker的数值是随机生成的[0,999)范围内的数值。
这个示例有几个核心功能须要先解释,也是经过channel实现线程池的通常功能:
首先,建立Task和Result两个结构,并建立它们的通道:
type Task struct { ID int randnum int } type Result struct { task Task result int } var tasks = make(chan Task, 10) var results = make(chan Result, 10)
这里,每一个Task都有本身的ID,以及该任务将要被worker计算的随机数。每一个Result都包含了worker的计算结果result以及这个结果对应的task,这样从Result中就能够取出任务信息以及计算结果。
另外,两个通道都是buffered channel,容量都是10。每一个worker都会监听tasks通道,并取出其中的任务进行计算,而后将计算结果和任务自身放进results通道中。
而后是计算位数之和的函数process(),它将做为worker的工做任务之一。
func process(num int) int { sum := 0 for num != 0 { digit := num % 10 sum += digit num /= 10 } time.Sleep(2 * time.Second) return sum }
这个计算过程其实很简单,但随后还睡眠了2秒,用来伪装执行一个计算任务是须要一点时间的。
而后是worker(),它监听tasks通道并取出任务进行计算,并将结果放进results通道。
func worker(wg *WaitGroup){ defer wg.Done() for task := range tasks { result := Result{task, process(task.randnum)} results <- result } }
上面的代码很容易理解,只要tasks channel不关闭,就会一直监听该channel。须要注意的是,该函数使用指针类型的*WaitGroup
做为参数,不能直接使用值类型的WaitGroup
做为参数,这样会使得每一个worker都有一个本身的WaitGroup。
而后是建立工做池的函数createWorkerPool(),它有一个数值参数,表示要建立多少个worker。
func createWorkerPool(numOfWorkers int) { var wg sync.WaitGroup for i := 0; i < numOfWorkers; i++ { wg.Add(1) go worker(&wg) } wg.Wait() close(results) }
建立工做池时,首先建立一个WaitGroup的值wg,这个wg被工做池中的全部goroutine共享,每建立一个goroutine都wg.Add(1)。建立完全部的goroutine后等待全部的groutine都执行完它们的任务,只要有一个任务尚未执行完,这个函数就会被Wait()阻塞。当全部任务都执行完成后,关闭results通道,由于没有结果再须要向该通道写了。
固然,这里是否须要关闭results通道,是由稍后的range迭代这个通道决定的,不关闭这个通道会一直阻塞range,最终致使死锁。
工做池部分已经完成了。如今须要使用allocate()函数分配任务:生成一大堆的随机数,而后将Task放进tasks通道。该函数有一个表明建立任务数量的数值参数:
func allocate(numOfTasks int