通向Golang的捷径【14. 并发协程和并发通道】

作为一种 21 世纪的编程语言,Go 语言可实现应用程序之间的通讯 (比如网络通讯, cs 模式, 分布式计算等, 参见第 15 章), 同时也实现了并发应用, 也就是在不同的进程或计算机中, 可同步执行相同代码的不同部分, 实现并发程序的基本单元为并发协程 (goroutine) 和并发通道 (channel), 它们的实现需要语言, 编译器和运行时管理的多方支持, 同时 Go 语言的垃圾收集也是一种最基本的并发编程, 通过内存共享无法实现通讯, 在 Go 语言中, 将基于通讯实现内存的共享, 同时通讯是一种对等关系.

14.1 并发协程

14.1.1 介绍

一个运行的应用程序其实是计算机中的一个进程 (process), 而进程又是一个独立的执行单元, 并运行在自身的地址空间 (需进行内存分配) 中, 一个进程可包含一个或多个 OS 线程 (thread), 这些线程可共享同一个地址空间, 并能实现同步执行, 几乎所有的程序都使用了多线程技术, 以便在用户或设备响应中, 不会引入等待时间,还可实现多个请求的同步服务 (比如 web 服务器), 以及增强性能和数据吞吐量 (比如在不同的数据集中, 实现并发执行), 这类并发应用将执行在一个处理器 (核心) 中, 该处理器可提供一组线程, 如果在某一个时刻下, 一组处理器 (核心) 将执行同一个应用程序进程时, 这被称为并行处理 (parallelism).

并行处理可使用多个进程, 以实现快速执行, 所以并发程序不一定是并行处理. 众所周知多线程的应用存在难点, 主要的问题是内存的数据共享, 不同的线程将以一种无法预知的次序, 完成内存数据的维护, 因此可能会导致不可重复的随机结果, 这被称为竞争条件 (racing condition). 不要使用全局变量或共享内存, 因为在并发执行中, 它们会产生不安全的代码.

不同线程的同步需要依赖一个规则, 即数据锁定, 任何时刻下, 只有一个线程可修改数据, 同时 Go 语言已在标准库 (sync 包) 中提供了同步锁定功能, 以方便底层代码的需要, 参见 9.3 节, 基于软件工程的思想, 可知锁定操作将导致更复杂和易出错的编程风格, 同时也会造成性能的下降, 因此在主流的多核 (多处理器单元) 编程中, 通常不会使用锁定操作, 因为它无法提供足够的效率.

Go 语言在诸多的解决方案中, 选择了一种较好的处理方式, 即进程的通讯序列 ( Communicating Sequential Processes,CSP, 来自于 C.Hoare 的贡献), 也被称为消息传递模型 (message passing-model,Erlang 等其他语言也使用了这类并发机制).

在 Go 语言中, 并发协程可同步执行一个应用程序的不同部分, 其中将实现并发的运算操作, 同时一个并发协程和一个操作系统线程之间, 并不存在一一对应的关系, 一个并发协程可基于线程的有效性, 映射到一个或多个线程中 (可实现多路选择), 而 Go 运行时管理的并发协程调度器, 将实现上述的同步机制.

多个并发协程可运行在相同的地址空间, 并能访问已同步的共享内存, 这些操作可由 sync 包完成 (参见 9.3节), 但在 Go 语言中, 需要使用并发通道, 来实现多个协程之间的同步 (参见 14.2 节).

并发协程是一类轻量级的模块 (开销很小), 甚至比线程的开销都小, 因此并发协程只需少量的内存和资源, 在内存堆上, 每个并发协程只需要 4k 的堆栈空间, 由于廉价的特性, 如果需要, 可创建大量的并发协程 (在同一个地址空间中, 可实现 100,000 个并发协程的创建), 另外还可使用一个堆栈段, 以适应内存用量的动态变化,同时也实现了堆栈的自动化管理, 但垃圾收集器不会管理堆栈, 因为并发协程退出后, 堆栈可直接释放.

并发协程可在不同操作系统的线程中运行, 并能基于少量的内存, 处理大量的任务, 由于并发协程可选用 OS的多个线程, 因此少量的 OS 线程可为大量的并发协程提供服务, 同时 Go 语言的运行时管理也足够智能, 可实现并发协程的阻塞, 关闭和任务切换.

目前的并发运行存在两种风格, 可预测 (实现序列化操作) 和不可预测 (利用锁定/ 互斥, 实现无序化操作),Go语言的并发协程和并发通道, 能提供可预测的并发运行 ( 比如并发通道中只有一个发送器和一个接收器), 因此它们更容易理解, 在 14.7 节的通用算法问题中, 我们将比较上述两种并发风格.

并发协程可视为一个函数或方法 (也可视为一个匿名函数或 lambda 函数), 但需要通过关键字 go 实现调用,同时并发协程可并发执行当前的运算, 并且在相同的地址空间中 (但不同的并发协程有独立的堆栈), 比如:
在这里插入图片描述
由于并发协程的堆栈尺寸可动态变化, 因此不会产生堆栈溢出, 所以编程者也无须考虑堆栈的尺寸, 当并发协程退出时, 不会给出返回值. 每个 Go 程序都有 main() 函数, 而 main 函数必须视为一个并发协程, 虽然它未使用关键字 go, 但在程序的初始化 (init) 函数中, 并发协程就可运行.

如果在单核平台下, 当首个并发协程 (通常是 main 函数) 运行时, 可在执行循环中, 周期调用 runtime.Gosched(), 以使处理器能允许其他并发协程的运行, 这也不会挂起当前的并发协程, 而当前的并发协程的执行将会自动恢复 ( 它调用了 runtime.Gosched), 而使用 runtime.Gosched() 可使处理器的资源分配和通讯更加优化, 以避免出现饥饿问题.

14.1.2 并发与并行的区别

Go 语言的并发机制提供了一种更好的并发编程的思路, 可使用独立的执行单元, 来描述整个程序, 所以 Go 语言没有强调某一点的并行处理, 而是考虑并发程序能否并行的问题, 在多核处理器平台下, 并行处理可实现快速运行, 但在大多数情况下, 一个设计良好的并发程序, 也可提供优秀的并行性能.

在 Go 运行时管理的当前实现 (截止 2012 年 1 月) 中,Go 并未默认使用并行代码, 而是基于单核 (单处理器)平台来考虑 Go 程序, 并且不理会并发协程的数量, 因此这些并发协程处于并发处理, 而非并行处理, 任意一个时刻下, 只能运行一个并发协程.

上述的 Go 语言机制可能会发生变化, 为了在多核平台下, 程序能实现同步执行, 则必须保证并发协程能进行并行处理, 所以需要使用环境变量 GOMAXPROCS, 该变量可告知 Go 运行时管理, 可实现同步执行的并发协程的数量. 另外, 只有 gc 编译器可创建真正的并发协程, 也就是将并发协程映射到 OS 线程中, 而 gccgo 编译器只能为每个并发协程, 创建一个 OS 线程.

14.1.3 GOMAXPROCS 的用法

在 gc 编译器 (6g 或 8g) 下, 必须设定 GOMAXPROCS, 它的值必须大于 1(默认值), 以允许 Go 运行时管理可使用多个线程, 否则所有的并发协程将共享一个线程, 如果 GOMAXPROCS > 1, 可为所有并发协程, 提供一个包含多个线程的线程池, 而 gccgo 编译器的 GOMAXPROCS 变量, 应当与并发协程的个数相等, 假定计算机中包含了 n 个核心 (处理器单元), 如果环境变量 GOMAXPROCS >= n, 或是调用了 run-time.GOMAXPROCS(n), 所有并发协程将被分配到 n 个核心中, 当然多核心并不意味着, 设备性能必定能出现一个线性增长, 因为核心之间需要更多的通讯, 即消息传递的开销也将增加, 基于经验可知, 如果系统中包含了 n 个核心, 将 GOMAXPROCS 设为 n-1, 可获得最佳的性能, 同时应满足以下条件:

并发协程的个数 > GOMAXPROCS + 1 > 1

如果只存在一个并发协程, 则无须设定 GOMAXPROCS, 同时还有实用经验可以分享, 如果将 GOMAXPROCS设为 9, 可提升单核系统的性能, 在一个 32 核系统中, 如果 GOMAXPROCS=8 可获得最佳的性能, 根据评估结果, 即使增大 GOMAXPROCS, 性能也不会出现提高, 如果 GOMAXPROCS 过大, 反而会造成性能的小幅下降, 另外 GOMAXPROCS 应等于线程数, 如果系统中包含了多核处理器, 那么多个线程可在多个核心中并行.

14.1.4 多核信息的传递

使用 flags 包, 可实现以下操作:
在这里插入图片描述
当并发协程调用 runtime.Goexit(), 可停止自身的执行, 但这类操作不太常见.

例 14.1 goroutine1.go

在这里插入图片描述
函数 main(),longWait(),shortWait() 将依次启动, 三者都可视为独立的处理单元, 并实现并行操作, 每个函数都可在启动和结束时刻, 输出一条消息, 为了模拟这些函数的处理, 这里使用了 time 包的 Sleep数,Sleep 可将函数或并发协程的处理, 暂停一个预定的时间, 这里的时间单位是纳秒 (1e9 表示 10 的 9 次方).

我们希望上述函数的消息可依次打印, 但实际上是同时完成的, 因为这些函数是同步执行, 让 main 暂停 10s,是为了保证另两个并发协程可完成执行, 如果 main 过早地终止执行 (比如只暂停 4s),longWait() 将无法完成,如果 main 不给出等待语句, 而立即结束, 两个并发协程也将随之结束.

当 main 函数返回时, 程序将立即退出, 不会等待其他并发协程的执行. 因此并发协程通常会用于服务器程序,为了响应一个连接请求, 将会启动一个并发协程, 服务于连接请求, 而 server() 主函数一直处于运行态, 上述操作通常会在一个死循环中进行. 另外, 并发协程是一个独立的执行单元, 而一组并发协程之间并不存在相互联系, 同时代码中包含的逻辑关系, 不能与并发协程的调用次序有关.

为了与单线程的连续执行进行比较, 可将关键字 go 移除, 重新执行上述示例, 可得以下输出结果:
在这里插入图片描述
并发协程的另一个经典应用实例, 是在超大数组中, 搜索所需的数组元素, 首先将整个数组分隔成一组不重叠的 slice, 再为每个 slice 配置一个并发协程, 用于元素的搜索, 同时在这类搜索操作中, 将会使用一组并行线程,因此整个搜索时间可大幅降低 (基于并发协程的个数).

14.1.5 并发协程与协程的区别

在 C#,Lua,Python 中, 也提供了协程 (coroutine) 的概念, 从名称可知, 它与并发协程很相似, 但也存在两点不同:
• 并发协程隐含了并行机制, 而协程无法实现并行.
• 通过并发通道, 可实现并发协程间的通讯, 而协程则需要通过获取和释放操作, 才可实现协程间的通讯.

并发协程的功能比协程更强大, 且更容易在并发协程中, 移植所需的协同逻辑.

14.2 并发通道

14.2.1 概念

在之前的示例中, 独立执行了多个的并发协程, 并发协程之间并没有通讯, 为了实现更强大的功能, 并发协程间应实现通讯, 也就是在并发协程间发送和接收信息, 实现所需的协同操作, 而使用共享变量, 也可实现并发协程间的通讯, 但是不推荐采用这类操作, 因为在多线程工作条件下, 它所需要的内存共享, 很难完美实现.

基于上述原因,Go 语言提供了一种特殊类型 channel(并发通道), 它可视为一个管道, 可传输所需的数值, 以实现并发协程间的通讯, 同时避免了内存共享所带来的陷阱, 通过并发通道的通讯, 可实现同步功能, 并能用于数据传输, 任意时刻下, 一个数据元素只能被一个并发协程所访问, 从而避免了数据访问的竞争, 同时数据元素的访问权限 (读写权限) 也可传递.

并发通道可比喻成工厂的传送带, 一台设备 (并发协程给出的生产者) 可放置物品到传送带中, 另一个设备 (并发协程给出的消费者) 可从传送带上拾取物品. 同时并发通道还可实现双向通讯 (数据互换), 在任意时刻下, 两个并发协程可实现下图的同步操作:
在这里插入图片描述
声明一个并发通道的通用格式如下:
在这里插入图片描述
如果并发通道未被初始化, 那么它的数值为 nil, 因此一个并发通道只能传输一种类型的数据, 比如 chan int 或chan string, 但并发通道可支持所有的类型, 也包括空接口, 而空接口能在并发通道中, 给出另一个并发通道.

并发通道实质上是一个消息队列 (包含类型), 并给出了先进先出 (FIFO) 的规则, 这可保证数据的传输次序(类似于 Unix shell 的双向管道), 同时并发通道也是一个引用类型, 因此可使用 make() 函数, 为其分配所需的内存, 以下将声明一个字符串类型的并发通道 ch1, 之后会进行初始化:
在这里插入图片描述
还可创建一个 int 类型的两重并发通道 (并发通道的并发通道):
在这里插入图片描述
这时并发通道包含了一个函数类型, 它将保存传入该函数的实参, 该函数的返回值, 并且基于并发通道, 可传递这些数值, 由于这些数值都有类型, 因此可进行类型检查, 以捕获编程错误, 比如在整型并发通道中, 传递一个指针类型.

14.2.2 通讯操作符 (<-)

通讯操作符使用了一个箭头, 来表示数据传输的方向, 这使得该操作符的应用更加直观.

发送数据给并发通道

ch <- int1 表示将变量 int1 发送给并发通道 ch.

从并发通道接收数据

int2 = <- ch 表示变量 int2 将接收来自于并发通道 ch 的数据, 假定 int2 已经声明, 如果 int2 未声明, 上述语句还可写为 int2 := <- ch.

<- ch 可用于获取并发通道的下一个数值, 当前数据将被丢弃, 同时还可对并发通道包含的数值进行检查, 因此以下是合法代码:
在这里插入图片描述
通讯操作符 <-既可用于发送, 也可用于接收,Go 语言将基于操作目标, 来判断通讯操作符的意图, 虽然不是强制要求, 为了提高可读性, 并发通道的名称通常会加入 ch 前缀或是包含 chan 字符, 值得注意的是, 并发通道的发送和接收操作都是不可中断的, 参见以下示例:

例 14.2 goroutine2.go

在这里插入图片描述
在这里插入图片描述
在 main() 函数中, 将运行两个并发协程,sendData() 将发送 5 个字符串给并发通道 ch, getData() 可从并发通道中, 逐个接收字符串, 并进行打印. 如果两个并发协程需要进行通讯, 则必须将同一个并发通道作为参数, 传递给两个并发协程.

从上述示例可知, 并发协程之间的同步已变得十分重要:
• 如果不允许 sendData() 使用并发通道, 生成所需的输出结果, main() 将等待 1s, 以使两个并发协程能执行完毕.
• getData() 将工作在一个死循环中, 当 getData() 接收完毕 sendData() 发送的字符串, 并发通道 ch 将为空.
• 如果移除上述示例的关键字 go, 该程序将无法工作,Go 运行时管理将抛出一个故障:
在这里插入图片描述
为什么会出现上述故障? 因为 Go 运行时管理可检测到, 所有并发协程所等待的目标 (读取或写入并发通道),这时会发现程序无法完成所需的处理, 也就是死锁 (deadlock), 而 Go 运行时管理可识别出死锁状态.
注意: 不要使用打印语句, 来跟踪并发通道的发送和接收次序, 因为打印语句的速度太慢 (远低于并发通道的发送和接收操作的实际速度), 因此当打印信息出现时, 那一刻的发送和接收次序早已不同,

14.2.3 并发通道的阻塞

默认情况下, 上述通讯可实现同步 (且无缓冲), 如果接收器未完成接收, 发送也未完成, 因此一个无缓冲的并发通道中, 并无保存传输数据的缓冲, 因此并发通道的接收器必须就绪后, 发送器才可直接传输数据给接收器,所以在此类并发通道中, 除非接收器和发送器都就绪, 否则发送/接收操作都将被阻塞:

• 除非接收器就绪, 否则同一并发通道的发送操作都将被阻塞 (并发协程或函数提供的发送操作), 如果未
出现接收器, 可接收并发通道的传输数据, 其他数据也无法放入并发通道, 当并发通道不为空, 将无法发
送新数据, 因此除非并发通道变为有效 (为空), 否则发送操作都将继续等待, 当并发通道的传输数据被接
收后, 并发通道将变为有效.

• 除非出现一个发送器, 否则同一并发通道的接收操作都将被阻塞 (并发协程或函数提供的接收操作), 如
果并发通道未提供传输数据, 接收器将被阻塞.

虽然上述规则存在一些限制, 但是在实际情况中, 传输操作能够良好运行, 在以下示例中, 并发协程 pump 将在一个死循环中, 基于并发通道, 传输整型值, 由于未给出接收器, 所以输出一直为 0.

例 14.3 channel_block.go

在这里插入图片描述
在这里插入图片描述
pump() 函数将为并发通道, 提供传输数据, 因此有时它被称为数据发生器, 定义一个并发通道的接收器 suck,可解锁并发通道的传输:

例 14.4 channel_block2.go

在这里插入图片描述
并在 main() 中添加上述并发协程, 如下:
在这里插入图片描述
当上述程序运行 1s 之后, 将实现上万个整型数据的传输.

14.2.4 并发通道的数据互换

在同步通讯的条件下, 两个并发协程可基于通讯协议, 实现数据互换, 而无缓冲并发通道可实现多个并发协程的完美同步, 当并发通道的两端出现阻塞时, 这被称为死锁状态 (deadlock situation),Go 运行时管理可检查到该状态, 并会产生一个故障, 以使程序停止运行, 而死锁状态几乎都来自于不良的程序设计.

如上所述, 无缓冲并发通道的操作可被阻塞, 为了避免类似阻塞的出现, 除了优化程序设计之外, 还可选择带缓冲的并发通道.

14.2.5 异步并发通道 (带缓冲的并发通道)

无缓冲并发通道只能包含一个传输数据, 因此在应用过程中, 存在诸多限制, 所以可提供一个缓冲给并发通道,在 make 函数中, 可加入对应的缓冲, 如下:
在这里插入图片描述
buf 可指定并发通道能保存的数据个数 (这里是字符串类型), 除非缓冲已满 (缓冲已被传输数据所占满), 否则缓冲并发通道的发送操作都不会出现阻塞, 同样除非缓冲为空, 否则缓冲并发通道的接收操作都不会出现阻塞.

缓冲的容量本质上与类型无关, 如果与类型无关, 可能存在一些危险性, 如果并发通道两端出现了不同的缓冲容量, 应当保证类型一致的数据元素. 内建函数 cap 可返回并发通道的缓冲容量.

如果缓冲容量大于 0, 就可实现异步并发通道, 如果容量为零或是缺省, 即为同步并发通道, 比如ch :=
make(chan type, value):
• value == 0 将得到一个同步的无缓冲的并发通道 (需要频繁阻塞)
• value > 0 将得到一个异步的带缓冲的并发通道 (缓冲的容量将决定是否存在阻塞)

如果在并发通道中使用了缓冲, 程序可处理更多的请求操作, 因此可实现更大的灵活性, 所以在设计初期可使用无缓冲并发通道, 如果出现问题, 可以引入带缓冲的并发通道.

14.2.6 并发通道的输出

为了了解运算的进展, 可通过并发通道, 报告当前的运算结果, 比如 go sum(bigArray) 示例中:
在这里插入图片描述
此时可选择一个同步并发通道, 该并发通道将等同于其他语言的信号量 (semaphore), 但操作方式有所不同, 当一个进程 (即一个并发协程中) 正在执行时, 可通过一个并发通道, 传递各种信号值.

一个基本的用法, 是让 main 永久阻塞, 并利用 select 语句 (可视为 main 函数的最后一条语句), 运行其他的并发协程, 并使用一个并发通道, 使得 main 等待其他并发协程完成执行, 这被称为信号量模式, 参见下一节.

14.2.7 信号量模式

在以下示例中, 并发协程 compute 传递了一个数值给并发通道 ch, 用于告知 main 函数, 它已经执行完毕, 而main 函数将使用 <- ch 语句, 等待上述数值的出现, 在并发通道中, 将获取到一个结果值, 如下:
在这里插入图片描述
同时信号值也可包含其他信息, 并非只能给出结果值, 比如 lambda(匿名) 函数的并发协程:
在这里插入图片描述
以下示例中, 将等待两个并发协程的结束, 它们将用于排序, 而每个并发协程将实现 s(slice 类型) 中某一部分的排序:
在这里插入图片描述
在以下的示例中, 将给出一个完整的信号量模式, 基于一个 slice(float64 类型), 将实现N个 doSomething() 运算的并行处理, 并发通道 sem 给出了对应的长度 (空接口类型可适应任何类型), 并在每次运算结束时, 给出一个通知 (传递一个数值), 为了等待所有并发协程能执行完毕, 为并发通道 sem, 提供了一个接收循环:
在这里插入图片描述
在这里插入图片描述
另外,i 和 xi 可视为形参, 可由 for 循环提供, 并允许每个并发协程可包含 i 和 xi 的一个副本, 否则在 for 循环的下一次迭代中, 所有并发协程的 i 和 xi 都将被更新, 同时 res 并未进行传递, 因为并发协程无须给 res 进行处理, 所以 res 并不是一个参数.

14.2.8 并行 for 循环

在上一节中, 给出了以下的代码片段,for 循环的每次迭代可实现并行, 如下:
在这里插入图片描述
并行处理 for 循环的每次迭代, 可极大提升处理性能, 但必须保证每次迭代之间不存在关联性, 在有些语言中(比如 Fortress), 给出了类似的并行框架, 但是使用 Go 语言的并发协程, 更容易实现并行处理.

14.2.9 缓冲并发通道实现的信号量

信号量是一个较为通用的同步机制, 并能用于实现互斥, 以限制多个资源的访问权限, 解决资源的读写冲突, 但在 sync 包中, 并未实现信号量, 但可使用缓冲并发通道, 模拟一个信号量,
• 缓冲并发通道的缓冲容量, 应等于需同步的资源数
• 并发通道的长度 (当前已保存的数据个数) 应等于当前已使用的资源数
• 缓冲容量减去并发通道的长度, 应等于空闲资源的个数

如果无须在并发通道中存储数据 (即同步并发通道), 则可创建一个长度为 0 的并发通道, 如下:
在这里插入图片描述
之后可使用一个整型值, 初始化一个信号量, 整型值可表示有效资源的个数 (N),
在这里插入图片描述
对应的信号量操作简单明了:
在这里插入图片描述
在这里插入图片描述
使用上述操作, 可实现一个互斥:
在这里插入图片描述

示例: 并发通道的工厂模式

在编程风格中, 将使用一种更为通用的模式, 以替换将并发通道作为实参, 传递给并发协程的编程方式, 即提供一个函数, 它可创建并发通道且能将其返回 (此时该函数可视为一个工厂), 同时在该函数中, 还能提供一个可作为并发协程而调用的 lambda(匿名) 函数, 如下:

例 14.5 channel_idiom.go

在这里插入图片描述

14.2.10 在并发通道中使用 for-range 语句

在 for 循环的 range 语句中, 可接收一个并发通道 ch, 这时在 for 循环中, 可基于来自并发通道 ch 的接收数据进行迭代, 如下:
在这里插入图片描述
for 语句可从并发通道中读取数据, 直到并发通道关闭后, 将退出 for 语句, 很显然需要另一个并发协程, 实现对 ch 的写入 (否则 for 循环将出现阻塞), 当完成写入后, 还必须关闭 ch(以便退出 for 循环), 以下的 suck 函数能运行一个并发协程, 并实现上述操作,

例 14.6 channel_idiom2.go

在这里插入图片描述

并发通道的迭代器模式

在例 14.6 中使用的方法, 可演变为一种更通用的模式, 也就是将 container 类型 (其中将包含一个可寻址的索引域 items) 的元素, 放入一个并发通道中, 之后可基于 container 类型, 定义一个 Iter() 方法, 它可返回一个只读并发通道 (已包含了 container 元素), 如下:
在这里插入图片描述
在并发协程中, 可使用 for 循环, 遍历容器 c 的所有元素, 在树形或图形算法中, 简单的 for 循环可替换为一个深度优先的搜索. 之后可调用该方法, 实现基于容器的迭代:
在这里插入图片描述
上述语句将运行在自身的并发协程中, 因此上述的迭代器将使用一个并发通道和两个并发协程 (会运行在不同的线程中), 这类方式即典型的生产者-消费者模式, 如果在并发协程向并发通道写入数据之前, 程序已经终止,该并发协程不会视为垃圾而清理, 这时并发协程的设计理念, 上述处理更像是一个错误操作, 因为并发通道可实现线程安全的通讯, 在上下文中, 如果并发协程挂起了并发通道的写入操作, 那么并发通道则无法实现读取,因此这将被视为一个错误, 而无须给出所谓的垃圾收集.

生产者与消费者模式

假定 Produce() 函数可传递 Consume() 函数所需的数值, 同时这两个函数将运行在不同的并发协程中, Produce可向并发通道传递数据, 而 Consume 可从并发通道读取数据, 这些操作都将放置在死循环中:
在这里插入图片描述

14.2.11 并发通道的方向

channel 类型可被定义为发送端并发通道 (只能发送) 或接收端并发通道 (只能接收),
在这里插入图片描述
接收端并发通道 (<-chan T) 不能自行关闭, 因为关闭一个并发通道意味着, 发送端已无数据需要传输, 所以接收端无法获知这类信息, 所有的并发通道都可实现双向传输, 以下将声明一个双向的并发通道变量:
在这里插入图片描述

管道与过滤器模式

以下示例将在一个并发协程 processChannel 中, 从接收端并发通道 (输入) 接收数据, 同时基于发送端并发通道 (输出) 发送该数据.
在这里插入图片描述
在使用双向并发通道时, 必须确定并发协程中, 不会出现非法的并发通道操作. 在以下示例中, 将基于不同的过滤器, 输出不同组的数据集, 这也就是一个过滤算法.
在这里插入图片描述

例 14.7 sieve1.go(第一版)

在这里插入图片描述在这里插入图片描述
并发协程 filter(in, out chan int, prime int) 可将整型值, 复制到输出 (发送端) 并发通道中, 并移除能被 prime整除的整型值, 每个 prime 都将给出一个新的并发协程, 而这些并发协程将工作在同一个进程中, 因此数值发生器和过滤器将并发执行.

在第二个版本中,sieve,generate,filter 函数都实现了工厂模式, 它们都能创建一个可返回的并发通道, 以及使用匿名函数生成并发协程, 这使得 main 函数能够更加简洁, 当其调用 sieve() 时, 可返回一个包含 primes 值的并发通道, 之后可通过 fmt.Println(<-primes) 完成打印.

例 14.8 sieve2.go(第二版)

在这里插入图片描述
在这里插入图片描述

14.3 并发协程的同步

并发通道能够显式关闭, 但是它又与文件不同, 一般情况下, 并不需要关闭并发通道, 只有当接收器被告知, 已无数据传输时, 才可关闭并发通道, 因此只有发送器可关闭并发通道, 而接收器不会关闭并发通道.

比如在例 14.2 中, 如何通告 sendData() 正在使用并发通道? 以及 getData() 如何确认, 并发通道处于关闭状态还是阻塞状态?

close(ch) 函数可使并发通道, 不再接受发送操作符 <-给出的数据发送, 因此在已关闭的并发通道中, 进行发送或是重复关闭时, 将引发一个运行时故障. 一种推荐的编程方式是, 在创建并发通道之后, 即使用 defer 语句,给出并发通道的关闭 (当然也需根据实际的情况而定), 如下:
在这里插入图片描述
_ = ch <- v 可触发一个未阻塞的发送操作, 下划线可标记并发通道 ch 的发送, 以下示例将对例 14.2 进行改进, 并生成相同的输出, 为了实现未阻塞并发通道的读取, 还需要使用 select 语句, 参见 14.4 节.

例 14.9 goroutine3.go

在这里插入图片描述
在这里插入图片描述
上述示例给出了以下改进:
• 只有 sendData() 是一个并发协程, 而 getData() 将与 main() 运行在同一线程中.
在这里插入图片描述
• 在 sendData() 函数末尾, 关闭了并发通道
在这里插入图片描述
• 在 getData() 的 for 循环中, 每次接收前, 都需要使用 if !open 测试并发通道.

推荐在 for-range 语句中, 进行并发通道的读取, 因为它可自动检测并发通道的关闭:
在这里插入图片描述

阻塞和生产者与消费者模式

在 14.2.10 节的迭代器模式中, 两个并发协程的关系为, 一个并发协程可阻塞另一个并发协程, 如果程序运行在一个多核 PC 中, 那么只有一个核心可工作 (其他核心都被阻塞), 使用一个缓冲尺寸大于 0 的并发通道, 可缓解阻塞的发生, 比如缓冲尺寸为 100, 迭代器可在阻塞前, 产生 100 个数据元素, 如果消费者并发协程工作在一个独立的核心中, 那么两个并发协程 (生产者与消费者) 都可能不会产生阻塞.

由于容器的数据元素通常是已知的, 因此可使用一个容量足够的并发通道, 来保存这些数据元素, 那么上述的迭代器将不会发生阻塞, 虽然在某些容器的迭代中, 需要双倍的内存空间, 但是并发通道的容量将受到一些最大值的限制, 因此代码的时序设计和评估测试, 有利于在最小内存用量和最佳性能之间, 找到适合的缓冲容量.

14.4 select 语句

使用 select 关键字, 可从多个运行的并发协程中获取数据, 这与 switch 语句很相似, 因此也被称为通讯 switch语句, 它更像是一种轮询机制,select 可监听并发通道的发送数据, 并确认这些数据是否满足于 case 条件, 如下:
在这里插入图片描述
default 条件是一个可选项, 但 fall through 语句不允许使用, 当 case 语句中, 出现 break 或 return 时,select将终止, 因此 select 可在多条通讯通道中进行选择:
• 如果所有通讯都被阻塞,select 将处于等待状态, 直到一条通讯通道开启
• 如果存在多条通讯通道,select 将随机选择一条
• 当所有并发通道都未给出发送数据 (所有通讯都暂停), 且存在 default 条件时, 将执行该默认条件

在包含 default 条件的 select 语句中, 使用发送操作, 可保证 select 不会被阻塞, 如果 select 的所有 case 条件都无法匹配 (且不存在 default) 时, 将会一直阻塞. select 实际上是一种监听器, 通常会在一个循环中使用, 当某个条件满足时, 会使用 break 退出循环.

在以下示例中, 给出了两个并发通道 ch1 和 ch2, 以及三个并发协程 pump1(),pump2(),suck(), 它们构成了一个典型的生产者-消费者模式.

在一个死循环中,pump1() 和 pump2() 可将整型值, 发送给 ch1 和 ch2, 同样 suck() 也会在一个死循环中进行轮询, 通过 select 语句, 从 ch1 和 ch2 中接收数据, 并会输出这些数据, 以及传输对应数据的并发通道的编号,整个程序只运行 1s.

例 14.10 goroutine_select.go

在这里插入图片描述
在这里插入图片描述
1s 产生的输出信息相当惊人, 如果给出计数结果, 可达 90,000 次以上.

后台服务器模式

服务器通常可视为一个运行在死循环中的后台并发协程, 并能通过 select 语句, 从多个并发通道中获取数据, 以及对数据进行处理.
在这里插入图片描述
同时还有其他应用程序, 可通过 ch1 和 ch2 等并发通道发送数据, 并且还提供了一条终止并发通道, 它可使服务器终止运行. 另一种可能存在的模式 (不太灵活), 即所有请求 (客户端) 都传递给一个 chRequest(用于请求的并发通道), 后台协程将基于该并发通道, 使用 switch, 处理不同的请求.
在这里插入图片描述

14.5 并发通道的超时和计时器

time 包还提供了一些函数, 能与并发通道一同使用, 并且还包含了一个计时器结构 time.Ticker, 它可基于特定的时间间隔, 向 (该结构包含的) 并发通道 C 重复发送一个时间值,
在这里插入图片描述
在这里插入图片描述
时间间隔的单位为 ns(ns 值可视为一个 int64 类型), 在工厂函数 time.NewTicker 中, 还定义了一个 Duration类型的变量 dur,
在这里插入图片描述
在并发协程的执行中, 以下函数可提供一些帮助, 比如登录操作的时间信息, 当前打印输出的时间信息, 运算执行的时间信息等, 也就是这类可基于周期时间进行累积的时间值.

一个计时器可被 Stop() 函数停止, 并能用于 defer 语句, 同时也满足 select 语句的要求:
在这里插入图片描述
time.Tick() 函数的原型为func Tick(d Duration) <-chan Time, 如果只需访问无须关闭的返回的并发通道, 可使用该函数, 它可基于周期值 d(ns 数), 向返回的并发通道发送时间值, 如果需要限制对应处理的间隔时间, 也很简单, 可使用以下示例, client.Call() 函数是一个 RPC(远端程序调用) 调用, 参见 15.9 节.
在这里插入图片描述
这将导致只能在指定的间隔时间内, 进行新请求的处理,chRate 并发通道将阻塞速率更高的请求, 同时可根据设备当前的负载和资源, 增加或降低处理操作的速率.

Timer 类型与 Ticker 类型很相似, 并可使用NewTimer(d Duration) 创建, 但在间隔时间 d 后,Timer 类型只能提供一次时间值, 即 time.After(d) 函数, 它的原型如下:
在这里插入图片描述
在间隔时间 d 之后, 当前时间值可发送给返回的并发通道, 这等同于NewTimer(d).C, 也与 Tick() 很相似, 但After() 只能发送一次时间值, 在以下示例的 select 中, 展示了 default 语句的重要性.

例 14.11 timer_goroutine.go

在这里插入图片描述
在这里插入图片描述

简单的超时模式

如果需要等待 1s(预计需要 1s 的时间, 数据才会出现), 再从并发通道 ch 中读取数据, 首先需创建一个并发通道 (用于通告), 当匿名并发协程向并发通道 (用于数据传输) 发送数据之前, 会先进入睡眠状态,
在这里插入图片描述
之后使用 select 语句, 从 ch 和 timeout 并发通道中接收数据, 如果 1s 之后, 未从并发通道 ch 中读到数
据,timeout 可放弃对 ch 的读取:
在这里插入图片描述
也可使用 time.After() 函数, 替代 timeout(超时) 并发通道, 它也可用于 select 语言中, 以通告一个超时或是终止一个并发协程的运行, 在以下示例中, 当 timeoutNs 时间值过去后,client.Call 依然未发送数据到并发通道时, 将产生一次接收超时:
在这里插入图片描述
如果并发通道的缓冲容量为 1, 可避免并发协程的死锁, 并保证超时并发通道的垃圾收集.

假定需要从多个备份数据库中实现同步读取, 同时只需处理最快出现的读取结果,Query 函数包含一个数据库连接的 slice, 以及一个查询字符串, 它可对多个备份数据库进行并发查询, 并返回最快出现的查询结果.
在这里插入图片描述
在结果并发通道 ch 中, 提供了缓冲, 这可保证最先出现的查询结果能被保存, 以确定查询操作是否成功, 同时最先出现的查询结果, 并不依赖于执行次序, 同时调用 runtime.Goexit(), 可终止一个正在执行的并发协程.

在需要与数据库传递数据 (或是数据存储) 的应用中, 通常会将数据缓存在内存, 因为从数据库中获取数据的开销太大, 当然在数据库不会发生变换的情况下, 读取开销是可以接受的, 如果数据库会发生变换, 则需要使用一种机制, 即实现数据库的重复读取 (周期性), 因此缓存数据会失效 (超时或到期), 同时不会将原有值显示给用户, 可参考网页http://www.tideland.biz/CachingValues, 它给出了一个并发协程和一个 Ticker 对象之间的用法.

14.6 并发协程的 recover

在 13.3 节中, 使用了 recover, 终止了服务器的一个出错并发协程, 同时不会影响到服务器的其他并发协程.
在这里插入图片描述
在上述示例中, 如果 do(work) 出现故障, 将被记录到日志, 同时对应的并发协程将被终止, 同时不会影响到其他的并发协程.

由于 recover 只能在延期 (defer) 函数中调用 (才有意义), 否则 recover 将一直返回 nil, 而延期函数也可使用包含 panic 和 recover 的库函数, 比如 safelyDo(), 可调用 recover 之前, 使用一个日志函数, 而日志程序的执行, 并不会影响到当前的故障状态, 由于给出了 recover(恢复) 模式, 当 do 函数调用 panic 后, 可使自身从一些错误状态中恢复, 但恢复操作必须放置在 (出现故障的) 并发协程中, 并且无法恢复外部的其他并发协程, 更进一步的讨论, 可参考页面http://www.tideland.biz/SupervisingGoroutines(ref.43).

14.7 工作模型的比较

假定我们需完成一组任务, 而每个任务需配置一个工作单元 (worker), 因此任务可定义成一个结构 (此时结构中的细节并不重要):
在这里插入图片描述

使用共享内存, 实现同步

共享内存即为保存所有任务的内存池, 为了实现同步执行, 并避免竞争条件, 需为内存池配置一个互斥锁:
在这里插入图片描述
sync.Mutex(参见 9.3 节) 就是一个互斥锁, 它可保护临界区的入口, 以保证任意时刻下, 只有一个并发协程 (或线程) 可进入临界区, 如果允许多个并发协程同时进入临界区, 将出现竞争条件, 这时 Pool 结构也无更新的必要, 在传统模式中 (比如 C++,Java,C# 等 OO 语言), 工作单元可使用以下代码:
在这里插入图片描述
大多数工作单元都可并发运行, 它们也可使用并发协程, 首先工作单元将锁定内存池, 并从内存池中获取到首个任务, 解锁内存池, 再进行任务处理, 锁定操作可保证任意时刻下, 只有一个工作单元可访问内存池, 一个任务只能被分配给一个进程, 如果不存在锁定, 工作单元的处理有可能在 task := pool.Tasks[0] 和 pool.Tasks= pool.Tasks[1:] 之间中断, 从而得到一个不可预知的结果, 而有些工作单元可能无法获取到一个任务, 同时另一些工作单元可能获取到多个任务, 因此在一组工作单元中必须使用锁定操作, 以维持同步运行, 如果内存池(Pool) 相当大, 则需要实现进程和任务之间的大量分配, 这时的处理效率将被锁定操作 (加锁和解锁) 拉低, 这也是一个瓶颈, 当工作单元的个数增加到一定数量时, 性能将显著降低.

使用并发通道

以下将使用任务的并发通道, 来实现同步处理, 一个并发通道可用于接收任务请求, 另一个并发通道可用于任务执行, 工作单元将在并发协程中实现, 而并发协程的个数将与任务数相匹配.

main 函数就是一个主函数, 如下:
在这里插入图片描述
在这里插入图片描述
任务处理相当简单, 也就是从 pending 并发通道中获取一个任务, 并进行处理, 再将处理完毕的任务, 放入done 并发通道:
在这里插入图片描述
这里不存在锁定, 任务的获取也不存在竞争, 如果任务数增加, 工作单元也需要相应增加, 但性能不会降低 (如之前的示例), 因为 pending 并发通道只是给出了任务的一个副本, 因此首个工作单元可简单获取首个任务 (参见 14.2.2, 基于并发通道的读取和写入, 都是一个原子 (不可中断的) 操作), 之后可进行任务处理, 同时这也无法预测任务与工作单元之间的关联结果, 工作单元的个数增加, 也将导致通讯开销的增加, 但不会对性能造成影响.

在上述讨论中, 比较了两种同步模型, 在应用程序中, 复杂的锁定应用, 需要大量的经验才能掌握, 如果无法承受巨大的复杂度, 可选择第二种模型. 这不仅仅是带来性能的提升, 同时还可实现简洁优雅的代码, 这也将带来极大的优势, 它也是 Go 语言所推荐的编程模式.

使用并发通道, 替代锁定操作

在这里插入图片描述
在上述的操作模型中, 通过并发通道将实现工作单元 (并发协程) 的通讯, 同时还给出一个作为调度器的主程序, 如果系统分布在多个设备中, 有些设备只需运行工作单元, 而主程序与工作单元之间, 可通过网络通道(netchan) 或 rpc 进行通讯, 参见第 15 章.

在并发通道中使用互斥

尽管在这一章, 强调了在并发协程中, 使用并发通道的重要性, 虽然 Go 语言是一种全新的系统语言, 但并不意味着, 它无法使用互斥锁定, 因此 Go 语言可基于不同的问题类型, 而选择不同的语言特性, 从而构建出更优雅, 简单和可读的解决方案, 以获得更佳的性能, 为了得到更好的解决方案, 无须害怕使用互斥, 如果互斥能提供更好的解决方式, 则无须强制使用一种语言特性, 可基于以下规则进行判断:
• 使用锁定操作 (互斥) 的场合:
▶ 在一个共享的数据结构中进行数据缓冲
▶ 需保存状态信息, 也就是应用程序的上下文或状态信息

• 使用并发通道的场合:
▶ 异步通讯
▶ 分布式单元的操作
▶ 需传输数据的所有权

如果锁定操作过于复杂, 则可自行判断, 改用并发通道是否能降低复杂度.

14.8 生成器 (generator)

生成器函数可生成一个数值序列, 并在每次调用时, 逐个返回该数值序列所包含的数值.
在这里插入图片描述
因此该函数可视为一个生产者, 但只能逐个返回一个数值, 而不是一次性返回整个数值序列, 这被称为延迟求值 (lazy evaluation), 因此该函数的运算速度很快, 并需要提供存储单元 (会占用 CPU 和内存资源), 该函数可基于外部调用, 实现表达式的逐次计算, 比如生成一个偶数的无穷序列, 如果一次性生成上述序列几乎不可能,同时也无法提供保存该序列的内存, 而使用一个并发通道和一个并发协程, 即可实现无穷序列的逐次计算.

在以下示例中, 实现了 int 数值发生器的并发通道, 同时给出了两个并发通道 yield 和 resume,

例 14.12 lazy_evaluation.go

在这里插入图片描述
上述示例与之前的示例稍有不同, 从并发通道中读到的数值, 实际上刚刚生成, 因此在读取操作启动时, 该数值并未生成, 如果需要这类传输流程, 可使用一个请求者-响应者机制, 由于生成器的开销巨大, 同时生成的结果与生成次序无关, 因此生成器可在内部, 使用并发协程实现并行计算, 同时应当避免产生过多的并发协程, 从而造成性能的下降.

以下将给出一些通用原则, 为了实现一个灵活的处理方式, 可使用空接口, 对于上层 (高阶) 函数, 可实现一个通用构造器 BuildLazyEvaluator, 以实现逐次运算函数 (该函数可放置在一个 utility(工具) 包中), 该构造器的形参中, 可包含一个用于运算的函数, 以及一个初始状态, 同时构造器可返回一个无形参的函数 (该函数可返回所需的数值), 传入构造器的运算函数, 可基于状态值实参, 计算返回值, 而在构造器的死循环中, 可创建一个并发通道和一个并发协程, 运算函数的返回值可传递给并发通道, 并被 (构造器的) 返回函数获取, 当运算函数的返回值被获取后, 将准备下一个数值的运算.

在以下示例中, 将定义一个 evenFunc 函数, 它可逐次生成一个偶数序列, 在 main() 函数中, 将生成前 10 个偶数, 这时调用 even() 逐次生成的偶数序列, 同时需要将通用的运算函数, 传递给BuildLazyIntEvaluator, 之后可将其返回值传递给顶层函数 even.

例 14.13 general_lazy_evaluation1.go

在这里插入图片描述
在这里插入图片描述

14.9 预约 (Future)

预约是指有些情况下, 在使用某个数值之前, 需要先计算出该数值, 那么在计算中, 有可能需要使用其他核心所包含的数值, 当需要这类数值时, 它们可处于就绪状态.

通过锁定或并发协程, 很容易实现预约, 这与生成器很相似, 但预约只需返回一个数值. ref.18(参考书目 18) 给出了一个很好的示例, 先假定了一个 Matrix 类型, 之后需要计算两个矩阵 a 和 b 倒数的乘积, 因此首先给出Inverse(m) 函数 (求倒数), 之后是 Product 函数 (乘积), 上述两函数都将放入 InverseProduct(),
在这里插入图片描述
在上述示例中,a 和 b 的倒数必须首先计算, 这时出现一个问题, 即 b_inv 的计算无须等待 a_inv 完成计算, 这两个求倒数运算可并行, 也就是说 Product 需要等待 a_inv 和 b_inv 结果, 才可实现运算, 如下:
在这里插入图片描述
InverseFuture() 可在并发协程中运行, 之后可将逆矩阵 (矩阵的倒数), 传递给并发通道 future,
在这里插入图片描述
在开发一个运算包时, 可基于预约功能, 设计所有的 API, 同时包中使用的预约, 可维护 API 之间的关联性,另外基于异步 API, 预约功能也可导出, 同时包实现的并行代码, 可使用户代码更加简单, 可参考 ref.18, 网页为http://www.golangpatterns.info/concurrency/futures.

14.10 多路复用

14.10.1 典型的 cs 模式

客户端-服务器应用程序是并发协程与并发通道的最佳应用示例, 客户端程序可运行在已 (物理) 连接到服务器的任意设备中, 同时客户端程序可发送一个请求消息, 服务器可接收该请求, 并进行相应的处理, 之后可发送一个响应消息给客户端, 在通常情况下, 会有多个客户端 (因此会存在多个请求消息) 和一个 (或是少量) 服务器,因此可将客户端视为一个可请求 web 页面的浏览器, 同时 web 服务器可将 web 页面, 发送给浏览器.

在 Go 语言中, 服务器通常会在一个并发协程中, 实现对客户端的响应, 所以该并发协程可处理所有的客户端请求, 通常情况下, 客户端请求还可包含一个并发通道, 而服务器可利用这个并发通道, 进行响应消息的发送, 在以下示例的 Request 结构中, 嵌入了一个 reply 并发通道:
在这里插入图片描述
为了保持简单的描述风格, 服务器将在一个并发协程的 run() 函数中, 处理所有的客户端请求, 其中会将 binOp(op类型) 转换成 int 类型, 之后可将转换结果, 发送给 reply 并发通道:
在这里插入图片描述
服务器应用程序其实是一个死循环, 并能从 chan *Request(并发通道) 中接收客户端请求, 为了避免长时间的处理操作, 所导致的并发通道的阻塞, 在实际应用中, 通常会为每个客户端请求, 生成一个独立的并发协程:
在这里插入图片描述
在 startServer 函数中, 服务器程序也将运行在一个并发协程中:
在这里插入图片描述
main 函数将调用 startServer 函数, 在以下的测试示例中, 有 100 个客户端请求, 将传递给服务器, 当这些请求发送完毕后, 我们将倒序检查这些请求,
在这里插入图片描述
在这里插入图片描述
上述程序只启动了 100 个并发协程, 即使启动了 100,000 个并发协程, 整个程序也可在几秒内完成, 这也可证明并发协程的轻量级, 如果启动相同数量的线程, 整个程序很快会崩溃.

例 14.14 multiplex_server.go

在这里插入图片描述
在这里插入图片描述

14.10.2 关闭服务器

在上一节的示例中, 当 main 返回时, 并未给出服务器的退出代码, 因此将出现服务器的强制退出, 所以需要为服务器, 提供一个用于退出的并发通道 quit.
在这里插入图片描述
之后在 server 函数中, 可使用 select 语句, 选择 service(服务) 并发通道或是 quit(退出) 并发通道,
在这里插入图片描述
如果 true 送入 quit 并发通道, 服务器将终止运行, 在 main 函数中, 还需要进行以下改动:
在这里插入图片描述
因此在以下示例中, 给出了完整的代码:

例 14.15 multiplex_server2.go

在这里插入图片描述
在这里插入图片描述

14.11 限制请求的并发个数

使用一个带缓冲的并发通道 (参见 14.2.5 节), 可满足客户端请求的最大并发数, 在以下示例中, 并发处理的请求数不能超过 MAXREQS, 因为并发通道 sem 的缓冲已被填完, 请求的接收函数将被阻塞, 即不再接收其他的请求, 直到 sem 中的请求被移除, 所以 sem 可视为一个信号量,

例 14.16 max_tasks.go

在这里插入图片描述
在这里插入图片描述
使用上述方式, 应用程序可在资源受限 (比如内存) 的情况下进行优化, 所以使用一个带缓冲的并发通道, 可使受限资源满足并发协程的需要, 而并发通道还能包含信号量功能.

14.12 并发协程链

在以下示例中, 给出了如何启动巨量并发协程的办法, 这类操作通常发生在 main 函数的 for 循环中, 而 0 次(首次) 循环将发送给最右侧的并发通道, 之后将启动 100,000 个并发协程, 而 100,000 次打印的时间将少于1.5s, 同时在示例中, 也给出了命令行中设定并发协程个数的办法, 而并发协程的个数将被解析到 flag.Int, 如果命令行给出选项-n=7000, 则表示将启动 7000 个并发协程.

例 14.17 chaining.go

在这里插入图片描述
在这里插入图片描述

14.13 多核的并行计算

假定存在多个核心, 比如const NCPU = 4, 即四核处理器, 因此需将运算分隔成四份, 使其同步执行. 以下代码只是一个框架 (省略了实际参数):
在这里插入图片描述
• DoAll() 函数可创建一个并发通道 sem, 以使并行运算在完成后, 可发出通告, 在 for 循环中, 将启动四个
并发协程, 每个并发协程将运行四分之一的局部运算, 当 DoPart() 完成局部运算后, 将在并发通道 sem
中, 发出一个通告.
• DoAll() 将在一个 for 循环中, 等待所有并发协程的完成, 这时并发通道 sem 可视为一个信号量, 从代码
也可知, 这是信号量的一个典型用法.

也可将运行时管理的 GOMAXPROCS 参数, 设为 NCPU.

14.14 基于海量数据的并行计算

假设需要处理海量的数据元素, 并且这些数据彼此独立, 可将这些数据放入 in 并发通道, 经过数据处理后, 又可将处理结果放入 out 并发通道, 这类操作方式可视为一条工厂流水线, 每个数据元素的处理都将包含若干个步骤: 预处理-步骤 A-步骤 B-…-后处理.

为了实现分步骤的顺序处理, 可给出一个典型的流水线算法,
在这里插入图片描述
在这里插入图片描述
任意时刻下, 只会执行一个步骤, 同时每个数据元素都将接受顺序处理, 在第一个数据元素的预处理之前, 不能开始第二个数据元素的处理步骤, 如果不遵循这一原则, 将浪费大量的时间, 另一个更高效的运算方式, 则是在并发协程中, 独立处理每个数据元素的运算步骤, 每一步骤的输入数据都来自于之前步骤的输出并发通道, 这种方式可丢失更少的处理时间, 大多数的处理时间都可被利用:
在这里插入图片描述
利用并发通道的缓冲大小, 还可对整个处理进行更大的优化.

14.15 漏桶算法 (leaky bucket)

在客户端-服务器模式中, 客户端并发协程可执行一个死循环, 并在死循环中, 从一些数据源 (比如网络) 中接收传输数据, 而这些传输数据可放入缓冲 (Buffer 类型) 中, 为了避免缓冲的连续分配和连续释放, 可使用一个空链表, 并能在一个缓冲并发通道中使用它,
在这里插入图片描述
这个队列 (可重用缓冲) 可与服务器共享, 当客户端从 freeList 获取缓冲数据时, 如果并发通道为空, 将分配一个新的缓冲内存, 一旦接收数据完成, 该数据将被发送给服务器的 serverChan 并发通道,
在这里插入图片描述
以下是客户端的算法代码:
在这里插入图片描述
在这里插入图片描述
在服务器的循环中, 将接收来自于客户端的所有消息, 并进行处理, 之后将返回给到共享链表的存储中,
在这里插入图片描述
当 freeList 已满, 将不会进行任何操作, 这时垃圾收集器将从该缓冲中, 一点一点整理出可用的空闲单元, 这类似于一个漏桶.

14.16 并发协程的评估

在 13.7 节中提到了, 在自定义函数中进行评估检测的意义, 这里可对并发协程进行评估, 也就是向并发协程传递一些 int 值, 之后再进行读取, 因此这类函数将调用 N(比如 N=1000,000) 次 testing.Benchmark, 而 Bench-MarkResult 包含了一个 String() 方法, 它可输出评估结果, 而 N 值也可由 gotest 工具来确定, 以得到一个更合理的评估结果. 在一般函数中, 进行评估的方式是一致的.

如果需要将代码的某些部分, 排除在评估之外, 或是需要确定特殊部分的执行时间, 可调用 testing.B.StopTimer()和 testing.B.StartTimer() 函数, 以给出定时器的起始时间和结束时间, 只有所有测试都通过后, 才可运行评估功能.

例 14.18 benchmark_channels.go

在这里插入图片描述
在这里插入图片描述

14.17 对象的并发访问

为了完成对象的并发修改, 并替换 sync.Mutex 的锁定机制, 将使用一个后台并发协程, 以实现匿名函数的顺序执行.

在以下示例中, 将定义了一个 Person 类型, 它将包含一个数据域 chF, 它是匿名函数可用的一个并发通道, 使用构造器方法 NewPerson 可初始化 Person 类型, 同时 NewPerson 将调用一个 backend() 方法 (它将作为一个并发协程),backend() 将在一个死循环中, 启动存在在 chF 中的所有函数, 这可实现高效的串行机制, 以提供安全的并发访问, 同时它还可获取基于 chF 生成的匿名函数入口, 并对薪资 (salary) 进行修改和获取, 而Salary 方法可创建一个并发通道 fChan.

以下示例比较简单, 无法直接用于上述的场景中, 但它可指引读者, 处理一些更复杂的环境.

例 14.19 conc_access.go

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述