条件变量源码剖析

1. 条件变量介绍

Condition variables allow threads to wait until some event 

or condition has occurred.
复制代码

条件变量是并发编程中很经典的一个手段,经常使用的条件变量有两种实现。linux

  1. 非阻塞式条件变量(Nonblocking condition variables),通知且继续(signal and continue)。
  2. 阻塞式条件变量(Blocking condition variables),通知且等待(signal and wait)。

两种方式的区别是发出通知的线程是否会马上失去全部权,阻塞式条件变量的实现是会马上失去,非阻塞式条件变量的实现式不会马上失去。c++

条件变量在各语言/基础库中都有本身的实现,linux的pthread库和c++的std::condition_variable 都是非阻塞式条件变量的实现,而golang sync.Cond也是典型的非阻塞式条件变量的实现。golang

// Cond implements a condition variable, a rendezvous point

// for goroutines waiting for or announcing the occurrence

// of an event.
复制代码

咱们再来看一下go官方对cond的定义,上述内容是从go源码中摘出来的。翻译一下,意思以下:编程

sync.Cond 是golang对条件变量的实现,有两个关键点:安全

  1. 等待条件变量成立而进入等待状态的多个goroutine(wait 操做)。
  2. 通知事件的发生goroutine,使条件变量成立(signal/broadcast 操做)。

2. 使用

2.1 基本使用

package main



import (

   "fmt"

   "sync"

   "time"

)



// 唤醒检测标志

var flag = false



func CondTest(info string, c *sync.Cond) {

   // 使用条件变量前须要先加锁(wait()内部有释放锁操做)

   c.L.Lock()

   for flag == false {

      fmt.Println(info, "wait")

      // 挂起等待唤醒

      c.Wait()

   }

   fmt.Println(info, flag)

   c.L.Unlock()

}



func main() {

   m := sync.Mutex{}

   c := sync.NewCond(&m)

   // add 2 waiter

   go CondTest("go one", c)

   go CondTest("go two", c)



   // main

   time.Sleep(time.Second)

  

   c.L.Lock()

   flag = true

   c.L.Unlock()

   c.Broadcast() // 所有唤醒

   // c.Signal() // 唤醒1个

   fmt.Println("main broadcast")



   time.Sleep(time.Second)

}
复制代码

2.2 开源库使用

没咋找到😂,sync.Cond的场景基本均可以被chan替换掉markdown

2.3 适用场景

想象这么一个场景,有1个worker在异步的接收数据,剩下的n个waiter必须等待这个worker接收完数据才能继续下面的处理流程,这时咱们很容易想到两种方案。并发

  1. 自旋锁+全局变量。

缺点是须要不断轮询对应的变量来判断是否知足条件,且较难支持单waiter通知的操做。app

  1. select + chan。

Don't communicate by sharing memory, share memory by communicating.less

按照go的哲学,在这种并发的场景下,更推荐chan,可是若是使用chan来操做,须要worker明确感知到等待的waiter数来进行处理,好比有n个waiter就须要notify n次(用close也能够),而使用sync.Cond就能够极大的简化这个操做,只须要调用Broadcast便可完成多waiter的通知,除此以外cond也提供了相似于chan send单信号的通知(Singal)。异步

一句话总结:多waiter单worker的场景均可以使用sync.Cond

3. 源码分析

代码部分基于golang 1.16 64位机

3.1 内存模型

image.png

// Cond implements a condition variable, a rendezvous point

// for goroutines waiting for or announcing the occurrence

// of an event.

//

// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),

// which must be held when changing the condition and

// when calling the Wait method.

//

// A Cond must not be copied after first use.

type Cond struct {

    noCopy noCopy



    // L is held while observing or changing the condition

    L Locker



    notify  notifyList

    checker copyChecker

}



// A Locker represents an object that can be locked and unlocked.

type Locker interface {

   Lock()

   Unlock()

}



// Approximation of notifyList in runtime/sema.go. Size and alignment must

// agree.

// 每个waiter都会有1个ticket,能够理解为waiter的惟一标识,单调递增

type notifyList struct {

   wait   uint32 //下一个waiter的最大ticket,单调递增

   notify uint32 //下一个待唤醒的waiter的ticket值(ticket 小于该值的waiter都 即将/已经 处于唤醒态)

   lock   uintptr // key field of the mutex

   head   unsafe.Pointer

   tail   unsafe.Pointer

}



type copyChecker uintptr



// noCopy may be embedded into structs which must not be copied

// after the first use.

//

// See https://golang.org/issues/8005#issuecomment-190753527

// for details.

type noCopy struct{}
复制代码

3.2 核心接口

copy

函数定义:

func (c *copyChecker) check()



func (*noCopy) Lock()



func (*noCopy) Unlock()
复制代码

实现:

// copyChecker holds back pointer to itself to detect object copying.

type copyChecker uintptr



// copyChecker会存放copyChecker的地址,经过这个地址判断是否被拷贝

func (c *copyChecker) check() {

   // 检测copyChecker的地址是否与copyChecker中存储的相同

   if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&

      // 第一次调用会将copyChecker的地址存储在copyChecker中

      !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&

      // 保证第一次调用check()不报错

      uintptr(*c) != uintptr(unsafe.Pointer(c)) {

      panic("sync.Cond is copied")

   }

}



// noCopy may be embedded into structs which must not be copied

// after the first use.

//

// See https://golang.org/issues/8005#issuecomment-190753527

// for details.

type noCopy struct{}



// Lock is a no-op used by -copylocks checker from `go vet`.

func (*noCopy) Lock()   {}

func (*noCopy) Unlock() {}
复制代码

cond

函数定义:

//新建cond

func NewCond(l Locker) *Cond 



//等待

func (c *Cond) Wait() 



//单waiter唤醒

func (c *Cond) Signal() 



//全waiter唤醒

func (c *Cond) Broadcast() 
复制代码

实现:

// NewCond returns a new Cond with Locker l.

func NewCond(l Locker) *Cond {

   return &Cond{L: l}

}



// Wait atomically unlocks c.L and suspends execution

// of the calling goroutine. After later resuming execution,

// Wait locks c.L before returning. Unlike in other systems,

// Wait cannot return unless awoken by Broadcast or Signal.

//

// Because c.L is not locked when Wait first resumes, the caller

// typically cannot assume that the condition is true when

// Wait returns. Instead, the caller should Wait in a loop:

//

// c.L.Lock()

// for !condition() {

// c.Wait()

// }

// ... make use of condition ...

// c.L.Unlock()

//

func (c *Cond) Wait() {

   // 检测cond是否被拷贝(若是拷贝则会panic)

   c.checker.check()

   // 获得waiter的惟一标识

   t := runtime_notifyListAdd(&c.notify)

   c.L.Unlock()

   // 将waiter惟一标识添加到等待通知的队列中

   runtime_notifyListWait(&c.notify, t)

   c.L.Lock()

}



// Signal wakes one goroutine waiting on c, if there is any.

//

// It is allowed but not required for the caller to hold c.L

// during the call.

func (c *Cond) Signal() {

   // 检测cond是否被拷贝(若是拷贝则会panic)

   c.checker.check()

   // 唤醒等待队列中的一个waiter(唤醒前须要加锁)

   runtime_notifyListNotifyOne(&c.notify)

}



// Broadcast wakes all goroutines waiting on c.

//

// It is allowed but not required for the caller to hold c.L

// during the call.

func (c *Cond) Broadcast() {

   // 检测cond是否被拷贝(若是拷贝则会panic)

   c.checker.check()

   // 唤醒等待队列中全部的waiter(唤醒前须要加锁)

   runtime_notifyListNotifyAll(&c.notify)

}
复制代码

3.3 runtime实现

sync/runtime.go

如下代码中省略了一些不重要的逻辑,已使用//...标识出来

和sync.Mutex同样,sync.Cond在sync包内只有函数声明,具体的函数实现会在连接时link到runtime/sema.go。

//能够理解为并发安全的id生成器,为每个waiter生成1个惟一标识

func runtime_notifyListAdd(l *notifyList) uint32



//等待事件(Signal/Broadcast)的发生,t为当前waiter的惟一标识

func runtime_notifyListWait(l *notifyList, t uint32)



//唤醒当前等待的所有waiter

func runtime_notifyListNotifyAll(l *notifyList)



//唤醒1个waiter

func runtime_notifyListNotifyOne(l *notifyList)



// 内存安全保证,在init时会执行检查,保证sync包的notifyList结构大小等于runtime包的notifyList

func runtime_notifyListCheck(size uintptr)

func init() {

   var n notifyList

   runtime_notifyListCheck(unsafe.Sizeof(n))

}
复制代码

runtime/sema.go

// notifyListAdd adds the caller to a notify list such that it can receive

// notifications. The caller must eventually call notifyListWait to wait for

// such a notification, passing the returned ticket number.

//go:linkname notifyListAdd sync.runtime_notifyListAdd

//获取waiter的惟一标识,单调递增,也是实现fifo的基础

func notifyListAdd(l *notifyList) uint32 {

   // This may be called concurrently, for example, when called from

   // sync.Cond.Wait while holding a RWMutex in read mode.

   return atomic.Xadd(&l.wait, 1) - 1

}



// notifyListWait waits for a notification. If one has been sent since

// notifyListAdd was called, it returns immediately. Otherwise, it blocks.

//go:linkname notifyListWait sync.runtime_notifyListWait

//开始wait,等待被唤醒

func notifyListWait(l *notifyList, t uint32) {

   lockWithRank(&l.lock, lockRankNotifyList)



   // Return right away if this ticket has already been notified.

   // 若是当前waiter的编号小于notify,则无须等待,直接返回便可

   if less(t, l.notify) {

      unlock(&l.lock)

      return

   }



   //将当前goroutine加入到notifyList链表中,单链表,尾插法

   //sudog represents a g in a wait list

   s := acquireSudog()

   s.g = getg()

   s.ticket = t

   s.releasetime = 0

//...

   if l.tail == nil {

      l.head = s

   } else {

      l.tail.next = s

   }

   l.tail = s

   //调用gopark,将goroutine状态由 _Grunning切换为 _Gwaiting

   goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)

//...

   releaseSudog(s)

}



// notifyListNotifyAll notifies all entries in the list.

//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll

func notifyListNotifyAll(l *notifyList) {

   // Fast-path: if there are no new waiters since the last notification

   // we don't need to acquire the lock.

   // fastpath,若是当前的notify和wait一致,则表明无新的waiter,直接返回便可

   if atomic.Load(&l.wait) == atomic.Load(&l.notify) {

      return

   }



   // Pull the list out into a local variable, waiters will be readied

   // outside the lock.

   lockWithRank(&l.lock, lockRankNotifyList)

   // 将notifyList链表清空

   s := l.head

   l.head = nil

   l.tail = nil



   // Update the next ticket to be notified. We can set it to the current

   // value of wait because any previous waiters are already in the list

   // or will notice that they have already been notified when trying to

   // add themselves to the list.

//将notify的值置为wait的值,意思将当前全部waiter都已经能够被唤醒了

   atomic.Store(&l.notify, atomic.Load(&l.wait))

   unlock(&l.lock)



   // Go through the local list and ready all waiters.

   // 遍历链表,循环唤醒全部waiter

   for s != nil {

      next := s.next

      s.next = nil

      readyWithTime(s, 4)

      s = next

   }

}



// notifyListNotifyOne notifies one entry in the list.

//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne

func notifyListNotifyOne(l *notifyList) {

   // Fast-path: if there are no new waiters since the last notification

   // we don't need to acquire the lock at all.

   // fastpath,若是当前的notify和wait一致,则表明无新的waiter,直接返回便可

   if atomic.Load(&l.wait) == atomic.Load(&l.notify) {

      return

   }



   lockWithRank(&l.lock, lockRankNotifyList)



   // Re-check under the lock if we need to do anything.

   // 很经典的操做,加锁后再二次确认

   t := l.notify

   if t == atomic.Load(&l.wait) {

      unlock(&l.lock)

      return

   }



   // Update the next notify ticket number.

   // 标识下一个可唤醒的waiter

   atomic.Store(&l.notify, t+1)



   // Try to find the g that needs to be notified.

   // If it hasn't made it to the list yet we won't find it,

   // but it won't park itself once it sees the new notify number.

   //

   // This scan looks linear but essentially always stops quickly.

   // Because g's queue separately from taking numbers,

   // there may be minor reorderings in the list, but we

   // expect the g we're looking for to be near the front.

   // The g has others in front of it on the list only to the

   // extent that it lost the race, so the iteration will not

   // be too long. This applies even when the g is missing:

   // it hasn't yet gotten to sleep and has lost the race to

   // the (few) other g's that we find on the list.

   // 从waiter链表中找到须要唤醒的waiter,将对应waiter唤醒,并从链表中移除

   for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {

      if s.ticket == t {

         n := s.next

         if p != nil {

            p.next = n

         } else {

            l.head = n

         }

         if n == nil {

            l.tail = p

         }

         unlock(&l.lock)

         s.next = nil

         // 将g的状态由 _Gwaiting切换到_Grunnable

         readyWithTime(s, 4)

         return

      }

   }

   unlock(&l.lock)

}



//go:linkname notifyListCheck sync.runtime_notifyListCheck

// 检查sync.notifyList和runtime.notifyList大小是否一致

func notifyListCheck(sz uintptr) {

   if sz != unsafe.Sizeof(notifyList{}) {

      print("runtime: bad notifyList size - sync=", sz, " runtime=", unsafe.Sizeof(notifyList{}), "\n")

      throw("bad notifyList size")

   }

}
复制代码

4. 踩坑

u1s1 sync.cond确实一次都没用过,踩坑都是从网上扒的😓

  1. 不要copy “使用过的” cond,运行时会有panic。
func main() {

   // 建立一个cond1并使用

   cond1 := sync.NewCond(&sync.Mutex{})

   go func() {

      cond1.L.Lock()

      cond1.Wait()

   }()

   cond1.Signal()

   

   // 对cond1进行深拷贝

   var cond2 = new(sync.Cond)

   *cond2 = *cond1

   f := func(cond *sync.Cond, v int) {

      cond.L.Lock()

      for {

         fmt.Println(v)

         cond.Wait()

      }

   }

   go f(cond1, 1)

   

   // 当使用cond2的时候会报错(panic: sync.Cond is copied)

   go f(cond2, 2)

   time.Sleep(time.Second)

}
复制代码

5. 参考文档

sync - The Go Programming Language (studygolang.com)

管程 - 维基百科,自由的百科全书 (wikipedia.org)

Golang sync.Cond 条件变量源码分析 | 编程沉思录 (cyhone.com)

Linux条件变量pthread_condition细节(为什么先加锁,pthread_cond_wait为什么先解锁,返回时又加锁)

相关文章
相关标签/搜索