go通道基于go的并发调度实现,自己并不复杂,go并发调度请看个人这篇文章:go并发调度原理学习html
type hchan struct { qcount uint // 缓冲区中已有元素个数 dataqsiz uint //循环队列容量大小 buf unsafe.Pointer // 缓冲区指针 elemsize uint16 //元素大小 closed uint32 //关闭标记,0没关闭,1关闭 elemtype *_type //数据项类型 sendx uint //发送索引 recvx uint //接收索引 recvq waitq //等待接收排队链表 sendq waitq //等待发送排队链表 lock mutex //锁 } type waitq struct { first *sudog last *sudog }
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } if sg := c.recvq.dequeue(); sg != nil { //缓冲区就是一个固定长度的循环列表 //发送队列是一个双向链表,接在缓冲区的后面,总体是一个队列,保证先进先出 //有接收者,并非将当前要发送的数据直接发出,而是将缓冲区的第一个元素发送给接收者,同时将发送队列的第一个元素加入缓冲区刚出队列的位置 send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } if c.qcount < c.dataqsiz { //缓冲区没有满,直接将要发送的数据复制到缓冲区,直接返回, qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } //以上都是同步非阻塞的,ch <- 100直接返回 //如下是同步阻塞 //缓冲区满了,也没有接收者,通道将被阻塞,其实就是不执行当前G了,将状态改为等待状态 gp := getg() mysg := acquireSudog() c.sendq.enqueue(mysg) goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) //当G被唤醒,状态改为可执行状态,从这里开始继续执行 releaseSudog(mysg) return true }
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { lock(&c.lock) if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } //以上同步非阻塞 //如下同步阻塞 gp := getg() mysg := acquireSudog() c.recvq.enqueue(mysg) //将当前G状态改为等待状态,中止调度 goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3) //当前G被唤醒从这里继续执行 mysg.c = nil releaseSudog(mysg) return true, !closed }