上篇已经记录到发送数据到 chanel 的三种状况的代码逻辑,接下来是从 chanel 接收数据的逻辑。golang
和 chansend 方法十分相似ui
if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }
lock(&c.lock) if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }
下面分别截取三种状况的代码段。this
if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true }
if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
··· // 上面条件都不知足,则只剩一种状况:hchan.buf 为空 // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
假设有一个 goroutine sender,和一个 goroutine reciever,若是 reciever 执行 chanrecv 方法的时候,buf 已经为空了,从上面的代码最后一行知道,goparkunlock 方法使 reciever 阻塞,那么 sender 写数据进 chanel,reciever 又如何被 sender 唤醒呢?code
reciever 休眠后, sender 来了,sender 执行到如下代码处。sender 从 recvq 队列 弹出 reciever,而后执行 send 方法。对象
if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
send 方法最后一行执行 goready 方法将 reciever 唤醒。队列
// send processes a send operation on an empty channel c. // The value ep sent by the sender is copied to the receiver sg. // The receiver is then woken up to go on its merry way. // Channel c must be empty and locked. send unlocks c with unlockf. // sg must already be dequeued from c. // ep must be non-nil and point to the heap or the caller's stack. func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if raceenabled { if c.dataqsiz == 0 { racesync(c, sg) } else { // Pretend we go through the buffer, even though // we copy directly. Note that we need to increment // the head/tail locations only when raceenabled. qp := chanbuf(c, c.recvx) raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
reciever 唤醒后,继续执行 chanrecv 方法剩下的语句:ip
// someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed
close(ch) 对应的执行方法即为 closechan 方法。主要有如下步骤:ci
func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) racerelease(c.raceaddr()) } c.closed = 1 var glist *g // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } gp.schedlink.set(glist) // <------ a 语句 glist = gp // <------ b 语句 } // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } gp.schedlink.set(glist) // <------ c 语句 glist = gp // <------ d 语句 } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. for glist != nil { gp := glist glist = glist.schedlink.ptr() // <------ e 语句 gp.schedlink = 0 goready(gp, 3) } } //go:nosplit func (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) } // 得到 gp 的地址 //go:nosplit func (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) } // 设置 gp 的地址
上面代码有个注意的点: schedlink 是 G 的一个属性,用于指向下一个 G 。rem
参考文章
http://legendtkl.com/2017/08/...