浅谈 Go 语言实现原理原文连接:https://draveness.me/golang/c...html
当提到并发编程、多线程编程时,咱们每每都离不开『锁』这一律念,Go 语言做为一个原生支持用户态进程 Goroutine 的语言,也必定会为开发者提供这一功能,锁的主要做用就是保证多个线程或者 Goroutine 在访问同一片内存时不会出现混乱的问题,锁实际上是一种并发编程中的同步原语(Synchronization Primitives)。git
在这一节中咱们就会介绍 Go 语言中常见的同步原语 Mutex
、RWMutex
、WaitGroup
、Once
和 Cond
以及扩展原语 ErrGroup
、Semaphore
和 SingleFlight
的实现原理,同时也会涉及互斥锁、信号量等并发编程中的常见概念。github
Go 语言在 sync 包中提供了用于同步的一些基本原语,包括常见的互斥锁 Mutex
与读写互斥锁 RWMutex
以及 Once
、WaitGroup
。golang
这些基本原语的主要做用是提供较为基础的同步功能,咱们应该使用 Channel 和通讯来实现更加高级的同步机制,咱们在这一节中并不会介绍标准库中所有的原语,而是会介绍其中比较常见的 Mutex
、RWMutex
、Once
、WaitGroup
和 Cond
,咱们并不会涉及剩下两个用于存取数据的结构体 Map
和 Pool
。数据库
Go 语言中的互斥锁在 sync
包中,它由两个字段 state
和 sema
组成,state
表示当前互斥锁的状态,而 sema
真正用于控制锁状态的信号量,这两个加起来只占 8 个字节空间的结构体就表示了 Go 语言中的互斥锁。编程
type Mutex struct { state int32 sema uint32 }
互斥锁的状态是用 int32
来表示的,可是锁的状态并非互斥的,它的最低三位分别表示 mutexLocked
、mutexWoken
和 mutexStarving
,剩下的位置都用来表示当前有多少个 Goroutine 等待互斥锁被释放:数组
互斥锁在被建立出来时,全部的状态位的默认值都是 0
,当互斥锁被锁定时 mutexLocked
就会被置成 1
、当互斥锁被在正常模式下被唤醒时 mutexWoken
就会被被置成 1
、mutexStarving
用于表示当前的互斥锁进入了状态,最后的几位是在当前互斥锁上等待的 Goroutine 个数。缓存
在了解具体的加锁和解锁过程以前,咱们须要先简单了解一下 Mutex
在使用过程当中可能会进入的饥饿模式,饥饿模式是在 Go 语言 1.9 版本引入的特性,它的主要功能就是保证互斥锁的获取的『公平性』(Fairness)。数据结构
互斥锁能够同时处于两种不一样的模式,也就是正常模式和饥饿模式,在正常模式下,全部锁的等待者都会按照先进先出的顺序获取锁,可是若是一个刚刚被唤起的 Goroutine 遇到了新的 Goroutine 进程也调用了 Lock
方法时,大几率会获取不到锁,为了减小这种状况的出现,防止 Goroutine 被『饿死』,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式。多线程
在饥饿模式中,互斥锁会被直接交给等待队列最前面的 Goroutine,新的 Goroutine 在这时不能获取锁、也不会进入自旋的状态,它们只会在队列的末尾等待,若是一个 Goroutine 得到了互斥锁而且它是队列中最末尾的协程或者它等待的时间少于 1ms,那么当前的互斥锁就会被切换回正常模式。
相比于饥饿模式,正常模式下的互斥锁可以提供更好地性能,饥饿模式的主要做用就是避免一些 Goroutine 因为陷入等待没法获取锁而形成较高的尾延时,这也是对 Mutex
的一个优化。
互斥锁 Mutex
的加锁是靠 Lock
方法完成的,最新的 Go 语言源代码中已经将 Lock
方法进行了简化,方法的主干只保留了最多见、简单而且快速的状况;当锁的状态是 0
时直接将 mutexLocked
位置成 1
:
func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } m.lockSlow() }
可是当 Lock
方法被调用时 Mutex
的状态不是 0
时就会进入 lockSlow
方法尝试经过自旋或者其余的方法等待锁的释放并获取互斥锁,该方法的主体是一个很是大 for
循环,咱们会将该方法分红几个部分介绍获取锁的过程:
func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue }
在这段方法的第一部分会判断当前方法可否进入自旋来等待锁的释放,自旋(Spinnig)实际上是在多线程同步的过程当中使用的一种机制,当前的进程在进入自旋的过程当中会一直保持 CPU 的占用,持续检查某个条件是否为真,在多核的 CPU 上,自旋的优势是避免了 Goroutine 的切换,因此若是使用恰当会对性能带来很是大的增益。
在 Go 语言的 Mutex
互斥锁中,只有在普通模式下才可能进入自旋,除了模式的限制以外,runtime_canSpin
方法中会判断当前方法是否能够进入自旋,进入自旋的条件很是苛刻:
P
而且处理的运行队列是空的;一旦当前 Goroutine 可以进入自旋就会调用 runtime_doSpin
,它最终调用汇编语言编写的方法 procyield
并执行指定次数的 PAUSE
指令,PAUSE
指令什么都不会作,可是会消耗 CPU 时间,每次自旋都会调用 30
次 PAUSE
,下面是该方法在 386 架构的机器上的实现:
TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET
处理了自旋相关的特殊逻辑以后,互斥锁接下来就根据上下文计算当前互斥锁最新的状态了,几个不一样的条件分别会更新 state
中存储的不一样信息 mutexLocked
、mutexStarving
、mutexWoken
和 mutexWaiterShift
:
new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { new &^= mutexWoken }
计算了新的互斥锁状态以后,咱们就会使用 atomic
包提供的 CAS 函数修改互斥锁的状态,若是当前的互斥锁已经处于饥饿和锁定的状态,就会跳过当前步骤,调用 runtime_SemacquireMutex
方法:
if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } }
runtime_SemacquireMutex
方法的主要做用就是经过 Mutex
的使用互斥锁中的信号量保证资源不会被两个 Goroutine 获取,从这里咱们就能看出 Mutex
其实就是对更底层的信号量进行封装,对外提供更加易用的 API,runtime_SemacquireMutex
会在方法中不断调用 goparkunlock
将当前 Goroutine 陷入休眠等待信号量能够被获取。
一旦当前 Goroutine 能够获取信号量,就证实互斥锁已经被解锁,该方法就会马上返回,Lock
方法的剩余代码也会继续执行下去了,当前互斥锁处于饥饿模式时,若是该 Goroutine 是队列中最后的一个 Goroutine 或者等待锁的时间小于 starvationThresholdNs(1ms)
,当前 Goroutine 就会直接得到互斥锁而且从饥饿模式中退出并得到锁。
互斥锁的解锁过程相比之下就很是简单,Unlock
方法会直接使用 atomic
包提供的 AddInt32
,若是返回的新状态不等于 0
就会进入 unlockSlow
方法:
func (m *Mutex) Unlock() { new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { m.unlockSlow(new) } }
unlockSlow
方法首先会对锁的状态进行校验,若是当前互斥锁已经被解锁过了就会直接抛出异常 sync: unlock of unlocked mutex
停止当前程序,在正常状况下会根据当前互斥锁的状态是正常模式仍是饥饿模式进入不一样的分支:
func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { runtime_Semrelease(&m.sema, true, 1) } }
若是当前互斥锁的状态是饥饿模式就会直接调用 runtime_Semrelease
方法直接将当前锁交给下一个正在尝试获取锁的等待者,等待者会在被唤醒以后设置 mutexLocked
状态,因为此时仍然处于 mutexStarving
,因此新的 Goroutine 也没法得到锁。
在正常模式下,若是当前互斥锁不存在等待者或者最低三位表示的状态都为 0
,那么当前方法就不须要唤醒其余 Goroutine 能够直接返回,当有 Goroutine 正在处于等待状态时,仍是会经过 runtime_Semrelease
唤醒对应的 Goroutine 并移交锁的全部权。
经过对互斥锁 Mutex
加锁和解锁过程的分析,咱们可以得出如下的一些结论,它们可以帮助咱们更好地理解互斥锁的工做原理,互斥锁的加锁的过程比较复杂,涉及自旋、信号量以及 Goroutine 调度等概念:
mutexLocked
加锁;mutexLocked
而且在普通模式下工做,就会进入自旋,执行 30 次 PAUSE
指令消耗 CPU 时间等待锁的释放;1ms
,互斥锁就会被切换到饥饿模式;runtime_SemacquireMutex
方法将调用 Lock
的 Goroutine 切换至休眠状态,等待持有信号量的 Goroutine 唤醒当前协程;1ms
,当前 Goroutine 会将互斥锁切换回正常模式;互斥锁的解锁过程相对来讲就比较简单,虽然对于普通模式和饥饿模式的处理有一些不一样,可是因为代码行数很少,因此逻辑清晰,也很是容易理解:
Unlock
会直接抛出异常;mutexLocked
标志位;runtime_Semrelease
唤醒对应的 Goroutine;读写互斥锁也是 Go 语言 sync
包为咱们提供的接口之一,一个常见的服务对资源的读写比例会很是高,若是大多数的请求都是读请求,它们之间不会相互影响,那么咱们为何不能将对资源读和写操做分离呢?这也就是 RWMutex
读写互斥锁解决的问题,不限制对资源的并发读,可是读写、写写操做没法并行执行。
读 | 写 | |
---|---|---|
读 | Y | N |
写 | N | N |
读写互斥锁在 Go 语言中的实现是 RWMutex
,其中不只包含一个互斥锁,还持有两个信号量,分别用于写等待读和读等待写:
type RWMutex struct { w Mutex writerSem uint32 readerSem uint32 readerCount int32 readerWait int32 }
readerCount
存储了当前正在执行的读操做的数量,最后的 readerWait
表示当写操做被阻塞时等待的读操做个数。
读锁的加锁很是简单,咱们经过 atomic.AddInt32
方法为 readerCount
加一,若是该方法返回了负数说明当前有 Goroutine 得到了写锁,当前 Goroutine 就会调用 runtime_SemacquireMutex
陷入休眠等待唤醒:
func (rw *RWMutex) RLock() { if atomic.AddInt32(&rw.readerCount, 1) < 0 { runtime_SemacquireMutex(&rw.readerSem, false, 0) } }
若是没有写操做获取当前互斥锁,当前方法就会在 readerCount
加一后返回;当 Goroutine 想要释放读锁时会调用 RUnlock
方法:
func (rw *RWMutex) RUnlock() { if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { rw.rUnlockSlow(r) } }
该方法会在减小正在读资源的 readerCount
,当前方法若是遇到了返回值小于零的状况,说明有一个正在进行的写操做,在这时就应该经过 rUnlockSlow
方法减小当前写操做等待的读操做数 readerWait
并在全部读操做都被释放以后触发写操做的信号量 writerSem
:
func (rw *RWMutex) rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { throw("sync: RUnlock of unlocked RWMutex") } if atomic.AddInt32(&rw.readerWait, -1) == 0 { runtime_Semrelease(&rw.writerSem, false, 1) } }
writerSem
在被触发以后,尝试获取读写锁的进程就会被唤醒并得到锁。
当资源的使用者想要获取读写锁时,就须要经过 Lock
方法了,在 Lock
方法中首先调用了读写互斥锁持有的 Mutex
的 Lock
方法保证其余获取读写锁的 Goroutine 进入等待状态,随后的 atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders)
实际上是为了阻塞后续的读操做:
func (rw *RWMutex) Lock() { rw.w.Lock() r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } }
若是当前仍然有其余 Goroutine 持有互斥锁的读锁,该 Goroutine 就会调用 runtime_SemacquireMutex
进入休眠状态,等待读锁释放时触发 writerSem
信号量将当前协程唤醒。
对资源的读写操做完成以后就会将经过 atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
变回正数并经过 for 循环触发全部因为获取读锁而陷入等待的 Goroutine:
func (rw *RWMutex) Unlock() { r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { throw("sync: Unlock of unlocked RWMutex") } for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } rw.w.Unlock() }
在方法的最后,RWMutex
会释放持有的互斥锁让其余的协程可以从新获取读写锁。
相比状态复杂的互斥锁 Mutex
来讲,读写互斥锁 RWMutex
虽然提供的功能很是复杂,可是因为站在了 Mutex
的『肩膀』上,因此总体的实现上会简单不少。
readerSem
— 读写锁释放时通知因为获取读锁等待的 Goroutine;writerSem
— 读锁释放时通知因为获取读写锁等待的 Goroutine;w
互斥锁 — 保证写操做之间的互斥;readerCount
— 统计当前进行读操做的协程数,触发写锁时会将其减小 rwmutexMaxReaders
阻塞后续的读操做;readerWait
— 当前读写锁等待的进行读操做的协程数,在触发 Lock
以后的每次 RUnlock
都会将其减一,当它归零时该 Goroutine 就会得到读写锁;Unlock
时首先会通知全部的读操做,而后才会释放持有的互斥锁,这样可以保证读操做不会被连续的写操做『饿死』;RWMutex
在 Mutex
之上提供了额外的读写分离功能,可以在读请求远远多于写请求时提供性能上的提高,咱们也能够在场景合适时选择读写互斥锁。
WaitGroup
是 Go 语言 sync
包中比较常见的同步机制,它能够用于等待一系列的 Goroutine 的返回,一个比较常见的使用场景是批量执行 RPC 或者调用外部服务:
requests := []*Request{...} wg := &sync.WaitGroup{} wg.Add(len(requests)) for _, request := range requests { go func(r *Request) { defer wg.Done() // res, err := service.call(r) }(request) } wg.Wait()
经过 WaitGroup
咱们能够在多个 Goroutine 之间很是轻松地同步信息,本来顺序执行的代码也能够在多个 Goroutine 中并发执行,加快了程序处理的速度,在上述代码中只有在全部的 Goroutine 都执行完毕以后 Wait
方法才会返回,程序能够继续执行其余的逻辑。
总而言之,它的做用就像它的名字同样,,经过 Done
来传递任务完成的信号,比较经常使用于等待一组 Goroutine 中并发执行的任务所有结束。
WaitGroup
结构体中的成员变量很是简单,其中的 noCopy
的主要做用就是保证 WaitGroup
不会被开发者经过再赋值的方式进行拷贝,进而致使一些诡异的行为:
type WaitGroup struct { noCopy noCopy state1 [3]uint32 }
copylock 包就是一个用于检查相似错误的分析器,它的原理就是在 编译期间 检查被拷贝的变量中是否包含 noCopy
或者 sync
关键字,若是包含当前关键字就会报出如下的错误:
package main import ( "fmt" "sync" ) func main() { wg := sync.Mutex{} yawg := wg fmt.Println(wg, yawg) } $ go run proc.go ./prog.go:10:10: assignment copies lock value to yawg: sync.Mutex ./prog.go:11:14: call of fmt.Println copies lock value: sync.Mutex ./prog.go:11:18: call of fmt.Println copies lock value: sync.Mutex
这段代码会在赋值和调用 fmt.Println
时发生值拷贝致使分析器报错,你能够经过访问 连接 尝试运行这段代码。
除了 noCopy
以外,WaitGroup
结构体中还包含一个总共占用 12 字节大小的数组,这个数组中会存储当前结构体持有的状态和信号量,在 64 位与 32 位的机器上表现也很是不一样。
WaitGroup
提供了私有方法 state
可以帮助咱们从 state1
字段中取出它的状态和信号量。
WaitGroup
对外暴露的接口只有三个 Add
、Wait
和 Done
,其中 Done
方法只是调用了 wg.Add(-1)
自己并无什么特殊的逻辑,咱们来了解一下剩余的两个方法:
func (wg *WaitGroup) Add(delta int) { statep, semap := wg.state() state := atomic.AddUint64(statep, uint64(delta)<<32) v := int32(state >> 32) w := uint32(state) if v < 0 { panic("sync: negative WaitGroup counter") } if v > 0 || w == 0 { return } *statep = 0 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } }
Add
方法的主要做用就是更新 WaitGroup
中持有的计数器 counter
,64 位状态的高 32 位,虽然 Add
方法传入的参数能够为负数,可是一个 WaitGroup
的计数器只能是非负数,当调用 Add
方法致使计数器归零而且还有等待的 Goroutine 时,就会经过 runtime_Semrelease
唤醒处于等待状态的全部 Goroutine。
另外一个 WaitGroup
的方法 Wait
就会在当前计数器中保存的数据大于 0
时修改等待 Goroutine 的个数 waiter
并调用 runtime_Semacquire
陷入睡眠状态。
func (wg *WaitGroup) Wait() { statep, semap := wg.state() for { state := atomic.LoadUint64(statep) v := int32(state >> 32) if v == 0 { return } if atomic.CompareAndSwapUint64(statep, state, state+1) { runtime_Semacquire(semap) if +statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return } } }
陷入睡眠的 Goroutine 就会等待 Add
方法在计数器为 0
时唤醒。
经过对 WaitGroup
的分析和研究,咱们可以得出如下的一些结论:
Add
不能在和 Wait
方法在 Goroutine 中并发调用,一旦出现就会形成程序崩溃;WaitGroup
必须在 Wait
方法返回以后才能被从新使用;Done
只是对 Add
方法的简单封装,咱们能够向 Add
方法传入任意负数(须要保证计数器非负)快速将计数器归零以唤醒其余等待的 Goroutine;WaitGroup
计数器的归零,这些 Goroutine 也会被『同时』唤醒;Go 语言在标准库的 sync
同步包中还提供了 Once
语义,它的主要功能其实也很好理解,保证在 Go 程序运行期间 Once
对应的某段代码只会执行一次。
在以下所示的代码中,Do
方法中传入的函数只会被执行一次,也就是咱们在运行以下所示的代码时只会看见一次 only once
的输出结果:
func main() { o := &sync.Once{} for i := 0; i < 10; i++ { o.Do(func() { fmt.Println("only once") }) } } $ go run main.go only once
做为 sync
包中的结构体,Once
有着很是简单的数据结构,每个 Once
结构体中都只包含一个用于标识代码块是否被执行过的 done
以及一个互斥锁 Mutex
:
type Once struct { done uint32 m Mutex }
Once
结构体对外惟一暴露的方法就是 Do
,该方法会接受一个入参为空的函数,若是使用 atomic.LoadUint32
检查到已经执行过函数了,就会直接返回,不然就会进入 doSlow
运行传入的函数:
func (o *Once) Do(f func()) { if atomic.LoadUint32(&o.done) == 0 { o.doSlow(f) } } func (o *Once) doSlow(f func()) { o.m.Lock() defer o.m.Unlock() if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) f() } }
doSlow
的实现也很是简单,咱们先为当前的 Goroutine 获取互斥锁,而后经过 defer
关键字将 done
成员变量设置成 1
并运行传入的函数,不管当前函数是正常运行仍是抛出 panic
,当前方法都会将 done
设置成 1
保证函数不会执行第二次。
做为用于保证函数执行次数的 Once
结构体,它使用互斥锁和 atomic
提供的方法实现了某个函数在程序运行期间只能执行一次的语义,在使用的过程当中咱们也须要注意如下的内容:
Do
方法中传入的函数只会被执行一次,哪怕函数中发生了 panic
;Do
方法传入不一样的函数时只会执行第一次调用的函数;Go 语言在标准库中提供的 Cond
实际上是一个条件变量,经过 Cond
咱们可让一系列的 Goroutine 都在触发某个事件或者条件时才被唤醒,每个 Cond
结构体都包含一个互斥锁 L
,咱们先来看一下 Cond
是如何使用的:
func main() { c := sync.NewCond(&sync.Mutex{}) for i := 0; i < 10; i++ { go listen(c) } go broadcast(c) ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt) <-ch } func broadcast(c *sync.Cond) { c.L.Lock() c.Broadcast() c.L.Unlock() } func listen(c *sync.Cond) { c.L.Lock() c.Wait() fmt.Println("listen") c.L.Unlock() } $ go run main.go listen listen ... listen
在上述代码中咱们同时运行了 11 个 Goroutine,其中的 10 个 Goroutine 会经过 Wait
等待指望的信号或者事件,而剩下的一个 Goroutine 会调用 Broadcast
方法通知全部陷入等待的 Goroutine,当调用 Boardcast
方法以后,就会打印出 10 次 "listen"
并结束调用。
Cond
的结构体中包含 noCopy
和 copyChecker
两个字段,前者用于保证 Cond
不会再编译期间拷贝,后者保证在运行期间发生拷贝会直接 panic
,持有的另外一个锁 L
实际上是一个接口 Locker
,任意实现 Lock
和 Unlock
方法的结构体均可以做为 NewCond
方法的参数:
type Cond struct { noCopy noCopy L Locker notify notifyList checker copyChecker }
结构体中最后的变量 notifyList
其实也就是为了实现 Cond
同步机制,该结构体其实就是一个 Goroutine
的链表:
type notifyList struct { wait uint32 notify uint32 lock mutex head *sudog tail *sudog }
在这个结构体中,head
和 tail
分别指向的就是整个链表的头和尾,而 wait
和 notify
分别表示当前正在等待的 Goroutine 和已经通知到的 Goroutine,咱们经过这两个变量就能确认当前待通知和已通知的 Goroutine。
Cond
对外暴露的 Wait
方法会将当前 Goroutine 陷入休眠状态,它会先调用 runtime_notifyListAdd
将等待计数器 +1
,而后解锁并调用 runtime_notifyListWait
等待其余 Goroutine 的唤醒:
func (c *Cond) Wait() { c.checker.check() t := runtime_notifyListAdd(&c.notify) c.L.Unlock() runtime_notifyListWait(&c.notify, t) c.L.Lock() } func notifyListAdd(l *notifyList) uint32 { return atomic.Xadd(&l.wait, 1) - 1 }
notifyListWait
方法的主要做用就是获取当前的 Goroutine 并将它追加到 notifyList
链表的最末端:
func notifyListWait(l *notifyList, t uint32) { lock(&l.lock) if less(t, l.notify) { unlock(&l.lock) return } s := acquireSudog() s.g = getg() s.ticket = t if l.tail == nil { l.head = s } else { l.tail.next = s } l.tail = s goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3) releaseSudog(s) }
除了将当前 Goroutine 追加到链表的末端以外,咱们还会调用 goparkunlock
陷入休眠状态,该函数也是在 Go 语言切换 Goroutine 时常常会使用的方法,它会直接让出当前处理器的使用权并等待调度器的唤醒。
Cond
对外提供的 Signal
和 Broadcast
方法就是用来唤醒调用 Wait
陷入休眠的 Goroutine,从两个方法的名字来看,前者会唤醒队列最前面的 Goroutine,后者会唤醒队列中所有的 Goroutine:
func (c *Cond) Signal() { c.checker.check() runtime_notifyListNotifyOne(&c.notify) } func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify) }
notifyListNotifyAll
方法会从链表中取出所有的 Goroutine 并为它们依次调用 readyWithTime
,该方法会经过 goready
将目标的 Goroutine 唤醒:
func notifyListNotifyAll(l *notifyList) { s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, atomic.Load(&l.wait)) for s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next } }
虽然它会依次唤醒所有的 Goroutine,可是这里唤醒的顺序其实也是按照加入队列的前后顺序,先加入的会先被 goready
唤醒,后加入的 Goroutine 可能就须要等待调度器的调度。
而 notifyListNotifyOne
函数就只会从 sudog
构成的链表中知足 sudog.ticket == l.notify
的 Goroutine 并经过 readyWithTime
唤醒:
func notifyListNotifyOne(l *notifyList) { t := l.notify atomic.Store(&l.notify, t+1) for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } s.next = nil readyWithTime(s, 4) return } } }
在通常状况下咱们都会选择在不知足特定条件时调用 Wait
陷入休眠,当某些 Goroutine 检测到当前知足了唤醒的条件,就能够选择使用 Signal
通知一个或者 Broadcast
通知所有的 Goroutine 当前条件已经知足,能够继续完成工做了。
与 Mutex
相比,Cond
仍是一个不被全部人都清楚和理解的同步机制,它提供了相似队列的 FIFO 的等待机制,同时也提供了 Signal
和 Broadcast
两种不一样的唤醒方法,相比于使用 for {}
忙碌等待,使用 Cond
可以在遇到长时间条件没法知足时将当前处理器让出的功能,若是咱们合理使用仍是可以在一些状况下提高性能,在使用的过程当中咱们须要注意:
Wait
方法在调用以前必定要使用 L.Lock
持有该资源,不然会发生 panic
致使程序崩溃;Signal
方法唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine;Broadcast
虽然是广播通知所有等待的 Goroutine,可是真正被唤醒时也是按照必定顺序的;除了这些标准库中提供的同步原语以外,Go 语言还在子仓库 x/sync
中提供了额外的四种同步原语,ErrGroup
、Semaphore
、SingleFlight
和 SyncMap
,其中的 SyncMap
其实就是 sync
包中的 sync.Map
,它在 1.9 版本的 Go 语言中被引入了 x/sync
包,随着 API 的成熟和稳定最后被移到了标准库 sync
包中。
咱们在这一节中就会介绍 Go 语言目前在扩展包中提供的三种原语,也就是 ErrGroup
、Semaphore
和 SingleFlight
。
子仓库 x/sync
中的包 errgroup 其实就为咱们在一组 Goroutine 中提供了同步、错误传播以及上下文取消的功能,咱们可使用以下所示的方式并行获取网页的数据:
var g errgroup.Group var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", } for i := range urls { url := urls[i] g.Go(func() error { resp, err := http.Get(url) if err == nil { resp.Body.Close() } return err }) } if err := g.Wait(); err == nil { fmt.Println("Successfully fetched all URLs.") }
Go
方法可以建立一个 Goroutine 并在其中执行传入的函数,而 Wait
方法会等待 Go
方法建立的 Goroutine 所有返回后返回第一个非空的错误,若是全部的 Goroutine 都没有返回错误,该函数就会返回 nil
。
errgroup
包中的 Group
结构体同时由三个比较重要的部分组成:
Context
时返回的 cancel
函数,主要用于通知使用 context
的 Goroutine 因为某些子任务出错,能够中止工做让出资源了;WaitGroup
同步原语;err
和保证 err
只会被赋值一次的 errOnce
;type Group struct { cancel func() wg sync.WaitGroup errOnce sync.Once err error }
这些字段共同组成了 Group
结构体并为咱们提供同步、错误传播以及上下文取消等功能。
errgroup
对外惟一暴露的构造器就是 WithContext
方法,咱们只能从一个 Context
中建立一个新的 Group
变量,WithCancel
返回的取消函数也仅会在 Group
结构体内部使用:
func WithContext(ctx context.Context) (*Group, context.Context) { ctx, cancel := context.WithCancel(ctx) return &Group{cancel: cancel}, ctx }
建立新的并行子任务须要使用 Go
方法,这个方法内部会对 WaitGroup
加一并建立一个新的 Goroutine,在 Goroutine 内部运行子任务并在返回错误时及时调用 cancel
并对 err
赋值,只有最先返回的错误才会被上游感知到,后续的错误都会被舍弃:
func (g *Group) Go(f func() error) { g.wg.Add(1) go func() { defer g.wg.Done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel() } }) } }() } func (g *Group) Wait() error { g.wg.Wait() if g.cancel != nil { g.cancel() } return g.err }
Wait
方法其实就只是调用了 WaitGroup
的同步方法,在子任务所有完成时取消 Context
并返回可能出现的错误。
errgroup
包中的 Group
同步原语的实现原理仍是很是简单的,它没有涉及很是底层和运行时包中的 API,只是对基本同步语义进行了简单的封装提供了更加复杂的功能,在使用时咱们也须要注意如下的几个问题:
Context
的 cancel
方法取消上下文;信号量是在并发编程中比较常见的一种同步机制,它会保证持有的计数器在 0
到初始化的权重之间,每次获取资源时都会将信号量中的计数器减去对应的数值,在释放时从新加回来,当遇到计数器大于信号量大小时就会进入休眠等待其余进程释放信号,咱们经常会在控制访问资源的进程数量时用到。
Golang 的扩展包中就提供了带权重的信号量,咱们能够按照不一样的权重对资源的访问进行管理,这个包对外也只提供了四个方法:
NewWeighted
用于建立新的信号量;Acquire
获取了指定权重的资源,若是当前没有『空闲资源』,就会陷入休眠等待;TryAcquire
也用于获取指定权重的资源,可是若是当前没有『空闲资源』,就会直接返回 false
;Release
用于释放指定权重的资源;NewWeighted
方法的主要做用建立一个新的权重信号量,传入信号量最大的权重就会返回一个新的 Weighted
结构体指针:
func NewWeighted(n int64) *Weighted { w := &Weighted{size: n} return w } type Weighted struct { size int64 cur int64 mu sync.Mutex waiters list.List }
Weighted
结构体中包含一个 waiters
列表其中存储着等待获取资源的『用户』,除此以外它还包含当前信号量的上限以及一个计数器 cur
,这个计数器的范围就是 [0, size]
:
信号量中的计数器会随着用户对资源的访问和释放进行改变,引入的权重概念可以帮助咱们更好地对资源的访问粒度进行控制,尽量知足全部常见的用例。
在上面咱们已经提到过 Acquire
方法就是用于获取指定权重资源的方法,这个方法总共由三个不一样的状况组成:
Weighted
的大小时,因为不可能知足条件就会直接返回;select
等待当前 Goroutine 被唤醒,被唤醒后就会获取信号量;func (s *Weighted) Acquire(ctx context.Context, n int64) error { s.mu.Lock() if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n s.mu.Unlock() return nil } if n > s.size { s.mu.Unlock() <-ctx.Done() return ctx.Err() } ready := make(chan struct{}) w := waiter{n: n, ready: ready} elem := s.waiters.PushBack(w) s.mu.Unlock() select { case <-ctx.Done(): err := ctx.Err() s.mu.Lock() select { case <-ready: err = nil default: s.waiters.Remove(elem) } s.mu.Unlock() return err case <-ready: return nil } }
另外一个用于获取信号量的方法 TryAcquire
相比之下就很是简单,它只会判断当前信号量是否有充足的资源获取,若是有充足的资源就会直接马上返回 true
不然就会返回 false
:
func (s *Weighted) TryAcquire(n int64) bool { s.mu.Lock() success := s.size-s.cur >= n && s.waiters.Len() == 0 if success { s.cur += n } s.mu.Unlock() return success }
与 Acquire
相比,TryAcquire
因为不会等待资源的释放因此可能更适用于一些延时敏感、用户须要马上感知结果的场景。
最后要介绍的 Release
方法其实也很是简单,当咱们对信号量进行释放时,Release
方法会从头至尾遍历 waiters
列表中所有的等待者,若是释放资源后的信号量有充足的剩余资源就会经过 Channel 唤起指定的 Goroutine:
func (s *Weighted) Release(n int64) { s.mu.Lock() s.cur -= n for { next := s.waiters.Front() if next == nil { break } w := next.Value.(waiter) if s.size-s.cur < w.n { break } s.cur += w.n s.waiters.Remove(next) close(w.ready) } s.mu.Unlock() }
固然也可能会出现剩余资源没法唤起 Goroutine 的状况,在这时当前方法就会释放锁后直接返回,经过对这段代码的分析咱们也能发现,若是一个信号量须要的占用的资源很是多,他可能会长时间没法获取锁,这可能也是 Acquire
方法引入另外一个参数 Context
的缘由,为信号量的获取设置一个超时时间。
带权重的信号量确实有着更多的应用场景,这也是 Go 语言对外提供的惟一一种信号量实现,在使用的过程当中咱们须要注意如下的几个问题:
Acquire
和 TryAcquire
方法均可以用于获取资源,前者用于同步获取会等待锁的释放,后者会在没法获取锁时直接返回;Release
方法会按照 FIFO 的顺序唤醒能够被唤醒的 Goroutine;Release
的释放策略可能会等待比较长的时间;singleflight 是 Go 语言扩展包中提供了另外一种同步原语,这其实也是做者最喜欢的一种同步扩展机制,它可以在一个服务中抑制对下游的屡次重复请求,一个比较常见的使用场景是 — 咱们在使用 Redis 对数据库中的一些热门数据进行了缓存并设置了超时时间,缓存超时的一瞬间可能有很是多的并行请求发现了 Redis 中已经不包含任何缓存因此大量的流量会打到数据库上影响服务的延时和质量。
可是 singleflight
就能有效地解决这个问题,它的主要做用就是对于同一个 Key
最终只会进行一次函数调用,在这个上下文中就是只会进行一次数据库查询,查询的结果会写回 Redis 并同步给全部请求对应 Key
的用户:
这其实就减小了对下游的瞬时流量,在获取下游资源很是耗时,例如:访问缓存、数据库等场景下就很是适合使用 singleflight
对服务进行优化,在上述的这个例子中咱们就能够在想 Redis 和数据库中获取数据时都使用 singleflight
提供的这一功能减小下游的压力;它的使用其实也很是简单,咱们能够直接使用 singleflight.Group{}
建立一个新的 Group
结构体,而后经过调用 Do
方法就能对相同的请求进行抑制:
type service struct { requestGroup singleflight.Group } func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) { v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) { rows, err := // select * from tables if err != nil { return nil, err } return rows, nil }) if err != nil { return nil, err } return Response{ rows: rows, }, nil }
上述代码使用请求的哈希做为抑制相同请求的键,咱们也能够选择一些比较关键或者重要的字段做为 Do
方法的第一个参数避免对下游的瞬时大量请求。
Group
结构体自己由一个互斥锁 Mutex
和一个从 Key
到 call
结构体指针的映射表组成,每个 call
结构体都保存了当前此次调用对应的信息:
type Group struct { mu sync.Mutex m map[string]*call } type call struct { wg sync.WaitGroup val interface{} err error dups int chans []chan<- Result }
call
结构体中的 val
和 err
字段都是在执行传入的函数时只会被赋值一次,它们也只会在 WaitGroup
等待结束都被读取,而 dups
和 chans
字段分别用于存储当前 singleflight
抑制的请求数量以及在结果返回时将信息传递给调用方。
singleflight
包提供了两个用于抑制相同请求的方法,其中一个是同步等待的方法 Do
,另外一个是返回 Channel 的 DoChan
,这两个方法在功能上没有太多的区别,只是在接口的表现上稍有不一样。
每次 Do
方法的调用时都会获取互斥锁并尝试对 Group
持有的映射表进行懒加载,随后判断是否已经存在 key
对应的函数调用:
当不存在对应的 call
结构体时:
call
结构体指针;WaitGroup
持有的计数器;call
结构体指针添加到映射表;Mutex
;doCall
方法等待结果的返回;当已经存在对应的 call
结构体时;
dups
计数器,它表示当前重复的调用次数;Mutex
;WaitGroup.Wait
等待请求的返回;func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ g.mu.Unlock() c.wg.Wait() return c.val, c.err, true } c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock() g.doCall(c, key, fn) return c.val, c.err, c.dups > 0 }
由于 val
和 err
两个字段都只会在 doCall
方法中被赋值,因此当 doCall
方法和 WaitGroup.Wait
方法返回时,这两个值就会返回给 Do
函数的调用者。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { c.val, c.err = fn() c.wg.Done() g.mu.Lock() delete(g.m, key) for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } g.mu.Unlock() }
doCall
中会运行传入的函数 fn
,该函数的返回值就会赋值给 c.val
和 c.err
,函数执行结束后就会调用 WaitGroup.Done
方法通知全部被抑制的请求,当前函数已经执行完成,能够从 call
结构体中取出返回值并返回了;在这以后,doCall
方法会获取持有的互斥锁并经过管道将信息同步给使用 DoChan
方法的调用方。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch) g.mu.Unlock() return ch } c := &call{chans: []chan<- Result{ch}} c.wg.Add(1) g.m[key] = c g.mu.Unlock() go g.doCall(c, key, fn) return ch }
DoChan
方法和 Do
的区别就是,它使用 Goroutine 异步执行 doCall
并向 call
持有的 chans
切片中追加 chan Result
变量,这也是它可以提供异步传值的缘由。
singleflight
包提供的 Group
接口确实很是好用,当咱们须要这种抑制对下游的相同请求时就能够经过这个方法来增长吞吐量和服务质量,在使用的过程当中咱们也须要注意如下的几个问题:
Do
和 DoChan
一个用于同步阻塞调用传入的函数,一个用于异步调用传入的参数并经过 Channel 接受函数的返回值;Forget
方法能够通知 singleflight
在持有的映射表中删除某个键,接下来对该键的调用就会直接执行方法而不是等待前面的函数返回;咱们在这一节中介绍了 Go 语言标准库中提供的基本原语以及扩展包中的扩展原语,这些并发编程的原语可以帮助咱们更好地利用 Go 语言的特性构建高吞吐量、低延时的服务,并解决因为并发带来的错误,到这里咱们再从新回顾一下这一节介绍的内容:
Mutex
互斥锁
mutexLocked
加锁;mutexLocked
而且在普通模式下工做,就会进入自旋,执行 30 次 PAUSE
指令消耗 CPU 时间等待锁的释放;1ms
,互斥锁就会被切换到饥饿模式;runtime_SemacquireMutex
方法将调用 Lock
的 Goroutine 切换至休眠状态,等待持有信号量的 Goroutine 唤醒当前协程;1ms
,当前 Goroutine 会将互斥锁切换回正常模式;Unlock
会直接抛出异常;mutexLocked
标志位;runtime_Semrelease
唤醒对应的 Goroutine;RWMutex
读写互斥锁
readerSem
— 读写锁释放时通知因为获取读锁等待的 Goroutine;writerSem
— 读锁释放时通知因为获取读写锁等待的 Goroutine;w
互斥锁 — 保证写操做之间的互斥;readerCount
— 统计当前进行读操做的协程数,触发写锁时会将其减小 rwmutexMaxReaders
阻塞后续的读操做;readerWait
— 当前读写锁等待的进行读操做的协程数,在触发 Lock
以后的每次 RUnlock
都会将其减一,当它归零时该 Goroutine 就会得到读写锁;Unlock
时首先会通知全部的读操做,而后才会释放持有的互斥锁,这样可以保证读操做不会被连续的写操做『饿死』;WaitGroup
等待一组 Goroutine 结束
Add
不能在和 Wait
方法在 Goroutine 中并发调用,一旦出现就会形成程序崩溃;WaitGroup
必须在 Wait
方法返回以后才能被从新使用;Done
只是对 Add
方法的简单封装,咱们能够向 Add
方法传入任意负数(须要保证计数器非负)快速将计数器归零以唤醒其余等待的 Goroutine;WaitGroup
计数器的归零,这些 Goroutine 也会被『同时』唤醒;Once
程序运行期间仅执行一次
Do
方法中传入的函数只会被执行一次,哪怕函数中发生了 panic
;Do
方法传入不一样的函数时只会执行第一次调用的函数;Cond
发生指定事件时唤醒
Wait
方法在调用以前必定要使用 L.Lock
持有该资源,不然会发生 panic
致使程序崩溃;Signal
方法唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine;Broadcast
虽然是广播通知所有等待的 Goroutine,可是真正被唤醒时也是按照必定顺序的;ErrGroup
为一组 Goroutine 提供同步、错误传播以及上下文取消的功能
Context
的 cancel
方法取消上下文;Semaphore
带权重的信号量
Acquire
和 TryAcquire
方法均可以用于获取资源,前者用于同步获取会等待锁的释放,后者会在没法获取锁时直接返回;Release
方法会按照 FIFO 的顺序唤醒能够被唤醒的 Goroutine;Release
的释放策略可能会等待比较长的时间;SingleFlight
用于抑制对下游的重复请求
Do
和 DoChan
一个用于同步阻塞调用传入的函数,一个用于异步调用传入的参数并经过 Channel 接受函数的返回值;Forget
方法能够通知 singleflight
在持有的映射表中删除某个键,接下来对该键的调用就会直接执行方法而不是等待前面的函数返回;这些同步原语的实现不只要考虑 API 接口的易用、解决并发编程中可能遇到的线程竞争问题,还须要对尾延时进行优化避免某些 Goroutine 没法获取锁或者资源而被饿死,对同步原语的学习也可以加强咱们队并发编程的理解和认识,也是了解并发编程没法跨越的一个步骤。