不少文章介绍channel的时候都和并发揉在一块儿,这里我想把它当作一种数据结构来单独介绍它的实现原理。golang
channel,通道。golang中用于数据传递的一种数据结构。是golang中一种传递数据的方式,也可用做事件通知。数组
使用chan关键字声明一个通道,在使用前必须先建立,操做符 <-
用于指定通道的方向,发送或接收。若是未指定方向,则为双向通道。缓存
1 //声明和建立 2 var ch chan int // 声明一个传递int类型的channel 3 ch := make(chan int) // 使用内置函数make()定义一个channel 4 ch2 := make(chan interface{}) // 建立一个空接口类型的通道, 能够存听任意格式 5 6 type Equip struct{ /* 一些字段 */ } 7 ch2 := make(chan *Equip) // 建立Equip指针类型的通道, 能够存放*Equip 8 9 //传值 10 ch <- value // 将一个数据value写入至channel,这会致使阻塞,直到有其余goroutine从这个channel中读取数据 11 value := <-ch // 从channel中读取数据,若是channel以前没有写入数据,也会致使阻塞,直到channel中被写入数据为止 12 13 ch := make(chan interface{}) // 建立一个空接口通道 14 ch <- 0 // 将0放入通道中 15 ch <- "hello" // 将hello字符串放入通道中 16 17 //关闭 18 close(ch) // 关闭channel
把数据往通道中发送时,若是接收方一直都没有接收,那么发送操做将持续阻塞。Go 程序运行时能智能地发现一些永远没法发送成功的语句并报错:数据结构
fatal error: all goroutines are asleep - deadlock! //运行时发现全部的 goroutine(包括main)都处于等待 goroutine。
通道默认是无缓冲的,无缓冲通道上的发送操做将会被阻塞,直到有其余goroutine从对应的通道上执行接收操做,数据传送完成,通道继续工做。并发
package main import ( "fmt" "time" ) var done chan bool func HelloWorld() { fmt.Println("Hello world goroutine") time.Sleep(1*time.Second) done <- true } func main() { done = make(chan bool) // 建立一个channel go HelloWorld() <-done }
1 //输出 2 //Hello world goroutine
因为main不会等goroutine执行结束才返回,前文专门加了sleep输出为了能够看到goroutine的输出内容,那么在这里因为是阻塞的,因此无需sleep。异步
将代码中”done <- true”和”<-done”,去掉再执行,没有上面的输出内容。socket
通道能够用来链接goroutine,这样一个的输出是另外一个输入。这就叫作管道:函数
1 package main 2 3 import ( 4 "fmt" 5 "time" 6 ) 7 var echo chan string 8 var receive chan string 9 10 // 定义goroutine 1 11 func Echo() { 12 time.Sleep(1*time.Second) 13 echo <- "这是一次测试" 14 } 15 16 // 定义goroutine 2 17 func Receive() { 18 temp := <- echo // 阻塞等待echo的通道的返回 19 receive <- temp 20 } 21 22 23 func main() { 24 echo = make(chan string) 25 receive = make(chan string) 26 27 go Echo() 28 go Receive() 29 30 getStr := <-receive // 接收goroutine 2的返回 31 32 fmt.Println(getStr) 33 }
输出字符串:"这是一次测试"。测试
在这里不必定要去关闭channel,由于底层的垃圾回收机制会根据它是否能够访问来决定是否自动回收它。(这里不是根据channel是否关闭来决定的)ui
1 package main 2 3 import ( 4 "fmt" 5 "time" 6 ) 7 8 // 定义goroutine 1 9 func Echo(out chan<- string) { // 定义输出通道类型 10 time.Sleep(1*time.Second) 11 out <- "这又是一次测试" 12 close(out) 13 } 14 15 // 定义goroutine 2 16 func Receive(out chan<- string, in <-chan string) { // 定义输出通道类型和输入类型 17 temp := <-in // 阻塞等待echo的通道的返回 18 out <- temp 19 close(out) 20 } 21 22 23 func main() { 24 echo := make(chan string) 25 receive := make(chan string) 26 27 go Echo(echo) 28 go Receive(receive, echo) 29 30 getStr := <-receive // 接收goroutine 2的返回 31 32 fmt.Println(getStr) 33 }
输出:这又是一次测试。
goroutine的通道默认是是阻塞的,那么有什么办法能够缓解阻塞? 答案是:加一个缓冲区。
建立一个缓冲通道:
1 ch := make(chan string, 3) // 建立了缓冲区为3的通道 2 3 //== 4 len(ch) // 长度计算 5 cap(ch) // 容量计算
缓冲通道传递数据示意图:
Go语言channel是first-class的,意味着它能够被存储到变量中,能够做为参数传递给函数,也能够做为函数的返回值返回。做为Go语言的核心特征之一,虽然channel看上去很高端,可是其实channel仅仅就是一个数据结构而已,具体定义在 $GOROOT/src/runtime/chan.go
里。以下:
1 type hchan struct { 2 qcount uint // 队列中的总数据 3 dataqsiz uint // 循环队列的大小 4 buf unsafe.Pointer // 指向dataqsiz元素数组 5 elemsize uint16 // 6 closed uint32 7 elemtype *_type // 元素类型 8 sendx uint // 发送索引 9 recvx uint // 接收索引 10 recvq waitq // 接待员名单, 因recv而阻塞的等待队列。 11 sendq waitq // 发送服务员列表, 因send而阻塞的等待队列。 12 //锁定保护hchan中的全部字段,以及几个在此通道上阻止的sudogs中的字段。 13 //按住此锁定时不要更改另外一个G的状态(尤为是不要准备G),由于这可能会致使死锁堆栈缩小。 14 lock mutex 15 }
其中一个核心的部分是存放channel数据的环形队列,由qcount和elemsize分别指定了队列的容量和当前使用量。dataqsize是队列的大小。elemalg是元素操做的一个Alg结构体,记录下元素的操做,如copy函数,equal函数,hash函数等。
若是是带缓冲区的chan,则缓冲区数据其实是紧接着Hchan结构体中分配的。不带缓冲的 channel ,环形队列 size 则为 0。
1 c = (Hchan*)runtime.mal(n + hint*elem->size);
另外一重要部分就是recvq和sendq两个链表,一个是因读这个通道而致使阻塞的goroutine,另外一个是由于写这个通道而阻塞的goroutine。若是一个goroutine阻塞于channel了,那么它就被挂在recvq或sendq中。WaitQ是链表的定义,包含一个头结点和一个尾结点:
1 struct WaitQ 2 { 3 SudoG* first; 4 SudoG* last; 5 };
队列中的每一个成员是一个SudoG结构体变量:
1 struct SudoG 2 { 3 G* g; // g和selgen构成 4 uint32 selgen; // 指向g的弱指针 5 SudoG* link; 6 int64 releasetime; 7 byte* elem; // 数据元素 8 };
该结构中主要的就是一个g和一个elem。elem用于存储goroutine的数据。读通道时,数据会从Hchan的队列中拷贝到SudoG的elem域。写通道时,数据则是由SudoG的elem域拷贝到Hchan的队列中。
基本的写channel操做,在底层运行时库中对应的是一个runtime.chansend函数。
1 c <- v
在运行时库中会执行:
1 void runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)
其中c就是channel,ep是取变量v的地址。这里的传值约定是调用者负责分配好ep的空间,仅须要简单的取变量地址就够了。pres参数是在select中的通道操做使用的。
这个函数首先会区分是同步仍是异步。同步是指chan是不带缓冲区的,所以可能写阻塞,而异步是指chan带缓冲区,只有缓冲区满才阻塞。在同步的状况下,因为channel自己是不带数据缓存的,这时首先会查看Hchan结构体中的recvq链表时否为空,便是否有由于读该管道而阻塞的goroutine。若是有则能够正常写channel,不然操做会阻塞。
recvq不为空的状况下,将一个SudoG结构体出队列,将传给通道的数据(函数参数ep)拷贝到SudoG结构体中的elem域,并将SudoG中的g放到就绪队列中,状态置为ready,而后函数返回。若是recvq为空,不然要将当前goroutine阻塞。此时将一个SudoG结构体,挂到通道的sendq链表中,这个SudoG中的elem域是参数eq,SudoG中的g是当前的goroutine。当前goroutine会被设置为waiting状态并挂到等待队列中。
在异步的状况,若是缓冲区满了,也是要将当前goroutine和数据一块儿做为SudoG结构体挂在sendq队列中,表示因写channel而阻塞。不然也是先看有没有recvq链表是否为空,有就唤醒。跟同步不一样的是在channel缓冲区不满的状况,这里不会阻塞写者,而是将数据放到channel的缓冲区中,调用者返回。
读channel的操做也是相似的,对应的函数是runtime.chansend。一个是收一个是发,基本的过程都是差很少的。
当协程尝试从未关闭的 channel 中读取数据时,内部的操做以下:
<- ch
未阻塞;<- ch
未阻塞;<- ch
阻塞。相似的,当协程尝试往未关闭的 channel 中写入数据时,内部的操做以下:
ch <-
未阻塞;ch <-
未阻塞;ch <-
阻塞。当关闭 non-nil channel 时,内部的操做以下:
空通道是指将一个channel赋值为nil,或者定义后不调用make进行初始化。按照Go语言的语言规范,读写空通道是永远阻塞的。其实在函数runtime.chansend和runtime.chanrecv开头就有判断这类状况,若是发现参数c是空的,则直接将当前的goroutine放到等待队列,状态设置为waiting。
读一个关闭的通道,永远不会阻塞,会返回一个通道数据类型的零值。这个实现也很简单,将零值复制到调用函数的参数ep中。写一个关闭的通道,则会panic。关闭一个空通道,也会致使panic。
类型于 POSIX 接口中线程通知其余线程某个事件发生的条件变量,channel 的特性也能够用来当成协程之间同步的条件变量。由于 channel 只是用来通知,因此 channel 中具体的数据类型和值并不重要,这种场景通常用 strct {}
做为 channel 的类型。
相似 pthread_cond_signal()
的功能,用来在一个协程中通知另个某一个协程事件发生:
1 package main 2 3 import ( 4 "fmt" 5 "time" 6 ) 7 8 func main() { 9 ch := make(chan struct{}) 10 nums := make([]int, 100) 11 12 go func() { 13 time.Sleep(time.Second) 14 for i := 0; i < len(nums); i++ { 15 nums[i] = i 16 } 17 // send a finish signal 18 ch <- struct{}{} 19 }() 20 21 // wait for finish signal 22 <-ch 23 fmt.Println(nums) 24 }
相似 pthread_cond_broadcast()
的功能。利用从已关闭的 channel 读取数据时老是非阻塞的特性,能够实如今一个协程中向其余多个协程广播某个事件发生的通知:
1 package main 2 3 import ( 4 "fmt" 5 "time" 6 ) 7 8 func main() { 9 N := 10 10 exit := make(chan struct{}) 11 done := make(chan struct{}, N) 12 13 // start N worker goroutines 14 for i := 0; i < N; i++ { 15 go func(n int) { 16 for { 17 select { 18 // wait for exit signal 19 case <-exit: 20 fmt.Printf("worker goroutine #%d exit\n", n) 21 done <- struct{}{} 22 return 23 case <-time.After(time.Second): 24 fmt.Printf("worker goroutine #%d is working...\n", n) 25 } 26 } 27 }(i) 28 } 29 30 time.Sleep(3 * time.Second) 31 // broadcast exit signal 32 close(exit) 33 // wait for all worker goroutines exit 34 for i := 0; i < N; i++ { 35 <-done 36 } 37 fmt.Println("main goroutine exit") 38 }
channel 的读/写至关于信号量的 P / V 操做,下面的示例程序中 channel 至关于信号量:
1 package main 2 3 import ( 4 "log" 5 "math/rand" 6 "time" 7 ) 8 9 type Seat int 10 type Bar chan Seat 11 12 func (bar Bar) ServeConsumer(customerId int) { 13 log.Print("-> consumer#", customerId, " enters the bar") 14 seat := <-bar // need a seat to drink 15 log.Print("consumer#", customerId, " drinks at seat#", seat) 16 time.Sleep(time.Second * time.Duration(2+rand.Intn(6))) 17 log.Print("<- consumer#", customerId, " frees seat#", seat) 18 bar <- seat // free the seat and leave the bar 19 } 20 21 func main() { 22 rand.Seed(time.Now().UnixNano()) 23 24 bar24x7 := make(Bar, 10) // the bar has 10 seats 25 // Place seats in an bar. 26 for seatId := 0; seatId < cap(bar24x7); seatId++ { 27 bar24x7 <- Seat(seatId) // none of the sends will block 28 } 29 30 // a new consumer try to enter the bar for each second 31 for customerId := 0; ; customerId++ { 32 time.Sleep(time.Second) 33 go bar24x7.ServeConsumer(customerId) 34 } 35 }
互斥量至关于二元信号里,因此 cap 为 1 的 channel 能够当成互斥量使用:
1 package main 2 3 import "fmt" 4 5 func main() { 6 mutex := make(chan struct{}, 1) // the capacity must be one 7 8 counter := 0 9 increase := func() { 10 mutex <- struct{}{} // lock 11 counter++ 12 <-mutex // unlock 13 } 14 15 increase1000 := func(done chan<- struct{}) { 16 for i := 0; i < 1000; i++ { 17 increase() 18 } 19 done <- struct{}{} 20 } 21 22 done := make(chan struct{}) 23 go increase1000(done) 24 go increase1000(done) 25 <-done; <-done 26 fmt.Println(counter) // 2000 27 }
关闭再也不须要使用的 channel 并非必须的。跟其余资源好比打开的文件、socket 链接不同,这类资源使用完后不关闭后会形成句柄泄露,channel 使用完后不关闭也没有关系,channel 没有被任何协程用到后最终会被 GC 回收。关闭 channel 通常是用来通知其余协程某个任务已经完成了。golang 也没有直接提供判断 channel 是否已经关闭的接口,虽然能够用其余不太优雅的方式本身实现一个:
1 func isClosed(ch chan int) bool { 2 select { 3 case <-ch: 4 return true 5 default: 6 } 7 return false 8 }
不过实现一个这样的接口也没什么必要。由于就算经过 isClosed()
获得当前 channel 当前还未关闭,若是试图往 channel 里写数据,仍然可能会发生 panic ,由于在调用 isClosed()
后,其余协程可能已经把 channel 关闭了。
关闭 channel 时应该注意如下准则:
关闭 channel 粗暴一点的作法是随意关闭,若是产生了 panic 就用 recover 避免进程挂掉。稍好一点的方案是使用标准库的 sync
包来作关闭 channel 时的协程同步,不过使用起来也稍微复杂些。下面介绍一种优雅些的作法。
这种场景下这个惟一的写入端能够关闭 channel 用来通知读取端全部数据都已经写入完成了。读取端只须要用 for range
把 channel 中数据遍历完就能够了,当 channel 关闭时,for range
仍然会将 channel 缓冲中的数据所有遍历完而后再退出循环:
1 package main 2 3 import ( 4 "fmt" 5 "sync" 6 ) 7 8 func main() { 9 wg := &sync.WaitGroup{} 10 ch := make(chan int, 100) 11 12 send := func() { 13 for i := 0; i < 100; i++ { 14 ch <- i 15 } 16 // signal sending finish 17 close(ch) 18 } 19 20 recv := func(id int) { 21 defer wg.Done() 22 for i := range ch { 23 fmt.Printf("receiver #%d get %d\n", id, i) 24 } 25 fmt.Printf("receiver #%d exit\n", id) 26 } 27 28 wg.Add(3) 29 go recv(0) 30 go recv(1) 31 go recv(2) 32 send() 33 34 wg.Wait() 35 }
这种场景下虽然能够用 sync.Once
来解决多个写入端重复关闭 channel 的问题,但更优雅的办法设置一个额外的 channel ,由读取端经过关闭来通知写入端任务完成不要再继续再写入数据了:
1 package main 2 3 import ( 4 "fmt" 5 "sync" 6 ) 7 8 func main() { 9 wg := &sync.WaitGroup{} 10 ch := make(chan int, 100) 11 done := make(chan struct{}) 12 13 send := func(id int) { 14 defer wg.Done() 15 for i := 0; ; i++ { 16 select { 17 case <-done: 18 // get exit signal 19 fmt.Printf("sender #%d exit\n", id) 20 return 21 case ch <- id*1000 + i: 22 } 23 } 24 } 25 26 recv := func() { 27 count := 0 28 for i := range ch { 29 fmt.Printf("receiver get %d\n", i) 30 count++ 31 if count >= 1000 { 32 // signal recving finish 33 close(done) 34 return 35 } 36 } 37 } 38 39 wg.Add(3) 40 go send(0) 41 go send(1) 42 go send(2) 43 recv() 44 45 wg.Wait() 46 }
这种场景稍微复杂,和上面的例子同样,也须要设置一个额外 channel 用来通知多个写入端和读取端。另外须要起一个额外的协程来经过关闭这个 channel 来广播通知:
1 package main 2 3 import ( 4 "fmt" 5 "sync" 6 "time" 7 ) 8 9 func main() { 10 wg := &sync.WaitGroup{} 11 ch := make(chan int, 100) 12 done := make(chan struct{}) 13 14 send := func(id int) { 15 defer wg.Done() 16 for i := 0; ; i++ { 17 select { 18 case <-done: 19 // get exit signal 20 fmt.Printf("sender #%d exit\n", id) 21 return 22 case ch <- id*1000 + i: 23 } 24 } 25 } 26 27 recv := func(id int) { 28 defer wg.Done() 29 for { 30 select { 31 case <-done: 32 // get exit signal 33 fmt.Printf("receiver #%d exit\n", id) 34 return 35 case i := <-ch: 36 fmt.Printf("receiver #%d get %d\n", id, i) 37 time.Sleep(time.Millisecond) 38 } 39 } 40 } 41 42 wg.Add(6) 43 go send(0) 44 go send(1) 45 go send(2) 46 go recv(0) 47 go recv(1) 48 go recv(2) 49 50 time.Sleep(time.Second) 51 // signal finish 52 close(done) 53 // wait all sender and receiver exit 54 wg.Wait() 55 }
channle 做为 golang 最重要的特性,用起来仍是比较方便的。传统的 C 里要实现相似的功能的话,通常须要用到 socket 或者 FIFO 来实现,另外还要考虑数据包的完整性与并发冲突的问题,channel 则屏蔽了这些底层细节,使用者只须要考虑读写就能够了。 channel 是引用类型,了解一下 channel 底层的机制对更好的使用 channel 仍是很用必要的。虽然操做原语简单,但涉及到阻塞的问题,使用不当可能会形成死锁或者无限制的协程建立最终致使进程挂掉。