互斥锁是并发程序中对共享资源进行访问控制的主要手段,Mutex是go语言提供的简单易用的互斥锁。Mutex的结构很简单,暴露的方法也只有2个,一个加锁 一个解锁。那么咱们天天用的Mutex互斥锁是如何实现的呢?
其实使用的是go语言automic包中的院子操做,具体如何使用能够参考以前写的文章。
在Mutex中的state是状态码,在mutex中把state分红4段。以下图:segmentfault
互斥锁的实现其实就是争夺Locked,当goroutineA 抢到了锁以后,第二个GoroutineB获取锁则会被阻塞等到GoroutineA释放锁以后GoroutineB将会被唤醒。固然具体实现则不会有这么简单,其中还有饥饿模式,自旋函数等一些概念。并发
加锁时,若是当前Locked位为1,说明该锁当前由其余协程持有,尝试加锁的协程并非立刻转入阻塞,而是会持续的探测Locked位是否变为0,这个过程即为自旋过程。自旋时间很短,但若是在自旋过程当中发现锁已被释放,那么协程能够当即获取锁。此时即使有协程被唤醒也没法获取锁,只能再次阻塞。
自旋的好处是,当加锁失败时没必要当即转入阻塞,有必定机会获取到锁,这样能够避免协程的切换。函数
若是自旋过程当中得到锁,则立刻执行该goroutine。若是永远在自旋模式中那么以前阻塞的goroutine则很难得到锁,这样一来一些goroutine则会被阻塞时间过长。如何解决这个问题,go mutex中引入了两种模式,具体请看下文。源码分析
在普通模式下等待者以 FIFO 的顺序排队来获取锁,但被唤醒的等待者发现并无获取到 mutex,而且还要与新到达的 goroutine 们竞争 mutex 的全部权。ui
在饥饿模式下,mutex 的全部权直接从对 mutex 执行解锁的 goroutine 传递给等待队列前面的等待者。新到达的 goroutine 们不要尝试去获取 mutex,即便它看起来是在解锁状态,也不要试图自旋。this
type Mutex struct { // 状态码 state int32 // 信号量,用于向处于 Gwaitting 的 G 发送信号 sema uint32 } const( // 值=1 表示是否锁住 1=锁 0=未锁 mutexLocked = 1 << iota // mutex is locked // 值=2 表示是否被唤醒 1=唤醒 0=未唤醒 mutexWoken // 是否为饥渴模式(等待超过1秒则为饥渴模式) mutexStarving // 右移3位,为等待的数量 mutexWaiterShift = iota // 饥饿模式的时间 starvationThresholdNs = 1e6 )
func (m *Mutex) Lock() { // 利用atomic包中的cas操做判断是否上锁 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { // 判断是否启用了race检测 if race.Enabled { race.Acquire(unsafe.Pointer(m)) } // m.state = 1 直接返回 其余goroutine调用lock会发现已经被上锁 return } // 等待时间 var waitStartTime int64 // 饥饿模式标志位 starving := false // 唤醒标志位 awoke := false // 自旋迭代的次数 iter := 0 // 保存 mutex 当前状态 old := m.state // 循环 for { // 判断 若是不是饥饿模式而且是否可以执行自旋函数(判断自旋次数) // old&(0001|0100) == 0001 ==> old&0101 // 当old为0001为非饥饿模式 0001 == 0001 true 当old为0101饥饿模式 0101 == 0001 false // runtime_canSpin 判断自旋少于4次,而且是多核机器上而且GOMAXPROCS>1 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 判断条件: // 未被唤醒 && 等待数量不为0 && 使用CAS设置状态为已唤醒 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { // 设置激活为true awoke = true } // 自旋函数 自旋次数+1 runtime_doSpin() iter++ old = m.state continue } // 若是不能执行自旋函数 记录一个new状态 而后判断改变new 最终使用CAS替换尝试设置state属性 new := old // 当前的mutex.state处于正常模式,则将new的锁位设置为1 if old&mutexStarving == 0 { new |= mutexLocked } // 若是当前锁锁状态为锁定状态或者处于饥饿模式,则将等待的线程数量+1 if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 若是starving变量为true而且处于锁定状态,则new的饥饿状态位打开 if starving && old&mutexLocked != 0 { new |= mutexStarving } // 对于状态的验证 if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } // new已经判断设置完,若是mutex的state没有变更过的话 则替换成new if atomic.CompareAndSwapInt32(&m.state, old, new) { // 若是未被锁定而且并非出于饥饿状态 退出循环 goroutine获取到锁 if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // 若是当前的 goroutine 以前已经在排队了,就排到队列的前面。 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 进入休眠状态,等待信号唤醒后从新开始循环 若是queueLifo为true,则将等待goroutine插入到队列的前面 runtime_SemacquireMutex(&m.sema, queueLifo) // 计算等待时间 肯定 mutex 当前所处模式 // 此时这个goroutine已经被唤醒 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 判断被唤醒的goroutine是否为饥饿状态 if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) // 当不是饥饿状态或者等待数只有一个,则退出饥饿模式 if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } // 若是不是饥饿模式 让新到来的 goroutine 先获取锁,继续循环 awoke = true iter = 0 } else { // 若是CAS替换未能成功 则继续循环 old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // 利用原子操做 设置state锁位置为0 new := atomic.AddInt32(&m.state, -mutexLocked) // 判断状态,给未加锁的mutex解锁,抛出错误 if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 判断是否为饥饿模式 if new&mutexStarving == 0 { // 正常状态 old := new for { // 若是等待的goroutine为零 || 已经被锁定、唤醒、或者已经变成饥饿状态 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 更新new的值,减去等待数量 new = (old - 1<<mutexWaiterShift) | mutexWoken // 使用CAS 替换旧值 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 若是替换成功 则恢复挂起的goroutine.r若是为 true代表将唤醒第一个阻塞的goroutine runtime_Semrelease(&m.sema, false) return } old = m.state } } else { // 恢复挂起的goroutine.r若是为 true代表将唤醒第一个阻塞的goroutine runtime_Semrelease(&m.sema, true) } }