「本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!」前端
从一个nil的chan接收数据会deadlockweb
func main() {
var a chan int
fmt.Println(<-a)
}
fatal error: all goroutines are asleep - deadlock!
复制代码
向一个nil的chan发送数据会deadlock后端
func main() {
var a chan int
a <- 1
}
fatal error: all goroutines are asleep - deadlock!
复制代码
从一个已经关闭的chan获取数据,获得对应的零值数组
func main() {
var a chan int
a = make(chan int, 1)
close(a)
v, ok := <-a
fmt.Println(v, ok) //0,false
}
复制代码
向一个已经关闭的chan发送数据会panic安全
func main() {
var a chan int
a = make(chan int, 1)
close(a)
a <- 1
}
panic: send on closed channel
复制代码
把一个已经关闭的chan再次关闭会panicmarkdown
func main() {
var a chan int
a = make(chan int, 1)
close(a)
close(a)
}
panic: close of closed channel
复制代码
没有buffer的chan,要提早作好接收的准备,不然会deadlocksvg
func main() {
var a chan int
a = make(chan int)
a <- 1
}
fatal error: all goroutines are asleep - deadlock!
复制代码
有buffer的chan,在buffer满了以后,再发送会deadlock源码分析
func main() {
var a chan int
a = make(chan int, 1)
a <- 1 //不会报错
a <- 2 //报错
}
fatal error: all goroutines are asleep - deadlock!
复制代码
nil的chan,在select和default组合下,不会报错post
func main() {
var a chan int
select {
case <-a: //nil的chan并不会报错
default: //会走到default
}
}
复制代码
range一个chan,记得close。不然会deadlockui
func main() {
var a chan int
a = make(chan int)
go func() {
a <- 1
close(a) //若是没close,那么就会deadlock
}()
for v := range a {
fmt.Println(v)
}
time.Sleep(time.Second)
}
复制代码
从make chan开始
func makechan(t *chantype, size int) *hchan {
elem := t.elem
省略...
switch {
case mem == 0: //无缓冲的chan
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0: //元素不含指针
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: //默认
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
复制代码
hchan的结构
type hchan struct {
qcount uint // 环形队列的长度
dataqsiz uint // 环形队列的长度、缓冲区的大小
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每一个元素的大小
closed uint32 // 通道是否关闭,1关闭 0打开
elemtype *_type // 元素类型
sendx uint // 已发送元素在循环数组中的索引
recvx uint // 已接收元素在循环数组中的索引
recvq waitq // 等待接收的goroutine队列
sendq waitq // 等待发送的goroutine队列
lock mutex //互斥锁,数据的出入都是要锁保护的
}
复制代码
发一个数据
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc()) // block=true
}
复制代码
像c<-x这样的语句会被编译成chansend1
,chansend1
调用chansend
。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block { //block=false的状况 通常在select的时候
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) //向nil的chan发送会发生阻塞
throw("unreachable")
}
if !block && c.closed == 0 && full(c) { //select的时候,chan没关闭,且(buffer也满了或接收方未准备好接收)
return false
}
...
lock(&c.lock) //上锁安全保护
if c.closed != 0 { //已经关闭的chan
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil { //正好有等待取的goroutine
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz { //缓冲区没满
qp := chanbuf(c, c.sendx) //获取当前sendx的索引
...
typedmemmove(c.elemtype, qp, ep) //新元素的copy进去
c.sendx++ // 索引+1
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++ //数量加+1
unlock(&c.lock)
return true
}
// 接下来都是缓冲区满的状况
if !block {
unlock(&c.lock)
return false
}
gp := getg()
mysg := acquireSudog() //打包suog
c.sendq.enqueue(mysg) //推入发送队列
...
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) //让发送的goroutine进入睡眠,等待被唤醒
//如下是恢复时干的事
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg) //释放sudog
return true
}
复制代码
sudog
结构接收一个数据
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
复制代码
像<-c这样的语句会被编译成chanrecv1
,chanrecv1
调用chanrecv
。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c == nil {
if !block { //不阻塞的话,直接返回
return
}
//从一个nil的chan接收数据会阻塞
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
lock(&c.lock) //上锁
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
if sg := c.sendq.dequeue(); sg != nil { //正好有发送者
recv(c, sg, ep, func() { unlock(&c.lock) }, 3) //(带缓冲和不带缓冲的接收)
return true, true
}
if c.qcount > 0 { // buffer有数据
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp) //直接取数据 copy方式
}
typedmemclr(c.elemtype, qp) //清理取掉的位置
c.recvx++ //索引加1
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
//如下缓冲区无数据
if !block { //不须要阻塞的话,直接返回
unlock(&c.lock)
return false, false
}
gp := getg()
mysg := acquireSudog() //打包成sudog
...
c.recvq.enqueue(mysg) //推入recvq队列
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) //让出cpu,等待下次调度
//被唤醒后
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg) //解包sudog
return true, !closed
}
复制代码
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 { //无缓冲区
...
if ep != nil {
// 从发送者那里copy数据
recvDirect(c.elemtype, sg, ep)
}
} else {//缓冲区必定满了
...
// copy data
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
...
goready(gp, skip+1) //唤醒准备发送的goroutine
}
复制代码
关闭一个通道
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel")) // close 一个nil的chan panic
}
lock(&c.lock) //上锁
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel")) //已经关闭的chan 再次close会panic
}
...
c.closed = 1 //关闭chan
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
复制代码