Go 译文之如何构建并发 Pipeline

本文首发于个人博客,若是以为有用,欢迎点赞收藏,让更多的朋友看到。golang

做者:Sameer Ajmani | 原文:blog.golang.org/pipelines算法

译者前言

这篇文章来自 Go 官网,不愧是官方的博客,写的很是详细。在开始翻译这篇文章前,先简单说明两点。编程

首先,这篇文章我以前已经翻译过一遍,但最近再读,发现以前的翻译真是有点烂。因而,决定在彻底不参考以前译文的状况下,把这篇文章从新翻译一遍。bash

其二,文章中有一些专有名字,计划仍是用英文来表达,以保证原汁原味,好比 pipeline(管道)、stage (阶段)、goroutine (协程)、channel (通道)。并发

关于它们之间的关系,按本身的理解简单画了张草图,但愿能帮助更好地理解它们之间的关系。以下:app

强调一点,若是你们在阅读这篇文章时,感到了迷糊,建议能够回头再看一下这张图。分布式

翻译的正文部分以下。函数


Go 的并发原语使咱们很是轻松地就构建出能够高效利用 IO 和多核 CPU 的流式数据 pipeline。这篇文章将会此为基础进行介绍。在这个过程当中,咱们将会遇到一些异常状况,关于它们的处理方法,文中也会详细介绍。工具

什么是管道(pipleline)

关于什么是 pipeline, Go 中并无给出明确的定义,它只是众多并发编程方式中的一种。非正式的解释,咱们理解为,它是由一系列经过 chanel 链接起来的 stage 组成,而每一个 stage 都是由一组运行着相同函数的 goroutine 组成。每一个 stage 的 goroutine 一般会执行以下的一些工做:oop

  • 从上游的输入 channel 中接收数据;
  • 对接收到的数据进行一些处理,(一般)并产生新的数据;
  • 将数据经过输出 channel 发送给下游;

除了第一个 stage 和最后一个 stage ,每一个 stage 都包含必定数量的输入和输出 channel。第一个 stage 只有输出,一般会把它称为 "生产者",最后一个 stage 只有输入,一般咱们会把它称为 "消费者"。

咱们先来看一个很简单例子,经过它来解释上面提到那些与 pipeline 相关的概念和技术。了解了这些后,咱们再看其它的更实际的例子。

计算平方数

一个涉及三个 stage 的 pipeline。

第一个 stage,gen 函数。它负责将把从参数中拿到的一系列整数发送给指定 channel。它启动了一个 goroutine 来发送数据,当数据所有发送结束,channel 会被关闭。

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}
复制代码

第二个 stage,sq 函数。它负责从输入 channel 中接收数据,并会返回一个新的 channel,即输出 channel,它负责将通过平方处理过的数据传输给下游。当输入 channel 关闭,而且全部数据都已发送到下游,就能够关闭这个输出 channel 了。

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}
复制代码

main 函数负责建立 pipeline 并执行最后一个 stage 的任务。它将从第二个 stage 接收数据,并将它们打印出来,直到 channel 关闭。

func main() {
    // Set up the pipeline.
    c := gen(2, 3)
    out := sq(c)

    // Consume the output.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}
复制代码

既然,sq 的输入和输出的 channel 类型相同,那么咱们就能够把它进行组合,从而造成多个 stage。好比,咱们能够把 main 函数重写为以下的形式:

func main() {
    // Set up the pipeline and consume the output.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 then 81
    }
}
复制代码

扇出和扇入(Fan-out and Fan-in)

当多个函数从一个 channel 中读取数据,直到 channel 关闭,这称为扇出 fan-out。利用它,咱们能够实现了一种分布式的工做方式,经过一组 workers 实现并行的 CPU 和 IO。

当一个函数从多个 channel 中读取数据,直到全部 channel 关闭,这称为扇入 fan-in。扇入是经过将多个输入 channel 的数据合并到同一个输出 channel 实现的,当全部的输入 channel 关闭,输出的 channel 也将关闭。

咱们来改变一下上面例子中的 pipeline,在它上面运行两个 sq 函数试试。它们将都从同一个输入 channel 中读取数据。咱们引入了一个新的函数,merge,负责 fan-in 处理结果,即 merge 两个 sq 的处理结果。

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    // 分布式处理来自 in channel 的数据
    c1 := sq(in)
    c2 := sq(in)

    // Consume the merged output from c1 and c2.
    // 从 channel c1 和 c2 的合并后的 channel 中接收数据
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}
复制代码

merge 函数负责将从一系列输入 channel 中接收的数据合并到一个 channel 中。它为每一个输入 channel 都启动了一个 goroutine,并将它们中接收到的值发送到唯一的输出 channel 中。在全部的 goroutines 启动后,还会再另外启动一个 goroutine,它的做用是,当全部的输入 channel 关闭后,负责关闭惟一的输出 channel 。

在已关闭的 channel 发送数据将致使 panic,所以要保证在关闭 channel 前,全部数据都发送完成,是很是重要的。sync.WaitGroup 提供了一种很是简单的方式来完成这样的同步。

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs. output
    // copies values from c to out until c is closed, then calls wg.Done.
    // 为每一个输入 channel 启动一个 goroutine
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done. This must start after the wg.Add call.
    // 启动一个 goroutine 负责在全部的输入 channel 关闭后,关闭这个惟一的输出 channel
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}
复制代码

中途中止

pipeline 中的函数包含一个模式:

  • 当数据发送完成,每一个 stage 都应该关闭它们的输入 channel;
  • 只要输入 channel 没有关闭,每一个 stage 就要持续从中接收数据;

咱们能够经过编写 range loop 来保证全部 goroutine 是在全部数据都已经发送到下游的时候退出。

但在一个真实的场景下,每一个 stage 都接收完 channel 中的全部数据,是不可能的。有时,咱们的设计是:接收方只须要接收数据的部分子集便可。更常见的,若是 channel 在上游的 stage 出现了错误,那么,当前 stage 就应该提前退出。不管如何,接收方都不应再继续等待接收 channel 中的剩余数据,并且,此时上游应该中止生产数据,毕竟下游已经不须要了。

咱们的例子中,即便 stage 没有成功消费完全部的数据,上游 stage 依然会尝试给下游发送数据,这将会致使程序永久阻塞。

// Consume the first value from the output.
    // 从 output 中接收了第一个数据
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // Since we didn't receive the second value from out,
    // one of the output goroutines is hung attempting to send it.
    // 咱们并无从 out channel 中接收第二个数据,
    // 因此上游的其中一个 goroutine 在尝试向下游发送数据时将会被挂起。
}
复制代码

这是一种资源泄露,goroutine 是须要消耗内存和运行时资源的,goroutine 栈中的堆引用信息也是不会被 gc。

咱们须要提供一种措施,即便当下游从上游接收数据时发生异常,上游也能成功退出。一种方式是,把 channel 改成带缓冲的 channel,这样,它就能够承载指定数量的数据,若是 buffer channel 还有空间,数据的发送将会马上完成。

// 缓冲大小 2 buffer size 2 
c := make(chan int, 2)
// 发送马上成功 succeeds immediately 
c <- 1
// 发送马上成功 succeeds immediately
c <- 2 
//blocks until another goroutine does <-c and receives 1
// 阻塞,直到另外一个 goroutine 从 c 中接收数据
c <- 3
复制代码

若是咱们在建立 channel 时已经知道将发送的数据量,就能够把前面的代码简化一下。好比,重写 gen 函数,将数据都发送至一个 buffer channel,这还能避免建立新的 goroutine。

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}
复制代码

译者按:channel 关闭后,不可再写入数据,不然会 panic,可是仍可读取已发送数据,并且能够一直读取 0 值。

继续往下游 stage,将又会返回到阻塞的 goroutine 中,咱们也能够考虑给 merge 的输出 channel 加点缓冲。

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup

    // enough space for the unread inputs
    // 给未读的输入 channel 预留足够的空间
    out := make(chan int, 1)    
    // ... the rest is unchanged ...
复制代码

虽然经过这个方法,咱们能解决了 goroutine 阻塞的问题,可是这并不是一个优秀的设计。好比 merge 中的 buffer 的大小 1 是基于咱们已经知道了接下来接收数据的大小,以及下游将能消费的数量。很明显,这种设计很是脆弱,若是上游多发送了一些数据,或下游并没接收那么多的数据,goroutine 将又会被阻塞。

于是,当下游再也不准备接收上游的数据时,须要有一种方式,能够通知到上游。

明确的取消

若是 main 函数在没把 out 中全部数据接收完就退出,它必需要通知上游中止继续发送数据。如何作到?咱们能够在上下游之间引入一个新的 channel,一般称为 done。

示例中有两个可能阻塞的 goroutine,因此, done 须要发送两个值来通知它们。

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the first value from output.
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // Tell the remaining senders we're leaving.
    // 通知发送方,咱们已经中止接收数据了
    done <- struct{}{}
    done <- struct{}{}
}
复制代码

发送方 merge 用 select 语句替换了以前的发送操做,它负责经过 out channel 发送数据或者从 done 接收数据。done 接收的值是没有实际意义的,只是表示 out 应该中止继续发送数据了,用空 struct 便可。output 函数将会不停循环,由于上游,即 sq ,并无阻塞。咱们过会再讨论如何退出这个循环。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs. output
    // copies values from c to out until c is closed or it receives a value
    // from done, then output calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... the rest is unchanged ...
复制代码

这种方法有个问题,下游只有知道了上游可能阻塞的 goroutine 数量,才能向每一个 goroutine 都发送了一个 done 信号,从而确保它们都能成功退出。但多维护一个 count 是很使人讨厌的,并且很容易出错。

咱们须要一种方式,能够告诉上游的全部 goroutine 中止向下游继续发送信息。在 Go 中,其实可经过关闭 channel 实现,由于在一个已关闭的 channel 接收数据会马上返回,而且会获得一个零值。

这也就意味着,main 仅需经过关闭 done channel,就可让全部的发送方解除阻塞。关闭操做至关于一个广播信号。为确保任意返回路径下都成功调用,咱们能够经过 defer 语句关闭 done。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs. output
    // copies values from c to out until c or done is closed, then calls
    // wg.Done.
    // 为每一个输入 channel 启动一个 goroutine,将输入 channel 中的数据拷贝到
    // out channel 中,直到输入 channel,即 c,或 done 关闭。
    // 接着,退出循环并执行 wg.Done()
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... the rest is unchanged ...
复制代码

一样地,一旦 done 关闭,sq 也将退出。sq 也是经过 defer 语句来确保本身的输出 channel,即 out,必定被成功关闭释放。

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}
复制代码

都这里,Go 中如何构建一个 pipeline,已经介绍的差很少了。

简单总结下如何正确构建一个 pipeline。

  • 当全部的发送已经完成,stage 应该关闭输出 channel;
  • stage 应该持续从输入 channel 中接收数据,除非 channel 关闭或主动通知到发送方中止发送。

Pipeline 中有量方式能够解除发送方的阻塞,一是发送方建立充足空间的 channel 来发送数据,二是当接收方中止接收数据时,明确通知发送方。

摘要树

一个真实的案例。

MD5,消息摘要算法,可用于文件校验和的计算。下面的输出是命令行工具 md5sum 输出的文件摘要信息。

$ md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go
复制代码

咱们的例子和 md5sum 相似,不一样的是,传递给这个程序的参数是一个目录。程序的输出是目录下每一个文件的摘要值,输出的顺序按文件名排序。

$ go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go
复制代码

主函数,第一步调用 MD5All,它返回的是一个以文件名为 key,摘要值为 value 的 map,而后对返回结果进行排序和打印。

func main() {
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x %s\n", m[path], path)
    }
}
复制代码

MD5All 函数将是咱们接下来讨论的重点。串行版的实现没有并发,仅仅是从文件中读取数据再计算。

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}
复制代码

并行计算

并行版 中,咱们会把 MD5All 的计算拆分开含有两个 stage 的 pipeline。第一个 stage,sumFiles,负责遍历目录和计算文件摘要值,摘要的计算会启动一个 goroutine 来执行,计算结果将经过一个类型 result 的 channel 发出。

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}
复制代码

sumFiles 返回了 2 个 channel,一个用于接收计算的结果,一个用于接收 filepath.Walk 的 err 返回。walk 会为每一个文件启动一个 goroutine 执行摘要计算和检查 done。若是 done 关闭,walk 将马上中止。

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // For each regular file, start a goroutine that sums the file and sends
    // the result on c. Send the result of the walk on errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk has returned, so all calls to wg.Add are done. Start a
        // goroutine to close c once all the sends are done.
        go func() {
            wg.Wait()
            close(c)
        }()
        // No select needed here, since errc is buffered.
        // 不须要使用 select,由于 errc 是带有 buffer 的 channel
        errc <- err
    }()
    return c, errc
}
复制代码

MD5All 将从 c channel 中接收计算的结果,若是发生错误,将经过 defer 关闭 done。

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}
复制代码

并行限制

并行版本 中,MD5All 为每一个文件启动了一个 goroutine。但若是一个目录中文件太多,这可能会致使分配的内存过大以致于超过了当前机器的限制。

咱们能够经过限制并行读取的文件数,限制内存分配。在 并发限制版本中,咱们建立了固定数量的 goroutine 读取文件。如今,咱们的 pipeline 涉及 3 个 stage:遍历目录、文件读取与摘要计算、结果收集。

第一个 stage,遍历目录并经过 paths channel 发出文件。

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Close the paths channel after Walk returns.
        defer close(paths)
        // No select needed for this send, since errc is buffered.
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}
复制代码

第二个 stage,启动固定数量的 goroutine,从 paths channel 中读取文件名称,处理结果发送到 c channel。

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}
复制代码

和以前的例子不一样,digester 将不会关闭 c channel,由于多个 goroutine 共享这个 channel,计算结果都将发给这个 channel 上。

相应地,MD5All 会负责在全部摘要完成后关闭这个 c channel。

// Start a fixed number of goroutines to read and digest files.
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()
复制代码

咱们也能够为每一个 digester 建立一个单独的 channel,经过本身的 channel 传输结果。但这种方式,咱们还要再启动一个新的 goroutine 合并结果。

最后一个 stage,负责从 c 中接收处理结果,经过 errc 检查是否有错误发生。该检查没法提早进行,由于提早执行将会阻塞 walkFile 往下游发送数据。

m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // Check whether the Walk failed.
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}
复制代码

总结

这篇文章介绍了,在 Go 中如何正确地构建流式数据 pipeline。它的异常处理很是复杂,pipeline 中的每一个 stage 均可能致使上游阻塞,而下游可能再也不关心接下来的数据。关闭 channel 能够给全部运行中的 goroutine 发送 done 信号,这能帮助咱们成功解除阻塞。如何正确地构建一条流式数据 pipeline,文中也总结了一些指导建议。

相关文章
相关标签/搜索