本次的代码是基于go version go1.13.15 darwin/amd64
less
Go语言标准库中的条件变量sync.Cond
,它可让一组的Goroutine
都在知足特定条件时被唤醒。函数
每一个Cond
都会关联一个Lock(*sync.Mutex or *sync.RWMutex)
ui
var ( locker = new(sync.Mutex) cond = sync.NewCond(locker) ) func listen(x int) { // 获取锁 cond.L.Lock() // 等待通知 暂时阻塞 cond.Wait() fmt.Println(x) // 释放锁 cond.L.Unlock() } func main() { // 启动60个被cond阻塞的线程 for i := 1; i <= 60; i++ { go listen(i) } fmt.Println("start all") // 3秒以后 下发一个通知给已经获取锁的goroutine time.Sleep(time.Second * 3) fmt.Println("++++++++++++++++++++one Signal") cond.Signal() // 3秒以后 下发一个通知给已经获取锁的goroutine time.Sleep(time.Second * 3) fmt.Println("++++++++++++++++++++one Signal") cond.Signal() // 3秒以后 下发广播给全部等待的goroutine time.Sleep(time.Second * 3) fmt.Println("++++++++++++++++++++begin broadcast") cond.Broadcast() // 阻塞直到全部的所有输出 time.Sleep(time.Second * 60) }
上面是个简单的例子,咱们启动了60个线程,而后都被cond
阻塞,主函数经过Signal()
通知一个goroutine
接触阻塞,经过Broadcast()
通知全部被阻塞的所有解除阻塞。atom
// Wait 原子式的 unlock c.L, 并暂停执行调用的 goroutine。 // 在稍后执行后,Wait 会在返回前 lock c.L. 与其余系统不一样, // 除非被 Broadcast 或 Signal 唤醒,不然等待没法返回。 // // 由于等待第一次 resume 时 c.L 没有被锁定,因此当 Wait 返回时, // 调用者一般不能认为条件为真。相反,调用者应该在循环中使用 Wait(): // // c.L.Lock() // for !condition() { // c.Wait() // } // ... make use of condition ... // c.L.Unlock() // type Cond struct { // 用于保证结构体不会在编译期间拷贝 noCopy noCopy // 锁 L Locker // goroutine链表,维护等待唤醒的goroutine队列 notify notifyList // 保证运行期间不会发生copy checker copyChecker }
重点分析下:notifyList
和copyChecker
线程
type notifyList struct { // 总共须要等待的数量 wait uint32 // 已经通知的数量 notify uint32 // 锁 lock uintptr // 指向链表头部 head *sudog // 指向链表尾部 tail *sudog }
这个是核心,全部wait
的goroutine
都会被加入到这个链表中,而后在通知的时候再从这个链表中获取。code
保证运行期间不会发生copy对象
type copyChecker uintptr // copyChecker holds back pointer to itself to detect object copying func (c *copyChecker) check() { if uintptr(*c) != uintptr(unsafe.Pointer(c)) && !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) && uintptr(*c) != uintptr(unsafe.Pointer(c)) { panic("sync.Cond is copied") } }
func (c *Cond) Wait() { // 监测是否复制 c.checker.check() // 更新 notifyList中须要等待的wait的数量 // 返回当前须要插入链表节点ticket t := runtime_notifyListAdd(&c.notify) c.L.Unlock() // 为当前的加入的waiter构建一个链表的节点,插入链表的尾部 runtime_notifyListWait(&c.notify, t) c.L.Lock() } // go/src/runtime/sema.go // 更新 notifyList中须要等待的wait的数量 // 同时返回当前的加入的 waiter 的 ticket 编号,从0开始 //go:linkname notifyListAdd sync.runtime_notifyListAdd func notifyListAdd(l *notifyList) uint32 { // 使用atomic原子的对wait字段进行加一操做 return atomic.Xadd(&l.wait, 1) - 1 } // go/src/runtime/sema.go // 为当前的加入的waiter构建一个链表的节点,插入链表的尾部 //go:linkname notifyListWait sync.runtime_notifyListWait func notifyListWait(l *notifyList, t uint32) { lock(&l.lock) // 当t小于notifyList中的notify,说明当前节点已经被通知了 if less(t, l.notify) { unlock(&l.lock) return } // 构建当前节点 s := acquireSudog() s.g = getg() s.ticket = t s.releasetime = 0 t0 := int64(0) if blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } // 头结点没构建,插入头结点 if l.tail == nil { l.head = s } else { // 插入到尾节点 l.tail.next = s } l.tail = s // 将当前goroutine置于等待状态并解锁 // 经过调用goready(gp),可使goroutine再次可运行。 // 也就是将 M/P/G 解绑,并将 G 调整为等待状态,放入 sudog 等待队列中 goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3) if t0 != 0 { blockevent(s.releasetime-t0, 2) } releaseSudog(s) }
梳理流程blog
一、首先检测对象的复制行为,若是有复制发生直接抛出panic;队列
二、而后调用runtime_notifyListAdd
对notifynotifyListList
中的wait
(须要等待的数量)进行加一操做,同时返回一个ticket
,用来做为当前wait
的编号,这个编号,会和notifyList
中的notify
对应起来;get
三、而后调用runtime_notifyListWait
把当前的wait
封装成链表的一个节点,插入到notifyList
维护的链表的尾部。
// 唤醒一个被wait的goroutine func (c *Cond) Signal() { // 监测是否复制 c.checker.check() runtime_notifyListNotifyOne(&c.notify) } // go/src/runtime/sema.go // 通知链表中的第一个 //go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne func notifyListNotifyOne(l *notifyList) { // wait和notify,说明已经所有通知到了 if atomic.Load(&l.wait) == atomic.Load(&l.notify) { return } lock(&l.lock) // 这里作了二次的确认 // wait和notify,说明已经所有通知到了 t := l.notify if t == atomic.Load(&l.wait) { unlock(&l.lock) return } // 原子的对notify执行+1操做 atomic.Store(&l.notify, t+1) // 尝试找到须要被通知的 g // 若是目前还没来得及入队,是没法找到的 // 可是,当它看到通知编号已经发生改变是不会被 park 的 // // 这个查找过程看起来是线性复杂度,但实际上很快就停了 // 由于 g 的队列与获取编号不一样,于是队列中会出现少许重排,但咱们但愿找到靠前的 g // 而 g 只有在再也不 race 后才会排在靠前的位置,所以这个迭代也不会过久, // 同时,即使找不到 g,这个状况也成立: // 它尚未休眠,而且已经失去了咱们在队列上找到的(少数)其余 g 的 race。 for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { // 顺序拿到一个节点的ticket,会和上面会和notifyList中的notify作比较,相同才进行后续的操做 // 这个咱们分析了,notifyList中的notify和链表节点中的ticket是一一对应的 if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil // 经过goready掉起在上面经过goparkunlock挂起的goroutine readyWithTime(s, 4) return } } unlock(&l.lock) }
梳理下流程:
一、首先检测对象的复制行为,若是有复制发生直接抛出panic
;
二、判断wait
和notify
,若是二者相同说明已经已经所有通知到了;
三、调用notifyListNotifyOne
,经过for循环,依次遍历这个链表,直到找到和notifyList
中的notify
,相匹配的ticket
的节点;
四、掉起goroutine
,完成通知。
// 唤醒全部被wait的goroutine func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify) } // go/src/runtime/sema.go // notifyListNotifyAll notifies all entries in the list. //go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll func notifyListNotifyAll(l *notifyList) { // wait和notify,说明已经所有通知到了 if atomic.Load(&l.wait) == atomic.Load(&l.notify) { return } // 加锁 lock(&l.lock) s := l.head l.head = nil l.tail = nil // 这个很粗暴,直接将notify的值置换成wait atomic.Store(&l.notify, atomic.Load(&l.wait)) unlock(&l.lock) // 循环链表,一个个唤醒goroutine for s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next } }
梳理下流程:
一、首先检测对象的复制行为,若是有复制发生直接抛出panic;
二、判断wait
和notify
,若是二者相同说明已经已经所有通知到了;
三、notifyListNotifyAll
,就相对简单了,直接将notify
的值置为wait
,标注这个已经所有通知了;
四、循环链表,一个个唤醒goroutine
。
sync.Cond
不是一个经常使用的同步机制,可是在条件长时间没法知足时,与使用for {}
进行忙碌等待相比,sync.Cond
可以让出处理器的使用权,提供CPU
的利用率。使用时咱们也须要注意如下问题:
一、sync.Cond.Wait
在调用以前必定要使用获取互斥锁,不然会触发程序崩溃;
二、sync.Cond.Signal
唤醒的 Goroutine
都是队列最前面、等待最久的Goroutine
;
三、sync.Cond.Broadcast
会按照必定顺序广播通知等待的所有 Goroutine
。