手摸手Go 深刻理解sync.Cond

Today that you are wasting is the unattainable tomorrow to someone who expired yesterday. This very moment that you detest is the unreturnable experience to your future self.

sync.Cond实现了一个条件变量,用于等待一个或一组goroutines知足条件后唤醒的场景。每一个Cond关联一个Locker一般是一个*MutexRWMutex\`根据需求初始化不一样的锁。数据结构

基本用法

老规矩正式剖析源码前,先来看看sync.Cond如何使用。好比咱们实现一个FIFO的队列app

`package main`
`import (`
 `"fmt"`
 `"math/rand"`
 `"os"`
 `"os/signal"`
 `"sync"`
 `"time"`
`)`
`type FIFO struct {`
 `lock  sync.Mutex`
 `cond  *sync.Cond`
 `queue []int`
`}`
`type Queue interface {`
 `Pop() int`
 `Offer(num int) error`
`}`
`func (f *FIFO) Offer(num int) error {`
 `f.lock.Lock()`
 `defer f.lock.Unlock()`
 `f.queue = append(f.queue, num)`
 `f.cond.Broadcast()`
 `return nil`
`}`
`func (f *FIFO) Pop() int {`
 `f.lock.Lock()`
 `defer f.lock.Unlock()`
 `for {`
 `for len(f.queue) == 0 {`
 `f.cond.Wait()`
 `}`
 `item := f.queue[0]`
 `f.queue = f.queue[1:]`
 `return item`
 `}`
`}`
`func main() {`
 `l := sync.Mutex{}`
 `fifo := &FIFO{`
 `lock:  l,`
 `cond:  sync.NewCond(&l),`
 `queue: []int{},`
 `}`
 `go func() {`
 `for {`
 `fifo.Offer(rand.Int())`
 `}`
 `}()`
 `time.Sleep(time.Second)`
 `go func() {`
 `for {`
 `fmt.Println(fmt.Sprintf("goroutine1 pop-->%d", fifo.Pop()))`
 `}`
 `}()`
 `go func() {`
 `for {`
 `fmt.Println(fmt.Sprintf("goroutine2 pop-->%d", fifo.Pop()))`
 `}`
 `}()`
 `ch := make(chan os.Signal, 1)`
 `signal.Notify(ch, os.Interrupt)`
 `<-ch`
`}`

咱们定一个FIFO 队列有OfferPop两个操做,咱们起一个gorountine不断向队列投放数据,另外两个gorountine不断取拿数据。less

  1. Pop操做会判断若是队列里没有数据len(f.queue) == 0则调用f.cond.Wait()goroutine挂起。
  2. 等到Offer操做投放数据成功,里面调用f.cond.Broadcast()来唤醒全部挂起在这个mutex上的goroutine。固然sync.Cond也提供了一个Signal(),有点儿相似Java中的notify()notifyAll()的意思 主要是唤醒一个和唤醒所有的区别。

总结一下sync.Mutex的大体用法ide

  1. 首先声明一个mutex,这里sync.Mutex/sync.RWMutex可根据实际状况选用
  2. 调用sync.NewCond(l Locker) *Cond 使用1中的mutex做为入参 注意 这里传入的是指针 为了不c.L.Lock()c.L.Unlock()调用频繁复制锁 致使死锁
  3. 根据业务条件 知足则调用cond.Wait()挂起goroutine
  4. cond.Broadcast()唤起全部挂起的gorotune 另外一个方法cond.Signal()唤醒一个最早挂起的goroutine

须要注意的是cond.wait()的使用须要参照以下模版 具体为啥咱们后续分析源码分析

`c.L.Lock()`
 `for !condition() {`
 `c.Wait()`
 `}`
 `... make use of condition ...`
 `c.L.Unlock()`

源码分析

数据结构

分析具体方法前,咱们先来了解下sync.Cond的数据结构。具体源码以下:ui

`type Cond struct {`
 `noCopy noCopy // Cond使用后不容许拷贝`
 `// L is held while observing or changing the condition`
 `L Locker`
 `//通知列表调用wait()方法的goroutine会被放到notifyList中`
 `notify  notifyList`
 `checker copyChecker //检查Cond实例是否被复制`
`}`

noCopy以前讲过 不清楚的能够看下《你真的了解mutex吗》,除此以外,Locker是咱们刚刚谈到的mutexcopyChecker是用来检查Cond实例是否被复制的,就有一个方法 :this

`func (c *copyChecker) check() {`
 `if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&`
 `!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&`
 `uintptr(*c) != uintptr(unsafe.Pointer(c)) {`
 `panic("sync.Cond is copied")`
 `}`
`}`

大体意思是说,初始type copyChecker uintptr默认为0,当第一次调用check()会将copyChecker自身的地址复制给本身,至于为何uintptr(*c) != uintptr(unsafe.Pointer(c))会被调用2次,由于期间goroutine可能已经改变copyChecker。二次调用若是不相等,则说明sync.Cond被复制,从新分配了内存地址。atom

sync.Cond比较有意思的是notifyListspa

`type notifyList struct {`
 `// wait is the ticket number of the next waiter. It is atomically`
 `// incremented outside the lock.`
 `wait uint32 // 等待goroutine操做的数量`
 `// notify is the ticket number of the next waiter to be notified. It can`
 `// be read outside the lock, but is only written to with lock held.`
 `//`
 `// Both wait & notify can wrap around, and such cases will be correctly`
 `// handled as long as their "unwrapped" difference is bounded by 2^31.`
 `// For this not to be the case, we'd need to have 2^31+ goroutines`
 `// blocked on the same condvar, which is currently not possible.`
 `notify uint32 // 唤醒goroutine操做的数量`
 `// List of parked waiters.`
 `lock mutex`
 `head *sudog`
 `tail *sudog`
`}`

包含了3类字段:指针

  • waitnotify两个无符号整型,分别表示了Wait()操做的次数和goroutine被唤醒的次数,wait应该是恒大于等于notify
  • lock mutex 这个跟sync.Mutex咱们分析信号量阻塞队列时semaRoot里的mutex同样,并非Go提供开发者使用的sync.Mutex,而是系统内部运行时实现的一个简单版本的互斥锁。
  • headtail看名字,咱们就能脑补出跟链表很像 没错这里就是维护了阻塞在当前sync.Cond上的goroutine构成的链表

总体来说sync.Cond大致结构为:

图片

cond architecture

操做方法

Wait()操做

`func (c *Cond) Wait() {`
 `//1. 检查cond是否被拷贝`
 `c.checker.check()`
 `//2. notifyList.wait+1`
 `t := runtime_notifyListAdd(&c.notify)`
 `//3. 释放锁 让出资源给其余goroutine`
 `c.L.Unlock()`
 `//4. 挂起goroutine`
 `runtime_notifyListWait(&c.notify, t)`
 `//5. 尝试得到锁`
 `c.L.Lock()`
`}`

Wait()方法源码很容易看出它的操做大概分了5步:

  1. 调用copyChecker.check()保证sync.Cond不会被拷贝
  2. 每次调用Wait()会将sync.Cond.notifyList.wait属性进行加一操做,这也是它完成FIFO的基石,根据wait来判断\`goroutine1等待的顺序
`//go:linkname notifyListAdd sync.runtime_notifyListAdd`
`func notifyListAdd(l *notifyList) uint32 {`
 `// This may be called concurrently, for example, when called from`
 `// sync.Cond.Wait while holding a RWMutex in read mode.`
 `return atomic.Xadd(&l.wait, 1) - 1`
`}`
  1. 调用c.L.Unlock()释放锁,由于当前goroutine即将被gopark,让出锁给其余goroutine避免死锁
  2. 调用runtime_notifyListWait(&c.notify, t)可能稍微复杂一点儿
`// notifyListWait waits for a notification. If one has been sent since`
`// notifyListAdd was called, it returns immediately. Otherwise, it blocks.`
`//go:linkname notifyListWait sync.runtime_notifyListWait`
`func notifyListWait(l *notifyList, t uint32) {`
 `lockWithRank(&l.lock, lockRankNotifyList)`
 `// 若是已经被唤醒 则当即返回`
 `if less(t, l.notify) {`
 `unlock(&l.lock)`
 `return`
 `}`
 `// Enqueue itself.`
 `s := acquireSudog()`
 `s.g = getg()`
 `// 把等待递增序号赋值给s.ticket 为FIFO打基础`
 `s.ticket = t`
 `s.releasetime = 0`
 `t0 := int64(0)`
 `if blockprofilerate > 0 {`
 `t0 = cputicks()`
 `s.releasetime = -1`
 `}`
 `// 将当前goroutine插入到notifyList链表中`
 `if l.tail == nil {`
 `l.head = s`
 `} else {`
 `l.tail.next = s`
 `}`
 `l.tail = s`
 `// 最终调用gopark挂起当前goroutine`
 `goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)`
 `if t0 != 0 {`
 `blockevent(s.releasetime-t0, 2)`
 `}`
 `// goroutine被唤醒后释放sudog`
 `releaseSudog(s)`
`}`

主要完成两个任务:

  • 将当前goroutine插入到notifyList链表中
  • 调用gopark将当前goroutine挂起
  1. 当其余goroutine调用了SignalBroadcast方法,当前goroutine被唤醒后 再次尝试得到锁

Signal操做

Signal唤醒一个等待时间最长的goroutine,调用时不要求持有锁。

`func (c *Cond) Signal() {`
 `c.checker.check()`
 `runtime_notifyListNotifyOne(&c.notify)`
`}`

具体实现也不复杂,先判断sync.Cond是否被复制,而后调用runtime_notifyListNotifyOne

`//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne`
`func notifyListNotifyOne(l *notifyList) {`
 `// wait==notify 说明没有等待的goroutine了`
 `if atomic.Load(&l.wait) == atomic.Load(&l.notify) {`
 `return`
 `}`
 `lockWithRank(&l.lock, lockRankNotifyList)`
 `// 锁下二次检查`
 `t := l.notify`
 `if t == atomic.Load(&l.wait) {`
 `unlock(&l.lock)`
 `return`
 `}`
 `// 更新下一个须要被唤醒的ticket number`
 `atomic.Store(&l.notify, t+1)`
 `// Try to find the g that needs to be notified.`
 `// If it hasn't made it to the list yet we won't find it,`
 `// but it won't park itself once it sees the new notify number.`
 `//`
 `// This scan looks linear but essentially always stops quickly.`
 `// Because g's queue separately from taking numbers,`
 `// there may be minor reorderings in the list, but we`
 `// expect the g we're looking for to be near the front.`
 `// The g has others in front of it on the list only to the`
 `// extent that it lost the race, so the iteration will not`
 `// be too long. This applies even when the g is missing:`
 `// it hasn't yet gotten to sleep and has lost the race to`
 `// the (few) other g's that we find on the list.`
 `//这里是FIFO实现的核心 其实就是遍历链表 sudog.ticket查找指定须要唤醒的节点`
 `for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {`
 `if s.ticket == t {`
 `n := s.next`
 `if p != nil {`
 `p.next = n`
 `} else {`
 `l.head = n`
 `}`
 `if n == nil {`
 `l.tail = p`
 `}`
 `unlock(&l.lock)`
 `s.next = nil`
 `readyWithTime(s, 4)`
 `return`
 `}`
 `}`
 `unlock(&l.lock)`
`}`

主要逻辑:

  1. 判断是否存在等待须要被唤醒的goroutine 没有直接返回
  2. 递增notify属性,由于是根据notifysudog.ticket匹配来查找须要唤醒的goroutine,由于其是递增生成的,故而有了FIFO语义。
  3. 遍历notifyList持有的链表,从head开始依据next指针依次遍历。这个过程是线性的,故而时间复杂度为O(n),不过官方说法这个过程实际比较快This scan looks linear but essentially always stops quickly.

有个小细节:还记得咱们Wait()操做中,wait属性原子更新和goroutine插入等待链表是两个单独的步骤,因此存在竞争的状况下,链表中的节点可能会轻微的乱序产生。可是不要担忧,由于ticket是原子递增的 因此唤醒顺序不会乱。

Broadcast操做

Broadcast()Singal()区别主要是它能够唤醒所有等待的goroutine,并直接将wait属性的值赋值给notify

`func (c *Cond) Broadcast() {`
 `c.checker.check()`
 `runtime_notifyListNotifyAll(&c.notify)`
`}`
`// notifyListNotifyAll notifies all entries in the list.`
`//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll`
`func notifyListNotifyAll(l *notifyList) {`
 `// Fast-path 无等待goroutine直接返回`
 `if atomic.Load(&l.wait) == atomic.Load(&l.notify) {`
 `return`
 `}`
 `lockWithRank(&l.lock, lockRankNotifyList)`
 `s := l.head`
 `l.head = nil`
 `l.tail = nil`
 `// 直接更新notify=wait`
 `atomic.Store(&l.notify, atomic.Load(&l.wait))`
 `unlock(&l.lock)`
 `// 依次调用goready唤醒goroutine`
 `for s != nil {`
 `next := s.next`
 `s.next = nil`
 `readyWithTime(s, 4)`
 `s = next`
 `}`
`}`

逻辑比较简单再也不赘述

总结

  1. sync.Cond一旦建立使用 不容许被拷贝,由noCopycopyChecker来限制保护。
  2. Wait()操做先是递增notifyList.wait属性 而后将goroutine封装进sudog,将notifyList.wait赋值给sudog.ticket,而后将sudog插入notifyList链表中
  3. Singal()实际是按照notifyList.notifynotifyList链表中节点的ticket匹配 来肯定唤醒的goroutine,由于notifyList.notifynotifyList.wait都是原子递增的,故而有了FIFO的语义
  4. Broadcast()相对简单 就是唤醒所有等待的goroutine

若是阅读过程当中发现本文存疑或错误的地方,能够关注公众号留言。若是以为还能够 帮忙点个在看😁

图片

相关文章
相关标签/搜索