最近在看源码,发现好多地方用到了这个semaphore
。git
本文是在go version go1.13.15 darwin/amd64
上进行的github
下面是官方的描述golang
// Semaphore implementation exposed to Go. // Intended use is provide a sleep and wakeup // primitive that can be used in the contended case // of other synchronization primitives. // Thus it targets the same goal as Linux's futex, // but it has much simpler semantics. // // That is, don't think of these as semaphores. // Think of them as a way to implement sleep and wakeup // such that every sleep is paired with a single wakeup, // even if, due to races, the wakeup happens before the sleep. // 具体的用法是提供 sleep 和 wakeup 原语 // 以使其可以在其它同步原语中的竞争状况下使用 // 所以这里的 semaphore 和 Linux 中的 futex 目标是一致的 // 只不过语义上更简单一些 // // 也就是说,不要认为这些是信号量 // 把这里的东西看做 sleep 和 wakeup 实现的一种方式 // 每个 sleep 都会和一个 wakeup 配对 // 即便在发生 race 时,wakeup 在 sleep 以前时也是如此
上面提到了和futex
做用同样,关于futex
编程
futex(快速用户区互斥的简称)是一个在Linux上实现锁定和构建高级抽象锁如信号量和POSIX互斥的基本工具缓存
Futex 由一块可以被多个进程共享的内存空间(一个对齐后的整型变量)组成;这个整型变量的值可以经过汇编语言调用CPU提供的原子操做指令来增长或减小,而且一个进程能够等待直到那个值变成正数。Futex 的操做几乎所有在用户空间完成;只有当操做结果不一致从而须要仲裁时,才须要进入操做系统内核空间执行。这种机制容许使用 futex 的锁定原语有很是高的执行效率:因为绝大多数的操做并不须要在多个进程之间进行仲裁,因此绝大多数操做均可以在应用程序空间执行,而不须要使用(相对高代价的)内核系统调用。并发
go中的semaphore
做用和futex
目标同样,提供sleep
和wakeup
原语,使其可以在其它同步原语中的竞争状况下使用。当一个goroutine
须要休眠时,将其进行集中存放,当须要wakeup
时,再将其取出,从新放入调度器中。app
例如在读写锁的实现中,读锁和写锁以前的相互阻塞唤醒,就是经过sleep
和wakeup
实现,当有读锁存在的时候,新加入的写锁经过semaphore
阻塞本身,当前面的读锁完成,在经过semaphore
唤醒被阻塞的写锁。异步
写锁ide
// 获取互斥锁 // 阻塞等待全部读操做结束(若是有的话) func (rw *RWMutex) Lock() { ... // 原子的修改readerCount的值,直接将readerCount减去rwmutexMaxReaders // 说明,有写锁进来了,这在上面的读锁中也有体现 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // 当r不为0说明,当前写锁以前有读锁的存在 // 修改下readerWait,也就是当前写锁须要等待的读锁的个数 if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { // 阻塞当前写锁 runtime_SemacquireMutex(&rw.writerSem, false, 0) } ... }
经过runtime_SemacquireMutex
对当前写锁进行sleep
函数
读锁释放
// 减小读操做计数,即readerCount-- // 唤醒等待写操做的协程(若是有的话) func (rw *RWMutex) RUnlock() { ... // 首先经过atomic的原子性使readerCount-1 // 1.若readerCount大于0, 证实当前还有读锁, 直接结束本次操做 // 2.若readerCount小于0, 证实已经没有读锁, 可是还有由于读锁被阻塞的写锁存在 if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // 尝试唤醒被阻塞的写锁 rw.rUnlockSlow(r) } ... } func (rw *RWMutex) rUnlockSlow(r int32) { ... // readerWait--操做,若是readerWait--操做以后的值为0,说明,写锁以前,已经没有读锁了 // 经过writerSem信号量,唤醒队列中第一个阻塞的写锁 if atomic.AddInt32(&rw.readerWait, -1) == 0 { // 唤醒一个写锁 runtime_Semrelease(&rw.writerSem, false, 1) } }
写锁处理完以后,调用runtime_Semrelease
来唤醒sleep
的写锁
在go/src/sync/runtime.go
中,定义了这几个方法
// Semacquire等待*s > 0,而后原子递减它。 // 它是一个简单的睡眠原语,用于同步 // library and不该该直接使用。 func runtime_Semacquire(s *uint32) // SemacquireMutex相似于Semacquire,用来阻塞互斥的对象 // 若是lifo为true,waiter将会被插入到队列的头部 // skipframes是跟踪过程当中要省略的帧数,从这里开始计算 // runtime_SemacquireMutex's caller. func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int) // Semrelease会自动增长*s并通知一个被Semacquire阻塞的等待的goroutine // 它是一个简单的唤醒原语,用于同步 // library and不该该直接使用。 // 若是handoff为true, 传递信号到队列头部的waiter // skipframes是跟踪过程当中要省略的帧数,从这里开始计算 // runtime_Semrelease's caller. func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
具体的实现是在go/src/runtime/sema.go
中
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire func sync_runtime_Semacquire(addr *uint32) { semacquire1(addr, false, semaBlockProfile, 0) } //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { semrelease1(addr, handoff, skipframes) } //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes) }
semaphore
的实现使用到了sudog
,咱们先来看下
sudog 是运行时用来存放处于阻塞状态的goroutine
的一个上层抽象,是用来实现用户态信号量的主要机制之一。 例如当一个goroutine
由于等待channel
的数据须要进行阻塞时,sudog
会将goroutine
及其用于等待数据的位置进行记录, 并进而串联成一个等待队列,或二叉平衡树。
// sudogs are allocated from a special pool. Use acquireSudog and // releaseSudog to allocate and free them. type sudog struct { // 如下字段受hchan保护 g *g // isSelect 表示 g 正在参与一个 select, so // 所以 g.selectDone 必须以 CAS 的方式来获取wake-up race. isSelect bool next *sudog prev *sudog elem unsafe.Pointer // 数据元素(可能指向栈) // 如下字段不会并发访问。 // 对于通道,waitlink只被g访问。 // 对于信号量,全部字段(包括上面的字段) // 只有当持有一个semroot锁时才被访问。 acquiretime int64 releasetime int64 ticket uint32 parent *sudog //semaRoot 二叉树 waitlink *sudog // g.waiting 列表或 semaRoot waittail *sudog // semaRoot c *hchan // channel }
sudog
的获取和归还,遵循如下策略:
一、获取,首先从per-P
缓存获取,对于per-P
缓存,若是per-P
缓存为空,则从全局池抓取一半,而后取出per-P
缓存中的最后一个;
二、归还,归还到per-P
缓存,若是per-P
缓存满了,就把per-P
缓存的一半归还到全局缓存中,而后归还sudog
到per-P
缓存中。
一、若是per-P
缓存的内容没达到长度的通常,则会从全局额缓存中抓取一半;
二、而后返回把per-P
缓存中最后一个sudog
返回,而且置空;
// go/src/runtime/proc.go //go:nosplit func acquireSudog() *sudog { // Delicate dance: 信号量的实现调用acquireSudog,而后acquireSudog调用new(sudog) // new调用malloc, malloc调用垃圾收集器,垃圾收集器在stopTheWorld调用信号量 // 经过在new(sudog)周围执行acquirem/releasem来打破循环 // acquirem/releasem在new(sudog)期间增长m.locks,防止垃圾收集器被调用。 // 获取当前 g 所在的 m mp := acquirem() // 获取p的指针 pp := mp.p.ptr() if len(pp.sudogcache) == 0 { lock(&sched.sudoglock) // 首先,尝试从中央缓存获取一批数据。 for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil { s := sched.sudogcache sched.sudogcache = s.next s.next = nil pp.sudogcache = append(pp.sudogcache, s) } unlock(&sched.sudoglock) // 若是中央缓存中没有,新分配 if len(pp.sudogcache) == 0 { pp.sudogcache = append(pp.sudogcache, new(sudog)) } } // 取缓存中最后一个 n := len(pp.sudogcache) s := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil // 将刚取出的在缓存中移除 pp.sudogcache = pp.sudogcache[:n-1] if s.elem != nil { throw("acquireSudog: found s.elem != nil in cache") } releasem(mp) return s }
一、若是per-P
缓存满了,就归还per-P
缓存通常的内容到全局缓存;
二、而后将回收的sudog
放到per-P
缓存中。
// go/src/runtime/proc.go //go:nosplit func releaseSudog(s *sudog) { if s.elem != nil { throw("runtime: sudog with non-nil elem") } if s.isSelect { throw("runtime: sudog with non-false isSelect") } if s.next != nil { throw("runtime: sudog with non-nil next") } if s.prev != nil { throw("runtime: sudog with non-nil prev") } if s.waitlink != nil { throw("runtime: sudog with non-nil waitlink") } if s.c != nil { throw("runtime: sudog with non-nil c") } gp := getg() if gp.param != nil { throw("runtime: releaseSudog with non-nil gp.param") } // 避免从新安排到另外一个P mp := acquirem() // avoid rescheduling to another P pp := mp.p.ptr() // 若是缓存满了 if len(pp.sudogcache) == cap(pp.sudogcache) { // 将本地高速缓存的一半传输到中央高速缓存 var first, last *sudog for len(pp.sudogcache) > cap(pp.sudogcache)/2 { n := len(pp.sudogcache) p := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if first == nil { first = p } else { last.next = p } last = p } lock(&sched.sudoglock) last.next = sched.sudogcache sched.sudogcache = first unlock(&sched.sudoglock) } // 归还sudog到`per-P`缓存中 pp.sudogcache = append(pp.sudogcache, s) releasem(mp) }
// go/src/runtime/sema.go // 用于sync.Mutex的异步信号量。 // semaRoot拥有一个具备不一样地址(s.elem)的sudog平衡树。 // 每一个sudog均可以依次(经过s.waitlink)指向一个列表,在相同地址上等待的其余sudog。 // 对具备相同地址的sudog内部列表进行的操做所有为O(1)。顶层semaRoot列表的扫描为O(log n), // 其中,n是阻止goroutines的不一样地址的数量,经过他们散列到给定的semaRoot。 type semaRoot struct { lock mutex // waiters的平衡树的根节点 treap *sudog // waiters的数量,读取的时候无所 nwait uint32 } // Prime to not correlate with any user patterns. const semTabSize = 251 var semtable [semTabSize]struct { root semaRoot pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte }
// go/src/runtime/sema.go //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire func poll_runtime_Semacquire(addr *uint32) { semacquire1(addr, false, semaBlockProfile, 0) } //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes) } func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) { // 判断这个goroutine,是不是m上正在运行的那个 gp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") } // *addr -= 1 if cansemacquire(addr) { return } // 增长等待计数 // 再试一次 cansemacquire 若是成功则直接返回 // 将本身做为等待者入队 // 休眠 // (等待器描述符由出队信号产生出队行为) // 获取一个sudog s := acquireSudog() root := semroot(addr) t0 := int64(0) s.releasetime = 0 s.acquiretime = 0 s.ticket = 0 if profile&semaBlockProfile != 0 && blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } if profile&semaMutexProfile != 0 && mutexprofilerate > 0 { if t0 == 0 { t0 = cputicks() } s.acquiretime = t0 } for { lock(&root.lock) // 添加咱们本身到nwait来禁用semrelease中的"easy case" atomic.Xadd(&root.nwait, 1) // 检查cansemacquire避免错过唤醒 if cansemacquire(addr) { atomic.Xadd(&root.nwait, -1) unlock(&root.lock) break } // 任何在 cansemacquire 以后的 semrelease 都知道咱们在等待(由于设置了 nwait),所以休眠 // 队列将s添加到semaRoot中被阻止的goroutine中 root.queue(addr, s, lifo) // 将当前goroutine置于等待状态并解锁锁。 // 经过调用goready(gp),可使goroutine再次可运行。 goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } } if s.releasetime > 0 { blockevent(s.releasetime-t0, 3+skipframes) } // 归还sudog releaseSudog(s) } func cansemacquire(addr *uint32) bool { for { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } } }
// go/src/runtime/sema.go //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { semrelease1(addr, handoff, skipframes) } func semrelease1(addr *uint32, handoff bool, skipframes int) { root := semroot(addr) atomic.Xadd(addr, 1) // Easy case:没有等待者 // 这个检查必须发生在xadd以后,以免错过唤醒 if atomic.Load(&root.nwait) == 0 { return } // Harder case: 找到等待者,而且唤醒 lock(&root.lock) if atomic.Load(&root.nwait) == 0 { // 该计数已被另外一个goroutine占用, // 所以无需唤醒其余goroutine。 unlock(&root.lock) return } // 搜索一个等待着而后将其唤醒 s, t0 := root.dequeue(addr) if s != nil { atomic.Xadd(&root.nwait, -1) } unlock(&root.lock) if s != nil { // 可能会很慢,所以先解锁 acquiretime := s.acquiretime if acquiretime != 0 { mutexevent(t0-acquiretime, 3+skipframes) } if s.ticket != 0 { throw("corrupted semaphore ticket") } if handoff && cansemacquire(addr) { s.ticket = 1 } // goready(s.g, 5) // 标记 runnable,等待被从新调度 readyWithTime(s, 5+skipframes) } }
摘自"同步原语"的一段总结
这一对 semacquire 和 semrelease 理解上可能不太直观。 首先,咱们必须意识到这两个函数必定是在两个不一样的 M(线程)上获得执行,不然不会出现并发,咱们不妨设为 M1 和 M2。 当 M1 上的 G1 执行到 semacquire1 时,若是快速路径成功,则说明 G1 抢到锁,可以继续执行。但一旦失败且在慢速路径下 依然抢不到锁,则会进入 goparkunlock,将当前的 G1 放到等待队列中,进而让 M1 切换并执行其余 G。 当 M2 上的 G2 开始调用 semrelease1 时,只是单纯的将等待队列的 G1 从新放到调度队列中,而当 G1 从新被调度时(假设运气好又在 M1 上被调度),代码仍然会从 goparkunlock 以后开始执行,并再次尝试竞争信号量,若是成功,则会归还 sudog。
【同步原语】https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/sync/
【Go并发编程实战--信号量的使用方法和其实现原理】https://juejin.cn/post/6906677772479889422
【Semaphore】https://github.com/cch123/golang-notes/blob/master/semaphore.md
【进程同步之信号量机制(pv操做)及三个经典同步问题】https://blog.csdn.net/SpeedMe/article/details/17597373
本文做者:liz
本文连接:https://boilingfrog.github.io/2021/04/02/semaphore/
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处连接和本声明。