在锁的实现中如今愈来愈多的采用CAS来进行,经过利用处理器的CAS指令来实现对给定变量的值交换来进行锁的获取编程
在多线程并发的状况下颇有可能会有线程CAS失败,一般就会配合for循环采用轮询的方式去尝试从新获取锁多线程
锁从公平性上一般会分为公平锁和非公平锁,主要取决于在锁获取的过程当中,先进行锁获取的线程是否比后续的线程更先得到锁,若是是则就是公平锁:多个线程按照获取锁的顺序依次得到锁,不然就是非公平性并发
锁饥饿是指由于大量线程都同时进行获取锁,某些线程可能在锁的CAS过程当中一直失败,从而长时间获取不到锁编程语言
上面提到了CAS和轮询锁进行锁获取的方式,能够发现若是已经有线程获取了锁,可是在当前线程在屡次轮询获取锁失败的时候,就没有必要再继续进行反复尝试浪费系统资源,一般就会采用一种排队机制,来进行排队等待ide
在大多数编程语言中针对实现基于CAS的锁的时候,一般都会采用一个32位的整数来进行锁状态的存储源码分析
在go的mutex中核心成员变量只有两个state和sema,其经过state来进行锁的计数,而经过sema来实现排队性能
type Mutex struct { state int32 sema uint32 }
锁模式主要分为两种ui
描述 | 公平性 | |
---|---|---|
正常模式 | 正常模式下全部的goroutine按照FIFO的顺序进行锁获取,被唤醒的goroutine和新请求锁的goroutine同时进行锁获取,一般新请求锁的goroutine更容易获取锁 | 否 |
饥饿模式 | 饥饿模式全部尝试获取锁的goroutine进行等待排队,新请求锁的goroutine不会进行锁获取,而是加入队列尾部等待获取锁 | 是 |
上面能够看到其实在正常模式下,其实锁的性能是最高的若是多个goroutine进行锁获取后立马进行释放则能够避免多个线程的排队消耗 同理在切换到饥饿模式后,在进行锁获取的时候,若是知足必定的条件也会切换回正常模式,从而保证锁的高性能atom
在mutex中锁有三个标志位,其中其二进制位分别位001(mutexLocked)、010(mutexWoken)、100(mutexStarving), 注意这三者并非互斥的关系,好比一个锁的状态多是锁定的饥饿模式而且已经被唤醒线程
mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving
mutex中经过低3位存储了当前mutex的三种状态,剩下的29位所有用来存储尝试正在等待获取锁的goroutine的数量
mutexWaiterShift = iota // 3
唤醒标志其实就是上面说的第二位,唤醒标志主要用于标识当前尝试获取goroutine是否有正在处于唤醒状态的,记得上面公平模式下,当前正在cpu上运行的goroutine可能会先获取到锁
当释放锁的时候,若是当前有goroutine正在唤醒状态,则只须要修改锁状态为释放锁,则处于woken状态的goroutine就能够直接获取锁,不然则须要唤醒一个goroutine, 而且等待这个goroutine修改state状态为mutexWoken,才退出
若是当前没有goroutine加锁,则而且直接进行CAS成功,则直接获取锁成功
// Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return }
// 注意这里其实包含两个信息一个是若是当前已是锁定状态,而后容许自旋iter主要是计数次数实际上只容许自旋4次 // 其实就是在自旋而后等待别人释放锁,若是有人释放锁,则会马上进行下面的尝试获取锁的逻辑 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // !awoke 若是当前线程不处于唤醒状态 // old&mutexWoken == 0若是当前没有其余正在唤醒的节点,就将当前节点处于唤醒的状态 // old>>mutexWaiterShift != 0 :右移3位,若是不位0,则代表当前有正在等待的goroutine // atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken)设置当前状态为唤醒状态 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } // 尝试自旋, runtime_doSpin() // 自旋计数 iter++ // 重新获取状态 old = m.state continue }
流程走到这里会有两种可能: 1.锁状态当前已经不是锁定状态 2.自旋超过指定的次数,再也不容许自旋了
new := old if old&mutexStarving == 0 { // 若是当前不是饥饿模式,则这里其实就能够尝试进行锁的获取了|=其实就是将锁的那个bit位设为1表示锁定状态 new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { // 若是当前被锁定或者处于饥饿模式,则增等待一个等待计数 new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { // 若是当前已经处于饥饿状态,而且当前锁仍是被占用,则尝试进行饥饿模式的切换 new |= mutexStarving } if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } // awoke为true则代表当前线程在上面自旋的时候,修改mutexWoken状态成功 // 清除唤醒标志位 // 为何要清除标志位呢? // 其实是由于后续流程颇有可能当前线程会被挂起,就须要等待其余释放锁的goroutine来唤醒 // 但若是unlock的时候发现mutexWoken的位置不是0,则就不会去唤醒,则该线程就没法再醒来加锁 new &^= mutexWoken }
再加锁的时候实际上只会有一个goroutine加锁CAS成功,而其余线程则须要从新获取状态,进行上面的自旋与唤醒状态的从新计算,从而再次CAS
if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { // 若是原来的状态等于0则代表当前已经释放了锁而且也不处于饥饿模式下 // 实际的二进制位多是这样的 1111000, 后面三位全是0,只有记录等待goroutine的计数器可能会不为0 // 那就代表其实 break // locked the mutex with CAS } // 排队逻辑,若是发现waitStatrTime不为0,则代表当前线程以前已经再排队来,后面可能由于 // unlock被唤醒,可是本次依旧没获取到锁,因此就将它移动到等待队列的头部 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 这里就会进行排队等待其余节点进行唤醒 runtime_SemacquireMutex(&m.sema, queueLifo) // 若是等待超过指定时间,则切换为饥饿模式 starving=true // 若是一个线程以前不是饥饿状态,而且也没超过starvationThresholdNs,则starving为false // 就会触发下面的状态切换 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 从新获取状态 old = m.state if old&mutexStarving != 0 { // 若是发现当前已是饥饿模式,注意饥饿模式唤醒的是第一个goroutine // 当前全部的goroutine都在排队等待 // 一致性检查, if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 获取当前的模式 delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // 若是当前goroutine不是饥饿状态,就从饥饿模式切换会正常模式 // 就从mutexStarving状态切换出去 delta -= mutexStarving } // 最后进行cas操做 atomic.AddInt32(&m.state, delta) break } // 重置计数 awoke = true iter = 0 } else { old = m.state }
func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // 直接进行cas操做 new := atomic.AddInt32(&m.state, -mutexLocked) if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { // 若是释放锁而且不是饥饿模式 old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { // 若是已经有等待者而且已经被唤醒,就直接返回 return } // 减去一个等待计数,而后将当前模式切换成mutexWoken new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { // 唤醒一个goroutine runtime_Semrelease(&m.sema, false) return } old = m.state } } else { // 唤醒等待的线程 runtime_Semrelease(&m.sema, true) } }
关注公告号阅读更多源码分析文章
更多文章关注 www.sreguide.com