Condition variables allow threads to wait until some event
or condition has occurred.
复制代码
条件变量是并发编程中很经典的一个手段,经常使用的条件变量有两种实现。linux
两种方式的区别是发出通知的线程是否会马上失去全部权,阻塞式条件变量的实现是会马上失去,非阻塞式条件变量的实现式不会马上失去。c++
条件变量在各语言/基础库中都有本身的实现,linux的pthread
库和c++的std::condition_variable
都是非阻塞式条件变量的实现,而golang sync.Cond
也是典型的非阻塞式条件变量的实现。golang
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
复制代码
咱们再来看一下go官方对cond
的定义,上述内容是从go源码中摘出来的。翻译一下,意思以下:编程
sync.Cond
是golang对条件变量的实现,有两个关键点:安全
wait
操做)。signal/broadcast
操做)。package main
import (
"fmt"
"sync"
"time"
)
// 唤醒检测标志
var flag = false
func CondTest(info string, c *sync.Cond) {
// 使用条件变量前须要先加锁(wait()内部有释放锁操做)
c.L.Lock()
for flag == false {
fmt.Println(info, "wait")
// 挂起等待唤醒
c.Wait()
}
fmt.Println(info, flag)
c.L.Unlock()
}
func main() {
m := sync.Mutex{}
c := sync.NewCond(&m)
// add 2 waiter
go CondTest("go one", c)
go CondTest("go two", c)
// main
time.Sleep(time.Second)
c.L.Lock()
flag = true
c.L.Unlock()
c.Broadcast() // 所有唤醒
// c.Signal() // 唤醒1个
fmt.Println("main broadcast")
time.Sleep(time.Second)
}
复制代码
没咋找到😂,sync.Cond的场景基本均可以被chan替换掉markdown
想象这么一个场景,有1个worker在异步的接收数据,剩下的n个waiter必须等待这个worker接收完数据才能继续下面的处理流程,这时咱们很容易想到两种方案。并发
缺点是须要不断轮询对应的变量来判断是否知足条件,且较难支持单waiter通知的操做。app
Don't communicate by sharing memory, share memory by communicating.
less
按照go的哲学,在这种并发的场景下,更推荐chan,可是若是使用chan
来操做,须要worker明确感知到等待的waiter数来进行处理,好比有n个waiter就须要notify n次(用close也能够),而使用sync.Cond
就能够极大的简化这个操做,只须要调用Broadcast
便可完成多waiter的通知,除此以外cond也提供了相似于chan send
单信号的通知(Singal
)。异步
一句话总结:多waiter单worker的场景均可以使用sync.Cond
代码部分基于golang 1.16 64位机
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
// Approximation of notifyList in runtime/sema.go. Size and alignment must
// agree.
// 每个waiter都会有1个ticket,能够理解为waiter的惟一标识,单调递增
type notifyList struct {
wait uint32 //下一个waiter的最大ticket,单调递增
notify uint32 //下一个待唤醒的waiter的ticket值(ticket 小于该值的waiter都 即将/已经 处于唤醒态)
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}
type copyChecker uintptr
// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}
复制代码
函数定义:
func (c *copyChecker) check()
func (*noCopy) Lock()
func (*noCopy) Unlock()
复制代码
实现:
// copyChecker holds back pointer to itself to detect object copying.
type copyChecker uintptr
// copyChecker会存放copyChecker的地址,经过这个地址判断是否被拷贝
func (c *copyChecker) check() {
// 检测copyChecker的地址是否与copyChecker中存储的相同
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
// 第一次调用会将copyChecker的地址存储在copyChecker中
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
// 保证第一次调用check()不报错
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
复制代码
函数定义:
//新建cond
func NewCond(l Locker) *Cond
//等待
func (c *Cond) Wait()
//单waiter唤醒
func (c *Cond) Signal()
//全waiter唤醒
func (c *Cond) Broadcast()
复制代码
实现:
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
func (c *Cond) Wait() {
// 检测cond是否被拷贝(若是拷贝则会panic)
c.checker.check()
// 获得waiter的惟一标识
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// 将waiter惟一标识添加到等待通知的队列中
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
// 检测cond是否被拷贝(若是拷贝则会panic)
c.checker.check()
// 唤醒等待队列中的一个waiter(唤醒前须要加锁)
runtime_notifyListNotifyOne(&c.notify)
}
// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
// 检测cond是否被拷贝(若是拷贝则会panic)
c.checker.check()
// 唤醒等待队列中全部的waiter(唤醒前须要加锁)
runtime_notifyListNotifyAll(&c.notify)
}
复制代码
如下代码中省略了一些不重要的逻辑,已使用//...标识出来
和sync.Mutex同样,sync.Cond在sync包内只有函数声明,具体的函数实现会在连接时link到runtime/sema.go。
//能够理解为并发安全的id生成器,为每个waiter生成1个惟一标识
func runtime_notifyListAdd(l *notifyList) uint32
//等待事件(Signal/Broadcast)的发生,t为当前waiter的惟一标识
func runtime_notifyListWait(l *notifyList, t uint32)
//唤醒当前等待的所有waiter
func runtime_notifyListNotifyAll(l *notifyList)
//唤醒1个waiter
func runtime_notifyListNotifyOne(l *notifyList)
// 内存安全保证,在init时会执行检查,保证sync包的notifyList结构大小等于runtime包的notifyList
func runtime_notifyListCheck(size uintptr)
func init() {
var n notifyList
runtime_notifyListCheck(unsafe.Sizeof(n))
}
复制代码
// notifyListAdd adds the caller to a notify list such that it can receive
// notifications. The caller must eventually call notifyListWait to wait for
// such a notification, passing the returned ticket number.
//go:linkname notifyListAdd sync.runtime_notifyListAdd
//获取waiter的惟一标识,单调递增,也是实现fifo的基础
func notifyListAdd(l *notifyList) uint32 {
// This may be called concurrently, for example, when called from
// sync.Cond.Wait while holding a RWMutex in read mode.
return atomic.Xadd(&l.wait, 1) - 1
}
// notifyListWait waits for a notification. If one has been sent since
// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
//go:linkname notifyListWait sync.runtime_notifyListWait
//开始wait,等待被唤醒
func notifyListWait(l *notifyList, t uint32) {
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
// 若是当前waiter的编号小于notify,则无须等待,直接返回便可
if less(t, l.notify) {
unlock(&l.lock)
return
}
//将当前goroutine加入到notifyList链表中,单链表,尾插法
//sudog represents a g in a wait list
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
//...
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
//调用gopark,将goroutine状态由 _Grunning切换为 _Gwaiting
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
//...
releaseSudog(s)
}
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
// Fast-path: if there are no new waiters since the last notification
// we don't need to acquire the lock.
// fastpath,若是当前的notify和wait一致,则表明无新的waiter,直接返回便可
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// Pull the list out into a local variable, waiters will be readied
// outside the lock.
lockWithRank(&l.lock, lockRankNotifyList)
// 将notifyList链表清空
s := l.head
l.head = nil
l.tail = nil
// Update the next ticket to be notified. We can set it to the current
// value of wait because any previous waiters are already in the list
// or will notice that they have already been notified when trying to
// add themselves to the list.
//将notify的值置为wait的值,意思将当前全部waiter都已经能够被唤醒了
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// Go through the local list and ready all waiters.
// 遍历链表,循环唤醒全部waiter
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
// notifyListNotifyOne notifies one entry in the list.
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
// Fast-path: if there are no new waiters since the last notification
// we don't need to acquire the lock at all.
// fastpath,若是当前的notify和wait一致,则表明无新的waiter,直接返回便可
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
// Re-check under the lock if we need to do anything.
// 很经典的操做,加锁后再二次确认
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// Update the next notify ticket number.
// 标识下一个可唤醒的waiter
atomic.Store(&l.notify, t+1)
// Try to find the g that needs to be notified.
// If it hasn't made it to the list yet we won't find it,
// but it won't park itself once it sees the new notify number.
//
// This scan looks linear but essentially always stops quickly.
// Because g's queue separately from taking numbers,
// there may be minor reorderings in the list, but we
// expect the g we're looking for to be near the front.
// The g has others in front of it on the list only to the
// extent that it lost the race, so the iteration will not
// be too long. This applies even when the g is missing:
// it hasn't yet gotten to sleep and has lost the race to
// the (few) other g's that we find on the list.
// 从waiter链表中找到须要唤醒的waiter,将对应waiter唤醒,并从链表中移除
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
// 将g的状态由 _Gwaiting切换到_Grunnable
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
//go:linkname notifyListCheck sync.runtime_notifyListCheck
// 检查sync.notifyList和runtime.notifyList大小是否一致
func notifyListCheck(sz uintptr) {
if sz != unsafe.Sizeof(notifyList{}) {
print("runtime: bad notifyList size - sync=", sz, " runtime=", unsafe.Sizeof(notifyList{}), "\n")
throw("bad notifyList size")
}
}
复制代码
u1s1 sync.cond确实一次都没用过,踩坑都是从网上扒的😓
func main() {
// 建立一个cond1并使用
cond1 := sync.NewCond(&sync.Mutex{})
go func() {
cond1.L.Lock()
cond1.Wait()
}()
cond1.Signal()
// 对cond1进行深拷贝
var cond2 = new(sync.Cond)
*cond2 = *cond1
f := func(cond *sync.Cond, v int) {
cond.L.Lock()
for {
fmt.Println(v)
cond.Wait()
}
}
go f(cond1, 1)
// 当使用cond2的时候会报错(panic: sync.Cond is copied)
go f(cond2, 2)
time.Sleep(time.Second)
}
复制代码
sync - The Go Programming Language (studygolang.com)
管程 - 维基百科,自由的百科全书 (wikipedia.org)
Golang sync.Cond 条件变量源码分析 | 编程沉思录 (cyhone.com)
Linux条件变量pthread_condition细节(为什么先加锁,pthread_cond_wait为什么先解锁,返回时又加锁)