Golang协程并发的流水线模型

背景

最近因为性能问题,后端服务一直在作python到golang的迁移和重构。go语言精简优雅,既有编译型语言的严谨和高性能,又有解释型语言的开发效率,出色的并发性能也是go区别于其余语言的一大特点。go的并发编程代码虽然简单,但重在其并发模型和流程的设计。因此这里总结下golang协程并发经常使用的流水线模型。python

简单的流水线思惟

流水线模式并非什么新奇的概念,可是它能极大地提升生产效率。好比实际生活中的汽车生产流水线,流水线上的每个流程负责不一样的工做,好比第一个流程是拼装车身,第二个流程是安装发动机,第三个流程是装轮胎...,这些步骤咱们能够类比成go并发流程中的协程,每个协程就是一个任务。流水线上面传递的车身、发动机、轮胎,这些咱们能够类比成协程间须要传递的数据,而在这些流程(协程)间传递这些配件(数据),天然就要经过传送带(channel)。在流水线上,咱们装四个轮胎确定不是一个一个来装的,确定是有四个机械臂同时来装。所以装轮胎这个步骤咱们有4个协程在并发工做来提升效率。这么一来,流水线模型的基本要素就构成了。
Golang的并发模型灵感其实都来自咱们生活,对程序而言,高的生产效率就是高的性能。在Golang中,流水线由多个流程节点组成,流程之间经过channel链接,每一个流程节点能够由多个同时运行的goroutine组成。
image.pnggolang

如何构造流水线

有了流水线模式的思惟,接下来就是如何构造流水线了。简单来讲,其实就是经过channel将任务流程链接起来,两个相邻的流程互为生产者和消费者,经过channel进行通讯。耗时的流程能够将任务分散到多个协程来执行。
咱们先来看一个最简单的流水线,以下图,A是生产者流程,B是它的消费流程,同时又是C的生产者流程。A,B,C三个协程直接,经过读写channel进行通讯。
image.png编程

那若是此时B流程能够将a channel中的任务并发执行呢,很简单,咱们只须要起多个B协程就能够了。以下图。
image.png后端

总之,咱们构造流水线并发的思路是关注数据的流动,数据流动的过程交给channel,channel两端数据处理的每一个环节都交给goroutine,这个流程连起来,就构成了流水线模型。安全

关于channel

为何咱们能够选择channel来进行协程间的通讯呢,协程之间又是怎么保持同步顺序呢,固然这都要归功于channel。channel是go提供的进程内协程间的通讯方式,它是协程/线程安全的,channe的读写阻塞会致使协程的切换。
channel的操做和状态组合能够有如下几种状况:
image.png数据结构

**有1个特殊场景**:当`nil`的通道在`select`的某个`case`中时,这个case会阻塞,但不会形成死锁。

channel不只能够保证协程安全的数据流动,还能够保证协程的同步。当有并发问题时,channel也是咱们首先应该想到的数据结构。不过显而易见,当使用有缓冲区的channel时,才能达到协程并发的效果,而且生产者和消费者的协程间是相对同步的。使用无缓冲区的channel时,是没有并发效果的,协程间是绝对同步的,生产者和消费者必须同时写和读协程才能运行。
channel关注的是数据的流动,这种场景下均可以考虑使用channel。好比:消息传递、信号广播、任务分发、结果汇总、同步与异步、并发控制... 更多的不在这里赘述了,总之,Share memory by communicating, don't communicate by sharing memory.并发

流水线模型实例

举个简单栗子,计算80000之内的质数并输出。
这个例子若是咱们采用非并发的方式,就是for循环80000,挨个判断是否是素数再输出。不过若是咱们采用流水线的并发模型会更高效。异步

从数据流动的角度来分析,须要遍历生成1-80000的数字到一个channel中,数字判断是否为素数,输出结果到一个channel中。所以咱们须要两个channel,channel的两端就设计成协程便可。
一、遍历生成原始80000个数据(生产者)
二、计算这80000个数据中的素数(生产者+消费者)
三、取结果输出(消费者)函数

代码以下:性能

package gen_channel
import "fmt"
import "time"
func generate_source(data_source_chan chan int) {
   for i := 1; i <= 80000; i++ {
      data_source_chan <- i
   }
   fmt.Println("写入协程结束")
   close(data_source_chan)
}
func generate_sushu(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool) {
   for num:= range data_source_chan {
      falg := true
 for i := 2; i < num; i++ {
         if num%i == 0 {
            falg = false
 break }
      }
      if falg == true {
         data_result_chan <- num
      }
   }
   fmt.Println("该协程结束")
   gen_chan <- true
}
func workpool(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool, gen_num int){
   // 开启8个协程
 for i := 0; i < gen_num; i++ {
      go generate_sushu(data_source_chan, data_result_chan, gen_chan)
   }
}
func Channel_main() {
   // 任务数据
   data_source_chan := make(chan int, 2000)
   // 结果数据
   data_result_chan := make(chan int, 2000)
   // 全部任务协程是否结束
   gen_chan := make(chan bool, 8)
   time1 := time.Now().Unix()
   go generate_source(data_source_chan)
   // 协程池,任务分发
   workpool(data_source_chan, data_result_chan, gen_chan, 8)
   // 全部协程结束后关闭结果数据channel
   go func() {
      for i := 0; i < 8; i++ {
         <-gen_chan
      }
      close(data_result_chan)
      fmt.Println("spend timeis ", time.Now().Unix()-time1)
   }()
   for date_result := range data_result_chan {
      fmt.Println(date_result)
   }
}

上面这段代码中。data_source_chandata_result_chan这两个channel分别用来放原始数据和结果数据,buffer分别为2000。

generate_source协程: 生产数据,它会把数据写入data_source_chan通道,所有写入完成后关闭通道。
generate_sushu协程: 负责计算并判断data_source_chan中的数据是否为质数,是的话就写入data_result_chan通道。
主协程for date_result := range data_result_chan: 最后负责读取data_result_chan中的结果,直到data_result_chan关闭后结束程序。

能够看到咱们经过workpool方法起了8个generate_sushu协程来并发处理data_source_chan的任务。那么就有一个问题,如何知道全部数据都已处理完毕呢,等到生产者generate_source协程结束data_source_chan关闭吗? 恐怕不是,由于可能data_source_chan关闭后8个任务协程仍然在继续计算。那么只能等8个协程所有处理完毕后,才能说明全部数据已处理完,从而才能关闭data_result_chan,而后主协程读取data_result_chan结束。

所以咱们这里引入了另外一个channel:gen_chan,来记录计算结束的任务。每一个generate_sushu协程处理完,就写入一个记录到channel中。所以咱们有一个匿名协程,当能够从gen_chan中取8个结果出来的话,就说明全部协程已计算完成,那么能够关上阻塞程序的最后阀门data_result_chan

固然这种设计方式并不惟一,咱们也能够不用统一的data_result_chan来接收结果,而是每一个协程分配一个channel来存放结果,最后再merge到一块儿。

可能你们以为这种方式很复杂,确实比较高效但写起来并不友好,那有没有更友好的方式呢?

sync包

在处理并发任务时咱们首先想到的应该是channel,但有时候channel不是万能或者最方便的,因此go也为咱们提供了sync包。

sync包提供了各类异步及锁类型及其内置方法。用起来也很方便,好比Mutex就是给协程加锁,某个时段内不能有多个协程访问同一段代码。WaitGroup就是等待一些工做完成后,再进行下一步工做。Once能够用来确保协程中某个函数只执行1次...当咱们面对一个并发问题的时候,应该去分析采用哪一种协程同步方式,是channel仍是Mutex呢。这须要看咱们关注的是数据的流动仍是数据的安全性。篇幅缘由这里再也不展开讲了。

  1. Mutex:互斥锁
  2. RWMutex:读写锁
  3. WaitGroup:等待组
  4. Once:单次执行
  5. Cond:信号量
  6. Pool:临时对象池
  7. Map:自带锁的map

咱们接着上面质数的问题,使用sync中的WaitGroup,会让咱们的代码更加友好,由于咱们不须要引入一个channel来记录是否4个车轮都换完了,让WaitGroup来作就行了。

package gen_channel
import (
   "fmt"
 "time")
import "sync"
func generate_source3(data_source_chan chan int) {
   for i := 1; i <= 80000; i++ {
      data_source_chan <- i
   }
   fmt.Println("写入协程结束")
   close(data_source_chan)
}
func generate_sushu3(data_source_chan, data_result_chan chan int, wg *sync.WaitGroup) {
   defer wg.Done()
   for num := range data_source_chan {
      falg := true
 for i := 2; i < num; i++ {
         if num%i == 0 {
            falg = false
 break }
      }
      if falg == true {
         data_result_chan <- num
      }
   }
   fmt.Println("该协程结束")
}
func workpool3(data_source_chan chan int, data_result_chan chan int, wg *sync.WaitGroup, gen_num int) {
   // 开启8个协程
 for i := 0; i < gen_num; i++ {
      wg.Add(1)
      go generate_sushu3(data_source_chan, data_result_chan, wg)
   }
}
func Channel_main3() {
   data_source_chan := make(chan int, 500)
   data_result_chan := make(chan int, 2000)
   time1 := time.Now().Unix()
   var wg sync.WaitGroup
 go generate_source3(data_source_chan)
   // 开启8个协程
 for i := 0; i < 8; i++ {
      wg.Add(1)
      go generate_sushu3(data_source_chan, data_result_chan, &wg)
   }
   wg.Wait()
   close(data_result_chan)
   fmt.Println("spend timeis ", time.Now().Unix()-time1)
   for date_result := range data_result_chan {
      fmt.Println(date_result)
   }
}

总结

流水线模式的设计要关注数据的流动,而后在数据流动的路径中将数据放到channel中,将channel的两端设计成协程。
并发设计中channel和sync能够从开发效率和性能的角度自由组合,channel不必定是最优解
写入channel的协程来控制该协程的关闭,消费者协程不关闭读协程,防止报错。养成在协程入口限制channel读写类型的习惯。

以上是咱们在go并发的流水线模型中的一些总结。能够看出go的协程并发更考验咱们的设计能力,由于协程间的同步和数据传递都交给了开发者来设计。同时也留给咱们一些引伸思考,协程在IO密集和CPU密集的状况下是否都能大幅提升性能呢?是否和channel的缓冲区或者并发设计有关呢?协程异常该怎么处理呢?go的协程和python的协程又有什么区别呢?...咱们后面慢慢探讨~