cond是go语言sync提供的条件变量,经过cond能够让一系列的goroutine在触发某个条件时才被唤醒。每个cond结构体都包含一个锁L。cond提供了三个方法:less
建立40个goroutine都wait阻塞住。调用Signal则唤醒第一个goroutine。调用Broadcast则唤醒全部等待的goroutine。ide
package main import ( "fmt" "sync" "time" ) var locker = new(sync.Mutex) var cond = sync.NewCond(locker) func test(x int) { cond.L.Lock() //获取锁 cond.Wait() //等待通知 暂时阻塞 fmt.Println(x) time.Sleep(time.Second * 1) cond.L.Unlock() //释放锁 } func main() { for i := 0; i < 40; i++ { go test(i) } fmt.Println("start all") time.Sleep(time.Second * 3) fmt.Println("broadcast") cond.Signal() // 下发一个通知给已经获取锁的goroutine time.Sleep(time.Second * 3) cond.Signal() // 3秒以后 下发一个通知给已经获取锁的goroutine time.Sleep(time.Second * 3) cond.Broadcast() //3秒以后 下发广播给全部等待的goroutine time.Sleep(time.Second * 60) }
type Cond struct { noCopy noCopy // 锁的具体实现,一般为 mutex 或者rwmutex L Locker // notifyList对象,维护等待唤醒的goroutine队列,使用链表实现 notify notifyList checker copyChecker } // 新建cond初始化cond对象 func NewCond(l Locker) *Cond { return &Cond{L: l} } type notifyList struct { // 等待数量 wait uint32 // 通知数量 notify uint32 // 锁对象 lock mutex // 链表头 head *sudog // 链表尾 tail *sudog }
// 等待函数 func (c *Cond) Wait() { c.checker.check() // 等待计数器加1 看下面具体实现 t := runtime_notifyListAdd(&c.notify) c.L.Unlock() // runtime_notifyListWait(&c.notify, t) c.L.Lock() } // 此函数在sema.go中控制计数器加1 func notifyListAdd(l *notifyList) uint32 { // This may be called concurrently, for example, when called from // sync.Cond.Wait while holding a RWMutex in read mode. return atomic.Xadd(&l.wait, 1) - 1 } // 此函数在sema.go中 // 获取当前goroutine 添加到链表末端,而后goparkunlock函数休眠阻塞当前goroutine // goparkunlock函数会让出当前处理器的使用权并等待调度器的唤醒 func notifyListWait(l *notifyList, t uint32) { lock(&l.lock) // Return right away if this ticket has already been notified. if less(t, l.notify) { unlock(&l.lock) return } // Enqueue itself. s := acquireSudog() s.g = getg() s.ticket = t s.releasetime = 0 t0 := int64(0) if blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } if l.tail == nil { l.head = s } else { l.tail.next = s } l.tail = s goparkunlock(&l.lock, "semacquire", traceEvGoBlockCond, 3) if t0 != 0 { blockevent(s.releasetime-t0, 2) } releaseSudog(s) }
唤醒链表中全部的阻塞中的goroutine,仍是使用readyWithTime来实现这个功能函数
func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify) } // 源代码在sema.go中 func notifyListNotifyAll(l *notifyList) { // Fast-path: if there are no new waiters since the last notification // we don't need to acquire the lock. if atomic.Load(&l.wait) == atomic.Load(&l.notify) { return } // Pull the list out into a local variable, waiters will be readied // outside the lock. lock(&l.lock) s := l.head l.head = nil l.tail = nil // Update the next ticket to be notified. We can set it to the current // value of wait because any previous waiters are already in the list // or will notice that they have already been notified when trying to // add themselves to the list. atomic.Store(&l.notify, atomic.Load(&l.wait)) unlock(&l.lock) // Go through the local list and ready all waiters. for s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next } }
// 调用runtime_notifyListNotifyOne方法唤醒链表头的goroutine func (c *Cond) Signal() { c.checker.check() runtime_notifyListNotifyOne(&c.notify) } // runtime_notifyListNotifyOne具体实现 获取链表头部的G,而后调用readyWithTime唤醒goroutine // 源代码在sema.go中 func notifyListNotifyOne(l *notifyList) { if atomic.Load(&l.wait) == atomic.Load(&l.notify) { return } lock(&l.lock) t := l.notify if t == atomic.Load(&l.wait) { unlock(&l.lock) return } atomic.Store(&l.notify, t+1) for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil readyWithTime(s, 4) return } } unlock(&l.lock) }