Go语言的并发原语容许开发者以相似于 Unix Pipe 的方式构建数据流水线 (data pipelines),数据流水线可以高效地利用 I/O和多核 CPU 的优点。golang
本文要讲的就是一些使用流水线的一些例子,流水线的错误处理也是本文的重点。c#
数据流水线充分利用了多核特性,代码层面是基于 channel 类型 和 go 关键字。缓存
channel 和 go 贯穿本文的始终。若是你对这两个概念不太了解,建议先阅读以前公众号发布的两篇文章:Go 语言内存模型(上/下)。微信
若是你对操做系统中"生产者"和"消费者"模型比较了解的话,也将有助于对本文中流水线的理解。并发
本文中绝大多数讲解都是基于代码进行的。换句话说,若是你看不太懂某些代码片断,建议补全之后,在机器或play.golang.org 上运行一下。对于某些不明白的细节,能够手动添加一些语句以助于理解。ide
因为 Go语言并发模型 的英文原文 Go Concurrency Patterns: Pipelines and cancellation 篇幅比较长,本文只包含 理论推导和简单的例子。
下一篇文章咱们会对 "并行MD5" 这个现实生活的例子进行详细地讲解。函数
对于"流水线"这个概念,Go语言中并无正式的定义,它只是不少种并发方式的一种。这里我给出一个非官方的定义:一条流水线是 是由多个阶段组成的,相邻的两个阶段由 channel 进行链接;
每一个阶段是由一组在同一个函数中启动的 goroutine 组成。在每一个阶段,这些 goroutine 会执行下面三个操做:spa
经过 inbound channels 从上游接收数据操作系统
对接收到的数据执行一些操做,一般会生成新的数据翻译
将新生成的数据经过 outbound channels 发送给下游
除了第一个和最后一个阶段,每一个阶段均可以有任意个 inbound 和 outbound channel。
显然,第一个阶段只有 outbound channel,而最后一个阶段只有 inbound channel。
咱们一般称第一个阶段为"生产者"
或"源头"
,称最后一个阶段为"消费者"
或"接收者"
。
首先,咱们经过一个简单的例子来演示这个概念和其中的技巧。后面咱们会更出一个真实世界的例子。
假设咱们有一个流水线,它由三个阶段组成。
第一阶段是 gen 函数,它可以将一组整数转换为channel,channel 能够将数字发送出去。
gen 函数首先启动一个 goroutine,该goroutine 发送数字到 channel,当数字发送完时关闭channel。
代码以下:
func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out }
第二阶段是 sq 函数,它从 channel 接收一个整数,而后返回 一个channel,返回的channel能够发送 接收到整数的平方。
当它的 inbound channel 关闭,而且把全部数字均发送到下游时,会关闭 outbound channel。代码以下:
func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out }
main 函数 用于设置流水线并运行最后一个阶段。最后一个阶段会从第二阶段接收数字,并逐个打印出来,直到来自于上游的 inbound channel关闭。代码以下:
func main() { // 设置流水线 c := gen(2, 3) out := sq(c) // 消费输出结果 fmt.Println(<-out) // 4 fmt.Println(<-out) // 9 }
因为 sq 函数的 inbound channel 和 outbound channel 类型同样,因此组合任意个 sq 函数。好比像下面这样使用:
func main() { // 设置流水线并消费输出结果 for n := range sq(sq(gen(2, 3))) { fmt.Println(n) // 16 then 81 } }
若是咱们稍微修改一下 gen 函数,即可以模拟 haskell的惰性求值。有兴趣的读者能够本身折腾一下。
扇出:同一个 channel 能够被多个函数读取数据,直到channel关闭。
这种机制容许将工做负载分发到一组worker,以便更好地并行使用 CPU 和 I/O。
扇入:多个 channel 的数据能够被同一个函数读取和处理,而后合并到一个 channel,直到全部 channel都关闭。
下面这张图对 扇入 有一个直观的描述:
咱们修改一下上个例子中的流水线,这里咱们运行两个 sq 实例,它们从同一个 channel 读取数据。
这里咱们引入一个新函数 merge 对结果进行"扇入"操做:
func main() { in := gen(2, 3) // 启动两个 sq 实例,即两个goroutines处理 channel "in" 的数据 c1 := sq(in) c2 := sq(in) // merge 函数将 channel c1 和 c2 合并到一块儿,这段代码会消费 merge 的结果 for n := range merge(c1, c2) { fmt.Println(n) // 打印 4 9, 或 9 4 } }
merge 函数 将多个 channel 转换为一个 channel,它为每个 inbound channel 启动一个 goroutine,用于将数据
拷贝到 outbound channel。
merge 函数的实现见下面代码 (注意 wg 变量):
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // 为每个输入channel cs 建立一个 goroutine output // output 将数据从 c 拷贝到 out,直到 c 关闭,而后 调用 wg.Done output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) } // 启动一个 goroutine,用于全部 output goroutine结束时,关闭 out // 该goroutine 必须在 wg.Add 以后启动 go func() { wg.Wait() close(out) }() return out }
在上面的代码中,每一个 inbound channel 对应一个 output
函数。全部 output
goroutine 被建立之后,merge 启动一个额外的 goroutine,
这个goroutine会等待全部 inbound channel 上的发送操做结束之后,关闭 outbound channel。
对已经关闭的channel 执行发送操做(ch<-)会致使异常,因此咱们必须保证全部的发送操做都在关闭channel以前结束。
sync.WaitGroup 提供了一种组织同步的方式。
它保证 merge 中全部 inbound channel (cs ...<-chan int) 均被正常关闭, output goroutine 正常结束后,关闭 out channel。
在使用流水线函数时,有一个固定的模式:
在一个阶段,当全部发送操做 (ch<-) 结束之后,关闭 outbound channel
在一个阶段,goroutine 会持续从 inbount channel 接收数据,直到全部 inbound channel 所有关闭
在这种模式下,每个接收阶段均可以写成 range
循环的方式,
从而保证全部数据都被成功发送到下游后,goroutine可以当即退出。
在现实中,阶段并不老是接收全部的 inbound 数据。有时候是设计如此:接收者可能只须要数据的一个子集就能够继续执行。
更常见的状况是:因为前一个阶段返回一个错误,致使该阶段提早退出。
这两种状况下,接收者都不该该继续等待后面的值被传送过来。
咱们指望的结果是:当后一个阶段不须要数据时,上游阶段可以中止生产。
在咱们的例子中,若是一个阶段不能消费全部的 inbound 数据,试图发送这些数据的 goroutine 会永久阻塞。看下面这段代码片断:
// 只消费 out 的第一个数据 out := merge(c1, c2) fmt.Println(<-out) // 4 or 9 return // 因为咱们再也不接收 out 的第二个数据 // 其中一个 goroutine output 将会在发送时被阻塞 }
显然这里存在资源泄漏。一方面goroutine 消耗内存和运行时资源,另外一方面goroutine 栈中的堆引用会阻止 gc 执行回收操做。
既然goroutine 不能被回收,那么他们必须本身退出。
咱们从新整理一下流水线中的不一样阶段,保证在下游阶段接收数据失败时,上游阶段也可以正常退出。
一个方式是使用带有缓冲的管道做为 outbound channel。缓存能够存储固定个数的数据。
若是缓存没有用完,那么发送操做会当即返回。看下面这段代码示例:
c := make(chan int, 2) // 缓冲大小为 2 c <- 1 // 当即返回 c <- 2 // 当即返回 c <- 3 // 该操做会被阻塞,直到有一个 goroutine 执行 <-c,并接收到数字 1
若是在建立 channel 时就知道要发送的值的个数,使用buffer就可以简化代码。
仍然使用求平方数的例子,咱们对 gen 函数进行重写。咱们将这组整型数拷贝到一个
缓冲 channel中,从而避免建立一个新的 goroutine:
func gen(nums ...int) <-chan int { out := make(chan int, len(nums)) for _, n := range nums { out <- n } close(out) return out }
回到 流水线中被阻塞的 goroutine,咱们考虑让 merge 函数返回一个缓冲管道:
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int, 1) // 在本例中存储未读的数据足够了 // ... 其余部分代码不变 ...
尽管这种方法解决了这个程序中阻塞 goroutine的问题,可是从长远来看,它并非好办法。
缓存大小选择为1 是创建在两个前提之上:
咱们已经知道 merge 函数有两个 inbound channel
咱们已经知道下游阶段会消耗多少个值
这段代码很脆弱。若是咱们在传入一个值给 gen 函数,或者下游阶段读取的值变少,goroutine
会再次被阻塞。
为了从根本上解决这个问题,咱们须要提供一种机制,让下游阶段可以告知上游发送者中止接收的消息。
下面咱们看下这种机制。
当 main 函数决定退出,并中止接收 out 发送的任何数据时,它必须告诉上游阶段的 goroutine 让它们放弃
正在发送的数据。 main 函数经过发送数据到一个名为 done 的channel实现这样的机制。 因为有两个潜在的
发送者被阻塞,它发送两个值。以下代码所示:
func main() { in := gen(2, 3) // 启动两个运行 sq 的goroutine // 两个goroutine的数据均来自于 in c1 := sq(in) c2 := sq(in) // 消耗 output 生产的第一个值 done := make(chan struct{}, 2) out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // 告诉其余发送者,咱们将要离开 // 再也不接收它们的数据 done <- struct{}{} done <- struct{}{} }
发送数据的 goroutine 使用一个 select 表达式代替原来的操做,select 表达式只有在接收到 out 或 done
发送的数据后,才会继续进行下去。 done 的值类型为 struct{} ,由于它发送什么值不重要,重要的是它发送没发送:
接收事件发生意味着 channel out 的发送操做被丢弃。 goroutine output 基于 inbound channel c 继续执行
循环,因此上游阶段不会被阻塞。(后面咱们会讨论如何让循环提早退出)。 使用 done channel 方式实现的merge 函数以下:
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // 为 cs 的的每个 输入channel // 建立一个goroutine。output函数将 // 数据从 c 拷贝到 out,直到c关闭, // 或者接收到 done 信号; // 而后调用 wg.Done() output := func(c <-chan int) { for n := range c { select { case out <- n: case <-done: } } wg.Done() } // ... the rest is unchanged ...
这种方法有一个问题:每个下游的接收者须要知道潜在被阻上游发送者的个数,而后向这些发送者发送信号让它们提早退出。
时刻追踪这些数目是一项繁琐且易出错的工做。
咱们须要一种方式可以让未知数目、且个数不受限制的goroutine 中止向下游发送数据。在Go语言中,咱们能够经过关闭一个
channel 实现,由于在一个已关闭 channel 上执行接收操做(<-ch)老是可以当即返回,返回值是对应类型的零值
。关于这点的细节,点击这里查看。
换句话说,咱们只要关闭 done channel,就可以让解开对全部发送者的阻塞。对一个管道的关闭操做事实上是对全部接收者的广播信号。
咱们把 done channel 做为一个参数传递给每个 流水线上的函数,经过 defer 表达式声明对 done channel的关闭操做。
所以,全部从 main 函数做为源头被调用的函数均可以收到 done 的信号,每一个阶段都可以正常退出。 使用 done 对main函数重构之后,代码以下:
func main() { // 设置一个 全局共享的 done channel, // 当流水线退出时,关闭 done channel // 全部 goroutine接收到 done 的信号后, // 都会正常退出。 done := make(chan struct{}) defer close(done) in := gen(done, 2, 3) // 将 sq 的工做分发给两个goroutine // 这两个 goroutine 均从 in 读取数据 c1 := sq(done, in) c2 := sq(done, in) // 消费 outtput 生产的第一个值 out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // defer 调用时,done channel 会被关闭。 }
如今,流水线中的每一个阶段都可以在 done channel 被关闭时返回。merge 函数中的 output 代码也可以顺利返回,由于它知道 done channel关闭时,上游发送者 sq 会中止发送数据。 在 defer 表达式执行结束时,全部调用链上的 output 都能保证 wg.Done() 被调用:
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // 为 cs 的每个 channel 建立一个 goroutine // 这个 goroutine 运行 output,它将数据从 c // 拷贝到 out,直到 c 关闭,或者 接收到 done // 的关闭信号。人啊后调用 wg.Done() output := func(c <-chan int) { defer wg.Done() for n := range c { select { case out <- n: case <-done: return } } } // ... the rest is unchanged ...
一样的原理, done channel 被关闭时,sq 也可以当即返回。在defer表达式执行结束时,全部调用链上的 sq 都能保证 out channel 被关闭。代码以下:
func sq(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-done: return } } }() return out }
这里,咱们给出几条构建流水线的指导:
当全部发送操做结束时,每一个阶段都关闭本身的 outbound channels
每一个阶段都会一直从 inbound channels 接收数据,直到这些 channels 被关闭,或发送者解除阻塞状态。
流水线经过两种方式解除发送者的阻塞:
提供足够大的缓冲保存发送者发送的数据
接收者放弃 channel 时,显式地通知发送者。
本文介绍了Go 语言中构建数据流水线的一些技巧。流水线的错误处理比较复杂,流水线的每一个阶段均可能阻塞向下游发送数据,
下游阶段也可能再也不关注上游发送的数据。上面咱们介绍了经过关闭一个channel,向流水线中的全部 goroutine 发送一个 "done" 信号;也定义了
构建流水线的正确方法。
下一篇文章,咱们将经过一个 并行 md5 的例子来讲明本文所讲的一些理念和技巧。
原做者 Sameer Ajmani,翻译 Oscar
下期预告:Go语言并发模型:以并行md5计算为例。英文原文连接
Go高级并发模型:http://blog.golang.org/advanc...
扫码关注微信公众号“深刻Go语言”