Today that you are wasting is the unattainable tomorrow to someone who expired yesterday. This very moment that you detest is the unreturnable experience to your future self.
sync.Cond
实现了一个条件变量,用于等待一个或一组goroutines知足条件后唤醒的场景。每一个
Cond关联一个
Locker一般是一个
*Mutex或
RWMutex\`根据需求初始化不一样的锁。数据结构
老规矩正式剖析源码前,先来看看sync.Cond
如何使用。好比咱们实现一个FIFO
的队列app
`package main` `import (` `"fmt"` `"math/rand"` `"os"` `"os/signal"` `"sync"` `"time"` `)` `type FIFO struct {` `lock sync.Mutex` `cond *sync.Cond` `queue []int` `}` `type Queue interface {` `Pop() int` `Offer(num int) error` `}` `func (f *FIFO) Offer(num int) error {` `f.lock.Lock()` `defer f.lock.Unlock()` `f.queue = append(f.queue, num)` `f.cond.Broadcast()` `return nil` `}` `func (f *FIFO) Pop() int {` `f.lock.Lock()` `defer f.lock.Unlock()` `for {` `for len(f.queue) == 0 {` `f.cond.Wait()` `}` `item := f.queue[0]` `f.queue = f.queue[1:]` `return item` `}` `}` `func main() {` `l := sync.Mutex{}` `fifo := &FIFO{` `lock: l,` `cond: sync.NewCond(&l),` `queue: []int{},` `}` `go func() {` `for {` `fifo.Offer(rand.Int())` `}` `}()` `time.Sleep(time.Second)` `go func() {` `for {` `fmt.Println(fmt.Sprintf("goroutine1 pop-->%d", fifo.Pop()))` `}` `}()` `go func() {` `for {` `fmt.Println(fmt.Sprintf("goroutine2 pop-->%d", fifo.Pop()))` `}` `}()` `ch := make(chan os.Signal, 1)` `signal.Notify(ch, os.Interrupt)` `<-ch` `}`
咱们定一个FIFO
队列有Offer
和Pop
两个操做,咱们起一个gorountine
不断向队列投放数据,另外两个gorountine
不断取拿数据。less
Pop
操做会判断若是队列里没有数据len(f.queue) == 0
则调用f.cond.Wait()
将goroutine
挂起。Offer
操做投放数据成功,里面调用f.cond.Broadcast()
来唤醒全部挂起在这个mutex
上的goroutine
。固然sync.Cond
也提供了一个Signal()
,有点儿相似Java中的notify()
和notifyAll()
的意思 主要是唤醒一个和唤醒所有的区别。总结一下sync.Mutex
的大体用法ide
mutex
,这里sync.Mutex
/sync.RWMutex
可根据实际状况选用sync.NewCond(l Locker) *Cond
使用1中的mutex
做为入参 注意 这里传入的是指针 为了不c.L.Lock()
、c.L.Unlock()
调用频繁复制锁 致使死锁cond.Wait()
挂起goroutine
cond.Broadcast()
唤起全部挂起的gorotune
另外一个方法cond.Signal()
唤醒一个最早挂起的goroutine
须要注意的是cond.wait()
的使用须要参照以下模版 具体为啥咱们后续分析源码分析
`c.L.Lock()` `for !condition() {` `c.Wait()` `}` `... make use of condition ...` `c.L.Unlock()`
分析具体方法前,咱们先来了解下sync.Cond
的数据结构。具体源码以下:ui
`type Cond struct {` `noCopy noCopy // Cond使用后不容许拷贝` `// L is held while observing or changing the condition` `L Locker` `//通知列表调用wait()方法的goroutine会被放到notifyList中` `notify notifyList` `checker copyChecker //检查Cond实例是否被复制` `}`
noCopy
以前讲过 不清楚的能够看下《你真的了解mutex吗》,除此以外,Locker
是咱们刚刚谈到的mutex
,copyChecker
是用来检查Cond实例是否被复制的,就有一个方法 :this
`func (c *copyChecker) check() {` `if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&` `!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&` `uintptr(*c) != uintptr(unsafe.Pointer(c)) {` `panic("sync.Cond is copied")` `}` `}`
大体意思是说,初始type copyChecker uintptr
默认为0,当第一次调用check()
会将copyChecker
自身的地址复制给本身,至于为何uintptr(*c) != uintptr(unsafe.Pointer(c))
会被调用2次,由于期间goroutine
可能已经改变copyChecker
。二次调用若是不相等,则说明sync.Cond
被复制,从新分配了内存地址。atom
sync.Cond
比较有意思的是notifyList
spa
`type notifyList struct {` `// wait is the ticket number of the next waiter. It is atomically` `// incremented outside the lock.` `wait uint32 // 等待goroutine操做的数量` `// notify is the ticket number of the next waiter to be notified. It can` `// be read outside the lock, but is only written to with lock held.` `//` `// Both wait & notify can wrap around, and such cases will be correctly` `// handled as long as their "unwrapped" difference is bounded by 2^31.` `// For this not to be the case, we'd need to have 2^31+ goroutines` `// blocked on the same condvar, which is currently not possible.` `notify uint32 // 唤醒goroutine操做的数量` `// List of parked waiters.` `lock mutex` `head *sudog` `tail *sudog` `}`
包含了3类字段:指针
wait
和notify
两个无符号整型,分别表示了Wait()
操做的次数和goroutine
被唤醒的次数,wait
应该是恒大于等于notify
lock mutex
这个跟sync.Mutex
咱们分析信号量阻塞队列时semaRoot
里的mutex
同样,并非Go
提供开发者使用的sync.Mutex
,而是系统内部运行时实现的一个简单版本的互斥锁。head
和tail
看名字,咱们就能脑补出跟链表很像 没错这里就是维护了阻塞在当前sync.Cond
上的goroutine
构成的链表总体来说sync.Cond
大致结构为:
cond architecture
`func (c *Cond) Wait() {` `//1. 检查cond是否被拷贝` `c.checker.check()` `//2. notifyList.wait+1` `t := runtime_notifyListAdd(&c.notify)` `//3. 释放锁 让出资源给其余goroutine` `c.L.Unlock()` `//4. 挂起goroutine` `runtime_notifyListWait(&c.notify, t)` `//5. 尝试得到锁` `c.L.Lock()` `}`
从Wait()
方法源码很容易看出它的操做大概分了5步:
copyChecker.check()
保证sync.Cond
不会被拷贝Wait()
会将sync.Cond.notifyList.wait
属性进行加一操做,这也是它完成FIFO
的基石,根据wait
来判断\`goroutine1等待的顺序`//go:linkname notifyListAdd sync.runtime_notifyListAdd` `func notifyListAdd(l *notifyList) uint32 {` `// This may be called concurrently, for example, when called from` `// sync.Cond.Wait while holding a RWMutex in read mode.` `return atomic.Xadd(&l.wait, 1) - 1` `}`
c.L.Unlock()
释放锁,由于当前goroutine
即将被gopark
,让出锁给其余goroutine
避免死锁runtime_notifyListWait(&c.notify, t)
可能稍微复杂一点儿`// notifyListWait waits for a notification. If one has been sent since` `// notifyListAdd was called, it returns immediately. Otherwise, it blocks.` `//go:linkname notifyListWait sync.runtime_notifyListWait` `func notifyListWait(l *notifyList, t uint32) {` `lockWithRank(&l.lock, lockRankNotifyList)` `// 若是已经被唤醒 则当即返回` `if less(t, l.notify) {` `unlock(&l.lock)` `return` `}` `// Enqueue itself.` `s := acquireSudog()` `s.g = getg()` `// 把等待递增序号赋值给s.ticket 为FIFO打基础` `s.ticket = t` `s.releasetime = 0` `t0 := int64(0)` `if blockprofilerate > 0 {` `t0 = cputicks()` `s.releasetime = -1` `}` `// 将当前goroutine插入到notifyList链表中` `if l.tail == nil {` `l.head = s` `} else {` `l.tail.next = s` `}` `l.tail = s` `// 最终调用gopark挂起当前goroutine` `goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)` `if t0 != 0 {` `blockevent(s.releasetime-t0, 2)` `}` `// goroutine被唤醒后释放sudog` `releaseSudog(s)` `}`
主要完成两个任务:
Signal
或Broadcast
方法,当前goroutine
被唤醒后 再次尝试得到锁Signal
唤醒一个等待时间最长的goroutine
,调用时不要求持有锁。
`func (c *Cond) Signal() {` `c.checker.check()` `runtime_notifyListNotifyOne(&c.notify)` `}`
具体实现也不复杂,先判断sync.Cond
是否被复制,而后调用runtime_notifyListNotifyOne
`//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne` `func notifyListNotifyOne(l *notifyList) {` `// wait==notify 说明没有等待的goroutine了` `if atomic.Load(&l.wait) == atomic.Load(&l.notify) {` `return` `}` `lockWithRank(&l.lock, lockRankNotifyList)` `// 锁下二次检查` `t := l.notify` `if t == atomic.Load(&l.wait) {` `unlock(&l.lock)` `return` `}` `// 更新下一个须要被唤醒的ticket number` `atomic.Store(&l.notify, t+1)` `// Try to find the g that needs to be notified.` `// If it hasn't made it to the list yet we won't find it,` `// but it won't park itself once it sees the new notify number.` `//` `// This scan looks linear but essentially always stops quickly.` `// Because g's queue separately from taking numbers,` `// there may be minor reorderings in the list, but we` `// expect the g we're looking for to be near the front.` `// The g has others in front of it on the list only to the` `// extent that it lost the race, so the iteration will not` `// be too long. This applies even when the g is missing:` `// it hasn't yet gotten to sleep and has lost the race to` `// the (few) other g's that we find on the list.` `//这里是FIFO实现的核心 其实就是遍历链表 sudog.ticket查找指定须要唤醒的节点` `for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {` `if s.ticket == t {` `n := s.next` `if p != nil {` `p.next = n` `} else {` `l.head = n` `}` `if n == nil {` `l.tail = p` `}` `unlock(&l.lock)` `s.next = nil` `readyWithTime(s, 4)` `return` `}` `}` `unlock(&l.lock)` `}`
主要逻辑:
notify
属性,由于是根据notify
和sudog.ticket
匹配来查找须要唤醒的goroutine
,由于其是递增生成的,故而有了FIFO
语义。head
开始依据next
指针依次遍历。这个过程是线性的,故而时间复杂度为O(n),不过官方说法这个过程实际比较快This scan looks linear but essentially always stops quickly.
有个小细节:还记得咱们Wait()
操做中,wait
属性原子更新和goroutine插入等待链表是两个单独的步骤,因此存在竞争的状况下,链表中的节点可能会轻微的乱序产生。可是不要担忧,由于ticket是原子递增的 因此唤醒顺序不会乱。
Broadcast()
与Singal()
区别主要是它能够唤醒所有等待的goroutine
,并直接将wait
属性的值赋值给notify
。
`func (c *Cond) Broadcast() {` `c.checker.check()` `runtime_notifyListNotifyAll(&c.notify)` `}` `// notifyListNotifyAll notifies all entries in the list.` `//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll` `func notifyListNotifyAll(l *notifyList) {` `// Fast-path 无等待goroutine直接返回` `if atomic.Load(&l.wait) == atomic.Load(&l.notify) {` `return` `}` `lockWithRank(&l.lock, lockRankNotifyList)` `s := l.head` `l.head = nil` `l.tail = nil` `// 直接更新notify=wait` `atomic.Store(&l.notify, atomic.Load(&l.wait))` `unlock(&l.lock)` `// 依次调用goready唤醒goroutine` `for s != nil {` `next := s.next` `s.next = nil` `readyWithTime(s, 4)` `s = next` `}` `}`
逻辑比较简单再也不赘述
sync.Cond
一旦建立使用 不容许被拷贝,由noCopy
和copyChecker
来限制保护。Wait()
操做先是递增notifyList.wait
属性 而后将goroutine
封装进sudog
,将notifyList.wait
赋值给sudog.ticket
,而后将sudog
插入notifyList
链表中Singal()
实际是按照notifyList.notify
跟notifyList
链表中节点的ticket
匹配 来肯定唤醒的goroutine,由于notifyList.notify
和notifyList.wait
都是原子递增的,故而有了FIFO
的语义Broadcast()
相对简单 就是唤醒所有等待的goroutine
若是阅读过程当中发现本文存疑或错误的地方,能够关注公众号留言。若是以为还能够 帮忙点个在看😁