做者:Sameer Ajmani | 原文:blog.golang.org/pipelines算法
这篇文章来自 Go 官网,不愧是官方的博客,写的很是详细。在开始翻译这篇文章前,先简单说明两点。编程
首先,这篇文章我以前已经翻译过一遍,但最近再读,发现以前的翻译真是有点烂。因而,决定在彻底不参考以前译文的状况下,把这篇文章从新翻译一遍。bash
其二,文章中有一些专有名字,计划仍是用英文来表达,以保证原汁原味,好比 pipeline(管道)、stage (阶段)、goroutine (协程)、channel (通道)。并发
关于它们之间的关系,按本身的理解简单画了张草图,但愿能帮助更好地理解它们之间的关系。以下:app
强调一点,若是你们在阅读这篇文章时,感到了迷糊,建议能够回头再看一下这张图。分布式
翻译的正文部分以下。函数
Go 的并发原语使咱们很是轻松地就构建出能够高效利用 IO 和多核 CPU 的流式数据 pipeline。这篇文章将会此为基础进行介绍。在这个过程当中,咱们将会遇到一些异常状况,关于它们的处理方法,文中也会详细介绍。工具
关于什么是 pipeline, Go 中并无给出明确的定义,它只是众多并发编程方式中的一种。非正式的解释,咱们理解为,它是由一系列经过 chanel 链接起来的 stage 组成,而每一个 stage 都是由一组运行着相同函数的 goroutine 组成。每一个 stage 的 goroutine 一般会执行以下的一些工做:oop
除了第一个 stage 和最后一个 stage ,每一个 stage 都包含必定数量的输入和输出 channel。第一个 stage 只有输出,一般会把它称为 "生产者",最后一个 stage 只有输入,一般咱们会把它称为 "消费者"。
咱们先来看一个很简单例子,经过它来解释上面提到那些与 pipeline 相关的概念和技术。了解了这些后,咱们再看其它的更实际的例子。
一个涉及三个 stage 的 pipeline。
第一个 stage,gen 函数。它负责将把从参数中拿到的一系列整数发送给指定 channel。它启动了一个 goroutine 来发送数据,当数据所有发送结束,channel 会被关闭。
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
复制代码
第二个 stage,sq 函数。它负责从输入 channel 中接收数据,并会返回一个新的 channel,即输出 channel,它负责将通过平方处理过的数据传输给下游。当输入 channel 关闭,而且全部数据都已发送到下游,就能够关闭这个输出 channel 了。
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
复制代码
main 函数负责建立 pipeline 并执行最后一个 stage 的任务。它将从第二个 stage 接收数据,并将它们打印出来,直到 channel 关闭。
func main() {
// Set up the pipeline.
c := gen(2, 3)
out := sq(c)
// Consume the output.
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}
复制代码
既然,sq 的输入和输出的 channel 类型相同,那么咱们就能够把它进行组合,从而造成多个 stage。好比,咱们能够把 main 函数重写为以下的形式:
func main() {
// Set up the pipeline and consume the output.
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}
复制代码
当多个函数从一个 channel 中读取数据,直到 channel 关闭,这称为扇出 fan-out。利用它,咱们能够实现了一种分布式的工做方式,经过一组 workers 实现并行的 CPU 和 IO。
当一个函数从多个 channel 中读取数据,直到全部 channel 关闭,这称为扇入 fan-in。扇入是经过将多个输入 channel 的数据合并到同一个输出 channel 实现的,当全部的输入 channel 关闭,输出的 channel 也将关闭。
咱们来改变一下上面例子中的 pipeline,在它上面运行两个 sq 函数试试。它们将都从同一个输入 channel 中读取数据。咱们引入了一个新的函数,merge,负责 fan-in 处理结果,即 merge 两个 sq 的处理结果。
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
// 分布式处理来自 in channel 的数据
c1 := sq(in)
c2 := sq(in)
// Consume the merged output from c1 and c2.
// 从 channel c1 和 c2 的合并后的 channel 中接收数据
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
复制代码
merge 函数负责将从一系列输入 channel 中接收的数据合并到一个 channel 中。它为每一个输入 channel 都启动了一个 goroutine,并将它们中接收到的值发送到唯一的输出 channel 中。在全部的 goroutines 启动后,还会再另外启动一个 goroutine,它的做用是,当全部的输入 channel 关闭后,负责关闭惟一的输出 channel 。
在已关闭的 channel 发送数据将致使 panic,所以要保证在关闭 channel 前,全部数据都发送完成,是很是重要的。sync.WaitGroup 提供了一种很是简单的方式来完成这样的同步。
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
// 为每一个输入 channel 启动一个 goroutine
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
// 启动一个 goroutine 负责在全部的输入 channel 关闭后,关闭这个惟一的输出 channel
go func() {
wg.Wait()
close(out)
}()
return out
}
复制代码
pipeline 中的函数包含一个模式:
咱们能够经过编写 range loop 来保证全部 goroutine 是在全部数据都已经发送到下游的时候退出。
但在一个真实的场景下,每一个 stage 都接收完 channel 中的全部数据,是不可能的。有时,咱们的设计是:接收方只须要接收数据的部分子集便可。更常见的,若是 channel 在上游的 stage 出现了错误,那么,当前 stage 就应该提前退出。不管如何,接收方都不应再继续等待接收 channel 中的剩余数据,并且,此时上游应该中止生产数据,毕竟下游已经不须要了。
咱们的例子中,即便 stage 没有成功消费完全部的数据,上游 stage 依然会尝试给下游发送数据,这将会致使程序永久阻塞。
// Consume the first value from the output.
// 从 output 中接收了第一个数据
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// Since we didn't receive the second value from out,
// one of the output goroutines is hung attempting to send it.
// 咱们并无从 out channel 中接收第二个数据,
// 因此上游的其中一个 goroutine 在尝试向下游发送数据时将会被挂起。
}
复制代码
这是一种资源泄露,goroutine 是须要消耗内存和运行时资源的,goroutine 栈中的堆引用信息也是不会被 gc。
咱们须要提供一种措施,即便当下游从上游接收数据时发生异常,上游也能成功退出。一种方式是,把 channel 改成带缓冲的 channel,这样,它就能够承载指定数量的数据,若是 buffer channel 还有空间,数据的发送将会马上完成。
// 缓冲大小 2 buffer size 2
c := make(chan int, 2)
// 发送马上成功 succeeds immediately
c <- 1
// 发送马上成功 succeeds immediately
c <- 2
//blocks until another goroutine does <-c and receives 1
// 阻塞,直到另外一个 goroutine 从 c 中接收数据
c <- 3
复制代码
若是咱们在建立 channel 时已经知道将发送的数据量,就能够把前面的代码简化一下。好比,重写 gen 函数,将数据都发送至一个 buffer channel,这还能避免建立新的 goroutine。
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
复制代码
译者按:channel 关闭后,不可再写入数据,不然会 panic,可是仍可读取已发送数据,并且能够一直读取 0 值。
继续往下游 stage,将又会返回到阻塞的 goroutine 中,咱们也能够考虑给 merge 的输出 channel 加点缓冲。
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
// enough space for the unread inputs
// 给未读的输入 channel 预留足够的空间
out := make(chan int, 1)
// ... the rest is unchanged ...
复制代码
虽然经过这个方法,咱们能解决了 goroutine 阻塞的问题,可是这并不是一个优秀的设计。好比 merge 中的 buffer 的大小 1 是基于咱们已经知道了接下来接收数据的大小,以及下游将能消费的数量。很明显,这种设计很是脆弱,若是上游多发送了一些数据,或下游并没接收那么多的数据,goroutine 将又会被阻塞。
于是,当下游再也不准备接收上游的数据时,须要有一种方式,能够通知到上游。
若是 main 函数在没把 out 中全部数据接收完就退出,它必需要通知上游中止继续发送数据。如何作到?咱们能够在上下游之间引入一个新的 channel,一般称为 done。
示例中有两个可能阻塞的 goroutine,因此, done 须要发送两个值来通知它们。
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the first value from output.
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// Tell the remaining senders we're leaving.
// 通知发送方,咱们已经中止接收数据了
done <- struct{}{}
done <- struct{}{}
}
复制代码
发送方 merge 用 select 语句替换了以前的发送操做,它负责经过 out channel 发送数据或者从 done 接收数据。done 接收的值是没有实际意义的,只是表示 out 应该中止继续发送数据了,用空 struct 便可。output 函数将会不停循环,由于上游,即 sq ,并无阻塞。咱们过会再讨论如何退出这个循环。
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed or it receives a value
// from done, then output calls wg.Done.
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// ... the rest is unchanged ...
复制代码
这种方法有个问题,下游只有知道了上游可能阻塞的 goroutine 数量,才能向每一个 goroutine 都发送了一个 done 信号,从而确保它们都能成功退出。但多维护一个 count 是很使人讨厌的,并且很容易出错。
咱们须要一种方式,能够告诉上游的全部 goroutine 中止向下游继续发送信息。在 Go 中,其实可经过关闭 channel 实现,由于在一个已关闭的 channel 接收数据会马上返回,而且会获得一个零值。
这也就意味着,main 仅需经过关闭 done channel,就可让全部的发送方解除阻塞。关闭操做至关于一个广播信号。为确保任意返回路径下都成功调用,咱们能够经过 defer 语句关闭 done。
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
// 为每一个输入 channel 启动一个 goroutine,将输入 channel 中的数据拷贝到
// out channel 中,直到输入 channel,即 c,或 done 关闭。
// 接着,退出循环并执行 wg.Done()
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// ... the rest is unchanged ...
复制代码
一样地,一旦 done 关闭,sq 也将退出。sq 也是经过 defer 语句来确保本身的输出 channel,即 out,必定被成功关闭释放。
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
复制代码
都这里,Go 中如何构建一个 pipeline,已经介绍的差很少了。
简单总结下如何正确构建一个 pipeline。
Pipeline 中有量方式能够解除发送方的阻塞,一是发送方建立充足空间的 channel 来发送数据,二是当接收方中止接收数据时,明确通知发送方。
一个真实的案例。
MD5,消息摘要算法,可用于文件校验和的计算。下面的输出是命令行工具 md5sum 输出的文件摘要信息。
$ md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
复制代码
咱们的例子和 md5sum 相似,不一样的是,传递给这个程序的参数是一个目录。程序的输出是目录下每一个文件的摘要值,输出的顺序按文件名排序。
$ go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
复制代码
主函数,第一步调用 MD5All,它返回的是一个以文件名为 key,摘要值为 value 的 map,而后对返回结果进行排序和打印。
func main() {
// Calculate the MD5 sum of all files under the specified directory,
// then print the results sorted by path name.
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
复制代码
MD5All 函数将是咱们接下来讨论的重点。串行版的实现没有并发,仅仅是从文件中读取数据再计算。
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}
复制代码
在 并行版 中,咱们会把 MD5All 的计算拆分开含有两个 stage 的 pipeline。第一个 stage,sumFiles,负责遍历目录和计算文件摘要值,摘要的计算会启动一个 goroutine 来执行,计算结果将经过一个类型 result 的 channel 发出。
type result struct {
path string
sum [md5.Size]byte
err error
}
复制代码
sumFiles 返回了 2 个 channel,一个用于接收计算的结果,一个用于接收 filepath.Walk 的 err 返回。walk 会为每一个文件启动一个 goroutine 执行摘要计算和检查 done。若是 done 关闭,walk 将马上中止。
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// For each regular file, start a goroutine that sums the file and sends
// the result on c. Send the result of the walk on errc.
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// Abort the walk if done is closed.
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Walk has returned, so all calls to wg.Add are done. Start a
// goroutine to close c once all the sends are done.
go func() {
wg.Wait()
close(c)
}()
// No select needed here, since errc is buffered.
// 不须要使用 select,由于 errc 是带有 buffer 的 channel
errc <- err
}()
return c, errc
}
复制代码
MD5All 将从 c channel 中接收计算的结果,若是发生错误,将经过 defer 关闭 done。
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
复制代码
在 并行版本 中,MD5All 为每一个文件启动了一个 goroutine。但若是一个目录中文件太多,这可能会致使分配的内存过大以致于超过了当前机器的限制。
咱们能够经过限制并行读取的文件数,限制内存分配。在 并发限制版本中,咱们建立了固定数量的 goroutine 读取文件。如今,咱们的 pipeline 涉及 3 个 stage:遍历目录、文件读取与摘要计算、结果收集。
第一个 stage,遍历目录并经过 paths channel 发出文件。
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// Close the paths channel after Walk returns.
defer close(paths)
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
return paths, errc
}
复制代码
第二个 stage,启动固定数量的 goroutine,从 paths channel 中读取文件名称,处理结果发送到 c channel。
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
复制代码
和以前的例子不一样,digester 将不会关闭 c channel,由于多个 goroutine 共享这个 channel,计算结果都将发给这个 channel 上。
相应地,MD5All 会负责在全部摘要完成后关闭这个 c channel。
// Start a fixed number of goroutines to read and digest files.
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
复制代码
咱们也能够为每一个 digester 建立一个单独的 channel,经过本身的 channel 传输结果。但这种方式,咱们还要再启动一个新的 goroutine 合并结果。
最后一个 stage,负责从 c 中接收处理结果,经过 errc 检查是否有错误发生。该检查没法提早进行,由于提早执行将会阻塞 walkFile 往下游发送数据。
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// Check whether the Walk failed.
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
复制代码
这篇文章介绍了,在 Go 中如何正确地构建流式数据 pipeline。它的异常处理很是复杂,pipeline 中的每一个 stage 均可能致使上游阻塞,而下游可能再也不关心接下来的数据。关闭 channel 能够给全部运行中的 goroutine 发送 done 信号,这能帮助咱们成功解除阻塞。如何正确地构建一条流式数据 pipeline,文中也总结了一些指导建议。