Goroutine是Go中最基本的执行单元。事实上每个Go程序至少有一个goroutine:主goroutine。当程序启动时,它会自动建立。html
事实上goroutine采用了一种fork-join的模型。golang
sayHello := func() { fmt.Println("hello") } go sayHello()
那咱们如何来join goroutine呢?须要引入wait操做:安全
var wg sync.WaitGroup() sayHello := func() { defer wg.Done() fmt.Println("hello") } wg.Add(1) go sayHello() wa.Wait()
goroutine是Go语言的基本调度单位,而channels则是它们之间的通讯机制。操做符<-用来指定管道的方向,发送或接收。若是未指定方向,则为双向管道。闭包
// 建立一个双向channel ch := make(chan interface{})
interface{}表示chan能够为任何类型并发
channel有发送和接受两个主要操做。发送和接收两个操做都使用<-运算符。在发送语句中,channel放<-运算符左边。在接收语句中,channel放<-运算符右边。一个不使用接收结果的接收操做也是合法的。函数
// 发送操做 ch <- x // 接收操做 x = <-ch // 忽略接收到的值,合法 <-ch
咱们不能弄错channel的方向:spa
writeStream := make(chan<- interface{}) readStream := make(<-chan interface{}) <-writeStream readStream <- struct{}{}
Channel支持close操做,用于关闭channel,后面对该channel的任何发送操做都将致使panic异常。对一个已经被close过的channel进行接收操做依然能够接受到以前已经成功发送的数据;若是channel中已经没有数据的话将产生一个零值的数据。设计
从已经关闭的channel中读:3d
intStream := make(chan int) close(intStream) integer, ok := <- intStream fmt.Pritf("(%v): %v", ok, integer) // (false): 0
上面例子中经过返回值ok来判断channel是否关闭,咱们还能够经过range这种更优雅的方式来处理已经关闭的channel:code
intStream := make(chan int) go func() { defer close(intStream) for i:=1; i<=5; i++{ intStream <- i } }() for integer := range intStream { fmt.Printf("%v ", integer) } // 1 2 3 4 5
建立了一个能够持有三个字符串元素的带缓冲Channel:
ch = make(chan string, 3)
咱们能够在无阻塞的状况下连续向新建立的channel发送三个值:
ch <- "A" ch <- "B" ch <- "C"
此刻,channel的内部缓冲队列将是满的,若是有第四个发送操做将发生阻塞。
若是咱们接收一个值:
fmt.Println(<-ch) // "A"
那么channel的缓冲队列将不是满的也不是空的,所以对该channel执行的发送或接收操做都不会发生阻塞。经过这种方式,channel的缓冲队列缓冲解耦了接收和发送的goroutine。
带缓冲的信道可被用做信号量,例如限制吞吐量。在此例中,进入的请求会被传递给 handle,它从信道中接收值,处理请求后将值发回该信道中,以便让该 “信号量” 准备迎接下一次请求。信道缓冲区的容量决定了同时调用 process 的数量上限。
var sem = make(chan int, MaxOutstanding) func handle(r *Request) { sem <- 1 // 等待活动队列清空。 process(r) // 可能须要很长时间。 <-sem // 完成;使下一个请求能够运行。 } func Serve(queue chan *Request) { for { req := <-queue go handle(req) // 无需等待 handle 结束。 } }
然而,它却有个设计问题:尽管只有 MaxOutstanding 个 goroutine 能同时运行,但 Serve 仍是为每一个进入的请求都建立了新的 goroutine。其结果就是,若请求来得很快, 该程序就会无限地消耗资源。为了弥补这种不足,咱们能够经过修改 Serve 来限制建立 Go 程,这是个明显的解决方案,但要小心咱们修复后出现的 Bug。
func Serve(queue chan *Request) { for req := range queue { sem <- 1 go func() { process(req) // 这儿有 Bug,解释见下。 <-sem }() } }
Bug 出如今 Go 的 for 循环中,该循环变量在每次迭代时会被重用,所以 req 变量会在全部的 goroutine 间共享,这不是咱们想要的。咱们须要确保 req 对于每一个 goroutine 来讲都是惟一的。有一种方法可以作到,就是将 req 的值做为实参传入到该 goroutine 的闭包中:
func Serve(queue chan *Request) { for req := range queue { sem <- 1 go func(req *Request) { process(req) <-sem }(req) } }
另外一种解决方案就是以相同的名字建立新的变量,如例中所示:
func Serve(queue chan *Request) { for req := range queue { req := req // 为该 Go 程建立 req 的新实例。 sem <- 1 go func() { process(req) <-sem }() } }
下面再看一个Go语言圣经的例子。它并发地向三个镜像站点发出请求,三个镜像站点分散在不一样的地理位置。它们分别将收到的响应发送到带缓冲channel,最后接收者只接收第一个收到的响应,也就是最快的那个响应。所以mirroredQuery函数可能在另外两个响应慢的镜像站点响应以前就返回告终果。
func mirroredQuery() string { responses := make(chan string, 3) go func() { responses <- request("asia.gopl.io") }() go func() { responses <- request("europe.gopl.io") }() go func() { responses <- request("americas.gopl.io") }() // 仅仅返回最快的那个response return <-responses } func request(hostname string) (response string) { /* ... */ }
若是咱们使用了无缓冲的channel,那么两个慢的goroutines将会由于没有人接收而被永远卡住。这种状况,称为goroutines泄漏,这将是一个BUG。和垃圾变量不一样,泄漏的goroutines并不会被自动回收,所以确保每一个再也不须要的goroutine能正常退出是重要的。
Go 最重要的特性就是信道是first-class value,它能够被分配并像其它值处处传递。 这种特性一般被用来实现安全、并行的多路分解。
咱们能够利用这个特性来实现一个简单的RPC。
如下为 Request 类型的大概定义。
type Request struct { args []int f func([]int) int resultChan chan int }
客户端提供了一个函数及其实参,此外在请求对象中还有个接收应答的信道。
func sum(a []int) (s int) { for _, v := range a { s += v } return } request := &Request{[]int{3, 4, 5}, sum, make(chan int)} // 发送请求 clientRequests <- request // 等待回应 fmt.Printf("answer: %d\n", <-request.resultChan)
服务端的handler函数:
func handle(queue chan *Request) { for req := range queue { req.resultChan <- req.f(req.args) } }
Channels也能够用于将多个goroutine链接在一块儿,一个Channel的输出做为下一个Channel的输入。这种串联的Channels就是所谓的管道(pipeline)。下面的程序用两个channels将三个goroutine串联起来:
第一个goroutine是一个计数器,用于生成0、一、二、……形式的整数序列,而后经过channel将该整数序列发送给第二个goroutine;第二个goroutine是一个求平方的程序,对收到的每一个整数求平方,而后将平方后的结果经过第二个channel发送给第三个goroutine;第三个goroutine是一个打印程序,打印收到的每一个整数。
func counter(out chan<- int) { for x := 0; x < 100; x++ { out <- x } close(out) } func squarer(out chan<- int, in <-chan int) { for v := range in { out <- v * v } close(out) } func printer(in <-chan int) { for v := range in { fmt.Println(v) } } func main() { naturals := make(chan int) squares := make(chan int) go counter(naturals) go squarer(squares, naturals) printer(squares) }
select用于从一组可能的通信中选择一个进一步处理。若是任意一个通信均可以进一步处理,则从中随机选择一个,执行对应的语句。不然,若是又没有默认分支(default case),select语句则会阻塞,直到其中一个通信完成。
select { case <-ch1: // ... case x := <-ch2: // ...use x... case ch3 <- y: // ... default: // ... }
如何使用select语句为一个操做设置一个时间限制。代码会输出变量news的值或者超时消息,具体依赖于两个接收语句哪一个先执行:
select { case news := <-NewsAgency: fmt.Println(news) case <-time.After(time.Minute): fmt.Println("Time out: no news in one minute.") }
下面的select语句会在abort channel中有值时,从其中接收值;无值时什么都不作。这是一个非阻塞的接收操做;反复地作这样的操做叫作“轮询channel”。
select { case <-abort: fmt.Printf("Launch aborted!\n") return default: // do nothing }
参考资料。