worker pool其实就是线程池thread pool。对于go来讲,直接使用的是goroutine而非线程,不过这里仍然以线程来解释线程池。git
在线程池模型中,有2个队列一个池子:任务队列、已完成任务队列和线程池。其中已完成任务队列可能存在也可能不存在,依据实际需求而定。安全
只要有任务进来,就会放进任务队列中。只要线程执行完了一个任务,就将任务放进已完成任务队列,有时候还会将任务的处理结果也放进已完成队列中。函数
worker pool中包含了一堆的线程(worker,对go而言每一个worker就是一个goroutine),这些线程嗷嗷待哺,等待着为它们分配任务,或者本身去任务队列中取任务。取得任务后更新任务队列,而后执行任务,并将执行完成的任务放进已完成队列。性能
下图来自wiki:测试
在Go中有两种方式能够实现工做池:传统的互斥锁、channel。线程
假设Go中的任务的定义形式为:指针
type Task struct { ... }
每次有任务进来时,都将任务放在任务队列中。code
使用传统的互斥锁方式实现,任务队列的定义结构大概以下:blog
type Queue struct{ M sync.Mutex Tasks []Task }
而后在执行任务的函数中加上Lock()和Unlock()。例如:队列
func Worker(queue *Queue) { for { // Lock()和Unlock()之间的是critical section queue.M.Lock() // 取出任务 task := queue.Tasks[0] // 更新任务队列 queue.Tasks = queue.Tasks[1:] queue.M.Unlock() // 在此goroutine中执行任务 process(task) } }
假如在线程池中激活了100个goroutine来执行Worker()。Lock()和Unlock()保证了在同一时间点只能有一个goroutine取得任务并随之更新任务列表,取任务和更新任务队列都是critical section中的代码,它们是具备原子性。而后这个goroutine能够执行本身取得的任务。于此同时,其它goroutine能够争夺互斥锁,只要争抢到互斥锁,就能够取得任务并更新任务列表。当某个goroutine执行完process(task),它将由于for循环再次参与互斥锁的争抢。
上面只是给出了一点主要的代码段,要实现完整的线程池,还有不少额外的代码。
经过互斥锁,上面的一切操做都是线程安全的。但问题在于加锁/解锁的机制比较重量级,当worker(即goroutine)的数量足够多,锁机制的实现将出现瓶颈。
在Go中,也能用buffered channel实现工做池。
示例代码很长,因此这里先拆分解释每一部分,最后给出完整的代码段。
在下面的示例中,每一个worker的工做都是计算每一个数值的位数相加之和。例如给定一个数值234,worker则计算2+3+4=9
。这里交给worker的数值是随机生成的[0,999)范围内的数值。
这个示例有几个核心功能须要先解释,也是经过channel实现线程池的通常功能:
首先,建立Task和Result两个结构,并建立它们的通道:
type Task struct { ID int randnum int } type Result struct { task Task result int } var tasks = make(chan Task, 10) var results = make(chan Result, 10)
这里,每一个Task都有本身的ID,以及该任务将要被worker计算的随机数。每一个Result都包含了worker的计算结果result以及这个结果对应的task,这样从Result中就能够取出任务信息以及计算结果。
另外,两个通道都是buffered channel,容量都是10。每一个worker都会监听tasks通道,并取出其中的任务进行计算,而后将计算结果和任务自身放进results通道中。
而后是计算位数之和的函数process(),它将做为worker的工做任务之一。
func process(num int) int { sum := 0 for num != 0 { digit := num % 10 sum += digit num /= 10 } time.Sleep(2 * time.Second) return sum }
这个计算过程其实很简单,但随后还睡眠了2秒,用来伪装执行一个计算任务是须要一点时间的。
而后是worker(),它监听tasks通道并取出任务进行计算,并将结果放进results通道。
func worker(wg *WaitGroup){ defer wg.Done() for task := range tasks { result := Result{task, process(task.randnum)} results <- result } }
上面的代码很容易理解,只要tasks channel不关闭,就会一直监听该channel。须要注意的是,该函数使用指针类型的*WaitGroup
做为参数,不能直接使用值类型的WaitGroup
做为参数,这样会使得每一个worker都有一个本身的WaitGroup。
而后是建立工做池的函数createWorkerPool(),它有一个数值参数,表示要建立多少个worker。
func createWorkerPool(numOfWorkers int) { var wg sync.WaitGroup for i := 0; i < numOfWorkers; i++ { wg.Add(1) go worker(&wg) } wg.Wait() close(results) }
建立工做池时,首先建立一个WaitGroup的值wg,这个wg被工做池中的全部goroutine共享,每建立一个goroutine都wg.Add(1)。建立完全部的goroutine后等待全部的groutine都执行完它们的任务,只要有一个任务尚未执行完,这个函数就会被Wait()阻塞。当全部任务都执行完成后,关闭results通道,由于没有结果再须要向该通道写了。
固然,这里是否须要关闭results通道,是由稍后的range迭代这个通道决定的,不关闭这个通道会一直阻塞range,最终致使死锁。
工做池部分已经完成了。如今须要使用allocate()函数分配任务:生成一大堆的随机数,而后将Task放进tasks通道。该函数有一个表明建立任务数量的数值参数:
func allocate(numOfTasks int) { for i := 0; i < numOfTasks; i++ { randnum := rand.Intn(999) task := Task{i, randnum} tasks <- task } close(tasks) }
注意,最后须要关闭tasks通道,由于全部任务都分配完以后,没有任务再须要分配。固然,这里之因此须要关闭tasks通道,是由于worker()中使用了range迭代tasks通道,若是不关闭这个通道,worker将在取完全部任务后一直阻塞,最终致使死锁。
再接着的是取出results通道中的结果进行输出,函数名为getResult():
func getResult(done chan bool) { for result := range results { fmt.Printf("Task id %d, randnum %d , sum %d\n", result.task.id, result.task.randnum, result.result) } done <- true }
getResult()中使用了一个done参数,这个参数是一个信号通道,用来表示results中的全部结果都取出来并处理完成了,这个通道不必定要用bool类型,任何类型皆可,它不用来传数据,仅用来返回可读,因此上面直接close(done)的效果也同样。经过下面的main()函数,就能理解done信号通道的做用。
最后还差main()函数:
func main() { // 记录起始终止时间,用来测试完成全部任务耗费时长 startTime := time.Now() numOfWorkers := 20 numOfTasks := 100 // 建立任务到任务队列中 go allocate(numOfTasks) // 建立工做池 go createWorkerPool(numOfWorkers) // 取得结果 var done = make(chan bool) go getResult(done) // 若是results中还有数据,将阻塞在此 // 直到发送了信号给done通道 <- done endTime := time.Now() diff := endTime.Sub(startTime) fmt.Println("total time taken ", diff.Seconds(), "seconds") }
上面分配了20个worker,这20个worker总共须要处理的任务数量为100。但注意,不管是tasks仍是results通道,容量都是10,意味着任务队列最长只能是10个任务。
下面是完整的代码段:
package main import ( "fmt" "math/rand" "sync" "time" ) type Task struct { id int randnum int } type Result struct { task Task result int } var tasks = make(chan Task, 10) var results = make(chan Result, 10) func process(num int) int { sum := 0 for num != 0 { digit := num % 10 sum += digit num /= 10 } time.Sleep(2 * time.Second) return sum } func worker(wg *sync.WaitGroup) { defer wg.Done() for task := range tasks { result := Result{task, process(task.randnum)} results <- result } } func createWorkerPool(numOfWorkers int) { var wg sync.WaitGroup for i := 0; i < numOfWorkers; i++ { wg.Add(1) go worker(&wg) } wg.Wait() close(results) } func allocate(numOfTasks int) { for i := 0; i < numOfTasks; i++ { randnum := rand.Intn(999) task := Task{i, randnum} tasks <- task } close(tasks) } func getResult(done chan bool) { for result := range results { fmt.Printf("Task id %d, randnum %d , sum %d\n", result.task.id, result.task.randnum, result.result) } done <- true } func main() { startTime := time.Now() numOfWorkers := 20 numOfTasks := 100 var done = make(chan bool) go getResult(done) go allocate(numOfTasks) go createWorkerPool(numOfWorkers) // 必须在allocate()和getResult()以后建立工做池 <-done endTime := time.Now() diff := endTime.Sub(startTime) fmt.Println("total time taken ", diff.Seconds(), "seconds") }
执行结果:
Task id 19, randnum 914 , sum 14 Task id 9, randnum 150 , sum 6 Task id 15, randnum 215 , sum 8 ............ Task id 97, randnum 315 , sum 9 Task id 99, randnum 641 , sum 11 total time taken 10.0174705 seconds
总共花费10秒。
能够试着将任务数量、worker数量修改修改,看看它们的性能比例状况。例如,将worker数量设置为99,将须要4秒,将worker数量设置为10,将须要20秒。