理解 Go Channels[精品长文]

愿我所遇之人,所历之事,哪怕由于我有一点点变好,我就心满意足了。html

本文转载自: blog.lab99.org/post/golang…git

1、视频信息

一、视频观看地址

www.youtube.com/watch?v=KBZ…github

二、PPT下载地址

download.csdn.net/download/xu…golang

三、博文

about.sourcegraph.com/go/understa…安全

2、Go 的并发特性

  • goroutines: 独立执行每一个任务,并可能并行执行
  • channels: 用于 goroutines 之间的通信、同步

一、一个简单的事务处理的例子

对于下面这样的非并发的程序:bash

func main() {
  tasks := getTasks()
  // 处理每一个任务
  for _, task := range tasks {
    process(task)
  }
}
复制代码

将其转换为 Go 的并发模式很容易,使用典型的 Task Queue 的模式:微信

func main() {
  //  建立带缓冲的 channel
  ch := make(chan Task, 3)
  //  运行固定数量的 workers
  for i := 0; i < numWorkers; i++ {
    go worker(ch)
  }
  //  发送任务到 workers
  hellaTasks := getTasks()
  for _, task := range hellaTasks {
    ch <- task
  }
  ...
}
func worker(ch chan Task) {
  for {
    //  接收任务
    task := <-ch
    process(task)
  }
}
复制代码

二、channels 的特性

  • goroutine-safe,多个 goroutine 能够同时访问一个 channel。
  • 能够用于在 goroutine 之间存储和传递值
  • 其语义是先入先出(FIFO)
  • 能够致使 goroutine 的 block 和 unblock

3、解析

一、构造 channel

//  带缓冲的 channel
ch := make(chan Task, 3)
//  无缓冲的 channel
ch := make(chan Task)
复制代码

回顾前面提到的 channel 的特性,特别是前两个。若是忽略内置的 channel,让你设计一个具备 goroutines-safe 而且能够用来存储、传递值的东西,你会怎么作?不少人可能以为或许能够用一个带锁的队列来作。没错,事实上,channel 内部就是一个带锁的队列。 golang.org/src/runtime…并发

type hchan struct {
  ...
  buf      unsafe.Pointer // 指向一个环形队列
  ...
  sendx    uint   // 发送 index
  recvx    uint   // 接收 index
  ...
  lock     mutex  //  互斥量
}
复制代码

buf 的具体实现很简单,就是一个环形队列的实现。sendx 和 recvx 分别用来记录发送、接收的位置。而后用一个 lock 互斥锁来确保无竞争冒险。ide

对于每个 ch := make(chan Task, 3) 这类操做,都会在中,分配一个空间,创建并初始化一个 hchan 结构变量,而 ch 则是指向这个 hchan 结构的指针函数

由于 ch 自己就是个指针,因此咱们才能够在 goroutine 函数调用的时候直接将 ch 传递过去,而不用再 &ch 取指针了,因此全部使用同一个 ch 的 goroutine 都指向了同一个实际的内存空间。

二、发送、接收

为了方便描述,咱们用 G1 表示 main() 函数的 goroutine,而 G2 表示 worker 的 goroutine。

// G1
func main() {
  ...
  for _, task := range tasks {
    ch <- task
  }
  ...
}
// G2
func worker(ch chan Task) {
  for {
    task :=<-ch
    process(task)
  }
}
复制代码

2.1 简单的发送、接收

那么 G1 中的 ch <- task0 具体是怎么作的呢?

  • 获取锁
  • enqueue(task0)(这里是内存复制 task0)
  • 释放锁

这一步很简单,接下来看 G2 的 t := <- ch 是如何读取数据的。

  • 获取锁
  • t = dequeue()(一样,这里也是内存复制)
  • 释放锁

这一步也很是简单。可是咱们从这个操做中能够看到,全部 goroutine 中共享的部分只有这个 hchan 的结构体,而全部通信的数据都是内存复制。这遵循了 Go 并发设计中很核心的一个理念:

Do not communicate by sharing memory;instead, share memory by communicating

内存复制指的是:

// typedmemmove copies a value of type t to dst from src.
// Must be nosplit, see #16026.
//go:nosplit
func typedmemmove(typ *_type, dst, src unsafe.Pointer) {
    if typ.kind&kindNoPointers == 0 {
        bulkBarrierPreWrite(uintptr(dst), uintptr(src), typ.size)
    }
    // There's a race here: if some other goroutine can write to // src, it may change some pointer in src after we've
    // performed the write barrier but before we perform the
    // memory copy. This safe because the write performed by that
    // other goroutine must also be accompanied by a write
    // barrier, so at worst we've unnecessarily greyed the old // pointer that was in src. memmove(dst, src, typ.size) if writeBarrier.cgo { cgoCheckMemmove(typ, dst, src, 0, typ.size) } } 复制代码

三、阻塞和恢复

3.1 发送方被阻塞

假设 G2 须要很长时间的处理,在此期间,G1 不断的发送任务:

ch <- task1
ch <- task2
ch <- task3
复制代码

可是当再一次 ch <- task4 的时候,因为 ch 的缓冲只有 3 个,因此没有地方放了,因而 G1 被 block 了,当有人从队列中取走一个 Task 的时候,G1 才会被恢复。这是咱们都知道的,不过咱们今天关心的不是发生了什么,而是如何作到的?

3.2 goroutine 的运行时调度

首先,goroutine 不是操做系统线程,而是 用户空间线程。所以 goroutine 是由 Go runtime 来建立并管理的,而不是 OS,因此要比操做系统线程轻量级。

固然,goroutine 最终仍是要运行于某个线程中的,控制 goroutine 如何运行于线程中的是 Go runtime 中的 scheduler (调度器)。

Go 的运行时调度器是 M:N 调度模型,既 N 个 goroutine,会运行于 M 个 OS 线程中。换句话说,一个 OS 线程中,可能会运行多个 goroutine。

Go 的 M:N 调度中使用了3个结构:

  • M: OS 线程
  • G: goroutine
  • P: 调度上下文
    • P 拥有一个运行队列,里面是全部能够运行的 goroutine 及其上下文

3.3 goroutine 被阻塞的具体过程

那么当 ch <- task4 执行的时候,channel 中已经满了,须要 pause G1。这个时候:

  1. G1 会调用运行时的 gopark
  2. 而后 Go 的运行时调度器就会接管
  3. 将 G1 的状态设置为 waiting
  4. 断开 G1 和 M 之间的关系(switch out),所以 G1 脱离 M,换句话说,M 空闲了,能够安排别的任务了。
  5. 从 P 的运行队列中,取得一个可运行的 goroutine G
  6. 创建新的 G 和 M 的关系(Switch in),所以 G 就准备好运行了。
  7. 当调度器返回的时候,新的 G 就开始运行了,而 G1 则不会运行,也就是 block 了。

从上面的流程中能够看到,对于 goroutine 来讲,G1 被阻塞了,新的 G 开始运行了;而对于操做系统线程 M 来讲,则根本没有被阻塞。

咱们知道 OS 线程要比 goroutine 要沉重的多,所以这里尽可能避免 OS 线程阻塞,能够提升性能。

3.4 goroutine 恢复执行的具体过程

前面理解了阻塞,那么接下来理解一下如何恢复运行。不过,在继续了解如何恢复以前,咱们须要先进一步理解 hchan 这个结构。由于,当 channel 不在满的时候,调度器是如何知道该让哪一个 goroutine 继续运行呢?并且 goroutine 又是如何知道该从哪取数据呢?

在 hchan 中,除了以前提到的内容外,还定义有 sendq 和 recvq 两个队列,分别表示等待发送、接收的 goroutine,及其相关信息。

type hchan struct {
  ...
  buf      unsafe.Pointer // 指向一个环形队列
  ...
  sendq    waitq  // 等待发送的队列
  recvq    waitq  // 等待接收的队列
  ...
  lock     mutex  //  互斥量
}
复制代码

其中 waitq 是一个链表结构的队列,每一个元素是一个 sudog 的结构,其定义大体为:

type sudog struct {
  g          *g //  正在等候的 goroutine
  elem       unsafe.Pointer // 指向须要接收、发送的元素
  ...
}
复制代码

golang.org/src/runtime…

因此在以前的阻塞 G1 的过程当中,实际上:

  1. G1 会给本身建立一个 sudog 的变量
  2. 而后追加到 sendq 的等候队列中,方便未来的receiver 来使用这些信息恢复 G1。

这些都是发生在调用调度器以前

那么如今开始看一下如何恢复。

当 G2 调用 t := <- ch 的时候,channel 的状态是,缓冲是满的,并且还有一个 G1 在等候发送队列里,而后 G2 执行下面的操做:

  1. G2 先执行 dequeue() 从缓冲队列中取得 task1 给 t
  2. G2 从 sendq 中弹出一个等候发送的 sudog
  3. 将弹出的 sudog 中的 elem 的值 enqueue() 到 buf 中
  4. 将弹出的 sudog 中的 goroutine,也就是 G1,状态从 waiting 改成 runnable
    1. 而后,G2 须要通知调度器 G1 已经能够进行调度了,所以调用 goready(G1)。
    2. 调度器将 G1 的状态改成 runnable
    3. 调度器将 G1 压入 P 的运行队列,所以在未来的某个时刻调度的时候,G1 就会开始恢复运行。
    4. 返回到 G2

注意,这里是由 G2 来负责将 G1 的 elem 压入 buf 的,这是一个优化。这样未来 G1 恢复运行后,就没必要再次获取锁、enqueue()、释放锁了。这样就避免了屡次锁的开销。

3.5 若是接收方先阻塞呢?

更酷的地方是接收方先阻塞的流程。

若是 G2 先执行了 t := <- ch,此时 buf 是空的,所以 G2 会被阻塞,他的流程是这样:

  1. G2 给本身建立一个 sudog 结构变量。其中 g 是本身,也就是 G2,而 elem 则指向 t
  2. 将这个 sudog 变量压入 recvq 等候接收队列
  3. G2 须要告诉 goroutine,本身须要 pause 了,因而调用 gopark(G2)
    1. 和以前同样,调度器将其 G2 的状态改成 waiting
    2. 断开 G2 和 M 的关系
    3. 从 P 的运行队列中取出一个 goroutine
    4. 创建新的 goroutine 和 M 的关系
    5. 返回,开始继续运行新的 goroutine

这些应该已经不陌生了,那么当 G1 开始发送数据的时候,流程是什么样子的呢?

G1 能够将 enqueue(task),而后调用 goready(G2)。不过,咱们能够更聪明一些。

咱们根据 hchan 结构的状态,已经知道 task 进入 buf 后,G2 恢复运行后,会读取其值,复制到 t 中。那么 G1 能够根本不走 buf,G1 能够直接把数据给 G2

Goroutine 一般都有本身的栈,互相之间不会访问对方的栈内数据,除了 channel。这里,因为咱们已经知道了 t 的地址(经过 elem指针),并且因为 G2 不在运行,因此咱们能够很安全的直接赋值。当 G2 恢复运行的时候,既不须要再次获取锁,也不须要对 buf 进行操做。从而节约了内存复制、以及锁操做的开销。

四、总结

  • goroutine-safe
    • hchan 中的 lock mutex
  • 存储、传递值,FIFO
    • 经过 hchan 中的环形缓冲区来实现
  • 致使 goroutine 的阻塞和恢复
    • hchan 中的 sendq和recvq,也就是 sudog 结构的链表队列
    • 调用运行时调度器 (gopark(), goready())

4、其它 channel 的操做

一、无缓冲 channel

无缓冲的 channel 行为就和前面说的直接发送的例子同样:

  • 接收方阻塞 → 发送方直接写入接收方的栈
  • 发送方阻塞 → 接受法直接从发送方的 sudog 中读取

二、select

golang.org/src/runtime…

  1. 先把全部须要操做的 channel 上锁
  2. 给本身建立一个 sudog,而后添加到全部 channel 的 sendq或recvq(取决因而发送仍是接收)
  3. 把全部的 channel 解锁,而后 pause 当前调用 select 的 goroutine(gopark())
  4. 而后当有任意一个 channel 可用时,select 的这个 goroutine 就会被调度执行。
  5. resuming mirrors the pause sequence

5、为何 Go 会这样设计?

一、Simplicity 更倾向于带锁的队列,而不是无锁的实现。

性能提高不是凭空而来的,是随着复杂度增长而增长的。

dvyokov 后者虽然性能可能会更好,可是这个优点,并不必定可以打败随之而来的实现代码的复杂度所带来的劣势。

二、Performance

  • 调用 Go 运行时调度器,这样能够保持 OS 线程不被阻塞跨 goroutine 的栈读、写。
  • 可让 goroutine 醒来后没必要获取锁。
  • 能够避免一些内存复制。

固然,任何优点都会有其代价。这里的代价是实现的复杂度,因此这里有更复杂的内存管理机制、垃圾回收以及栈收缩机制。

在这里性能的提升优点,要比复杂度的提升带来的劣势要大。

因此在 channel 实现的各类代码中,咱们均可以见到这种simplicity vs performance 的权衡后的结果。

6、博主信息

我的微信公众号:

我的博客

我的github

我的掘金博客

我的CSDN博客

相关文章
相关标签/搜索