原文:Fundamentals of concurrent programminghtml
译者:youngsterxyfjava
本文是一篇并发编程方面的入门文章,以Go语言编写示例代码,内容涵盖:linux
在开始阅读本文以前,你应该知道如何编写简单的Go程序。若是你熟悉的是C/C++、Java或Python之类的语言,那么 Go语言之旅 能提供全部必要的背景知识。也许你还有兴趣读一读 为C++程序员准备的Go语言教程 或 为Java程序员准备的Go语言教程。git
Go容许使用go
语句开启一个新的运行期线程,即 goroutine,以一个不一样的、新建立的goroutine来执行一个函数。同一个程序中的全部goroutine共享同一个地址空间。程序员
Goroutine很是轻量,除了为之分配的栈空间,其所占用的内存空间微乎其微。而且其栈空间在开始时很是小,以后随着堆存储空间的按需分配或释放而变化。内部实现上,goroutine会在多个操做系统线程上多路复用。若是一个goroutine阻塞了一个操做系统线程,例如:等待输入,这个线程上的其余goroutine就会迁移到其余线程,这样能继续运行。开发者并不须要关心/担忧这些细节。github
下面所示程序会输出“Hello from main goroutine”。也可能会输出“Hello from another goroutine”,具体依赖于两个goroutine哪一个先结束。golang
func main() { go fmt.Println("Hello from another goroutine") fmt.Println("Hello from main goroutine") // 至此,程序运行结束, // 全部活跃的goroutine被杀死 }
接下来的这个程序,多数状况下,会输出“Hello from main goroutine”和“Hello from another goroutine”,输出的顺序不肯定。但还有另外一个可能性是:第二个goroutine运行得极其慢,在程序结束以前都没来得及输出相应的消息。c#
func main() { go fmt.Println("Hello from another goroutine") fmt.Println("Hello from main goroutine") time.Sleep(time.Second) // 等待1秒,等另外一个goroutine结束 }
下面则是一个相对更加实际的示例,其中定义了一个函数使用并发来推迟触发一个事件。
// 函数Publish在给定时间过时后打印text字符串到标准输出 // 该函数并不会阻塞而是当即返回 func Publish(text string, delay time.Duration) { go func() { time.Sleep(delay) fmt.Println("BREAKING NEWS:", text) }() // 注意这里的括号。必须调用匿名函数 }
你可能会这样使用Publish
函数:
func main() { Publish("A goroutine starts a new thread of execution.", 5*time.Second) fmt.Println("Let’s hope the news will published before I leave.") // 等待发布新闻 time.Sleep(10 * time.Second) fmt.Println("Ten seconds later: I’m leaving now.") }
这个程序,绝大多数状况下,会输出如下三行,顺序固定,每行输出之间相隔5秒。
$ go run publish1.go Let’s hope the news will published before I leave. BREAKING NEWS: A goroutine starts a new thread of execution. Ten seconds later: I’m leaving now.
通常来讲,经过睡眠的方式来编排线程之间相互等待是不太可能的。下一章节会介绍Go语言中的一种同步机制 - 管道,并演示如何使用管道让一个goroutine等待另外一个goroutine。
管道是Go语言的一个构件,提供一种机制用于两个goroutine之间经过传递一个指定类型的值来同步运行和通信。操做符<-
用于指定管道的方向,发送或接收。若是未指定方向,则为双向管道。
chan Sushi // 可用来发送和接收Sushi类型的值 chan<- float64 // 仅可用来发送float64类型的值 <-chan int // 仅可用来接收int类型的值
管道是引用类型,基于make函数来分配。
ic := make(chan int) // 不带缓冲的int类型管道 wc := make(chan *Work, 10) // 带缓冲的Work类型指针管道
若是经过管道发送一个值,则将<-
做为二元操做符使用。经过管道接收一个值,则将其做为一元操做符使用:
ic <- 3 // 往管道发送3 work := <-wc // 从管道接收一个指向Work类型值的指针
若是管道不带缓冲,发送方会阻塞直到接收方从管道中接收了值。若是管道带缓冲,发送方则会阻塞直到发送的值被拷贝到缓冲区内;若是缓冲区已满,则意味着须要等待直到某个接收方获取到一个值。接收方在有值能够接收以前会一直阻塞。
关闭管道(Close)
close 函数标志着不会再往某个管道发送值。在调用close
以后,而且在以前发送的值都被接收后,接收操做会返回一个零值,不会阻塞。一个多返回值的接收操做会额外返回一个布尔值用来指示返回的值是否发送操做传递的。
ch := make(chan string) go func() { ch <- "Hello!" close(ch) }() fmt.Println(<-ch) // 输出字符串"Hello!" fmt.Println(<-ch) // 输出零值 - 空字符串"",不会阻塞 fmt.Println(<-ch) // 再次打印输出空字符串"" v, ok := <-ch // 变量v的值为空字符串"",变量ok的值为false
一个带有range
子句的for
语句会依次读取发往管道的值,直到该管道关闭:
func main() { // 译注:要想运行该示例,须要先定义类型Sushi,如type Sushi string var ch <-chan Sushi = Producer() for s := range ch { fmt.Println("Consumed", s) } } func Producer() <-chan Sushi { ch := make(chan Sushi) go func(){ ch <- Sushi("海老握り") // Ebi nigiri ch <- Sushi("鮪とろ握り") // Toro nigiri close(ch) }() return ch }
下一个示例中,咱们让Publish
函数返回一个管道 - 用于在发布text变量值时广播一条消息:
// 在给定时间过时时,Publish函数会打印text变量值到标准输出 // 在text变量值发布后,该函数会关闭管道wait func Publish(text string, delay time.Duration) (wait <-chan struct{}) { ch := make(chan struct{}) go func() { time.Sleep(delay) fmt.Println("BREAKING NEWS:", text) close(ch) // 广播 - 一个关闭的管道都会发送一个零值 }() return ch }
注意:咱们使用了一个空结构体的管道:struct{}
。这明确地指明该管道仅用于发信号,而不是传递数据。
咱们可能会这样使用这个函数:
func main() { wait := Publish("Channels let goroutines communicate.", 5*time.Second) fmt.Println("Waiting for the news...") <-wait fmt.Println("The news is out, time to leave.") }
这个程序会按指定的顺序输出如下三行内容。最后一行在新闻(news)一出就会当即输出。
$ go run publish2.go Waiting for the news... BREAKING NEWS: Channels let goroutines communicate. The news is out, time to leave.
如今咱们在Publish
函数中引入一个bug:
func Publish(text string, delay time.Duration) (wait <-chan struct{}) { ch := make(chan struct{}) go func() { time.Sleep(delay) fmt.Println("BREAKING NEWS:", text) // 译注:注意这里将close函数调用注释掉了 //close(ch) }() return ch }
主程序仍是像以前同样开始运行:输出第一行,而后等待5秒,这时Publish
函数开启的goroutine会输出突发新闻(breaking news),而后退出,留下主goroutine独自等待。
func main() { wait := Publish("Channels let goroutines communicate.", 5*time.Second) fmt.Println("Waiting for the news...") // 译注:注意下面这一句 <-wait fmt.Println("The news is out, time to leave.") }
此刻以后,程序没法再继续往下执行。众所周知,这种情形即为死锁。
死锁是线程之间相互等待,其中任何一个都没法向前运行的情形。
Go语言对于运行时的死锁检测具有良好的支持。当没有任何goroutine可以往前执行的情形发生时,Go程序一般会提供详细的错误信息。如下就是咱们的问题程序的输出:
Waiting for the news... BREAKING NEWS: Channels let goroutines communicate. fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]: main.main() .../goroutineStop.go:11 +0xf6 goroutine 2 [syscall]: created by runtime.main .../go/src/pkg/runtime/proc.c:225 goroutine 4 [timer goroutine (idle)]: created by addtimer .../go/src/pkg/runtime/ztime_linux_amd64.c:73
大多数状况下找出Go程序中形成死锁的缘由都比较容易,那么剩下的就是如何解决这个bug了。
死锁也许听起来使人挺忧伤的,但伴随并发编程真正灾难性的错误实际上是数据竞争,至关常见,也可能很是难于调试。
当两个线程并发地访问同一个变量,而且其中至少一个访问是写操做时,数据竞争就发生了。
下面的这个函数就有数据竞争问题,其行为是未定义的。例如,可能输出数值1。代码以后是一个可能性解释,试图搞清楚这一切是如何发生得。
func race() { wait := make(chan struct{}) n := 0 go func() { // 译注:注意下面这一行 n++ // 一次访问: 读, 递增, 写 close(wait) }() // 译注:注意下面这一行 n++ // 另外一次冲突的访问 <-wait fmt.Println(n) // 输出:未指定 }
代码中的两个goroutine(假设命名为g1
和g2
)参与了一次竞争,咱们没法获知操做会以何种顺序发生。如下是诸多可能中的一种:
g1
从 n
中获取值0g2
从 n
中获取值0g1
将值从0增大到1g1
将1写到 n
g2
将值从0增大到1g2
将1写到 n
“数据竞争(data race)”这名字有点误导的嫌疑。不只操做的顺序是未定义的,其实根本没有任何保证(no guarantees whatsoever)。编译器和硬件为了获得更好的性能,常常都会对代码进行上下内外的顺序变换。若是你看到一个线程处于中间行为状态时,那么当时的场景可能就像下图所示的同样:
避免数据竞争的惟一方式是线程间同步访问全部的共享可变数据。有几种方式可以实现这一目标。Go语言中,一般是使用管道或者锁。(sync和sync/atomic包中还有更低层次的机制可供使用,但本文中不作讨论)。
Go语言中,处理并发数据访问的推荐方式是使用管道从一个goroutine中往下一个goroutine传递实际的数据。有格言说得好:“不要经过共享内存来通信,而是经过通信来共享内存”。
func sharingIsCaring() { ch := make(chan int) go func() { n := 0 // 仅为一个goroutine可见的局部变量. n++ ch <- n // 数据从一个goroutine离开... }() n := <-ch // ...而后安全到达另外一个goroutine. n++ fmt.Println(n) // 输出: 2 }
以上代码中的管道肩负双重责任 - 从一个goroutine将数据传递到另外一个goroutine,而且起到同步的做用:发送方goroutine会等待另外一个goroutine接收数据,接收方goroutine也会等待另外一个goroutine发送数据。
Go语言内存模型 - 要保证一个goroutine中对一个变量的读操做获得的值正好是另外一个goroutine中对同一个变量写操做产生的值,条件至关复杂,但goroutine之间只要经过管道来共享全部可变数据,那么就能远离数据竞争了。
有时,经过显式加锁,而不是使用管道,来同步数据访问,可能更加便捷。Go语言标准库为这一目的提供了一个互斥锁 - sync.Mutex。
要想这类加锁起效的话,关键之处在于:全部对共享数据的访问,无论读写,仅当goroutine持有锁才能操做。一个goroutine出错就足以破坏掉一个程序,引入数据竞争。
所以,应该设计一个自定义数据结构,具有明确的API,确保全部的同步都在数据结构内部完成。下例中,咱们构建了一个安全、易于使用的并发数据结构,AtomicInt
,用于存储一个整型值。任意数量的goroutine都能经过Add
和Value
方法安全地访问这个数值。
// AtomicInt是一个并发数据结构,持有一个整数值 // 该数据结构的零值为0 type AtomicInt struct { mu sync.Mutex // 锁,一次仅能被一个goroutine持有。 n int } // Add方法做为一个原子操做将n加到AtomicInt func (a *AtomicInt) Add(n int) { a.mu.Lock() // 等待锁释放,而后持有它 a.n += n a.mu.Unlock() // 释放锁 } // Value方法返回a的值 func (a *AtomicInt) Value() int { a.mu.Lock() n := a.n a.mu.Unlock() return n } func lockItUp() { wait := make(chan struct{}) var n AtomicInt go func() { n.Add(1) // 一个访问 close(wait) }() n.Add(1) // 另外一个并发访问 <-wait fmt.Println(n.Value()) // 输出: 2 }
竞争有时很是难于检测。下例中的这个函数有一个数据竞争问题,执行这个程序时会输出55555
。尝试一下,也许你会获得一个不一样的结果。(sync.WaitGroup是Go语言标准库的一部分;用于等待一组goroutine结束运行。)
func race() { var wg sync.WaitGroup wg.Add(5) // 译注:注意下面这行代码中的i++ for i := 0; i < 5; i++ { go func() { // 注意下一行代码会输出什么?为何? fmt.Print(i) // 6个goroutine共享变量i wg.Done() }() } wg.Wait() // 等待全部(5个)goroutine运行结束 fmt.Println() }
对于输出55555
,一个貌似合理的解释是:执行i++
的goroutine在其余goroutine执行打印语句以前就完成了5次i++
操做。实际上变量i
更新后的值为其余goroutine所见纯属巧合。
一个简单的解决方案是:使用一个局部变量,而后当开启新的goroutine时,将数值做为参数传递:
func correct() { var wg sync.WaitGroup wg.Add(5) for i := 0; i < 5; i++ { go func(n int) { // 使用局部变量 fmt.Print(n) wg.Done() }(i) } wg.Wait() fmt.Println() }
此次代码就对了,程序会输出指望的结果,如:24031
。注意:goroutine之间的运行顺序是不肯定的。
仍旧使用闭包,但可以避免数据竞争也是可能的,必须当心翼翼地让每一个goroutine使用一个独有的变量。
func alsoCorrect() { var wg sync.WaitGroup wg.Add(5) for i := 0; i < 5; i++ { n := i // 为每一个闭包建立一个独有的变量 go func() { fmt.Print(n) wg.Done() }() } wg.Wait() fmt.Println() }
数据竞争自动检测
通常来讲,不太可能可以自动检测发现全部可能的数据竞争状况,但Go(从版本1.1开始)有一个强大的数据竞争检测器。
这个工具用起来也很简单:只要在使用go
命令时加上-race
标记便可。开启检测器运行上面的程序会给出清晰且信息量大的输出:
$ go run -race raceClosure.go Race: ================== WARNING: DATA RACE Read by goroutine 2: main.func·001() ../raceClosure.go:22 +0x65 Previous write by goroutine 0: main.race() ../raceClosure.go:20 +0x19b main.main() ../raceClosure.go:10 +0x29 runtime.main() ../go/src/pkg/runtime/proc.c:248 +0x91 Goroutine 2 (running) created at: main.race() ../raceClosure.go:24 +0x18b main.main() ../raceClosure.go:10 +0x29 runtime.main() ../go/src/pkg/runtime/proc.c:248 +0x91 ================== 55555 Correct: 01234 Also correct: 01324 Found 1 data race(s) exit status 66
该工具发现一处数据竞争,包含:一个goroutine在第20行对一个变量进行写操做,跟着另外一个goroutine在第22行对同一个变量进行了未同步的读操做。
注意:竞争检测器只能发如今运行期确实发生的数据竞争(译注:我也不太理解这话,请指导)
select语句是Go语言并发工具集中的终极工具。select用于从一组可能的通信中选择一个进一步处理。若是任意一个通信均可以进一步处理,则从中随机选择一个,执行对应的语句。不然,若是又没有默认分支(default case),select语句则会阻塞,直到其中一个通信完成。
如下是一个玩具示例,演示select
语句如何用于实现一个随机数生成器:
// RandomBits函数 返回一个管道,用于产生一个比特随机序列 func RandomBits() <-chan int { ch := make(chan int) go func() { for { select { case ch <- 0: // 注意:分支没有对应的处理语句 case ch <- 1: } } }() return ch }
下面是相对更加实际一点的例子:如何使用select语句为一个操做设置一个时间限制。代码会输出变量news的值或者超时消息,具体依赖于两个接收语句哪一个先执行:
select { case news := <-NewsAgency: fmt.Println(news) case <-time.After(time.Minute): fmt.Println("Time out: no news in one minute.") }
函数 time.After 是Go语言标准库的一部分;它会在等待指定时间后将当前的时间发送到返回的管道中。
花点时间认真研究一下这个示例。若是你彻底理解,也就对Go语言中并发的应用方式有了全面的掌握。
这个程序演示了如何将管道用于被任意数量的goroutine发送和接收数据,也演示了如何将select语句用于从多个通信中选择一个。
func main() { people := []string{"Anna", "Bob", "Cody", "Dave", "Eva"} match := make(chan string, 1) // 为一个未匹配的发送操做提供空间 wg := new(sync.WaitGroup) wg.Add(len(people)) for _, name := range people { go Seek(name, match, wg) } wg.Wait() select { case name := <-match: fmt.Printf("No one received %s’s message.\n", name) default: // 没有待处理的发送操做 } } // 函数Seek 发送一个name到match管道或从match管道接收一个peer,结束时通知wait group func Seek(name string, match chan string, wg *sync.WaitGroup) { select { case peer := <-match: fmt.Printf("%s sent a message to %s.\n", peer, name) case match <- name: // 等待某个goroutine接收个人消息 } wg.Done() }
示例输出:
$ go run matching.go Cody sent a message to Bob. Anna sent a message to Eva. No one received Dave’s message.
并发的一个应用是将一个大的计算切分红一些工做单元,调度到不一样的CPU上同时地计算。
将计算分布到多个CPU上更可能是一门艺术,而不是一门科学。如下是一些经验法则:
下面的这个示例展现如何切分一个开销很大的计算并将其分布在全部可用的CPU上进行计算。先看一下有待优化的代码:
type Vector []float64 // 函数Convolve 计算 w = u * v,其中 w[k] = Σ u[i]*v[j], i + j = k // 先决条件:len(u) > 0, len(v) > 0 func Convolve(u, v Vector) (w Vector) { n := len(u) + len(v) - 1 w = make(Vector, n) for k := 0; k < n; k++ { w[k] = mul(u, v, k) } return } // 函数mul 返回 Σ u[i]*v[j], i + j = k. func mul(u, v Vector, k int) (res float64) { n := min(k+1, len(u)) j := min(k, len(v)-1) for i := k - j; i < n; i, j = i+1, j-1 { res += u[i] * v[j] } return }
思路很简单:肯定合适大小的工做单元,而后在不一样的goroutine中执行每一个工做单元。如下是并发版本的 Convolve
:
func Convolve(u, v Vector) (w Vector) { n := len(u) + len(v) - 1 w = make(Vector, n) // 将 w 切分红花费 ~100μs-1ms 用于计算的工做单元 size := max(1, 1<<20/n) wg := new(sync.WaitGroup) wg.Add(1 + (n-1)/size) for i := 0; i < n && i >= 0; i += size { // 整型溢出后 i < 0 j := i + size if j > n || j < 0 { // 整型溢出后 j < 0 j = n } // 这些goroutine共享内存,可是只读 go func(i, j int) { for k := i; k < j; k++ { w[k] = mul(u, v, k) } wg.Done() }(i, j) } wg.Wait() return }
工做单元定义以后,一般状况下最好将调度工做交给运行时和操做系统。然而,对于Go 1.* 你也许须要告诉运行时但愿多少个goroutine来同时地运行代码。
func init() { numcpu := runtime.NumCPU() runtime.GOMAXPROCS(numcpu) // 尝试使用全部可用的CPU }