go中sync.Cond源码解读

sync.Cond

前言

本次的代码是基于go version go1.13.15 darwin/amd64less

什么是sync.Cond

Go语言标准库中的条件变量sync.Cond,它可让一组的Goroutine都在知足特定条件时被唤醒。函数

每一个Cond都会关联一个Lock(*sync.Mutex or *sync.RWMutex)ui

var (
	locker = new(sync.Mutex)
	cond   = sync.NewCond(locker)
)

func listen(x int) {
	// 获取锁
	cond.L.Lock()
	// 等待通知  暂时阻塞
	cond.Wait()
	fmt.Println(x)
	// 释放锁
	cond.L.Unlock()
}

func main() {
	// 启动60个被cond阻塞的线程
	for i := 1; i <= 60; i++ {
		go listen(i)
	}

	fmt.Println("start all")

	// 3秒以后 下发一个通知给已经获取锁的goroutine	time.Sleep(time.Second * 3)
	fmt.Println("++++++++++++++++++++one Signal")
	cond.Signal()

	// 3秒以后 下发一个通知给已经获取锁的goroutine
	time.Sleep(time.Second * 3)
	fmt.Println("++++++++++++++++++++one Signal")
	cond.Signal()

	// 3秒以后 下发广播给全部等待的goroutine
	time.Sleep(time.Second * 3)
	fmt.Println("++++++++++++++++++++begin broadcast")
	cond.Broadcast()
	// 阻塞直到全部的所有输出
	time.Sleep(time.Second * 60)
}

上面是个简单的例子,咱们启动了60个线程,而后都被cond阻塞,主函数经过Signal()通知一个goroutine接触阻塞,经过Broadcast()通知全部被阻塞的所有解除阻塞。atom

sync_cond

看下源码

// Wait 原子式的 unlock c.L, 并暂停执行调用的 goroutine。
// 在稍后执行后,Wait 会在返回前 lock c.L. 与其余系统不一样,
// 除非被 Broadcast 或 Signal 唤醒,不然等待没法返回。
//
// 由于等待第一次 resume 时 c.L 没有被锁定,因此当 Wait 返回时,
// 调用者一般不能认为条件为真。相反,调用者应该在循环中使用 Wait():
//
//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//
type Cond struct {
	// 用于保证结构体不会在编译期间拷贝
	noCopy noCopy
	// 锁
	L Locker
	// goroutine链表,维护等待唤醒的goroutine队列
	notify  notifyList
	// 保证运行期间不会发生copy
	checker copyChecker
}

重点分析下:notifyListcopyChecker线程

  • notify
type notifyList struct {
	// 总共须要等待的数量
	wait   uint32
	// 已经通知的数量
	notify uint32
	// 锁
	lock   uintptr
	// 指向链表头部
	head   *sudog
	// 指向链表尾部
	tail   *sudog
}

这个是核心,全部waitgoroutine都会被加入到这个链表中,而后在通知的时候再从这个链表中获取。code

  • copyChecker

保证运行期间不会发生copy对象

type copyChecker uintptr
// copyChecker holds back pointer to itself to detect object copying
func (c *copyChecker) check() {
	if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
		!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
		uintptr(*c) != uintptr(unsafe.Pointer(c)) {
		panic("sync.Cond is copied")
	}
}

Wait

func (c *Cond) Wait() {
	// 监测是否复制
	c.checker.check()
	// 更新 notifyList中须要等待的wait的数量
	// 返回当前须要插入链表节点ticket
	t := runtime_notifyListAdd(&c.notify)
	c.L.Unlock()
	// 为当前的加入的waiter构建一个链表的节点,插入链表的尾部
	runtime_notifyListWait(&c.notify, t)
	c.L.Lock()
}

// go/src/runtime/sema.go
// 更新 notifyList中须要等待的wait的数量
// 同时返回当前的加入的 waiter 的 ticket 编号,从0开始  
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
    // 使用atomic原子的对wait字段进行加一操做
	return atomic.Xadd(&l.wait, 1) - 1
}

// go/src/runtime/sema.go
// 为当前的加入的waiter构建一个链表的节点,插入链表的尾部
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
	lock(&l.lock)

	// 当t小于notifyList中的notify,说明当前节点已经被通知了  
	if less(t, l.notify) {
		unlock(&l.lock)
		return
	}

	// 构建当前节点
	s := acquireSudog()
	s.g = getg()
	s.ticket = t
	s.releasetime = 0
	t0 := int64(0)
	if blockprofilerate > 0 {
		t0 = cputicks()
		s.releasetime = -1
	}
	// 头结点没构建,插入头结点
	if l.tail == nil {
		l.head = s
	} else {
		// 插入到尾节点
		l.tail.next = s
	}
	l.tail = s
	// 将当前goroutine置于等待状态并解锁
	// 经过调用goready(gp),可使goroutine再次可运行。
	// 也就是将 M/P/G 解绑,并将 G 调整为等待状态,放入 sudog 等待队列中
	goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
	if t0 != 0 {
		blockevent(s.releasetime-t0, 2)
	}
	releaseSudog(s)
}

梳理流程blog

一、首先检测对象的复制行为,若是有复制发生直接抛出panic;队列

二、而后调用runtime_notifyListAddnotifynotifyListList中的wait(须要等待的数量)进行加一操做,同时返回一个ticket,用来做为当前wait的编号,这个编号,会和notifyList中的notify对应起来;get

三、而后调用runtime_notifyListWait把当前的wait封装成链表的一个节点,插入到notifyList维护的链表的尾部。

sync_cond

Signal

// 唤醒一个被wait的goroutine
func (c *Cond) Signal() {
	// 监测是否复制
	c.checker.check()
	runtime_notifyListNotifyOne(&c.notify)
}

// go/src/runtime/sema.go
// 通知链表中的第一个
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
	// wait和notify,说明已经所有通知到了
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}

	lock(&l.lock)

	// 这里作了二次的确认
	// wait和notify,说明已经所有通知到了
	t := l.notify
	if t == atomic.Load(&l.wait) {
		unlock(&l.lock)
		return
	}

	// 原子的对notify执行+1操做
	atomic.Store(&l.notify, t+1)

	// 尝试找到须要被通知的 g
	// 若是目前还没来得及入队,是没法找到的
	// 可是,当它看到通知编号已经发生改变是不会被 park 的
	//
	// 这个查找过程看起来是线性复杂度,但实际上很快就停了
	// 由于 g 的队列与获取编号不一样,于是队列中会出现少许重排,但咱们但愿找到靠前的 g
	// 而 g 只有在再也不 race 后才会排在靠前的位置,所以这个迭代也不会过久,
	// 同时,即使找不到 g,这个状况也成立:
	// 它尚未休眠,而且已经失去了咱们在队列上找到的(少数)其余 g 的 race。
	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
		// 顺序拿到一个节点的ticket,会和上面会和notifyList中的notify作比较,相同才进行后续的操做
		// 这个咱们分析了,notifyList中的notify和链表节点中的ticket是一一对应的  
		if s.ticket == t {
			n := s.next
			if p != nil {
				p.next = n
			} else {
				l.head = n
			}
			if n == nil {
				l.tail = p
			}
			unlock(&l.lock)
			s.next = nil
			// 经过goready掉起在上面经过goparkunlock挂起的goroutine
			readyWithTime(s, 4)
			return
		}
	}
	unlock(&l.lock)
}

梳理下流程:

一、首先检测对象的复制行为,若是有复制发生直接抛出panic

二、判断waitnotify,若是二者相同说明已经已经所有通知到了;

三、调用notifyListNotifyOne,经过for循环,依次遍历这个链表,直到找到和notifyList中的notify,相匹配的ticket的节点;

四、掉起goroutine,完成通知。

sync_cond

Broadcast

// 唤醒全部被wait的goroutine
func (c *Cond) Broadcast() {
	c.checker.check()
	runtime_notifyListNotifyAll(&c.notify)
}

// go/src/runtime/sema.go
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
	// wait和notify,说明已经所有通知到了
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}

	// 加锁
	lock(&l.lock)
	s := l.head
	l.head = nil
	l.tail = nil

	// 这个很粗暴,直接将notify的值置换成wait
	atomic.Store(&l.notify, atomic.Load(&l.wait))
	unlock(&l.lock)

	// 循环链表,一个个唤醒goroutine
	for s != nil {
		next := s.next
		s.next = nil
		readyWithTime(s, 4)
		s = next
	}
}

梳理下流程:

一、首先检测对象的复制行为,若是有复制发生直接抛出panic;

二、判断waitnotify,若是二者相同说明已经已经所有通知到了;

三、notifyListNotifyAll,就相对简单了,直接将notify的值置为wait,标注这个已经所有通知了;

四、循环链表,一个个唤醒goroutine

sync_cond

总结

sync.Cond不是一个经常使用的同步机制,可是在条件长时间没法知足时,与使用for {}进行忙碌等待相比,sync.Cond可以让出处理器的使用权,提供CPU的利用率。使用时咱们也须要注意如下问题:

一、sync.Cond.Wait在调用以前必定要使用获取互斥锁,不然会触发程序崩溃;

二、sync.Cond.Signal 唤醒的 Goroutine都是队列最前面、等待最久的Goroutine

三、sync.Cond.Broadcast会按照必定顺序广播通知等待的所有 Goroutine

相关文章
相关标签/搜索