多图详解Go中的Channel源码

转载请声明出处哦~,本篇文章发布于luozhiyun的博客:https://www.luozhiyun.comgit

本文使用的go的源码时14.4github

chan介绍

package main
import "fmt"

func main() {
    c := make(chan int)

    go func() {
        c <- 1 // send to channel
    }()

    x := <-c // recv from channel

    fmt.Println(x)
}

咱们能够这样查看汇编结果:golang

go tool compile -N -l -S hello.go
-N表示禁用优化
-l禁用内联
-S打印结果

经过上面这样的方式,咱们能够直到chan是调用的哪些函数:数组

ch

源码分析

结构体与建立

type hchan struct {
	qcount   uint           // 循环列表元素个数
	dataqsiz uint           // 循环队列的大小
	buf      unsafe.Pointer // 循环队列的指针
	elemsize uint16			// chan中元素的大小
	closed   uint32			// 是否已close
	elemtype *_type 		// chan中元素类型
	sendx    uint   		// send在buffer中的索引
	recvx    uint   		// recv在buffer中的索引
	recvq    waitq 	 		// receiver的等待队列
	sendq    waitq  		// sender的等待队列 
	// 互拆锁
	lock mutex
}

qcount表明chan 中已经接收但还没被取走的元素的个数,函数 len 能够返回这个字段的值;缓存

dataqsiz和buf分别表明队列buffer的大小,cap函数能够返回这个字段的值以及队列buffer的指针,是一个定长的环形数组;数据结构

elemtype 和 elemsiz表示chan 中元素的类型和 元素的大小;app

sendx:发送数据的指针在 buffer中的位置;函数

recvx:接收请求时的指针在 buffer 中的位置;工具

recvq和sendq分别表示等待接收数据的 goroutine 与等待发送数据的 goroutine;源码分析

sendq和recvq的类型是waitq的结构体:

type waitq struct {
	first *sudog
	last  *sudog
}

waitq里面链接的是一个sudog双向链表,保存的是等待的goroutine 。整个chan的图例大概是这样:

Group 40

下面看一下建立chan,咱们经过汇编结果也能够查看到make(chan int)这句代码会调用到runtime的makechan函数中:

const (
	maxAlign  = 8
	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) 
)

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// 略去检查代码
	... 
    //计算须要分配的buf空间
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}
 
	var c *hchan
	switch {
	case mem == 0:
		// chan的size或者元素的size是0,没必要建立buf
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector 
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// 元素不是指针,分配一块连续的内存给hchan数据结构和buf
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		// 表示hchan后面在内存里紧跟着就是buf
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// 元素包含指针,那么单独分配buf
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
 
	return c
}

首先咱们能够看到计算hchanSize:

maxAlign  = 8
	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))

maxAlign是8,那么maxAlign-1的二进制就是111,而后和int(unsafe.Sizeof(hchan{}))取与就是取它的低三位,hchanSize就获得的是8的整数倍,作对齐使用。

这里switch有三种状况,第一种状况是缓冲区所需大小为 0,那么在为 hchan 分配内存时,只须要分配 sizeof(hchan) 大小的内存;

第二种状况是缓冲区所需大小不为 0,并且数据类型不包含指针,那么就分配连续的内存。注意的是,咱们在建立channel的时候能够指定类型为指针类型:

//chan里存入的是int的指针
c := make(chan *int)
//chan里存入的是int的值
c := make(chan int)

第三种状况是缓冲区所需大小不为 0,并且数据类型包含指针,那么就不使用add的方式让hchan和buf放在一块儿了,而是单独的为buf申请一块内存。

发送数据

channel的阻塞非阻塞

在看发送数据的代码以前,咱们先看一下什么是channel的阻塞和非阻塞。

通常状况下,传入的参数都是 block=true,即阻塞调用,一个往 channel 中插入数据的 goroutine 会阻塞到插入成功为止。

非阻塞是只这种状况:

select {
case c <- v:
	... foo
default:
	... bar
}

编译器会将其改成:

if selectnbsend(c, v) {
	... foo
} else {
	... bar
}

selectnbsend方法传入的block就是false:

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	return chansend(c, elem, false, getcallerpc())
}

chansend方法

向通道发送数据咱们经过汇编结果能够发现是在runtime 中经过 chansend 实现的,方法比较长下面咱们分段来进行理解:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		// 对于非阻塞的发送,直接返回
		if !block {
			return false
		}
		// 对于阻塞的通道,将 goroutine 挂起
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
	...
}

这里会对chan作一个判断,若是它是空的,那么对于非阻塞的发送,直接返回 false;对于阻塞的通道,将 goroutine 挂起,而且永远不会返回。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	...
	// 非阻塞的状况下,若是通道没有关闭,知足如下一条:
	// 1.没有缓冲区而且当前没有接收者   
	// 2.缓冲区不为0,而且已满
	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
		return false
	}
	...
}

须要注意的是这里是没有加锁的,go虽然在使用指针读取单个值的时候原子性的,可是读取多个值并不能保证,因此在判断完closed虽然是没有关闭的,那么在读取完以后依然可能在这一瞬间从未关闭状态转变成关闭状态。那么就有两种可能:

  • 通道没有关闭,并且已经满了,那么须要返回false,没有问题;
  • 通道关闭,并且已经满了,可是在非阻塞的发送中返回false,也没有问题;

有关go的一致性原语,能够看这篇:The Go Memory Model

上面的这些判断被称为 fast path,由于加锁的操做是一个很重的操做,因此可以在加锁以前返回的判断就在加锁以前作好是最好的。

下面接着看看加锁部分的代码:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	...
	//加锁
	lock(&c.lock)
	// 是否关闭的判断
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
	// 从 recvq 中取出一个接收者
	if sg := c.recvq.dequeue(); sg != nil { 
		// 若是接收者存在,直接向该接收者发送数据,绕过buffer
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
	...
}

进入了lock区域以后还须要再判断如下close的状态,而后从recvq 中取出一个接收者,若是已经有接收者,那么就向第一个接收者发送当前enqueue的消息。这里须要注意的是若是有接收者在队列中等待,则说明此时的缓冲区是空的。

既然是一行行分析代码,那么咱们再进入到send看一下实现:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	...
	if sg.elem != nil {
		// 直接把要发送的数据copy到reciever的栈空间
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	// 唤醒对应的 goroutine
	goready(gp, skip+1)
}

在send方法里,sg就是goroutine打包好的对象,ep是对应要发送数据的指针,sendDirect方法会调用memmove进行数据的内存拷贝。而后goready函数会唤醒对应的 goroutine进行调度。

回到chansend方法,继续往下看:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	...
	// 若是缓冲区没有满,直接将要发送的数据复制到缓冲区
	if c.qcount < c.dataqsiz {
		// 找到buf要填充数据的索引位置
		qp := chanbuf(c, c.sendx)
		...
		// 将数据拷贝到 buffer 中
		typedmemmove(c.elemtype, qp, ep)
		// 数据索引前移,若是到了末尾,又从0开始
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		// 元素个数加1,释放锁并返回
		c.qcount++
		unlock(&c.lock)
		return true
	}
	...
}

这里会判断buf缓冲区有没有满,若是没有满,那么就找到buf要填充数据的索引位置,调用typedmemmove方法将数据拷贝到buf中,而后从新设值sendx偏移量。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	...
	// 缓冲区没有空间了,因此对于非阻塞调用直接返回
	if !block {
		unlock(&c.lock)
		return false
	}
	// 建立 sudog 对象
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	// 将sudog 对象入队
	c.sendq.enqueue(mysg)
	// 进入等待状态
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	...
}

这里会作两部分的操做,对于非阻塞的调用会直接返回;对于阻塞的调用会建立sudog 对象,而后将sudog对象入队以后gopark将 goroutine 转入 waiting 状态,并解锁。调用gopark以后,在使用者看来该向 channel 发送数据的代码语句会进行阻塞。

这里也须要注意一下,若是缓冲区为0,那么也会进入到这里,会调用到gopark立马阻塞,因此在使用的时候须要记得接收数据,防止向chan发送数据的那一端永远阻塞,如:

func process(timeout time.Duration) bool {
    ch := make(chan bool)

    go func() {
        // 模拟处理耗时的业务
        time.Sleep((timeout + time.Second))
        ch <- true // block
        fmt.Println("exit goroutine")
    }()
    select {
    case result := <-ch:
        return result
    case <-time.After(timeout):
        return false
    }
}

若是这里在select的时候直接timeout返回了,而没有调用 result := <-ch,那么goroutine 就会永远阻塞。

到这里发送的代码就讲解完了,整个流程大体以下:

好比我要执行:ch<-10

Group100
  1. 检查 recvq 是否为空,若是不为空,则从 recvq 头部取一个 goroutine,将数据发送过去;
  2. 若是 recvq 为空,,而且buf没有满,则将数据放入到 buf中;
  3. 若是 buf已满,则将要发送的数据和当前 goroutine 打包成sudog,而后入队到sendq队列中,并将当前 goroutine 置为 waiting 状态进行阻塞。

接收数据

从chan获取数据实现函数为 chanrecv。下面咱们看一下代码实现:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	if c == nil {
		// 若是 c 为空且是非阻塞调用,那么直接返回 (false,false)
		if !block {
			return
		}
		// 阻塞调用直接等待
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
	// 对于非阻塞的状况,而且没有关闭的状况
	// 若是是无缓冲chan或者是chan中没有数据,那么直接返回 (false,false)
	if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
		atomic.Load(&c.closed) == 0 {
		return
	}
	// 上锁
	lock(&c.lock)
	// 若是已经关闭,而且chan中没有数据,返回 (true,false)
	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}
	...
}

chanrecv方法和chansend方法是同样的,首先也是作非空判断,若是chan没有初始化,那么若是是非阻塞调用,那么直接返回 (false,false),阻塞调用会直接等待;

下面的两个if判断我放在一块儿来进行讲解,由于这里和chansend是不同的,chanrecv要根据不一样条件须要返回不一样的结果。

在上锁以前的判断是边界条件的判断:若是是非阻塞调用会判断chan没有发送方(dataqsiz为空且发送队列为空),或chan的缓冲为空(dataqsiz>0 而且qcount==0)而且chan是没有close,那么须要返回 (false,false);而chan已经关闭了,而且buf中没有数据,须要返回 (true,false);

为了实现这个需求,因此在chanrecv方法里面边界条件的判断都使用atomic方法进行了获取。

由于须要正确的获得chan已关闭,而且 buf 空会返回 (true, false),而不是 (false,false),因此在lock上锁以前须要使用atomic来获取参数防止重排序(Happens Before),所以必须使此处的 qcount 和 closed 的读取操做的顺序经过原子操做获得顺序保障

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	// 从发送者队列获取数据
	if sg := c.sendq.dequeue(); sg != nil { 
		// 发送者队列不为空,直接从发送者那里提取数据
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	} 
	...
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	// 若是是无缓冲区chan
	if c.dataqsiz == 0 {
		...
		if ep != nil {
			// 直接从发送者拷贝数据
			recvDirect(c.elemtype, sg, ep)
		}
	// 有缓冲区chan
	} else { 
		// 获取buf的存放数据指针
		qp := chanbuf(c, c.recvx) 
		...
		// 直接从缓冲区拷贝数据给接收者
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		} 
		// 从发送者拷贝数据到缓冲区
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	// 将发送者唤醒
	goready(gp, skip+1)
}

在这里若是有发送者在队列等待,那么直接从发送者那里提取数据,而且唤醒这个发送者。须要注意的是因为有发送者在等待,因此若是有缓冲区,那么缓冲区必定是满的

在唤醒发送者以前须要对缓冲区作判断,若是是无缓冲区,那么直接从发送者那里提取数据;若是有缓冲区首先会获取recvx的指针,而后将从缓冲区拷贝数据给接收者,再将发送者数据拷贝到缓冲区。

而后将recvx加1,至关于将新的数据移到了队尾,再将recvx的值赋值给sendx,最后调用goready将发送者唤醒,这里有些绕,咱们经过图片来展现:

Group 66

这里展现的是在chansend中将数据拷贝到缓冲区中,当数据满的时候会将sendx的指针置为0,因此当buf环形队列是满的时候sendx等于recvx。

而后再来看看chanrecv中发送者队列有数据的时候移交缓冲区的数据是怎么作的:

Group 85

这里会将recvx为0处的数据直接从缓存区拷贝数据给接收者,而后将发送者拷贝数据到缓冲区recvx指针处,而后将recvx指针加1并将recvx赋值给sendx,因为是满的因此用recvx加1的效果实现了将新加入的数据入库到队尾的操做。

接着往下看:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	// 若是缓冲区中有数据
	if c.qcount > 0 { 
		qp := chanbuf(c, c.recvx)
		...
		// 从缓冲区复制数据到 ep
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		// 接收数据的指针前移
		c.recvx++
		// 环形队列,若是到了末尾,再从0开始
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		// 缓冲区中现存数据减一
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
	...
}

到了这里,说明缓冲区中有数据,可是发送者队列没有数据,那么将数据拷贝到接收数据的协程,而后将接收数据的指针前移,若是已经到了队尾,那么就从0开始,最后将缓冲区中现存数据减一并解锁。

下面就是缓冲区中没有数据的状况:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	// 非阻塞,直接返回
	if !block {
		unlock(&c.lock)
		return false, false
	} 
	// 建立sudog
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	} 
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	// 将sudog添加到接收队列中
	c.recvq.enqueue(mysg)
	// 阻塞住goroutine,等待被唤醒
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
	...
}

若是是非阻塞调用,直接返回;阻塞调用会将当前goroutine 封装成sudog,而后将sudog添加到接收队列中,调用gopark阻塞住goroutine,等待被唤醒。

关闭通道

关闭通道会调用到closechan方法:

func closechan(c *hchan) {
	// 1. 校验chan是否已初始化
	if c == nil {
		panic(plainError("close of nil channel"))
	}
	// 加锁
	lock(&c.lock)
	// 若是已关闭了,那么不能被再次关闭
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}
	...
	// 设置chan已关闭
	c.closed = 1
	// 申明一个存放g的list,用于存放在等待队列中的groutine
	var glist gList

	// 2. 获取全部接收者
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		// 加入队列中
		glist.push(gp)
	}

	// 获取全部发送者
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		// 加入队列中
		glist.push(gp)
	}
	unlock(&c.lock)

	// 3.唤醒全部的glist中的goroutine 
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}
  1. 这个方法首先会校验chan是否已被初始化,而后加锁以后再校验是否已被关闭过,若是校验都经过了,那么将closed字段设值为1;
  2. 遍历全部的接收者和发送者,并将其goroutine 加入到glist中;
  3. 将全部glist中的goroutine加入调度队列,等待被唤醒,这里须要注意的是发送者在被唤醒以后会panic;

总结

chan在go中是一个很是强大的工具,使用它能够实现不少功能,可是为了可以高效的使用它咱们也应该去了解里面是如何实现的。这篇文章经过一步步分析从零开始了解go的chan是如何实现的,以及在使用过程当中有什么须要注意的事项,chan的buf环形队列是怎样维护的,但愿能对你有所帮助~

Reference

https://speakerdeck.com/kavya719/understanding-channels

https://golang.org/ref/mem

https://github.com/talkgo/night/issues/450

https://codeburst.io/diving-deep-into-the-golang-channels-549fd4ed21a8

相关文章
相关标签/搜索