Golang使用Groutine和channels实现了CSP(Communicating Sequential Processes)模型,channles在goroutine的通讯和同步中承担着重要的角色。在GopherCon 2017中,Golang专家Kavya深刻介绍了 Go Channels 的内部机制,以及运行时调度器和内存管理系统是如何支持Channel的,本文根据Kavya的ppt学习和分析一下go channels的原理,但愿可以对之后正确高效使用golang的并发带来一些启发。c++
以一个简单的channel应用开始,使用goroutine和channel实现一个任务队列,并行处理多个任务。git
func main(){ //带缓冲的channel ch := make(chan Task, 3) //启动固定数量的worker for i := 0; i< numWorkers; i++ { go worker(ch) } //发送任务给worker hellaTasks := getTaks() for _, task := range hellaTasks { ch <- task } ... } func worker(ch chan Task){ for { //接受任务 task := <- ch process(task) } }
从上面的代码能够看出,使用golang的goroutine和channel能够很容易的实现一个生产者-消费者模式的任务队列,相比java, c++简洁了不少。channel能够自然的实现了下面四个特性: github
- goroutine安全
- 在不一样的goroutine之间存储和传输值
- 提供FIFO语义(buffered channel提供)
- 可让goroutine block/unblock
那么channel是怎么实现这些特性的呢?下面咱们看看当咱们调用make来生成一个channel的时候都作了些什么。golang
make chan
上述任务队列的例子第三行,使用make建立了一个长度为3的带缓冲的channel,channel在底层是一个hchan结构体,位于src/runtime/chan.go
里。其定义以下:缓存
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }
make函数在建立channel的时候会在该进程的heap区申请一块内存,建立一个hchan结构体,返回执行该内存的指针,因此获取的的ch变量自己就是一个指针,在函数之间传递的时候是同一个channel。 安全
hchan结构体使用一个环形队列来保存groutine之间传递的数据(若是是缓存channel的话),使用两个list保存像该chan发送和从改chan接收数据的goroutine,还有一个mutex来保证操做这些结构的安全。数据结构
发送和接收
向channel发送和从channel接收数据主要涉及hchan里的四个成员变量,借用Kavya ppt里的图示,来分析发送和接收的过程。
仍是之前面的任务队列为例:并发
//G1 func main(){ ... for _, task := range hellaTasks { ch <- task //sender } ... } //G2 func worker(ch chan Task){ for { //接受任务 task := <- ch //recevier process(task) } }
其中G1是发送者,G2是接收,由于ch是长度为3的带缓冲channel,初始的时候hchan结构体的buf为空,sendx和recvx都为0,当G1向ch里发送数据的时候,会首先对buf加锁,而后将要发送的数据copy到buf里,并增长sendx的值,最后释放buf的锁。而后G2消费的时候首先对buf加锁,而后将buf里的数据copy到task变量对应的内存里,增长recvx,最后释放锁。整个过程,G1和G2没有共享的内存,底层经过hchan结构体的buf,使用copy内存的方式进行通讯,最后达到了共享内存的目的,这彻底符合CSP的设计理念 函数
Do not comminute by sharing memory;instead, share memory by communicating
通常状况下,G2的消费速度应该是慢于G1的,因此buf的数据会愈来愈多,这个时候G1再向ch里发送数据,这个时候G1就会阻塞,那么阻塞究竟是发生了什么呢?
Goroutine Pause/Resume
goroutine是Golang实现的用户空间的轻量级的线程,有runtime调度器调度,与操做系统的thread有多对一的关系,相关的数据结构以下图:
其中M是操做系统的线程,G是用户启动的goroutine,P是与调度相关的context,每一个M都拥有一个P,P维护了一个可以运行的goutine队列,用于该线程执行。
当G1向buf已经满了的ch发送数据的时候,当runtine检测到对应的hchan的buf已经满了,会通知调度器,调度器会将G1的状态设置为waiting, 移除与线程M的联系,而后从P的runqueue中选择一个goroutine在线程M中执行,此时G1就是阻塞状态,可是不是操做系统的线程阻塞,因此这个时候只用消耗少许的资源。
那么G1设置为waiting状态后去哪了?怎们去resume呢?咱们再回到hchan结构体,注意到hchan有个sendq的成员,其类型是waitq,查看源码以下:
type hchan struct { ... recvq waitq // list of recv waiters sendq waitq // list of send waiters ... } // type waitq struct { first *sudog last *sudog }
实际上,当G1变为waiting状态后,会建立一个表明本身的sudog的结构,而后放到sendq这个list中,sudog结构中保存了channel相关的变量的指针(若是该Goroutine是sender,那么保存的是待发送数据的变量的地址,若是是receiver则为接收数据的变量的地址,之因此是地址,前面咱们提到在传输数据的时候使用的是copy的方式)
当G2从ch中接收一个数据时,会通知调度器,设置G1的状态为runnable,而后将加入P的runqueue里,等待线程执行.
wait empty channel
前面咱们是假设G1先运行,若是G2先运行会怎么样呢?若是G2先运行,那么G2会从一个empty的channel里取数据,这个时候G2就会阻塞,和前面介绍的G1阻塞同样,G2也会建立一个sudog结构体,保存接收数据的变量的地址,可是该sudog结构体是放到了recvq列表里,当G1向ch发送数据的时候,runtime并无对hchan结构体题的buf进行加锁,而是直接将G1里的发送到ch的数据copy到了G2 sudog里对应的elem指向的内存地址!
总结
Golang的一大特点就是其简单搞笑的自然并发机制,使用goroutine和channel实现了CSP模型。理解channel的底层运行机制对灵活运用golang开发并发程序有很大的帮助,看了Kavya的分享,而后结合golang runtime相关的源码(源码开源而且也是golang实现简直良心!),对channel的认识更加的深入,固然还有一些地方存在一些疑问,好比goroutine的调度实现相关的,仍是要潜心膜拜大神们的源码!