目前GO已经更新到了1.14的版本
我们通常人若是直接去看mutex的源码的话,实际上是比较难理解为何写成了如今这个样子,尤为是加锁里面的各类逻辑判断太多了,各类位运算一脸懵逼,其实咱们只要掌握它最初的设计思想,那么后面新增的逻辑,理解起来都很简单了。git
Mutex初版代码加上注释不过才109行。很是精简,下面介绍一下我对初版Mutex源码的理解github
// Mutex有state和sema两个成员变量,这一点是在1.14没有变化的 // 其中 state 字段表明当前锁的状态,sema是控制锁状态的信号量,主要关注state就行 // // state 比较复杂,state一共32位 // 最低位表明 locked状态, 0表示未上锁,1表示上锁 // 倒数第二位 woken状态,0 表示未唤醒,1表示已唤醒 // 剩余30位用于表示当前有多少个goroutine等待互斥锁的释放,表明最多支持2^30个goroutine type Mutex struct { state int32 sema uint32 }
咱们接下来看它的Lock方法golang
func (m *Mutex) Lock() { // 首先直接CAS尝试获取锁 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if raceenabled { raceAcquire(unsafe.Pointer(m)) } // 上锁成功后,直接返回 return } // CAS获取锁失败 // awoke 默认是未唤醒状态 awoke := false for { // 当前state赋值给old old := m.state // 给old上锁 new := old | mutexLocked // 若是old自己就已经上了锁的话 if old&mutexLocked != 0 { // goroutine等待数 + 1 new = old + 1<<mutexWaiterShift } // 若是当前g被唤醒了 if awoke { // 把woken标记清除掉 new &^= mutexWoken } // 更新一下当前锁的状态 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 若是old自己就是解锁状态 if old&mutexLocked == 0 { // 那么表明抢锁成功,直接退出for循环 break } // 不是解锁状态 // 尝试获取信号量,进入等待队列,等待被唤醒 runtime_Semacquire(&m.sema) // 被唤醒,awoke设置true,继续for循环 awoke = true } } if raceenabled { raceAcquire(unsafe.Pointer(m)) } }
简单总结一下Lock的逻辑,分几种状况说明一下多线程
第一种状况:第一次上锁的时候,直接走第一步CAS上锁,成功返回ide
第二种状况:Mutex已经被另外一个g上锁,那么state的g等待数+1,更新当前的锁状态,而后就进入队列,等待被唤醒,等到另个一g调用了Unlock方法以后,当前g被唤醒,而后设置awoken=true,再执行一遍for循环,此时locked位就是未上锁状态(0),new就是表明上锁,而后清除woken位,而后再CAS更新new到state上,由于以前的锁是未上锁状态,那么就表明抢锁成功,break,返回oop
第三种状况:和第二种同样,只不过,在CAS更新new到state上时,有其余g先改掉了state的值,那么就继续for循环,而后重复到第二种状况。性能
接下来看下Unlock方法优化
func (m *Mutex) Unlock() { if raceenabled { _ = m.state raceRelease(unsafe.Pointer(m)) } // 一开始也是直接去掉加锁状态 new := atomic.AddInt32(&m.state, -mutexLocked) // 判断一下是否解锁了一个未加锁的Mutex if (new+mutexLocked)&mutexLocked == 0 { // 直接panic panic("sync: unlock of unlocked mutex") } // 把解锁后的值赋值给old old := new for { // 若是此时没有须要等待获取锁的G // 或者当前Mutex已经被抢锁成功或者已经有被唤醒的G,那么就能够直接返回 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { return } // g等待数-1,而后设置唤醒标记位 new = (old - 1<<mutexWaiterShift) | mutexWoken // 更新Mutex的state的值 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 手动唤醒一个被runtime_Semacquire阻塞的G runtime_Semrelease(&m.sema) // 返回 return } // 更新state失败,说明有其余G修改了state的值,那么,从新赋值一下,再进行下一次循环 old = m.state } }
Unlock要比Lock简单不少,因此这里不总结了,看注释就能明白ui
到这里,最第一版本的Mutex源码已经分析完了,关键仍是在上锁的方法里面。上锁逻辑很是简单粗暴,直接CAS获取锁,失败就G等待数+1,而后进入队列,等待被唤醒。this
那么,若是仔细想一想,就会发现性能上仍是有能够改进的地方。
咱们应用Mutex的时候,确定把锁粒度控制的越小越好,那么这样的话就极可能会出现这么一个问题,当第一次上锁CAS失败的时候,mutex已经被其余G解锁了,可是当前G就仍是直接进入队列,等待被唤醒,这样的话其实就会带来额外的调度开销。
因此,Mutex后面引进了自旋锁的概念自旋锁提交代码
Currently sync.Mutex is fully cooperative. That is, once contention is discovered,
the goroutine calls into scheduler. This is suboptimal as the resource can become
free soon after (especially if critical sections are short). Server software
usually runs at ~~50% CPU utilization, that is, switching to other goroutines
is not necessary profitable.This change adds limited active spinning to sync.Mutex if:
- running on a multicore machine and
- GOMAXPROCS>1 and
- there is at least one other running P and
- local runq is empty. As opposed to runtime mutex we don't do passive spinning, because there can be work on global runq on on other
Ps.
简单归纳一下,就是为了解决锁粒度很是小的时候,给系统带来的没必要要的调度开销
不过自旋要先知足几个条件
首先程序要跑在多核的机器上,而后GOMAXPROCS要大于1,而且此时有至少一个P的local runq是空的,才能进入到自旋的状态自旋是一种多线程同步机制,当前的进程在进入自旋的过程当中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋能够避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,可是使用的不恰当就会拖慢整个程序,因此 Goroutine 进入自旋的条件很是苛刻
看一下更新以后的Lock方法
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if raceenabled { raceAcquire(unsafe.Pointer(m)) } return } awoke := false iter := 0 // 自旋的次数( <= 4) for { old := m.state new := old | mutexLocked // 没有解锁 if old&mutexLocked != 0 { // 判断是否知足自旋的状态 if runtime_canSpin(iter) { // 当woken标记位没有被设置,并且等待G数量不等于0,并设置woken标记位成功 // 这里设置woken标记位的缘由是,通知Unlock不用去唤醒等待队列里面的G了 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { // 标记awoke=true awoke = true } // runtime_doSpin -> sync_runtime_doSpin // 每次自旋30个时钟周期,最多120个周期 runtime_doSpin() iter++ // 再次执行for循环 continue } // 自旋结束以后,G等待数量+1 new = old + 1<<mutexWaiterShift } if awoke { // 这里多了个判断woken状态不一致的逻辑 if new&mutexWoken == 0 { panic("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&mutexLocked == 0 { break } runtime_Semacquire(&m.sema) awoke = true iter = 0 // 重置iter } } if raceenabled { raceAcquire(unsafe.Pointer(m)) } }
相比于初版的Mutex,这里只在加锁的方法里面增长了自旋锁的逻辑
当Mutex已经上锁的时候,当前G在知足自旋条件下,进入自旋状态,在自旋中,其余G解锁了Mutex,那么当前G就设置了woken标记位,这样其余G在Unlock的时候就不会去等待队列里面唤醒G了,而后当前G就瓜熟蒂落的抢到了锁
这样自旋锁在锁粒度很是小的场景下的能对其性能带来必定的优化。
引入自旋锁以后,又带来了一个问题。就是G等待队列的长尾问题。由于从等待队列里面被唤醒,而后再去抢锁,对自己就在执行的G来讲,被唤醒的G实际上是很难抢过当前执行的G的,这样的话,等待队列里面的G,就会被饿死(长时间获取不到锁),这样对等待队列的G来讲实际上是不公平的。
因此Mutex后面引入了饥饿模式饥饿模式代码
本次代码变更仍是挺大的
先看下提交者的介绍
Add new starvation mode for Mutex.
In starvation mode ownership is directly handed off from
unlocking goroutine to the next waiter. New arriving goroutines
don't compete for ownership.
Unfair wait time is now limited to 1ms.
Also fix a long standing bug that goroutines were requeued
at the tail of the wait queue. That lead to even more unfair
acquisition times with multiple waiters.
Performance of normal mode is not considerably affected.简单归纳一下,就是解决了等待G队列的长尾问题
饥饿模式下,直接由unlock把锁交给等待队列中排在第一位的G,同时,饥饿模式下,新进来的G不会参与抢锁也不会进入自旋状态,会直接进入等待队列的尾部。
饥饿模式的触发条件,当一个G等待锁时间超过1毫秒时,Mutex切换到饥饿模式
饥饿模式的取消条件,当一个G获取到锁且在等待队列的末尾,或者这个G获取锁的等待时间在1ms内,那么Mutex切换回正常模式
带来的改变
Mutex.state的倒数第三位,变成了mutexStarving标记位,0表示正常模式,1表示饥饿模式,与此同时,支持的最大等待G数量从2^30^个 变成了2^29^个
接下来仍是主要关注Lock方法,我只在新增的逻辑上添加注释了,我直接贴1.14的Lock代码,较1.9的版本没什么改变
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path (outlined so that the fast path can be inlined) // 这里封装了一下 m.lockSlow() } func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false // 默认是正常模式 awoke := false iter := 0 old := m.state for { // 当前Mutex在饥饿模式下已经被锁了的话,当前G不进入自旋 // 只有Mutex在正常模式且被锁了的状况下,而且知足自旋的条件,才会进入到自旋逻辑里面 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old // 若是当前不是饥饿模式 if old&mutexStarving == 0 { // 加锁 new |= mutexLocked } // 若是Mutex已经被锁,或者是在饥饿模式 if old&(mutexLocked|mutexStarving) != 0 { // 等待的G数量+1 new += 1 << mutexWaiterShift } // The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don't do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case. // 若是已是饥饿模式,而且Mutex是被锁的状态 if starving && old&mutexLocked != 0 { // 切换成饥饿模式 new |= mutexStarving } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } // 更新state值 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 非饥饿模式下抢锁成功 if old&(mutexLocked|mutexStarving) == 0 { // 退出 break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. // 若是以前已经设置过waitStartTime的话,queueLifo就是true了 queueLifo := waitStartTime != 0 // 没有设置过,获取下运行时间 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 阻塞,等待被唤醒 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 若是等待时间超过1ms,设置starving = true,不然就是false starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 若是Mutex已是饥饿模式 if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. // 若是当前G是在饥饿模式下被唤醒的 // 加个判断state是否正确设置的逻辑 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // delta = -7 (1..... 0111) delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // 退出饥饿模式 delta -= mutexStarving } // 更新state atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
Unlock方法改动就很是小了
func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } } func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 不是饥饿模式 if new&mutexStarving == 0 { old := new for { // G等待队列==0,直接返回 // (或者,处于woken模式,直接返回 // 或者,处于locked模式,直接返回 // 或者处于饥饿模式,直接返回) if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // 唤醒G等待队列的首个G runtime_Semrelease(&m.sema, true, 1) } }
Mutex通过两次演进,都解决了不一样的问题。Mutex用法很是简单,里面的原理不感兴趣的话其实不必深究,知道个大概的逻辑就好了。
补充: mutex的等待G队列的顺序是FIFO 饥饿模式下,性能其实很低,主要就是为了解决长尾问题的