老听 clojure 社区的人提起 core.async ,说它如何好用,如何简化了并发编程的模型,不禁得勾起了个人好奇心,想了解一番其思想的源头:CSP 模型及受其启发的 goroutine 和 channel 。html
CSP 描述这样一种并发模型:多个Process 使用一个 Channel 进行通讯, 这个 Channel 连结的 Process 一般是匿名的,消息传递一般是同步的(有别于 Actor Model)。git
CSP 最先是由 Tony Hoare 在 1977 年提出,听说老爷子至今仍在更新这个理论模型,有兴趣的朋友能够自行查阅电子版本:http://www.usingcsp.com/cspbook.pdf。github
严格来讲,CSP 是一门形式语言(相似于 ℷ calculus),用于描述并发系统中的互动模式,也所以成为一众面向并发的编程语言的理论源头,并衍生出了 Occam/Limbo/Golang…golang
而具体到编程语言,如 Golang,其实只用到了 CSP 的很小一部分,即理论中的 Process/Channel(对应到语言中的 goroutine/channel):这两个并发原语之间没有从属关系, Process 能够订阅任意个 Channel,Channel 也并不关心是哪一个 Process 在利用它进行通讯;Process 围绕 Channel 进行读写,造成一套有序阻塞和可预测的并发模型。编程
What is a goroutine? It’s an independently executing function, launched by a go statement.
It has its own call stack, which grows and shrinks as required.
It’s very cheap. It’s practical to have thousands, even hundreds of thousands of goroutines.
It’s not a thread.
There might be only one thread in a program with thousands of goroutines.
Instead, goroutines are multiplexed dynamically onto threads as needed to keep all the goroutines running.
But if you think of it as a very cheap thread, you won’t be far off.并发― Rob Pikeasync
以上是 Rob Pike 在 Google I/O 2012 上给出的描述,归纳下来其实就一句话:编程语言
goroutine 能够视为开销很小的线程(既不是物理线程也不是协程,但它拥有本身的调用栈,而且这个栈的大小是可伸缩的 不是协程,它有本身的栈),很好用,须要并发的地方就用 go 起一个 func,goroutine走起
ide
在 Golang 中,任何代码都是运行在 goroutine里,即使没有显式的 go func()
,默认的 main 函数也是一个 goroutine。函数
但 goroutine 不等于操做系统的线程,它与系统线程的对应关系,牵涉到 Golang 运行时的调度器:
调度器由三方面实体构成:
三者对应关系:
上图有2个 物理线程 M,每个 M 都拥有一个上下文(P),每个也都有一个正在运行的goroutine(G)。
P 的数量可由 runtime.GOMAXPROCS()
进行设置,它表明了真正的并发能力,便可有多少个 goroutine 同时运行。
调度器为何要维护多个上下文P 呢?由于当一个物理线程 M 被阻塞时,P 能够转而投奔另外一个OS线程 M(即 P 带着 G 连茎拔起,去另外一个 M 节点下运行)。这是 Golang调度器厉害的地方,也是高并发能力的保障。
channel 是 goroutine 之间通讯(读写)的通道。由于它的存在,显得 Golang(或者说CSP)与传统的共享内存型的并发模型大相径庭,用 Effective Go 里的话来讲就是:
Do not communicate by sharing memory; instead, share memory by communicating.
在 Golang 的并发模型中,咱们并不关心是哪一个 goroutine(匿名性)在用 channel,只关心 channel 的性质:
好比我但愿在程序里并发的计算并传递一个整型值,我就会定义一个 int 型的 channel:
value := make(chan int)
无缓冲的 channel因为 make 这个 channel 并未提供第二个参数capacity,所以这个 channel 是不带缓冲区的,即同步阻塞的channel:
它有以下特色:
1. 不能够在同一个 goroutine 中既读又写,不然将会死锁,抛出如
fatal error: all goroutines are asleep - deadlock!
这样的错误,如下代码片段是这种典型:
func deadlock() { ch := make(chan int) ch <- 2 x := <-ch log.Println(x) }
2. 两个goroutine中使用无缓冲的channel,则读写互为阻塞,即双方代码的执行都会阻塞在 <-ch 和 ch <- 处,只到双方读写完成在 ch 中的传递,各自继续向下执行,此处借用CSP 图例说明:
goroutine 在无缓冲 channel 上交互的代码:
func nolock() { ch := make(chan int) go func() { ch <- 2 log.Println("after write") }() x := <-ch log.Println("after read:", x) }
有缓冲的 channel
在 make 时传递第二参 capacity,即为有缓冲的 channel:
ch := make(chan int, 1)
这样的 channel 不管是否在同一 goroutine 中,都可读写而不致死锁,看看以下片段,你猜它会输出什么:
ch := make(chan int, 1) for i := 0; i < 10; i++ { select { case x := <-ch: fmt.Println(x) case ch <- i: } }
举个粟子
网上看来的求素数的例子:使用若干个 goroutine (根据求解范围 N 而定)作素数的筛法,即
从2开始每找到一个素数就标记全部能被该素数整除的全部数。直到没有可标记的数,剩下的就都是素数。下面以找出10之内全部素数为例,借用 CSP 方式解决这个问题。
代码以下:
package main import "fmt" func Processor(seq <-chan int, wait chan struct{}, level int) { go func() { prime, ok := <-seq if !ok { close(wait) return } fmt.Printf("[%d]: %d\n", level, prime) out := make(chan int) Processor(out, wait, level+1) for num := range seq { if num%prime != 0 { out <- num } } close(out) }() } func main() { origin, wait := make(chan int), make(chan struct{}) Processor(origin, wait, 1) for num := 2; num < 10; num++ { origin <- num } close(origin) <-wait }
FAQ
v, ok :=
,要么使用 v := range ch
形式接收