深度解密Go语言之channel

你们好!“深度解密 Go 语言”系列很久未见,咱们今天讲 channel,预祝阅读愉快!在开始正文以前,咱们先说些题外话。git

上一篇关于 Go 语言的文章讲 Go 程序的整个编码、编译、运行、退出的全过程。文章发出后,反响强烈,在各大平台的阅读量都不错。例如博客园登上了 48 小时阅读排行榜,而且受到了编辑推荐,占据首页头条位置整整一天;在开发者头条首页精选的位置霸榜一周时间……程序员

博客园头条推荐

开发者头条精选

熟悉码农桃花源的朋友们都知道,这里每篇文章都很长,要花很长时间才能读完。但长并非目的,把每一个问题都讲深、讲透才是最重要的。首先我本身得彻底理解才行,因此写每篇文章时我都会看不少参考资料,看源码,请教大牛,本身还要去写样例代码跑结果……从建立文稿到真正完成写做须要很长时间。github

写做时间

作这些事情,无非是想力求我写出来的文字,都是我目前所能理解的最深层次。若是我暂时理解不了,我会说出来,或者不写进文章里面去,留到之后有能力的时候再来写。golang

我本身平时有这种体会:看微信公众号的文章都是想快速地看完,快速地拉到最后,目的快点开始看下一篇,新鲜感才能不断刺激大脑。有时候碰到长文很花时间,可能就没耐心看下去了,里面说的东西也以为很难理解,可能直接就放弃了。可是,若是我知道一篇文章价值很高,就会选一个精力比较充沛的时间段,花整块时间看完,这时候反倒很容易看进去。这种状况下,潜意识里就会知道我今天是必定要读完这篇文章的,而且要把里面有价值的东西都吸取进来。shell

因此,对于码农桃花源的文章,我建议你收藏以后,找个空闲时间再好好看。编程

上周,我把 GitHub 项目 Go-Question 的内容整合成了开源电子书,阅读体验提高 N 倍,建议关注项目,如今已经 400 star 了,年末目标是 1k star。项目地址列在了参考资料里。segmentfault

GitBook

另外,公众号的文章也能够使用微信读书看,体验也很是赞,而且能够放到书架上,每一个公众号就是一本书,简直酷炫。数组

微信读书

闲话最后,一直“吹”了好久的曹大,新书《Go 语言高级编程》出版了!书的另外一位做者是柴树杉老师,这是给 Go 语言提交 pull 的人,他在 Go 语言上面的研究不用我多说了吧。我第一时间下了单,而且到曹大工位要了签名。安全

Go 语言高级编程

这本书的推荐人有不少大佬,像许世伟,郝林,雨痕等,评价很是高。重点给你们看下雨痕老师对这本书的评价(上图第二排左侧图):

本书阐明了官方文档某些语焉不详的部分,有助于 Gopher 了解更多内在实现,以及平常工做中须要用到的 RPC、Web、分布式应用等内容。我认识本书做者之一曹春晖,对他的学习态度和能力颇为钦佩,所以推荐你们阅读本书。

你们可能不知道,出书一点都不赚钱,但投入的精力却很大。可是像曹大在给读者的书签名时所说的:书籍是时代的生命。多少知识都是经过书本一代代传承!

搬过几回家就知道,纸质书太多,过程会比较痛苦。因此,我如今买纸书都会考虑再三。可是,此次我仍是在第一时间下单了《Go 语言高级编程》。我也强烈推荐你买一本,支持原创者。

柴老师在武汉,我接触很少。但和曹大倒是常常能见面(在同一个公司工做)。他本人常常活跃在各类微信群,社区,也很是乐于解答各类疑难杂症。上周还和曹大一块儿吃了个饭,请教了不少问题,我总结了一些对家都有用的东西,放在个人朋友圈:

曹大交流总结

若是你想围观个人朋友圈,想和我交流,能够长按下面的二维码加我好友,备注下来自公众号。

wechat-QR

好了,下面开始咱们的正文。

并发模型

并发与并行

你们都知道著名的摩尔定律。1965 年,时任仙童公司的 Gordon Moore 发表文章,预测在将来十年,半导体芯片上的晶体管和电阻数量将每一年增长一倍;1975 年,Moore 再次发表论文,将“每一年”修改成“每两年”。这个预测在 2012 年左右基本是正确的。

但随着晶体管电路逐渐接近性能极限,摩尔定律终将走到尽头。靠增长晶体管数量来提升计算机的性能不灵了。因而,人们开始转换思路,用其余方法来提高计算机的性能,这就是多核计算机产生的缘由。

这一招看起来还不错,可是人们又遇到了一个另外一个定律的限制,那就是 Amdahl's Law,它提出了一个模型用来衡量在并行模式下程序运行效率的提高。这个定律是说,一个程序能从并行上得到性能提高的上限取决于有多少代码必须写成串行的。

举个例子,对于一个和用户打交道的界面程序,它必须和用户打交道。用户点一个按钮,而后才能继续运行下一步,这必须是串行执行的。这种程序的运行效率就取决于和用户交互的速度,你有多少核都白瞎。用户就是不按下一步,你怎么办?

2000 年左右云计算兴起,人们能够方便地获取计算云上的资源,方便地水平扩展本身的服务,能够垂手可得地就调动多台机器资源甚至将计算任务分发到分布在全球范围的机器。可是也所以带来了不少问题和挑战。例如怎样在机器间进行通讯、聚合结果等。最难的一个挑战是如何找到一个模型能用来描述 concurrent。

咱们都知道,要想一段并发的代码没有任何 bug,是很是困难的。有些并发 bug 是在系统上线数年后才发现的,缘由经常是很诡异的,好比用户数增长到了某个界限。

并发问题通常有下面这几种:

数据竞争。简单来讲就是两个或多个线程同时读写某个变量,形成了预料以外的结果。

原子性。在一个定义好的上下文里,原子性操做不可分割。上下文的定义很是重要。有些代码,你在程序里看起来是原子的,如最简单的 i++,但在机器层面看来,这条语句一般须要几条指令来完成(Load,Incr,Store),不是不可分割的,也就不是原子性的。原子性可让咱们放心地构造并发安全的程序。

内存访问同步。代码中须要控制同时只有一个线程访问的区域称为临界区。Go 语言中通常使用 sync 包里的 Mutex 来完成同步访问控制。锁通常会带来比较大的性能开销,所以通常要考虑加锁的区域是否会频繁进入、锁的粒度如何控制等问题。

死锁。在一个死锁的程序里,每一个线程都在等待其余线程,造成了一个首尾相连的尴尬局面,程序没法继续运行下去。

活锁。想象一下,你走在一条小路上,一我的迎面走来。你往左边走,想避开他;他作了相反的事情,他往右边走,结果两个都过不了。以后,两我的又都想从原来本身相反的方向走,仍是一样的结果。这就是活锁,看起来都像在工做,但工做进度就是没法前进。

饥饿。并发的线程不能获取它所须要的资源以进行下一步的工做。一般是有一个很是贪婪的线程,长时间占据资源不释放,致使其余线程没法得到资源。

关于并发和并行的区别,引用一个经典的描述:

并发是同一时间应对(dealing with)多件事情的能力。
并行是同一时间动手(doing)作多件事情的能力。

雨痕老师《Go 语言学习笔记》上的解释:

并发是指逻辑上具有同时处理多个任务的能力;并行则是物理上同时执行多个任务。

而根据《Concurrency in Go》这本书,计算机的概念都是抽象的结果,并发和并行也不例外。它这样描述并发和并行的区别:

Concurrency is a property of the code; parallelism is a property of the running program.

并发是代码的特性,并行是正在运行的程序的特性。先忽略我拙劣的翻译。很新奇,不是吗?我也是第一次见到这样的说法,细想一下,仍是颇有道理的。

咱们一直说写的代码是并发的或者是并行的,可是咱们能提供什么保证吗?若是在只有一个核的机器上跑并行的代码,它还能并行吗?你就是再天才,也没法写出并行的程序。充其量也就是代码上看起来“并发”的,如此而已。

固然,表面上看起来仍是并行的,但那不过 CPU 的障眼法,多个线程在分时共享 CPU 的资源,在一个粗糙的时间隔里看起来就是“并行”。

因此,咱们实际上只能编写“并发”的代码,而不能编写“并行”的代码,并且只是但愿并发的代码可以并行地执行。并发的代码可否并行,取决于抽象的层级:代码里的并发原语、runtime,操做系统(虚拟机、容器)。层级愈来愈底层,要求也愈来愈高。所以,咱们谈并发或并行实际上要指定上下文,也就是抽象的层级。

《Concurrency in Go》书里举了一个例子:假如两我的同时打开电脑上的计算器程序,这两个程序确定不会影响彼此,这就是并行。在这个例子中,上下文就是两我的的机器,而两个计算器进程就是并行的元素。

随着抽象层次的下降,并发模型实际上变得更难也更重要,而越低层次的并发模型对咱们也越重要。要想并发程序正确地执行,就要深刻研究并发模型。

在 Go 语言发布前,咱们写并发代码时,考虑到的最底层抽象是:系统线程。Go 发布以后,在这条抽象链上,又加一个 goroutine。并且 Go 从著名的计算机科学家 Tony Hoare 那借来一个概念:channel。Tony Hoare 就是那篇著名文章《Communicating Sequential Processes》的做者。

看起来事情变得更加复杂,由于 Go 又引入了一个更底层的抽象,但事实并非这样。由于 goroutine 并非看起来的那样又抽象了一层,它实际上是替代了系统线程。Gopher 在写代码的时候,并不会去关心系统线程,大部分时候只须要考虑到 goroutine 和 channel。固然有时候会用到一些共享内存的概念,通常就是指 sync 包里的东西,好比 sync.Mutex。

什么是 CSP

CSP 常常被认为是 Go 在并发编程上成功的关键因素。CSP 全称是 “Communicating Sequential Processes”,这也是 Tony Hoare 在 1978 年发表在 ACM 的一篇论文。论文里指出一门编程语言应该重视 input 和 output 的原语,尤为是并发编程的代码。

在那篇文章发表的时代,人们正在研究模块化编程的思想,该不应用 goto 语句在当时是最激烈的议题。彼时,面向对象编程的思想正在崛起,几乎没什么人关心并发编程。

在文章中,CSP 也是一门自定义的编程语言,做者定义了输入输出语句,用于 processes 间的通讯(communicatiton)。processes 被认为是须要输入驱动,而且产生输出,供其余 processes 消费,processes 能够是进程、线程、甚至是代码块。输入命令是:!,用来向 processes 写入;输出是:?,用来从 processes 读出。这篇文章要讲的 channel 正是借鉴了这一设计。

Hoare 还提出了一个 -> 命令,若是 -> 左边的语句返回 false,那它右边的语句就不会执行。

经过这些输入输出命令,Hoare 证实了若是一门编程语言中把 processes 间的通讯看得第一等重要,那么并发编程的问题就会变得简单。

Go 是第一个将 CSP 的这些思想引入,而且发扬光大的语言。仅管内存同步访问控制(原文是 memory access synchronization)在某些状况下大有用处,Go 里也有相应的 sync 包支持,可是这在大型程序很容易出错。

Go 一开始就把 CSP 的思想融入到语言的核内心,因此并发编程成为 Go 的一个独特的优点,并且很容易理解。

大多数的编程语言的并发编程模型是基于线程和内存同步访问控制,Go 的并发编程的模型则用 goroutine 和 channel 来替代。Goroutine 和线程相似,channel 和 mutex (用于内存同步访问控制)相似。

Goroutine 解放了程序员,让咱们更能贴近业务去思考问题。而不用考虑各类像线程库、线程开销、线程调度等等这些繁琐的底层问题,goroutine 天生替你解决好了。

Channel 则天生就能够和其余 channel 组合。咱们能够把收集各类子系统结果的 channel 输入到同一个 channel。Channel 还能够和 select, cancel, timeout 结合起来。而 mutex 就没有这些功能。

Go 的并发原则很是优秀,目标就是简单:尽可能使用 channel;把 goroutine 看成免费的资源,随便用。

说明一下,前面这两部分的内容来自英文开源书《Concurrency In Go》,强烈推荐阅读。

引入结束,咱们正式开始今天的主角:channel。

什么是 channel

Goroutine 和 channel 是 Go 语言并发编程的 两大基石。Goroutine 用于执行并发任务,channel 用于 goroutine 之间的同步、通讯。

Channel 在 gouroutine 间架起了一条管道,在管道里传输数据,实现 gouroutine 间的通讯;因为它是线程安全的,因此用起来很是方便;channel 还提供“先进先出”的特性;它还能影响 goroutine 的阻塞和唤醒。

相信你们必定见过一句话:

Do not communicate by sharing memory; instead, share memory by communicating.

不要经过共享内存来通讯,而要经过通讯来实现内存共享。

这就是 Go 的并发哲学,它依赖 CSP 模型,基于 channel 实现。

简直是一头雾水,这两句话难道不是同一个意思?

经过前面两节的内容,我我的这样理解这句话:前面半句说的是经过 sync 包里的一些组件进行并发编程;然后面半句则是说 Go 推荐使用 channel 进行并发编程。二者其实都是必要且有效的。实际上看完本文后面对 channel 的源码分析,你会发现,channel 的底层就是经过 mutex 来控制并发的。只是 channel 是更高一层次的并发编程原语,封装了更多的功能。

关因而选择 sync 包里的底层并发编程原语仍是 channel,《Concurrency In Go》这本书的第 2 章 “Go's Philosophy on Concurrency” 里有一张决策树和详细的论述,再次推荐你去阅读。我把图贴出来:

concurrency code decision tree

channel 实现 CSP

Channel 是 Go 语言中一个很是重要的类型,是 Go 里的第一对象。经过 channel,Go 实现了经过通讯来实现内存共享。Channel 是在多个 goroutine 之间传递数据和同步的重要手段。

使用原子函数、读写锁能够保证资源的共享访问安全,但使用 channel 更优雅。

channel 字面意义是“通道”,相似于 Linux 中的管道。声明 channel 的语法以下:

chan T // 声明一个双向通道
chan<- T // 声明一个只能用于发送的通道
<-chan T // 声明一个只能用于接收的通道

单向通道的声明,用 <- 来表示,它指明通道的方向。你只要明白,代码的书写顺序是从左到右就立刻能掌握通道的方向是怎样的。

由于 channel 是一个引用类型,因此在它被初始化以前,它的值是 nil,channel 使用 make 函数进行初始化。能够向它传递一个 int 值,表明 channel 缓冲区的大小(容量),构造出来的是一个缓冲型的 channel;不传或传 0 的,构造的就是一个非缓冲型的 channel。

二者有一些差异:非缓冲型 channel 没法缓冲元素,对它的操做必定顺序是“发送-> 接收 -> 发送 -> 接收 -> ……”,若是想连续向一个非缓冲 chan 发送 2 个元素,而且没有接收的话,第一次必定会被阻塞;对于缓冲型 channel 的操做,则要“宽松”一些,毕竟是带了“缓冲”光环。

为何要 channel

Go 经过 channel 实现 CSP 通讯模型,主要用于 goroutine 之间的消息传递和事件通知。

有了 channel 和 goroutine 以后,Go 的并发编程变得异常容易和安全,得以让程序员把注意力留到业务上去,实现开发效率的提高。

要知道,技术并非最重要的,它只是实现业务的工具。一门高效的开发语言让你把节省下来的时间,留着去作更有意义的事情,好比写写文章。

channel 实现原理

对 chan 的发送和接收操做都会在编译期间转换成为底层的发送接收函数。

Channel 分为两种:带缓冲、不带缓冲。对不带缓冲的 channel 进行的操做实际上能够看做“同步模式”,带缓冲的则称为“异步模式”。

同步模式下,发送方和接收方要同步就绪,只有在二者都 ready 的状况下,数据才能在二者间传输(后面会看到,实际上就是内存拷贝)。不然,任意一方先行进行发送或接收操做,都会被挂起,等待另外一方的出现才能被唤醒。

异步模式下,在缓冲槽可用的状况下(有剩余容量),发送和接收操做均可以顺利进行。不然,操做的一方(如写入)一样会被挂起,直到出现相反操做(如接收)才会被唤醒。

小结一下:同步模式下,必需要使发送方和接收方配对,操做才会成功,不然会被阻塞;异步模式下,缓冲槽要有剩余容量,操做才会成功,不然也会被阻塞。

数据结构

直接上源码(版本是 1.9.2):

type hchan struct {
    // chan 里元素数量
    qcount   uint
    // chan 底层循环数组的长度
    dataqsiz uint
    // 指向底层循环数组的指针
    // 只针对有缓冲的 channel
    buf      unsafe.Pointer
    // chan 中元素大小
    elemsize uint16
    // chan 是否被关闭的标志
    closed   uint32
    // chan 中元素类型
    elemtype *_type // element type
    // 已发送元素在循环数组中的索引
    sendx    uint   // send index
    // 已接收元素在循环数组中的索引
    recvx    uint   // receive index
    // 等待接收的 goroutine 队列
    recvq    waitq  // list of recv waiters
    // 等待发送的 goroutine 队列
    sendq    waitq  // list of send waiters

    // 保护 hchan 中全部字段
    lock mutex
}

关于字段的含义都写在注释里了,再来重点说几个字段:

buf 指向底层循环数组,只有缓冲型的 channel 才有。

sendxrecvx 均指向底层循环数组,表示当前能够发送和接收的元素位置索引值(相对于底层数组)。

sendqrecvq 分别表示被阻塞的 goroutine,这些 goroutine 因为尝试读取 channel 或向 channel 发送数据而被阻塞。

waitqsudog 的一个双向链表,而 sudog 其实是对 goroutine 的一个封装:

type waitq struct {
    first *sudog
    last  *sudog
}

lock 用来保证每一个读 channel 或写 channel 的操做都是原子的。

例如,建立一个容量为 6 的,元素为 int 型的 channel 数据结构以下 :

chan data structure

建立

咱们知道,通道有两个方向,发送和接收。理论上来讲,咱们能够建立一个只发送或只接收的通道,可是这种通道建立出来后,怎么使用呢?一个只能发的通道,怎么接收呢?一样,一个只能收的通道,如何向其发送数据呢?

通常而言,使用 make 建立一个能收能发的通道:

// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)

经过汇编分析,咱们知道,最终建立 chan 的函数是 makechan

func makechan(t *chantype, size int64) *hchan

从函数原型来看,建立的 chan 是一个指针。因此咱们能在函数间直接传递 channel,而不用传递 channel 的指针。

具体来看下代码:

const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))

func makechan(t *chantype, size int64) *hchan {
    elem := t.elem

    // 省略了检查 channel size,align 的代码
    // ……

    var c *hchan
    // 若是元素类型不含指针 或者 size 大小为 0(无缓冲类型)
    // 只进行一次内存分配
    if elem.kind&kindNoPointers != 0 || size == 0 {
        // 若是 hchan 结构体中不含指针,GC 就不会扫描 chan 中的元素
        // 只分配 "hchan 结构体大小 + 元素大小*个数" 的内存
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        // 若是是缓冲型 channel 且元素大小不等于 0(大小等于 0的元素类型:struct{})
        if size > 0 && elem.size != 0 {
            c.buf = add(unsafe.Pointer(c), hchanSize)
        } else {
            // race detector uses this location for synchronization
            // Also prevents us from pointing beyond the allocation (see issue 9401).
            // 1. 非缓冲型的,buf 没用,直接指向 chan 起始地址处
            // 2. 缓冲型的,能进入到这里,说明元素无指针且元素类型为 struct{},也无影响
            // 由于只会用到接收和发送游标,不会真正拷贝东西到 c.buf 处(这会覆盖 chan的内容)
            c.buf = unsafe.Pointer(c)
        }
    } else {
        // 进行两次内存分配操做
        c = new(hchan)
        c.buf = newarray(elem, int(size))
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    // 循环数组长度
    c.dataqsiz = uint(size)

    // 返回 hchan 指针
    return c
}

新建一个 chan 后,内存在堆上分配,大概长这样:

make chan

说明一下,这张图来源于 Gopher Con 上的一份 PPT,地址见参考资料。这份材料很是清晰易懂,推荐你去读。

接下来,咱们用一个来自参考资料【深刻 channel 底层】的例子来理解建立、发送、接收的整个过程。

func goroutineA(a <-chan int) {
    val := <- a
    fmt.Println("G1 received data: ", val)
    return
}

func goroutineB(b <-chan int) {
    val := <- b
    fmt.Println("G2 received data: ", val)
    return
}

func main() {
    ch := make(chan int)
    go goroutineA(ch)
    go goroutineB(ch)
    ch <- 3
    time.Sleep(time.Second)
}

首先建立了一个无缓冲的 channel,接着启动两个 goroutine,并将前面建立的 channel 传递进去。而后,向这个 channel 中发送数据 3,最后 sleep 1 秒后程序退出。

程序第 14 行建立了一个非缓冲型的 channel,咱们只看 chan 结构体中的一些重要字段,来从总体层面看一下 chan 的状态,一开始什么都没有:

unbuffered chan

接收

在继续分析前面小节的例子前,咱们先来看一下接收相关的源码。在清楚了接收的具体过程以后,也就能轻松理解具体的例子了。

接收操做有两种写法,一种带 "ok",反应 channel 是否关闭;一种不带 "ok",这种写法,当接收到相应类型的零值时没法知道是真实的发送者发送过来的值,仍是 channel 被关闭后,返回给接收者的默认类型的零值。两种写法,都有各自的应用场景。

通过编译器的处理后,这两种写法最后对应源码里的这两个函数:

// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

chanrecv1 函数处理不带 "ok" 的情形,chanrecv2 则经过返回 "received" 这个字段来反应 channel 是否被关闭。接收值则比较特殊,会“放到”参数 elem 所指向的地址了,这很像 C/C++ 里的写法。若是代码里忽略了接收值,这里的 elem 为 nil。

不管如何,最终转向了 chanrecv 函数:

// 位于 src/runtime/chan.go

// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 若是 ep 是 nil,说明忽略了接收值。
// 若是 block == false,即非阻塞型接收,在没有数据可接收的状况下,返回 (false, false)
// 不然,若是 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 不然,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 若是 ep 非空,则应该指向堆或者函数调用者的栈

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 省略 debug 内容 …………

    // 若是是一个 nil 的 channel
    if c == nil {
        // 若是不阻塞,直接返回 (false, false)
        if !block {
            return
        }
        // 不然,接收一个 nil 的 channel,goroutine 挂起
        gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
        // 不会执行到这里
        throw("unreachable")
    }

    // 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回
    // 当咱们观察到 channel 没准备好接收:
    // 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待
    // 2. 缓冲型,但 buf 里没有元素
    // 以后,又观察到 closed == 0,即 channel 未关闭。
    // 由于 channel 不可能被重复打开,因此前一个观测的时候 channel 也是未关闭的,
    // 所以在这种状况下能够直接宣布接收失败,返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加锁
    lock(&c.lock)

    // channel 已关闭,而且循环数组 buf 里没有元素
    // 这里能够处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的状况
    // 也就是说即便是关闭状态,但在缓冲型的 channel,
    // buf 里有元素的状况下还能接收到元素
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        // 解锁
        unlock(&c.lock)
        if ep != nil {
            // 从一个已关闭的 channel 执行接收操做,且未忽略返回值
            // 那么接收的值将是一个该类型的零值
            // typedmemclr 根据类型清理相应地址的内存
            typedmemclr(c.elemtype, ep)
        }
        // 从一个已关闭的 channel 接收,selected 会返回true
        return true, false
    }

    // 等待发送队列里有 goroutine 存在,说明 buf 是满的
    // 这有多是:
    // 1. 非缓冲型的 channel
    // 2. 缓冲型的 channel,但 buf 满了
    // 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
    // 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // 缓冲型,buf 里有元素,能够正常接收
    if c.qcount > 0 {
        // 直接从循环数组里找到要接收的元素
        qp := chanbuf(c, c.recvx)

        // …………

        // 代码里,没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // 清理掉循环数组里相应位置的值
        typedmemclr(c.elemtype, qp)
        // 接收游标向前移动
        c.recvx++
        // 接收游标归零
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        // buf 数组里的元素个数减 1
        c.qcount--
        // 解锁
        unlock(&c.lock)
        return true, true
    }

    if !block {
        // 非阻塞接收,解锁。selected 返回 false,由于没有接收到值
        unlock(&c.lock)
        return false, false
    }

    // 接下来就是要被阻塞的状况了
    // 构造一个 sudog
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    // 待接收数据的地址保存下来
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.param = nil
    // 进入channel 的等待接收队列
    c.recvq.enqueue(mysg)
    // 将当前 goroutine 挂起
    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

    // 被唤醒了,接着从这里继续执行一些扫尾工做
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}

上面的代码注释地比较详细了,你能够对着源码一行行地去看,咱们再来详细看一下。

  • 若是 channel 是一个空值(nil),在非阻塞模式下,会直接返回。在阻塞模式下,会调用 gopark 函数挂起 goroutine,这个会一直阻塞下去。由于在 channel 是 nil 的状况下,要想不阻塞,只有关闭它,但关闭一个 nil 的 channel 又会发生 panic,因此没有机会被唤醒了。更详细地能够在 closechan 函数的时候再看。

  • 和发送函数同样,接下来搞了一个在非阻塞模式下,不用获取锁,快速检测到失败而且返回的操做。顺带插一句,咱们平时在写代码的时候,找到一些边界条件,快速返回,能让代码逻辑更清晰,由于接下来的正常状况就比较少,更聚焦了,看代码的人也更能专一地看核心代码逻辑了。

// 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

当咱们观察到 channel 没准备好接收:

  1. 非缓冲型,等待发送列队里没有 goroutine 在等待
  2. 缓冲型,但 buf 里没有元素

以后,又观察到 closed == 0,即 channel 未关闭。

由于 channel 不可能被重复打开,因此前一个观测的时候, channel 也是未关闭的,所以在这种状况下能够直接宣布接收失败,快速返回。由于没被选中,也没接收到数据,因此返回值为 (false, false)。

  • 接下来的操做,首先会上一把锁,粒度比较大。若是 channel 已关闭,而且循环数组 buf 里没有元素。对应非缓冲型关闭和缓冲型关闭但 buf 无元素的状况,返回对应类型的零值,但 received 标识是 false,告诉调用者此 channel 已关闭,你取出来的值并非正常由发送者发送过来的数据。可是若是处于 select 语境下,这种状况是被选中了的。不少将 channel 用做通知信号的场景就是命中了这里。

  • 接下来,若是有等待发送的队列,说明 channel 已经满了,要么是非缓冲型的 channel,要么是缓冲型的 channel,但 buf 满了。这两种状况下均可以正常接收数据。

因而,调用 recv 函数:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 若是是非缓冲型的 channel
    if c.dataqsiz == 0 {
        if raceenabled {
            racesync(c, sg)
        }
        // 未忽略接收的数据
        if ep != nil {
            // 直接拷贝数据,从 sender goroutine -> receiver goroutine
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // 缓冲型的 channel,但 buf 已满。
        // 将循环数组 buf 队首的元素拷贝到接收数据的地址
        // 将发送者的数据入队。实际上这时 revx 和 sendx 值相等
        // 找到接收游标
        qp := chanbuf(c, c.recvx)
        // …………
        // 将接收游标处的数据拷贝给接收者
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }

        // 将发送者数据拷贝到 buf
        typedmemmove(c.elemtype, qp, sg.elem)
        // 更新游标值
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx
    }
    sg.elem = nil
    gp := sg.g

    // 解锁
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }

    // 唤醒发送的 goroutine。须要等到调度器的光临
    goready(gp, skip+1)
}

若是是非缓冲型的,就直接从发送者的栈拷贝到接收者的栈。

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
    // dst is on our stack or the heap, src is on another stack.
    src := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

不然,就是缓冲型 channel,而 buf 又满了的情形。说明发送游标和接收游标重合了,所以须要先找到接收游标:

// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

将该处的元素拷贝到接收地址。而后将发送者待发送的数据拷贝到接收游标处。这样就完成了接收数据和发送数据的操做。接着,分别将发送游标和接收游标向前进一,若是发生“环绕”,再从 0 开始。

最后,取出 sudog 里的 goroutine,调用 goready 将其状态改为 “runnable”,待发送者被唤醒,等待调度器的调度。

  • 而后,若是 channel 的 buf 里还有数据,说明能够比较正常地接收。注意,这里,即便是在 channel 已经关闭的状况下,也是能够走到这里的。这一步比较简单,正常地将 buf 里接收游标处的数据拷贝到接收数据的地址。

  • 到了最后一步,走到这里来的情形是要阻塞的。固然,若是 block 传进来的值是 false,那就不阻塞,直接返回就行了。

先构造一个 sudog,接着就是保存各类值了。注意,这里会将接收数据的地址存储到了 elem 字段,当被唤醒时,接收到的数据就会保存到这个字段指向的地址。而后将 sudog 添加到 channel 的 recvq 队列里。调用 goparkunlock 函数将 goroutine 挂起。

接下来的代码就是 goroutine 被唤醒后的各类收尾工做了。

咱们继续以前的例子。前面说到第 14 行,建立了一个非缓冲型的 channel,接着,第 1五、16 行分别建立了一个 goroutine,各自执行了一个接收操做。经过前面的源码分析,咱们知道,这两个 goroutine (后面称为 G1 和 G2 好了)都会被阻塞在接收操做。G1 和 G2 会挂在 channel 的 recq 队列中,造成一个双向循环链表。

在程序的 17 行以前,chan 的总体数据结构以下:

chan struct at the runtime

buf 指向一个长度为 0 的数组,qcount 为 0,表示 channel 中没有元素。重点关注 recvqsendq,它们是 waitq 结构体,而 waitq 实际上就是一个双向链表,链表的元素是 sudog,里面包含 g 字段,g 表示一个 goroutine,因此 sudog 能够当作一个 goroutine。recvq 存储那些尝试读取 channel 但被阻塞的 goroutine,sendq 则存储那些尝试写入 channel,但被阻塞的 goroutine。

此时,咱们能够看到,recvq 里挂了两个 goroutine,也就是前面启动的 G1 和 G2。由于没有 goroutine 接收,而 channel 又是无缓冲类型,因此 G1 和 G2 被阻塞。sendq 没有被阻塞的 goroutine。

recvq 的数据结构以下。这里直接引用文章中的一幅图,用了三维元素,画得很好:

recvq structure

再从总体上来看一下 chan 此时的状态:

chan state

G1 和 G2 被挂起了,状态是 WAITING。关于 goroutine 调度器这块不是今天的重点,固然后面确定会写相关的文章。这里先简单说下,goroutine 是用户态的协程,由 Go runtime 进行管理,做为对比,内核线程由 OS 进行管理。Goroutine 更轻量,所以咱们能够轻松建立数万 goroutine。

一个内核线程能够管理多个 goroutine,当其中一个 goroutine 阻塞时,内核线程能够调度其余的 goroutine 来运行,内核线程自己不会阻塞。这就是一般咱们说的 M:N 模型:

M:N scheduling

M:N 模型一般由三部分构成:M、P、G。M 是内核线程,负责运行 goroutine;P 是 context,保存 goroutine 运行所须要的上下文,它还维护了可运行(runnable)的 goroutine 列表;G 则是待运行的 goroutine。M 和 P 是 G 运行的基础。

MGP

继续回到例子。假设咱们只有一个 M,当 G1(go goroutineA(ch)) 运行到 val := <- a 时,它由原本的 running 状态变成了 waiting 状态(调用了 gopark 以后的结果):

G1 running

G1 脱离与 M 的关系,但调度器可不会让 M 闲着,因此会接着调度另外一个 goroutine 来运行:

G1 waiting

G2 也是一样的遭遇。如今 G1 和 G2 都被挂起了,等待着一个 sender 往 channel 里发送数据,才能获得解救。

发送

接着上面的例子,G1 和 G2 如今都在 recvq 队列里了。

ch <- 3

第 17 行向 channel 发送了一个元素 3。

发送操做最终转化为 chansend 函数,直接上源码,一样大部分都注释了,能够看懂主流程:

// 位于 src/runtime/chan.go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 若是 channel 是 nil
    if c == nil {
        // 不能阻塞,直接返回 false,表示未发送成功
        if !block {
            return false
        }
        // 当前 goroutine 被挂起
        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }

    // 省略 debug 相关……

    // 对于不阻塞的 send,快速检测失败场景
    //
    // 若是 channel 未关闭且 channel 没有多余的缓冲空间。这多是:
    // 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
    // 2. channel 是缓冲型的,但循环数组已经装满了元素
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 锁住 channel,并发安全
    lock(&c.lock)

    // 若是 channel 关闭了
    if c.closed != 0 {
        // 解锁
        unlock(&c.lock)
        // 直接 panic
        panic(plainError("send on closed channel"))
    }

    // 若是接收队列里有 goroutine,直接将要发送的数据拷贝到接收 goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 对于缓冲型的 channel,若是还有缓冲空间
    if c.qcount < c.dataqsiz {
        // qp 指向 buf 的 sendx 位置
        qp := chanbuf(c, c.sendx)

        // ……

        // 将数据从 ep 处拷贝到 qp
        typedmemmove(c.elemtype, qp, ep)
        // 发送游标值加 1
        c.sendx++
        // 若是发送游标值等于容量值,游标值归 0
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // 缓冲区的元素数量加一
        c.qcount++

        // 解锁
        unlock(&c.lock)
        return true
    }

    // 若是不须要阻塞,则直接返回错误
    if !block {
        unlock(&c.lock)
        return false
    }

    // channel 满了,发送方会被阻塞。接下来会构造一个 sudog

    // 获取当前 goroutine 的指针
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil

    // 当前 goroutine 进入发送等待队列
    c.sendq.enqueue(mysg)

    // 当前 goroutine 被挂起
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

    // 从这里开始被唤醒了(channel 有机会能够发送了)
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        // 被唤醒后,channel 关闭了。坑爹啊,panic
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    // 去掉 mysg 上绑定的 channel
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

上面的代码注释地比较详细了,咱们来详细看看。

  • 若是检测到 channel 是空的,当前 goroutine 会被挂起。

  • 对于不阻塞的发送操做,若是 channel 未关闭而且没有多余的缓冲空间(说明:a. channel 是非缓冲型的,且等待接收队列里没有 goroutine;b. channel 是缓冲型的,但循环数组已经装满了元素)

对于这一点,runtime 源码里注释了不少。这一条判断语句是为了在不阻塞发送的场景下快速检测到发送失败,好快速返回。

if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
    return false
}

注释里主要讲为何这一块能够不加锁,我详细解释一下。if 条件里先读了两个变量:block 和 c.closed。block 是函数的参数,不会变;c.closed 可能被其余 goroutine 改变,由于没加锁嘛,这是“与”条件前面两个表达式。

最后一项,涉及到三个变量:c.dataqsiz,c.recvq.first,c.qcount。c.dataqsiz == 0 && c.recvq.first == nil 指的是非缓冲型的 channel,而且 recvq 里没有等待接收的 goroutine;c.dataqsiz > 0 && c.qcount == c.dataqsiz 指的是缓冲型的 channel,但循环数组已经满了。这里 c.dataqsiz 实际上也是不会被修改的,在建立的时候就已经肯定了。不加锁真正影响地是 c.qcountc.recvq.first

这一部分的条件就是两个 word-sized read,就是读两个 word 操做:c.closedc.recvq.first(非缓冲型) 或者 c.qcount(缓冲型)。

当咱们发现 c.closed == 0 为真,也就是 channel 未被关闭,再去检测第三部分的条件时,观测到 c.recvq.first == nil 或者 c.qcount == c.dataqsiz 时(这里忽略 c.dataqsiz),就判定要将此次发送操做做失败处理,快速返回 false。

这里涉及到两个观测项:channel 未关闭、channel not ready for sending。这两项都会由于没加锁而出现观测先后不一致的状况。例如我先观测到 channel 未被关闭,再观察到 channel not ready for sending,这时我觉得能知足这个 if 条件了,可是若是这时 c.closed 变成 1,这时其实就不知足条件了,谁让你不加锁呢!

可是,由于一个 closed channel 不能将 channel 状态从 'ready for sending' 变成 'not ready for sending',因此当我观测到 'not ready for sending' 时,channel 不是 closed。即便 c.closed == 1,即 channel 是在这两个观测中间被关闭的,那也说明在这两个观测中间,channel 知足两个条件:not closednot ready for sending,这时,我直接返回 false 也是没有问题的。

这部分解释地比较绕,其实这样作的目的就是少获取一次锁,提高性能。

  • 若是检测到 channel 已经关闭,直接 panic。

  • 若是能从等待接收队列 recvq 里出队一个 sudog(表明一个 goroutine),说明此时 channel 是空的,没有元素,因此才会有等待接收者。这时会调用 send 函数将元素直接从发送者的栈拷贝到接收者的栈,关键操做由 sendDirect 函数完成。

// send 函数处理向一个空的 channel 发送操做

// ep 指向被发送的元素,会被直接拷贝到接收的 goroutine
// 以后,接收的 goroutine 会被唤醒
// c 必须是空的(由于等待队列里有 goroutine,确定是空的)
// c 必须被上锁,发送操做执行完后,会使用 unlockf 函数解锁
// sg 必须已经从等待队列里取出来了
// ep 必须是非空,而且它指向堆或调用者的栈

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 省略一些用不到的
    // ……

    // sg.elem 指向接收到的值存放的位置,如 val <- ch,指的就是 &val
    if sg.elem != nil {
        // 直接拷贝内存(从发送者到接收者)
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    // sudog 上绑定的 goroutine
    gp := sg.g
    // 解锁
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 唤醒接收的 goroutine. skip 和打印栈相关,暂时不理会
    goready(gp, skip+1)
}

继续看 sendDirect 函数:

// 向一个非缓冲型的 channel 发送数据、从一个无元素的(非缓冲型或缓冲型但空)的 channel
// 接收数据,都会致使一个 goroutine 直接操做另外一个 goroutine 的栈
// 因为 GC 假设对栈的写操做只能发生在 goroutine 正在运行中而且由当前 goroutine 来写
// 因此这里实际上违反了这个假设。可能会形成一些问题,因此须要用到写屏障来规避
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    // src 在当前 goroutine 的栈上,dst 是另外一个 goroutine 的栈

    // 直接进行内存"搬迁"
    // 若是目标地址的栈发生了栈收缩,当咱们读出了 sg.elem 后
    // 就不能修改真正的 dst 位置的值了
    // 所以须要在读和写以前加上一个屏障
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

这里涉及到一个 goroutine 直接写另外一个 goroutine 栈的操做,通常而言,不一样 goroutine 的栈是各自独有的。而这也违反了 GC 的一些假设。为了避免出问题,写的过程当中增长了写屏障,保证正确地完成写操做。这样作的好处是减小了一次内存 copy:不用先拷贝到 channel 的 buf,直接由发送者到接收者,没有中间商赚差价,效率得以提升,完美。

而后,解锁、唤醒接收者,等待调度器的光临,接收者也得以重见天日,能够继续执行接收操做以后的代码了。

  • 若是 c.qcount < c.dataqsiz,说明缓冲区可用(确定是缓冲型的 channel)。先经过函数取出待发送元素应该去到的位置:
qp := chanbuf(c, c.sendx)

// 返回循环队列里第 i 个元素的地址处
func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

c.sendx 指向下一个待发送元素在循环数组中的位置,而后调用 typedmemmove 函数将其拷贝到循环数组中。以后 c.sendx 加 1,元素总量加 1 :c.qcount++,最后,解锁并返回。

  • 若是没有命中以上条件的,说明 channel 已经满了。无论这个 channel 是缓冲型的仍是非缓冲型的,都要将这个 sender “关起来”(goroutine 被阻塞)。若是 block 为 false,直接解锁,返回 false。

  • 最后就是真的须要被阻塞的状况。先构造一个 sudog,将其入队(channel 的 sendq 字段)。而后调用 goparkunlock 将当前 goroutine 挂起,并解锁,等待合适的时机再唤醒。

唤醒以后,从 goparkunlock 下一行代码开始继续往下执行。

这里有一些绑定操做,sudog 经过 g 字段绑定 goroutine,而 goroutine 经过 waiting 绑定 sudog,sudog 还经过 elem 字段绑定待发送元素的地址,以及 c 字段绑定被“坑”在此处的 channel。

因此,待发送的元素地址实际上是存储在 sudog 结构体里,也就是当前 goroutine 里。

好了,看完源码。咱们接着来分析例子,相信你们已经把例子忘得差很少了,我再贴一下代码:

func goroutineA(a <-chan int) {
    val := <- a
    fmt.Println("goroutine A received data: ", val)
    return
}

func goroutineB(b <-chan int) {
    val := <- b
    fmt.Println("goroutine B received data: ", val)
    return
}

func main() {
    ch := make(chan int)
    go goroutineA(ch)
    go goroutineB(ch)
    ch <- 3
    time.Sleep(time.Second)

    ch1 := make(chan struct{})
}

在发送小节里咱们说到 G1 和 G2 如今被挂起来了,等待 sender 的解救。在第 17 行,主协程向 ch 发送了一个元素 3,来看下接下来会发生什么。

根据前面源码分析的结果,咱们知道,sender 发现 ch 的 recvq 里有 receiver 在等待着接收,就会出队一个 sudog,把 recvq 里 first 指针的 sudo “推举”出来了,并将其加入到 P 的可运行 goroutine 队列中。

而后,sender 把发送元素拷贝到 sudog 的 elem 地址处,最后会调用 goready 将 G1 唤醒,状态变为 runnable。

G1 runnable

当调度器光顾 G1 时,将 G1 变成 running 状态,执行 goroutineA 接下来的代码。G 表示其余可能有的 goroutine。

这里其实涉及到一个协程写另外一个协程栈的操做。有两个 receiver 在 channel 的一边虎视眈眈地等着,这时 channel 另外一边来了一个 sender 准备向 channel 发送数据,为了高效,用不着经过 channel 的 buf “中转”一次,直接从源地址把数据 copy 到目的地址就能够了,效率高啊!

send direct

上图是一个示意图,3 会被拷贝到 G1 栈上的某个位置,也就是 val 的地址处,保存在 elem 字段。

关闭

关闭某个 channel,会执行函数 closechan

func closechan(c *hchan) {
    // 关闭一个 nil channel,panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    // 上锁
    lock(&c.lock)
    // 若是 channel 已经关闭
    if c.closed != 0 {
        unlock(&c.lock)
        // panic
        panic(plainError("close of closed channel"))
    }

    // …………

    // 修改关闭状态
    c.closed = 1

    var glist *g

    // 将 channel 全部等待接收队列的里 sudog 释放
    for {
        // 从接收队列里出队一个 sudog
        sg := c.recvq.dequeue()
        // 出队完毕,跳出循环
        if sg == nil {
            break
        }

        // 若是 elem 不为空,说明此 receiver 未忽略接收数据
        // 给它赋一个相应类型的零值
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        // 取出 goroutine
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 相连,造成链表
        gp.schedlink.set(glist)
        glist = gp
    }

    // 将 channel 等待发送队列里的 sudog 释放
    // 若是存在,这些 goroutine 将会 panic
    for {
        // 从发送队列里出队一个 sudog
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }

        // 发送者会 panic
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 造成链表
        gp.schedlink.set(glist)
        glist = gp
    }
    // 解锁
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    // 遍历链表
    for glist != nil {
        // 取最后一个
        gp := glist
        // 向前走一步,下一个唤醒的 g
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        // 唤醒相应 goroutine
        goready(gp, 3)
    }
}

close 逻辑比较简单,对于一个 channel,recvq 和 sendq 中分别保存了阻塞的发送者和接收者。关闭 channel 后,对于等待接收者而言,会收到一个相应类型的零值。对于等待发送者,会直接 panic。因此,在不了解 channel 还有没有接收者的状况下,不能贸然关闭 channel。

close 函数先上一把大锁,接着把全部挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将全部的 sudog 全都唤醒。

唤醒以后,该干吗干吗。sender 会继续执行 chansend 函数里 goparkunlock 函数以后的代码,很不幸,检测到 channel 已经关闭了,panic。receiver 则比较幸运,进行一些扫尾工做后,返回。这里,selected 返回 true,而返回值 received 则要根据 channel 是否关闭,返回不一样的值。若是 channel 关闭,received 为 false,不然为 true。这咱们分析的这种状况下,received 返回 false。

channel 进阶

总结一下操做 channel 的结果:

操做 nil channel closed channel not nil, not closed channel
close panic panic 正常关闭
读 <- ch 阻塞 读到对应类型的零值 阻塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等待发送者时会阻塞
写 ch <- 阻塞 panic 阻塞或正常写入数据。非缓冲型 channel 没有等待接收者或缓冲型 channel buf 满时会被阻塞

总结一下,发生 panic 的状况有三种:向一个关闭的 channel 进行写操做;关闭一个 nil 的 channel;重复关闭一个 channel。

读、写一个 nil channel 都会被阻塞。

发送和接收元素的本质

Channel 发送和接收元素的本质是什么?参考资料【深刻 channel 底层】里是这样回答的:

Remember all transfer of value on the go channels happens with the copy of value.

就是说 channel 的发送和接收操做本质上都是 “值的拷贝”,不管是从 sender goroutine 的栈到 chan buf,仍是从 chan buf 到 receiver goroutine,或者是直接从 sender goroutine 到 receiver goroutine。

这里再引用文中的一个例子,我会加上更加详细地解释。顺带说一下,这是一篇英文的博客,写得很好,没有像咱们这篇文章那样大段的源码分析,它是将代码里状况拆开来各自描述的,各有利弊吧。推荐去读下原文,阅读体验比较好。

type user struct {
    name string
    age int8
}

var u = user{name: "Ankur", age: 25}
var g = &u

func modifyUser(pu *user) {
    fmt.Println("modifyUser Received Vaule", pu)
    pu.name = "Anand"
}

func printUser(u <-chan *user) {
    time.Sleep(2 * time.Second)
    fmt.Println("printUser goRoutine called", <-u)
}

func main() {
    c := make(chan *user, 5)
    c <- g
    fmt.Println(g)
    // modify g
    g = &user{name: "Ankur Anand", age: 100}
    go printUser(c)
    go modifyUser(g)
    time.Sleep(5 * time.Second)
    fmt.Println(g)
}

运行结果:

&{Ankur 25}
modifyUser Received Value &{Ankur Anand 100}
printUser goRoutine called &{Ankur 25}
&{Anand 100}

这里就是一个很好的 share memory by communicating 的例子。

output

一开始构造一个结构体 u,地址是 0x56420,图中地址上方就是它的内容。接着把 &u 赋值给指针 g,g 的地址是 0x565bb0,它的内容就是一个地址,指向 u。

main 程序里,先把 g 发送到 c,根据 copy value 的本质,进入到 chan buf 里的就是 0x56420,它是指针 g 的值(不是它指向的内容),因此打印从 channel 接收到的元素时,它就是 &{Ankur 25}。所以,这里并非将指针 g “发送” 到了 channel 里,只是拷贝它的值而已。

再强调一次:

Remember all transfer of value on the go channels happens with the copy of value.

资源泄漏

Channel 可能会引起 goroutine 泄漏。

泄漏的缘由是 goroutine 操做 channel 后,处于发送或接收阻塞状态,而 channel 处于满或空的状态,一直得不到改变。同时,垃圾回收器也不会回收此类资源,进而致使 gouroutine 会一直处于等待队列中,不见天日。

雨痕老师的《Go 语言学习笔记》第 8 章通道的“资源泄露”一节举了个例子,你们能够本身去看。

happened before

维基百科上给的定义:

In computer science, the happened-before relation (denoted: ->) is a relation between the result of two events, such that if one event should happen before another event, the result must reflect that, even if those events are in reality executed out of order (usually to optimize program flow).

简单来讲就是若是事件 a 和事件 b 存在 happened-before 关系,即 a -> b,那么 a,b 完成后的结果必定要体现这种关系。因为现代编译器、CPU 会作各类优化,包括编译器重排、内存重排等等,在并发代码里,happened-before 限制就很是重要了。

根据晃岳攀老师在 Gopher China 2019 上的并发编程分享,关于 channel 的发送(send)、发送完成(send finished)、接收(receive)、接收完成(receive finished)的 happened-before 关系以下:

  1. 第 n 个 send 必定 happened before 第 n 个 receive finished,不管是缓冲型仍是非缓冲型的 channel。
  2. 对于容量为 m 的缓冲型 channel,第 n 个 receive 必定 happened before 第 n+m 个 send finished
  3. 对于非缓冲型的 channel,第 n 个 receive 必定 happened before 第 n 个 send finished
  4. channel close 必定 happened before receiver 获得通知。

咱们来逐条解释一下。

第一条,咱们从源码的角度看也是对的,send 不必定是 happened before receive,由于有时候是先 receive,而后 goroutine 被挂起,以后被 sender 唤醒,send happened after receive。但无论怎样,要想完成接收,必定是要先有发送。

第二条,缓冲型的 channel,当第 n+m 个 send 发生后,有下面两种状况:

若第 n 个 receive 没发生。这时,channel 被填满了,send 就会被阻塞。那当第 n 个 receive 发生时,sender goroutine 会被唤醒,以后再继续发送过程。这样,第 n 个 receive 必定 happened before 第 n+m 个 send finished

若第 n 个 receive 已经发生过了,这直接就符合了要求。

第三条,也是比较好理解的。第 n 个 send 若是被阻塞,sender goroutine 挂起,第 n 个 receive 这时到来,先于第 n 个 send finished。若是第 n 个 send 未被阻塞,说明第 n 个 receive 早就在那等着了,它不只 happened before send finished,它还 happened before send。

第四条,回忆一下源码,先设置完 closed = 1,再唤醒等待的 receiver,并将零值拷贝给 receiver。

参考资料【鸟窝 并发编程分享】这篇博文的评论区有 PPT 的下载连接,这是晁老师在 Gopher 2019 大会上的演讲。

关于 happened before,这里再介绍一个柴大和曹大的新书《Go 语言高级编程》里面提到的一个例子。

书中 1.5 节先讲了顺序一致性的内存模型,这是并发编程的基础。

咱们直接来看例子:

var done = make(chan bool)
var msg string

func aGoroutine() {
    msg = "hello, world"
    done <- true
}

func main() {
    go aGoroutine()
    <-done
    println(msg)
}

先定义了一个 done channel 和一个待打印的字符串。在 main 函数里,启动一个 goroutine,等待从 done 里接收到一个值后,执行打印 msg 的操做。若是 main 函数中没有 <-done 这行代码,打印出来的 msg 为空,由于 aGoroutine 来不及被调度,还来不及给 msg 赋值,主程序就会退出。而在 Go 语言里,主协程退出时不会等待其余协程。

加了 <-done 这行代码后,就会阻塞在此。等 aGoroutine 里向 done 发送了一个值以后,才会被唤醒,继续执行打印 msg 的操做。而这在以前,msg 已经被赋值过了,因此会打印出 hello, world

这里依赖的 happened before 就是前面讲的第一条。第一个 send 必定 happened before 第一个 receive finished,即 done <- true 先于 <-done 发生,这意味着 main 函数里执行完 <-done 后接着执行 println(msg) 这一行代码时,msg 已经被赋过值了,因此会打印出想要的结果。

书中,又进一步利用前面提到的第 3 条 happened before 规则,修改了一下代码:

var done = make(chan bool)
var msg string

func aGoroutine() {
    msg = "hello, world"
    <-done
}

func main() {
    go aGoroutine()
    done <- true
    println(msg)
}

一样能够获得相同的结果,为何?根据第三条规则,对于非缓冲型的 channel,第一个 receive 必定 happened before 第一个 send finished。也就是说,
done <- true 完成以前,<-done 就已经发生了,也就意味着 msg 已经被赋上值了,最终也会打印出 hello, world

如何优雅地关闭 channel

这部份内容主要来自 Go 101 上的一篇英文文章,参考资料【如何优雅地关闭 channel】能够直达原文。

文章先“吐槽”了下 Go channel 在设计上的一些问题,接着给出了几种不一样状况下如何优雅地关闭 channel 的例子。按照惯例,我会在原做者内容的基础上给出本身的解读,看完这一节你能够再回头看一下英文原文,会以为颇有意思。

关于 channel 的使用,有几点不方便的地方:

  1. 在不改变 channel 自身状态的状况下,没法获知一个 channel 是否关闭。
  2. 关闭一个 closed channel 会致使 panic。因此,若是关闭 channel 的一方在不知道 channel 是否处于关闭状态时就去贸然关闭 channel 是很危险的事情。
  3. 向一个 closed channel 发送数据会致使 panic。因此,若是向 channel 发送数据的一方不知道 channel 是否处于关闭状态时就去贸然向 channel 发送数据是很危险的事情。

文中还真的就给出了一个检查 channel 是否关闭的函数:

func IsClosed(ch <-chan T) bool {
    select {
    case <-ch:
        return true
    default:
    }

    return false
}

func main() {
    c := make(chan T)
    fmt.Println(IsClosed(c)) // false
    close(c)
    fmt.Println(IsClosed(c)) // true
}

看一下代码,其实存在不少问题。首先,IsClosed 函数是一个有反作用的函数。每调用一次,都会读出 channel 里的一个元素,改变了 channel 的状态。这不是一个好的函数,干活就干活,还顺手牵羊!

其次,IsClosed 函数返回的结果仅表明调用那个瞬间,并不能保证调用以后会不会有其余 goroutine 对它进行了一些操做,改变了它的这种状态。例如,IsClosed 函数返回 true,但这时有另外一个 goroutine 关闭了 channel,而你还拿着这个过期的 “channel 未关闭”的信息,向其发送数据,就会致使 panic 的发生。固然,一个 channel 不会被重复关闭两次,若是 IsClosed 函数返回的结果是 true,说明 channel 是真的关闭了。

有一条普遍流传的关闭 channel 的原则:

don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.

不要从一个 receiver 侧关闭 channel,也不要在有多个 sender 时,关闭 channel。

比较好理解,向 channel 发送元素的就是 sender,所以 sender 能够决定什么时候不发送数据,而且关闭 channel。可是若是有多个 sender,某个 sender 一样无法肯定其余 sender 的状况,这时也不能贸然关闭 channel。

可是上面所说的并非最本质的,最本质的原则就只有一条:

don't close (or send values to) closed channels.

有两个不那么优雅地关闭 channel 的方法:

  1. 使用 defer-recover 机制,放心大胆地关闭 channel 或者向 channel 发送数据。即便发生了 panic,有 defer-recover 在兜底。

  2. 使用 sync.Once 来保证只关闭一次。

代码我就不贴上来了,直接去看原文。

这一节的重头戏来了,那应该如何优雅地关闭 channel?

根据 sender 和 receiver 的个数,分下面几种状况:

  1. 一个 sender,一个 receiver
  2. 一个 sender, M 个 receiver
  3. N 个 sender,一个 reciver
  4. N 个 sender, M 个 receiver

对于 1,2,只有一个 sender 的状况就不用说了,直接从 sender 端关闭就行了,没有问题。重点关注第 3,4 种状况。

第 3 种情形下,优雅关闭 channel 的方法是:the only receiver says "please stop sending more" by closing an additional signal channel。

解决方案就是增长一个传递关闭信号的 channel,receiver 经过信号 channel 下达关闭数据 channel 指令。senders 监听到关闭信号后,中止发送数据。我把代码修改地更简洁了:

func main() {
    rand.Seed(time.Now().UnixNano())

    const Max = 100000
    const NumSenders = 1000

    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})

    // senders
    for i := 0; i < NumSenders; i++ {
        go func() {
            for {
                select {
                case <- stopCh:
                    return
                case dataCh <- rand.Intn(Max):
                }
            }
        }()
    }

    // the receiver
    go func() {
        for value := range dataCh {
            if value == Max-1 {
                fmt.Println("send stop signal to senders.")
                close(stopCh)
                return
            }

            fmt.Println(value)
        }
    }()

    select {
    case <- time.After(time.Hour):
    }
}

这里的 stopCh 就是信号 channel,它自己只有一个 sender,所以能够直接关闭它。senders 收到了关闭信号后,select 分支 “case <- stopCh” 被选中,退出函数,再也不发送数据。

须要说明的是,上面的代码并无明确关闭 dataCh。在 Go 语言中,对于一个 channel,若是最终没有任何 goroutine 引用它,无论 channel 有没有被关闭,最终都会被 gc 回收。因此,在这种情形下,所谓的优雅地关闭 channel 就是不关闭 channel,让 gc 代劳。

最后一种状况,优雅关闭 channel 的方法是:any one of them says "let's end the game" by notifying a moderator to close an additional signal channel。

和第 3 种状况不一样,这里有 M 个 receiver,若是直接仍是采起第 3 种解决方案,由 receiver 直接关闭 stopCh 的话,就会重复关闭一个 channel,致使 panic。所以须要增长一个中间人,M 个 receiver 都向它发送关闭 dataCh 的“请求”,中间人收到第一个请求后,就会直接下达关闭 dataCh 的指令(经过关闭 stopCh,这时就不会发生重复关闭的状况,由于 stopCh 的发送方只有中间人一个)。另外,这里的 N 个 sender 也能够向中间人发送关闭 dataCh 的请求。

func main() {
    rand.Seed(time.Now().UnixNano())

    const Max = 100000
    const NumReceivers = 10
    const NumSenders = 1000

    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})

    // It must be a buffered channel.
    toStop := make(chan string, 1)

    var stoppedBy string

    // moderator
    go func() {
        stoppedBy = <-toStop
        close(stopCh)
    }()

    // senders
    for i := 0; i < NumSenders; i++ {
        go func(id string) {
            for {
                value := rand.Intn(Max)
                if value == 0 {
                    select {
                    case toStop <- "sender#" + id:
                    default:
                    }
                    return
                }

                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }(strconv.Itoa(i))
    }

    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func(id string) {
            for {
                select {
                case <- stopCh:
                    return
                case value := <-dataCh:
                    if value == Max-1 {
                        select {
                        case toStop <- "receiver#" + id:
                        default:
                        }
                        return
                    }

                    fmt.Println(value)
                }
            }
        }(strconv.Itoa(i))
    }

    select {
    case <- time.After(time.Hour):
    }

}

代码里 toStop 就是中间人的角色,使用它来接收 senders 和 receivers 发送过来的关闭 dataCh 请求。

这里将 toStop 声明成了一个 缓冲型的 channel。假设 toStop 声明的是一个非缓冲型的 channel,那么第一个发送的关闭 dataCh 请求可能会丢失。由于不管是 sender 仍是 receiver 都是经过 select 语句来发送请求,若是中间人所在的 goroutine 没有准备好,那 select 语句就不会选中,直接走 default 选项,什么也不作。这样,第一个关闭 dataCh 的请求就会丢失。

若是,咱们把 toStop 的容量声明成 Num(senders) + Num(receivers),那发送 dataCh 请求的部分能够改为更简洁的形式:

...
toStop := make(chan string, NumReceivers + NumSenders)
...
            value := rand.Intn(Max)
            if value == 0 {
                toStop <- "sender#" + id
                return
            }
...
                if value == Max-1 {
                    toStop <- "receiver#" + id
                    return
                }
...

直接向 toStop 发送请求,由于 toStop 容量足够大,因此不用担忧阻塞,天然也就不用 select 语句再加一个 default case 来避免阻塞。

能够看到,这里一样没有真正关闭 dataCh,原样同第 3 种状况。

以上,就是最基本的一些情形,但已经能覆盖几乎全部的状况及其变种了。只要记住:

don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.

以及更本质的原则:

don't close (or send values to) closed channels.

关闭的 channel 仍能读出数据

从一个有缓冲的 channel 里读数据,当 channel 被关闭,依然能读出有效值。只有当返回的 ok 为 false 时,读出的数据才是无效的。

func main() {
    ch := make(chan int, 5)
    ch <- 18
    close(ch)
    x, ok := <-ch
    if ok {
        fmt.Println("received: ", x)
    }

    x, ok = <-ch
    if !ok {
        fmt.Println("channel closed, data invalid.")
    }
}

运行结果:

received:  18
channel closed, data invalid.

先建立了一个有缓冲的 channel,向其发送一个元素,而后关闭此 channel。以后两次尝试从 channel 中读取数据,第一次仍然能正常读出值。第二次返回的 ok 为 false,说明 channel 已关闭,且通道里没有数据。

channel 应用

Channel 和 goroutine 的结合是 Go 并发编程的大杀器。而 Channel 的实际应用也常常让人眼前一亮,经过与 select,cancel,timer 等结合,它能实现各类各样的功能。接下来,咱们就要梳理一下 channel 的应用。

中止信号

前面一节如何优雅关闭 channel 那一节已经讲得不少了,这块就略过了。

channel 用于中止信号的场景仍是挺多的,常常是关闭某个 channel 或者向 channel 发送一个元素,使得接收 channel 的那一方获知道此信息,进而作一些其余的操做。

任务定时

与 timer 结合,通常有两种玩法:实现超时控制,实现按期执行某个任务。

有时候,须要执行某项操做,但又不想它耗费太长时间,上一个定时器就能够搞定:

select {
    case <-time.After(100 * time.Millisecond):
    case <-s.stopc:
        return false
}

等待 100 ms 后,若是 s.stopc 尚未读出数据或者被关闭,就直接结束。这是来自 etcd 源码里的一个例子,这样的写法随处可见。

定时执行某个任务,也比较简单:

func worker() {
    ticker := time.Tick(1 * time.Second)
    for {
        select {
        case <- ticker:
            // 执行定时任务
            fmt.Println("执行 1s 定时任务")
        }
    }
}

每隔 1 秒种,执行一次定时任务。

解耦生产方和消费方

服务启动时,启动 n 个 worker,做为工做协程池,这些协程工做在一个 for {} 无限循环里,从某个 channel 消费工做任务并执行:

func main() {
    taskCh := make(chan int, 100)
    go worker(taskCh)

    // 塞任务
    for i := 0; i < 10; i++ {
        taskCh <- i
    }

    // 等待 1 小时 
    select {
    case <-time.After(time.Hour):
    }
}

func worker(taskCh <-chan int) {
    const N = 5
    // 启动 5 个工做协程
    for i := 0; i < N; i++ {
        go func(id int) {
            for {
                task := <- taskCh
                fmt.Printf("finish task: %d by worker %d\n", task, id)
                time.Sleep(time.Second)
            }
        }(i)
    }
}

5 个工做协程在不断地从工做队列里取任务,生产方只管往 channel 发送任务便可,解耦生产方和消费方。

程序输出:

finish task: 1 by worker 4
finish task: 2 by worker 2
finish task: 4 by worker 3
finish task: 3 by worker 1
finish task: 0 by worker 0
finish task: 6 by worker 0
finish task: 8 by worker 3
finish task: 9 by worker 1
finish task: 7 by worker 4
finish task: 5 by worker 2

控制并发数

有时须要定时执行几百个任务,例如天天定时按城市来执行一些离线计算的任务。可是并发数又不能过高,由于任务执行过程依赖第三方的一些资源,对请求的速率有限制。这时就能够经过 channel 来控制并发数。

下面的例子来自《Go 语言高级编程》:

var limit = make(chan int, 3)

func main() {
    // …………
    for _, w := range work {
        go func() {
            limit <- 1
            w()
            <-limit
        }()
    }
    // …………
}

构建一个缓冲型的 channel,容量为 3。接着遍历任务列表,每一个任务启动一个 goroutine 去完成。真正执行任务,访问第三方的动做在 w() 中完成,在执行 w() 以前,先要从 limit 中拿“许可证”,拿到许可证以后,才能执行 w(),而且在执行完任务,要将“许可证”归还。这样就能够控制同时运行的 goroutine 数。

这里,limit <- 1 放在 func 内部而不是外部,书籍做者柴大在读者群里的解释是:

若是在外层,就是控制系统 goroutine 的数量,可能会阻塞 for 循环,影响业务逻辑。

limit 其实和逻辑无关,只是性能调优,放在内层和外层的语义不太同样。

还有一点要注意的是,若是 w() 发生 panic,那“许可证”可能就还不回去了,所以须要使用 defer 来保证。

总结

终于写完了,你也终于看完了,恭喜!

回顾一下,这篇文章先从并发和并行讲起,又讲到了 CSP,Go 语言用 channel 实现 CSP。接着讲了什么是 channel,为何须要 channel,而后详细分析了 channel 的实现原理,这也是全文最重要的部分。以后,又讲了几个进阶的例子,最后,列举了几个 channel 应用的场景。

但愿你们能借助本文去读一下 Go 源码,这部分源码也不长,和 context 包同样,短小精悍,值得一读。

我在参考资料里列举了不少文章、书籍,不少都值得去细看,我在文中也有说起。

当你理解这 channel 的底层原理后,再去看这些英文文章,会以为颇有意思。之前对他有一种“畏难”心理,理解了以后再读,就会以为颇有意思,由于你确实都能看懂。

最后,阅读愉快!

参考资料

【Concurrency In Go】https://github.com/arpitjindal97/technology_books/blob/master/Golang/Concurrency-in-Go:Tools-and-Techniques-for-Developers.pdf

【Go 语言高级编程开源书】https://chai2010.cn/advanced-go-programming-book/ch1-basic/ch1-05-mem.html

【简洁清晰明了】http://litang.me/post/golang-channel/

【柴大 && 曹大 《Go语言高级编程》】https://chai2010.cn/advanced-go-programming-book/ch1-basic/ch1-05-mem.html

【Go 并发编程实战】https://book.douban.com/subject/26244729/

【曹大 golang notes】https://github.com/cch123/golang-notes/blob/master/channel.md

【互联网技术窝 图解 channel 实现 动画】https://mp.weixin.qq.com/s/40uxAPdubIk0lU321LmfRg

【一块儿学 Golang,推荐的资料很是有用】http://www.javashuo.com/article/p-rxijdptq-ex.html

【如何优雅地关闭 channel】https://go101.org/article/channel-closing.html

【深刻 channel 底层】https://codeburst.io/diving-deep-into-the-golang-channels-549fd4ed21a8

【Kavya在Gopher Con 上关于 channel 的设计,很是好】https://speakerd.s3.amazonaws.com/presentations/10ac0b1d76a6463aa98ad6a9dec917a7/GopherCon_v10.0.pdf

【channel 应用】https://www.s0nnet.com/archives/go-channels-practice

【应用举例】https://zhuyasen.com/post/go_queue.html

【应用】https://tonybai.com/2014/09/29/a-channel-compendium-for-golang/

【鸟窝 并发编程分享】https://colobu.com/2019/04/28/gopher-2019-concurrent-in-action/

【Go-Questions,码农桃花源项目】https://github.com/qcrao/Go-Questions

【GitBook 码农桃花源开源书】https://qcrao91.gitbook.io/go/

QR

相关文章
相关标签/搜索