- 原文地址:Part 23: Buffered Channels and Worker Pools
- 原文做者:Naveen R
- 译者:咔叽咔叽 转载请注明出处。
channel
咱们在上一个教程中讨论的全部channel
基本上都是无缓冲的。正如咱们在channel
教程中详细讨论的那样,发送和接收到无缓冲的channel
都是阻塞的。git
可使用缓冲区建立channel
。仅当缓冲区已满时才会阻塞对缓冲channel
的发送。相似地,仅当缓冲区为空时才阻塞从缓冲channel
接收。golang
能够经过添加一个capacity
参数传递给make
函数来建立缓冲channel
,该函数指定缓冲区的大小。并发
ch := make(chan type, capacity)
复制代码
对于具备缓冲区的channel
,上述语法中的容量应大于 0。默认状况下,无缓冲通道的容量为 0,所以在上一个教程中建立通道时省略了容量参数。dom
咱们来建立一个缓冲channel
,函数
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
fmt.Println(<- ch)
fmt.Println(<- ch)
}
复制代码
在上面的程序中,第 9 行咱们建立一个容量为 2 的缓冲channel
。因为channel
的容量为 2,所以能够将 2 个字符串写入而不会被阻塞。咱们在第 10 和 11 行写入 2 个字符串,随后读取了写入的字符串并打印,ui
naveen
paul
复制代码
让咱们再看一个缓冲channel
的例子,其中channel
的值写入Goroutine
并从main Goroutine
读取。这个例子将帮助咱们更好地理解什么时候写入缓冲的channel
。spa
package main
import (
"fmt"
"time"
)
func write(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("successfully wrote", i, "to ch")
}
close(ch)
}
func main() {
ch := make(chan int, 2)
go write(ch)
time.Sleep(2 * time.Second)
for v := range ch {
fmt.Println("read value", v,"from ch")
time.Sleep(2 * time.Second)
}
}
复制代码
Run in playgroudcode
上面的程序中,在第 16 行建立了容量为 2 的缓冲channel ch
。main Goroutine
将ch
传给write Goroutine
,而后main Goroutine
休眠 2 秒钟。在此期间,write Goroutine
在运行。write Goroutine
有一个 for 循环,它将 0 到 4 的数字循环写入channel ch
。因为容量为 2,所以可以将值 0 和 1 写入,而后阻塞直到从channel ch
读取至少一个值。因此这个程序会当即打印如下 2 行,协程
successfully wrote 0 to ch
successfully wrote 1 to ch
复制代码
在打印上述两行以后,write Goroutine
中的写入被阻塞,直到channel ch
的数据被读取。因为main Goroutine
会休眠 2 秒,所以程序在接下来的 2 秒内不会打印任何内容。当main Goroutine
在被唤醒后,使用for range
循环开始从channel ch
读取并打印读取值,而后再次休眠 2 秒,此循环继续,直到 ch 关闭。所以程序将在 2 秒后打印如下行,
read value 0 from ch
successfully wrote 2 to ch
复制代码
而后继续直到全部值都写入并在关闭 channel
。最终的输出是,
successfully wrote 0 to ch
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch
复制代码
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
ch <- "steve"
fmt.Println(<-ch)
fmt.Println(<-ch)
}
复制代码
Run in playgroud 在上面的程序中,咱们将 3 个字符串写入容量为 2 的缓冲channel
。当第三个字符串写入的时候已超过其容量,所以写入操做被阻塞。如今必须等待其余Goroutine
从channel
读取数据才能继续写入,但在上述代码中并无从该channel
读取数据的Goroutine
。所以会出现死锁,程序将在运行时打印如下内容,
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/tmp/sandbox274756028/main.go:11 +0x100
复制代码
容量是channel
能够容纳的值的数量。这是咱们使用make
函数建立时指定的值。
长度是当前在channel
中的元素数量。
一个程序会让理解变得简单😀
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 3)
ch <- "naveen"
ch <- "paul"
fmt.Println("capacity is", cap(ch))
fmt.Println("length is", len(ch))
fmt.Println("read value", <-ch)
fmt.Println("new length is", len(ch))
}
复制代码
在上面的程序中,建立的channel
容量为 3,即它能够容纳 3 个字符串。而后咱们分别写入 2 个字符串,如今该channel
有 2 个字符串,所以其长度为 2。 咱们从channel
中读取一个字符串。如今channel
只有一个字符串了,所以它的长度变为 1。这个程序将打印,
capacity is 3
length is 2
read value naveen
new length is 1
复制代码
本教程的下一部分是关于Worker Pools
。要了解工做池,咱们首先须要了解WaitGroup
,由于它将用于工做池的实现。
WaitGroup
用于阻塞main Goroutines
直到全部Goroutines
完成执行。好比说咱们有 3 个从main Goroutine
生成的Goroutines
须要并发执行。main Goroutines
须要等待其余 3 个Goroutines
完成才能终止,不然可能在main Goroutines
终止时,其他的Goroutines
还没能得当执行,这种场景下可使用WaitGroup
来完成。
中止理论上代码😀
package main
import (
"fmt"
"sync"
"time"
)
func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}
func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}
复制代码
WaitGroup
是一种结构类型,咱们在第 18 行建立一个WaitGroup
类型的空值变量。 WaitGroup
的工做方式是使用计数器。当咱们在WaitGroup
上调用int
型参数调用Add
方法,计数器会增长传递给Add
的值。递减计数器的方法是在WaitGroup上
调用Done
方法。 Wait
方法阻塞调用它的Goroutine
,直到计数器变为零。
在上面的程序中,咱们在第 20 行调用wg.Add(1)
循环迭代 3 次。因此计数器的值如今变成了 3。 for 循环也产生 3 个Goroutines
,main Goroutines
在第 23 行调用了wg.Wait()
以阻塞直到计数器变为零。在Goroutine
中,经过调用wg.Done
来减小计数器的值。 一旦全部 3 个生成的Goroutines
完成执行,也就是wg.Done()
被调用三次,计数器被清零,main Goroutine
被解除阻塞,程序执行完成,输出,
started Goroutine 2
started Goroutine 0
started Goroutine 1
Goroutine 0 ended
Goroutine 2 ended
Goroutine 1 ended
All go routines finished executing
复制代码
你们的输出可能与个人不一样,由于Goroutines
的执行顺序会有所不一样:)。
缓冲channel
的一个重要用途是协程池的实现。
一般,协程池是一组协程,它们等待分配给它们任务。一旦完成分配的任务,他们就会再次等待下一个任务。
咱们将使用缓冲channel
实现协程池。咱们的协程池将执行查找输入数字的数字之和的任务。例如,若是传递 234,则输出将为 9(9 = 2 + 3 + 4)。协程池的输入将是伪随机整数列表。
如下是咱们协程池的核心功能
Goroutines
池,用于监听缓冲jobs channel
,等待任务分配jobs channel
添加任务results channel
results channel
读取和打印结果咱们将逐步编写此程序,以便更容易理解。
第一步是建立表示任务和结果的结构。
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
复制代码
每一个Job
结构都有一个id
和randomno
,用来计算各个数字的总和。
Result
结构有一个job
字段和sumofdigits
字段,sumofdigits
字段用来保存job
各个数字之和的结果。
下一步是建立用于接收任务和存储结果的缓冲channel
。
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
复制代码
worker Goroutines
在任务缓冲channel
上侦听新任务。一旦任务完成,把结果写入结果缓冲channel
。
digits
函数执行查找整数的各个数字之和并返回它的。咱们为此函数添加 2 秒的休眠,以模拟此函数计算结果须要一些时间的场景。
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
复制代码
接下来将编写一个建立worker Goroutine
的函数。
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
复制代码
上面的函数建立了一个worker
,它从jobs channel
读取任务,使用当前任务和digits
函数的返回值建立Result
结构,而后将结果写入结果缓冲channel
。此函数将WaitGroup wg
做为参数,在全部任务完成后,它将调用Done
方法结束当前Goroutine
的阻塞。
createWorkerPool
函数将建立一个Goroutines
池。
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
复制代码
上面的函数将要建立的worker
数量做为参数。它在建立Goroutine
以前调用了wg.Add(1)
来增长WaitGroup
计数器。而后它经过将WaitGroup wg
的地址传递给worker
函数来建立worker Goroutines
。在建立了所需的worker Goroutines
以后,它经过调用wg.Wait()
来阻塞当前协程直到全部Goroutines
完成执行后,关闭results channel
,由于全部的Goroutines
都已完成执行,没有结果被写入该results channel
。
如今咱们已经写好了协程池,让咱们继续编写将任务分配给协程的功能。
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
复制代码
上面的allocate
函数将要建立的任务数做为输入参数,生成最大值为 998 的伪随机数,使用随机数建立Job
结构,并将 for 循环计数器的i
做为id
,而后将它们写入jobs channel
。它在写完全部任务后关闭了jobs channel
。
下一步是建立一个函数读取results channel
并打印输出。
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
复制代码
result
函数读取results channel
并打印任务 ID,输入随机数和随机数的总和。result
函数在打印全部结果后,将true
写入done channel
。
万事俱备,让咱们把上面全部的功能用main
函数串联起来。
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
复制代码
第 2 行咱们首先将程序的执行开始时间存储起来,在最后一行(第 12 行),咱们计算endTime
和startTime
之间的时间差,并显示程序的总运行时间。这是必要的,由于咱们将经过改变Goroutines
的数量来作一些基准测试。
noOfJobs
设置为 100,而后调用allocate
以将任务添加到jobs channel
。
而后建立done channel
并将其传递给results channel
,以便它能够开始打印输出并在打印完全部内容后通知。
最后,经过调用createWorkerPool
函数建立了一个 10 个work Goroutines
的池,而后main
阻塞直到done channel
写入true
值,最后打印全部结果。
下面是完整的代码。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
复制代码
请在本地计算机上运行此程序,以便计算的总时间更准确。
程序将打印,
Job id 1, input random no 636, sum of digits 15
Job id 0, input random no 878, sum of digits 23
Job id 9, input random no 150, sum of digits 6
...
total time taken 20.01081009 seconds
复制代码
对应于 100 个任务,将打印总共 100 行,将在最后一行打印该程序运行所花费的总时间。您的输出将与个人不一样,由于Goroutines
能够按任何顺序运行,总时间也会因硬件而异。在个人状况下,程序完成大约须要 20 秒。
如今让咱们将main
函数中的noOfWorkers
增长到 20。咱们将worker
的数量增长了一倍。因为work Goroutines
已经增长,程序完成所需的总时间应该减小。在个人状况下,它变成 10.004364685 秒,程序打印,
...
total time taken 10.004364685 seconds
复制代码
如今咱们了解到了随着work Goroutines
数量的增长,完成任务所需的总时间减小了。我把它留做练习,让你在主函数中使用不一样的noOfJobs
和noOfWorkers
的值执行并分析结果。