[译] part23: golang 缓冲 channel 和协程池

什么是缓冲channel

咱们在上一个教程中讨论的全部channel基本上都是无缓冲的。正如咱们在channel教程中详细讨论的那样,发送和接收到无缓冲的channel都是阻塞的。git

可使用缓冲区建立channel。仅当缓冲区已满时才会阻塞对缓冲channel的发送。相似地,仅当缓冲区为空时才阻塞从缓冲channel接收。golang

能够经过添加一个capacity参数传递给make函数来建立缓冲channel,该函数指定缓冲区的大小。并发

ch := make(chan type, capacity)
复制代码

对于具备缓冲区的channel,上述语法中的容量应大于 0。默认状况下,无缓冲通道的容量为 0,所以在上一个教程中建立通道时省略了容量参数。dom

咱们来建立一个缓冲channel函数

package main

import (
    "fmt"
)


func main() {
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println(<- ch)
    fmt.Println(<- ch)
}
复制代码

Run in playgroud测试

在上面的程序中,第 9 行咱们建立一个容量为 2 的缓冲channel。因为channel的容量为 2,所以能够将 2 个字符串写入而不会被阻塞。咱们在第 10 和 11 行写入 2 个字符串,随后读取了写入的字符串并打印,ui

naveen
paul
复制代码

另外一个例子

让咱们再看一个缓冲channel的例子,其中channel的值写入Goroutine并从main Goroutine读取。这个例子将帮助咱们更好地理解什么时候写入缓冲的channelspa

package main

import (
    "fmt"
    "time"
)

func write(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Println("successfully wrote", i, "to ch")
    }
    close(ch)
}
func main() {
    ch := make(chan int, 2)
    go write(ch)
    time.Sleep(2 * time.Second)
    for v := range ch {
        fmt.Println("read value", v,"from ch")
        time.Sleep(2 * time.Second)

    }
}
复制代码

Run in playgroudcode

上面的程序中,在第 16 行建立了容量为 2 的缓冲channel chmain Goroutinech传给write Goroutine,而后main Goroutine休眠 2 秒钟。在此期间,write Goroutine在运行。write Goroutine有一个 for 循环,它将 0 到 4 的数字循环写入channel ch。因为容量为 2,所以可以将值 0 和 1 写入,而后阻塞直到从channel ch读取至少一个值。因此这个程序会当即打印如下 2 行,协程

successfully wrote 0 to ch
successfully wrote 1 to ch
复制代码

在打印上述两行以后,write Goroutine中的写入被阻塞,直到channel ch的数据被读取。因为main Goroutine会休眠 2 秒,所以程序在接下来的 2 秒内不会打印任何内容。当main Goroutine在被唤醒后,使用for range循环开始从channel ch读取并打印读取值,而后再次休眠 2 秒,此循环继续,直到 ch 关闭。所以程序将在 2 秒后打印如下行,

read value 0 from ch
successfully wrote 2 to ch
复制代码

而后继续直到全部值都写入并在关闭 channel。最终的输出是,

successfully wrote 0 to ch
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch
复制代码

死锁

package main

import (
    "fmt"
)

func main() {
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    ch <- "steve"
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}
复制代码

Run in playgroud 在上面的程序中,咱们将 3 个字符串写入容量为 2 的缓冲channel。当第三个字符串写入的时候已超过其容量,所以写入操做被阻塞。如今必须等待其余Goroutinechannel读取数据才能继续写入,但在上述代码中并无从该channel读取数据的Goroutine。所以会出现死锁,程序将在运行时打印如下内容,

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
    /tmp/sandbox274756028/main.go:11 +0x100
复制代码

长度 VS 容量

容量是channel能够容纳的值的数量。这是咱们使用make函数建立时指定的值。

长度是当前在channel中的元素数量。

一个程序会让理解变得简单😀

package main

import (
    "fmt"
)

func main() {
    ch := make(chan string, 3)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println("capacity is", cap(ch))
    fmt.Println("length is", len(ch))
    fmt.Println("read value", <-ch)
    fmt.Println("new length is", len(ch))
}
复制代码

Run in playgroud

在上面的程序中,建立的channel容量为 3,即它能够容纳 3 个字符串。而后咱们分别写入 2 个字符串,如今该channel有 2 个字符串,所以其长度为 2。 咱们从channel中读取一个字符串。如今channel只有一个字符串了,所以它的长度变为 1。这个程序将打印,

capacity is 3
length is 2
read value naveen
new length is 1
复制代码

WaitGroup

本教程的下一部分是关于Worker Pools。要了解工做池,咱们首先须要了解WaitGroup,由于它将用于工做池的实现。

WaitGroup用于阻塞main Goroutines直到全部Goroutines完成执行。好比说咱们有 3 个从main Goroutine生成的Goroutines须要并发执行。main Goroutines须要等待其余 3 个Goroutines完成才能终止,不然可能在main Goroutines终止时,其他的Goroutines还没能得当执行,这种场景下可使用WaitGroup来完成。

中止理论上代码😀

package main

import (
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}

func main() {
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")
}
复制代码

Run in playgroud

WaitGroup是一种结构类型,咱们在第 18 行建立一个WaitGroup类型的空值变量。 WaitGroup的工做方式是使用计数器。当咱们在WaitGroup上调用int型参数调用Add 方法,计数器会增长传递给Add的值。递减计数器的方法是在WaitGroup上调用Done方法。 Wait方法阻塞调用它的Goroutine,直到计数器变为零。

在上面的程序中,咱们在第 20 行调用wg.Add(1)循环迭代 3 次。因此计数器的值如今变成了 3。 for 循环也产生 3 个Goroutinesmain Goroutines在第 23 行调用了wg.Wait()以阻塞直到计数器变为零。在Goroutine中,经过调用wg.Done来减小计数器的值。 一旦全部 3 个生成的Goroutines完成执行,也就是wg.Done()被调用三次,计数器被清零,main Goroutine被解除阻塞,程序执行完成,输出,

started Goroutine  2
started Goroutine  0
started Goroutine  1
Goroutine 0 ended
Goroutine 2 ended
Goroutine 1 ended
All go routines finished executing
复制代码

你们的输出可能与个人不一样,由于Goroutines的执行顺序会有所不一样:)。

协程池的实现

缓冲channel的一个重要用途是协程池的实现。

一般,协程池是一组协程,它们等待分配给它们任务。一旦完成分配的任务,他们就会再次等待下一个任务。

咱们将使用缓冲channel实现协程池。咱们的协程池将执行查找输入数字的数字之和的任务。例如,若是传递 234,则输出将为 9(9 = 2 + 3 + 4)。协程池的输入将是伪随机整数列表。

如下是咱们协程池的核心功能

  • 建立一个Goroutines池,用于监听缓冲jobs channel,等待任务分配
  • jobs channel添加任务
  • 任务完成后,将结果写入缓冲results channel
  • results channel读取和打印结果

咱们将逐步编写此程序,以便更容易理解。

第一步是建立表示任务和结果的结构。

type Job struct {
    id       int
    randomno int
}

type Result struct {
    job         Job
    sumofdigits int
}
复制代码

每一个Job结构都有一个idrandomno,用来计算各个数字的总和。

Result结构有一个job字段和sumofdigits字段,sumofdigits字段用来保存job各个数字之和的结果。

下一步是建立用于接收任务和存储结果的缓冲channel

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
复制代码

worker Goroutines在任务缓冲channel上侦听新任务。一旦任务完成,把结果写入结果缓冲channel

digits函数执行查找整数的各个数字之和并返回它的。咱们为此函数添加 2 秒的休眠,以模拟此函数计算结果须要一些时间的场景。

func digits(number int) int {
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
复制代码

接下来将编写一个建立worker Goroutine的函数。

func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
复制代码

上面的函数建立了一个worker,它从jobs channel读取任务,使用当前任务和digits函数的返回值建立Result结构,而后将结果写入结果缓冲channel。此函数将WaitGroup wg做为参数,在全部任务完成后,它将调用Done方法结束当前Goroutine的阻塞。

createWorkerPool函数将建立一个Goroutines池。

func createWorkerPool(noOfWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
复制代码

上面的函数将要建立的worker数量做为参数。它在建立Goroutine以前调用了wg.Add(1)来增长WaitGroup计数器。而后它经过将WaitGroup wg的地址传递给worker函数来建立worker Goroutines。在建立了所需的worker Goroutines以后,它经过调用wg.Wait()来阻塞当前协程直到全部Goroutines完成执行后,关闭results channel,由于全部的Goroutines都已完成执行,没有结果被写入该results channel

如今咱们已经写好了协程池,让咱们继续编写将任务分配给协程的功能。

func allocate(noOfJobs int) {
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
复制代码

上面的allocate函数将要建立的任务数做为输入参数,生成最大值为 998 的伪随机数,使用随机数建立Job结构,并将 for 循环计数器的i做为id,而后将它们写入jobs channel。它在写完全部任务后关闭了jobs channel

下一步是建立一个函数读取results channel并打印输出。

func result(done chan bool) {
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
复制代码

result函数读取results channel并打印任务 ID,输入随机数和随机数的总和。result函数在打印全部结果后,将true写入done channel

万事俱备,让咱们把上面全部的功能用main函数串联起来。

func main() {
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
复制代码

第 2 行咱们首先将程序的执行开始时间存储起来,在最后一行(第 12 行),咱们计算endTimestartTime之间的时间差,并显示程序的总运行时间。这是必要的,由于咱们将经过改变Goroutines的数量来作一些基准测试。

noOfJobs设置为 100,而后调用allocate以将任务添加到jobs channel

而后建立done channel并将其传递给results channel,以便它能够开始打印输出并在打印完全部内容后通知。

最后,经过调用createWorkerPool函数建立了一个 10 个work Goroutines的池,而后main阻塞直到done channel写入true值,最后打印全部结果。

下面是完整的代码。

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {
    id       int
    randomno int
}
type Result struct {
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

func digits(number int) int {
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func createWorkerPool(noOfWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
func allocate(noOfJobs int) {
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main() {
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
复制代码

Run in playgroud

请在本地计算机上运行此程序,以便计算的总时间更准确。

程序将打印,

Job id 1, input random no 636, sum of digits 15
Job id 0, input random no 878, sum of digits 23
Job id 9, input random no 150, sum of digits 6
...
total time taken  20.01081009 seconds
复制代码

对应于 100 个任务,将打印总共 100 行,将在最后一行打印该程序运行所花费的总时间。您的输出将与个人不一样,由于Goroutines能够按任何顺序运行,总时间也会因硬件而异。在个人状况下,程序完成大约须要 20 秒。

如今让咱们将main函数中的noOfWorkers增长到 20。咱们将worker的数量增长了一倍。因为work Goroutines已经增长,程序完成所需的总时间应该减小。在个人状况下,它变成 10.004364685 秒,程序打印,

...
total time taken  10.004364685 seconds
复制代码

如今咱们了解到了随着work Goroutines数量的增长,完成任务所需的总时间减小了。我把它留做练习,让你在主函数中使用不一样的noOfJobsnoOfWorkers的值执行并分析结果。

相关文章
相关标签/搜索