Go基础系列:Go实现工做池的两种方式(一)

worker pool简介

worker pool其实就是线程池thread pool。对于go来讲,直接使用的是goroutine而非线程,不过这里仍然以线程来解释线程池。git

在线程池模型中,有2个队列一个池子:任务队列、已完成任务队列和线程池。其中已完成任务队列可能存在也可能不存在,依据实际需求而定。安全

只要有任务进来,就会放进任务队列中。只要线程执行完了一个任务,就将任务放进已完成任务队列,有时候还会将任务的处理结果也放进已完成队列中。函数

worker pool中包含了一堆的线程(worker,对go而言每一个worker就是一个goroutine),这些线程嗷嗷待哺,等待着为它们分配任务,或者本身去任务队列中取任务。取得任务后更新任务队列,而后执行任务,并将执行完成的任务放进已完成队列。spa

下图来自wiki:线程

在Go中有两种方式能够实现工做池:传统的互斥锁、channel。指针

传统互斥锁机制的工做池

假设Go中的任务的定义形式为:code

type Task struct {
    ...
}

每次有任务进来时,都将任务放在任务队列中。blog

使用传统的互斥锁方式实现,任务队列的定义结构大概以下:队列

type Queue struct{
    M     sync.Mutex
    Tasks []Task
}

而后在执行任务的函数中加上Lock()和Unlock()。例如:it

func Worker(queue *Queue) {
    for {
        // Lock()和Unlock()之间的是critical section
        queue.M.Lock()
        // 取出任务
        task := queue.Tasks[0]
        // 更新任务队列
        queue.Tasks = queue.Tasks[1:]
        queue.M.Unlock()
        // 在此goroutine中执行任务
        process(task)
    }
}

假如在线程池中激活了100个goroutine来执行Worker()。Lock()和Unlock()保证了在同一时间点只能有一个goroutine取得任务并随之更新任务列表,取任务和更新任务队列都是critical section中的代码,它们是具备原子性。而后这个goroutine能够执行本身取得的任务。于此同时,其它goroutine能够争夺互斥锁,只要争抢到互斥锁,就能够取得任务并更新任务列表。当某个goroutine执行完process(task),它将由于for循环再次参与互斥锁的争抢。

上面只是给出了一点主要的代码段,要实现完整的线程池,还有不少额外的代码。

经过互斥锁,上面的一切操做都是线程安全的。但问题在于加锁/解锁的机制比较重量级,当worker(即goroutine)的数量足够多,锁机制的实现将出现瓶颈。

经过buffered channel实现工做池

在Go中,也能用buffered channel实现工做池。

示例代码很长,因此这里先拆分解释每一部分,最后给出完整的代码段。

在下面的示例中,每一个worker的工做都是计算每一个数值的位数相加之和。例如给定一个数值234,worker则计算2+3+4=9。这里交给worker的数值是随机生成的[0,999)范围内的数值。

这个示例有几个核心功能须要先解释,也是经过channel实现线程池的通常功能:

  • 建立一个task buffered channel,并经过allocate()函数将生成的任务存放到task buffered channel中
  • 建立一个goroutine pool,每一个goroutine监听task buffered channel,并从中取出任务
  • goroutine执行任务后,将结果写入到result buffered channel中
  • 从result buffered channel中取出计算结果并输出

首先,建立Task和Result两个结构,并建立它们的通道:

type Task struct {
    ID      int
    randnum int
}

type Result struct {
    task    Task
    result  int
}

var tasks = make(chan Task, 10)
var results = make(chan Result, 10)

这里,每一个Task都有本身的ID,以及该任务将要被worker计算的随机数。每一个Result都包含了worker的计算结果result以及这个结果对应的task,这样从Result中就能够取出任务信息以及计算结果。

另外,两个通道都是buffered channel,容量都是10。每一个worker都会监听tasks通道,并取出其中的任务进行计算,而后将计算结果和任务自身放进results通道中。

而后是计算位数之和的函数process(),它将做为worker的工做任务之一。

func process(num int) int {
    sum := 0
    for num != 0 {
        digit := num % 10
        sum += digit
        num /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}

这个计算过程其实很简单,但随后还睡眠了2秒,用来伪装执行一个计算任务是须要一点时间的。

而后是worker(),它监听tasks通道并取出任务进行计算,并将结果放进results通道。

func worker(wg *WaitGroup){
    defer wg.Done()
    for task := range tasks {
        result := Result{task, process(task.randnum)}
        results <- result
    }
}

上面的代码很容易理解,只要tasks channel不关闭,就会一直监听该channel。须要注意的是,该函数使用指针类型的*WaitGroup做为参数,不能直接使用值类型的WaitGroup做为参数,这样会使得每一个worker都有一个本身的WaitGroup。

而后是建立工做池的函数createWorkerPool(),它有一个数值参数,表示要建立多少个worker。

func createWorkerPool(numOfWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < numOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}

建立工做池时,首先建立一个WaitGroup的值wg,这个wg被工做池中的全部goroutine共享,每建立一个goroutine都wg.Add(1)。建立完全部的goroutine后等待全部的groutine都执行完它们的任务,只要有一个任务尚未执行完,这个函数就会被Wait()阻塞。当全部任务都执行完成后,关闭results通道,由于没有结果再须要向该通道写了。

固然,这里是否须要关闭results通道,是由稍后的range迭代这个通道决定的,不关闭这个通道会一直阻塞range,最终致使死锁。

工做池部分已经完成了。如今须要使用allocate()函数分配任务:生成一大堆的随机数,而后将Task放进tasks通道。该函数有一个表明建立任务数量的数值参数:

func allocate(numOfTasks int
相关文章
相关标签/搜索