golang中锁mutex的实现

golang中的锁是经过CAS原子操做实现的,Mutex结构以下:
type Mutex struct {
    state int32                
    sema  uint32
}
 
//state表示锁当前状态,每一个位都有意义,零值表示未上锁
//sema用作信号量,经过PV操做从等待队列中阻塞/唤醒goroutine,等待锁的goroutine会挂到等待队列中,而且陷入睡眠不被调度,unlock锁时才唤醒。具体在sync/mutex.go Lock函数实现中。
 
插播一下sema
虽然在Mutex中就是一个整形字段,可是它是很重要的一环,这个字段就是用于信号量管理goroutine的睡眠和唤醒的。
sema具体实现还没详看,这里大概分析下功能,注意不许确!!
首先sema为goroutine的“调度”提供了一种实现,可让goroutine阻塞和唤醒
信号量申请资源在runtime/sema.go中semacquire1
信号量释放资源在semrelease1中
首先sema中,一个semaRoot结构和一个全局semtable变量,一个semaRoot用于一个信号量的PV操做(猜想与goroutine调度模型MGP有关,一个Processor挂多个goroutine,对于一个processor下的多个goroutine的须要一个信号量来管理,固然须要一个轻量的锁在goroutine的状态转换时加锁,即下面的lock结构,这个锁与Mutex中的锁不相同的,是sema中本身实现的),多个semaRoot的分配和查找就经过全局变量semtable来管理
type semaRoot struct {
    lock  mutex
    treap *sudog // root of balanced tree of unique waiters.
    nwait uint32 // Number of waiters. Read w/o the lock.
}
var semtable [semTabSize]struct {
    root semaRoot
    pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}
1 让当前goroutine睡眠阻塞是经过goparkunlock实现的,在semacquire1中这样调用:
          1) root := semroot(addr)
                semroot中是经过信号量地址找到semaRoot结构
          2) 略过一段..... 直接到使当前goroutine睡眠位置
                首先lock(&root.lock)上锁
                而后调用root.queue()让当前goroutine进入等待队列(注意一个信号量管理多个goroutine,goroutine睡眠前,自己的详细信息就要保存起来,放到队列中,也就是在挂到了semaRoot结构的treap上,看注释队列是用平衡树实现的?)
          3)调用goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4) 
                最后会调用到gopark,gopark会让系统从新执行一次调度,在从新调度以前,会将当前goroutine,即G对象状态置为sleep状态,再也不被调度直到被唤醒,而后unlock锁,这个函数给了系统一个机会,将代码执行权限转交给runtime调度器,runtime会去调度别的goroutine。
 
2 既然阻塞,就须要有唤醒的机制
   唤醒机制是经过semtable结构
   sema.go并不是专门为mutex锁中的设计的,在mutex中使用的话,是在其它goroutine释放Mutex时,调用的semrelease1,从队列中唤醒goroutine执行。详细没看。
   不过根据分析,Mutex是互斥锁,Mutex中的信号量应该是二值信号量,只有0和1。在Mutex中调用Lock,假如执行到semacquire1,从中判断信号量若是为0,就让当前goroutine睡眠,
func cansemacquire(addr *uint32) bool {
    for {
        v := atomic.Load(addr)
        if v == 0 {
            return false
        }
        if atomic.Cas(addr, v, v-1) {
            return true
        }
    }
}
      若是不断有goroutine尝试获取Mutex锁,都会判断到信号量为0,会不断有goroutine陷入睡眠状态。只有当unlock时,信号量才会+1,固然不能重复执行unlock,因此这个信号量应该只为0和1。
 
大概分析了下sema,转回到Mutex中来。
上面说了sema字段的做用,state字段在Mutex中是更为核心的字段,标识了当前锁的一个状态。
state     |31|30|....|      2    |     1      |      0     |
                  |                |           |      第0位表示当前被加锁,0,unlock,   1 locked
                  |                |        是否有goroutine已被唤醒,0 唤醒, 1 没有
                  |           这一位表示当前Mutex处于什么模式,两种模式,0 Normal   1 Starving
             第三位表示尝试Lock这个锁而等待的goroutine的个数
 
先解释下Mutex的normal和starving两种模式,代码中关于Mutex的注释以下
两种模式是为了锁的公平性而实现,摘取网上的一段翻译: http://blog.51cto.com/qiangmzsx/2134786
互斥量可分为两种操做模式:正常和饥饿。
在正常模式下,等待的goroutines按照FIFO(先进先出)顺序排队,可是goroutine被唤醒以后并不能当即获得mutex锁,它须要与新到达的goroutine争夺mutex锁。
由于新到达的goroutine已经在CPU上运行了,因此被唤醒的goroutine很大几率是争夺mutex锁是失败的。出现这样的状况时候,被唤醒的goroutine须要排队在队列的前面。
若是被唤醒的goroutine有超过1ms没有获取到mutex锁,那么它就会变为饥饿模式。
在饥饿模式中,mutex锁直接从解锁的goroutine交给队列前面的goroutine。新达到的goroutine也不会去争夺mutex锁(即便没有锁,也不能去自旋),而是到等待队列尾部排队。
在饥饿模式下,有一个goroutine获取到mutex锁了,若是它知足下条件中的任意一个,mutex将会切换回去正常模式:
1. 是等待队列中的最后一个goroutine
2. 它的等待时间不超过1ms。
正常模式有更好的性能,由于goroutine能够连续屡次得到mutex锁;
饥饿模式对于预防队列尾部goroutine一致没法获取mutex锁的问题。
 
具体实现以下:
在Lock函数中
    // Fast path: grab unlocked mutex.
    // 1  使用原子操做修改锁状态为locked
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }   
        return
    }   
Mutex多个goroutine在任什么时候机都会尝试去获取,Mutex的state又实时在变化,各类场景有点多,这里挑典型的来讲。
1) 假设当前mutex处于初始状态,即m.state=0,那么当前goroutine会在这里会直接获取到锁,m.state变为locked,
则m.state = 00...001     上锁了,Not Woken, normal状态。 
      运气好,一来就获取到,就跟上面说的同样,来时就在cpu里,又遇上锁没人占,天生自带光环,呵呵。
      Lock结束return
 
      若是这个goroutine不释放锁,那么而后再来一个goroutine就锁不上了,进入第二步
 
2) 紧接着一个for循环,大概就是尝试获取锁,求而不得,就睡一会吧,等着被叫醒,醒了看看是否是等的时间太长饿了,饿了就进入starving,starving就会被优先调度了,没有那运气,就只能等了。
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state    //刚才已经设置m.state=001,old也为001
    for {
        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
        // old=001,锁着呢
        // 而后runtime_canSpin看看能不能自旋啊,就是看传进来的iter,每次循环都是自增
        // 自旋条件:多核,GOMAXPROCS>1,至少有另一个运行的P而且本地队列不空。或许是惧怕单核自旋,程序都停了。另外最多自旋4次,iter为4时不会再进if
                             咱们这里考虑多核的状况,会进if
        // old在每次if中会从新获取,这里自旋的目的就是等待锁释放,当前占用cpu的goroutine就能够占了,go里面老是尽可能让在cpu中的goroutine占用锁
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // Active spinning makes sense.
            // Try to set mutexWoken flag to inform Unlock
            // to not wake other blocked goroutines.
            // 当前awoke为false,可是没有goroutine在等待,那么unlock时,不必唤醒队列goroutine。
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }   
            runtime_doSpin()     //自旋,执行没用的指令30次
            iter++
            old = m.state           //old从新获取一次state值,若是有其它goroutine释放了,那么下次循环就不进if了
            continue                   //自旋完再循环一次
        }   
        //if出来后,会有两种状况
        2.1)其它goroutine  unlock了,上面if判断非Locked跳出,此时 m.state=000, old=000, awoke=false, 没有goroutine在等待,这是最简单的状况了
        new := old                  //new=000,   old=000,  m.state=000,  awoke=false,这里初始化new,后面要设置锁状态,m.state设置为new
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        if old&mutexStarving == 0 {      //new=000, 当前锁并非starving模式,正在运行的goroutine要占用这个锁,若是是starving模式,当前的goroutine要去排队,把锁让给队列中快饿死的兄弟     
            new |= mutexLocked              //new=001, 要上锁
        }   
        if old&(mutexLocked|mutexStarving) != 0 {       //old=000, 当前正在跑的这个goroutine要占锁,不会进队列, new=001
            new += 1 << mutexWaiterShift
        }   
        // The current goroutine switches mutex to starvation mode.
        // But if the mutex is currently unlocked, don't do the switch.
        // Unlock expects that starving mutex has waiters, which will not
        // be true in this case.
        if starving && old&mutexLocked != 0 {             //starving=false,只有goroutine在unlock唤醒后,发现等待时间过长,starving才设置为true,由于队列中其它的goroutine都等的有点长了,因此在锁可用时,优先给队列中的goroutine。这个逻辑在后面,当前不进这个if,new=001
            new |= mutexStarving
        }   
        if awoke {                       //awoke为false,不去唤醒等待队列, new仍为001
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }   
            new &^= mutexWoken
        } 
           至此new初始化完毕,new=001,要去更改Mutex的锁状态,真正独占锁了
          //保险起见,以防在new设置过程当中,有其它goroutine更改了锁状态,原子性的设置当前锁状态为new=001,这里就是上锁
          if atomic.CompareAndSwapInt32(&m.state, old, new) {        
            if old&(mutexLocked|mutexStarving) == 0 {                           //old=000,直接break,由于上面是将m.state置为上锁,已经成功了,至此后面逻辑不走了
                break // locked the mutex with CAS                                    //回头看2.1,咱们若是是自旋次数够了跳出呢?如2.2逻辑
            }   
            // If we were already waiting before, queue at the front of the queue.
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }   
            runtime_SemacquireMutex(&m.sema, queueLifo)
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }   
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
 
       2.2)new := old,    此时new=001, old=001, m.state=001, awoke=false (awoke在if中设置为true的状况就不讨论了,太多了。。。。)
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        if old&mutexStarving == 0 {
            new |= mutexLocked                    //new=001
        }
        if old&(mutexLocked|mutexStarving) != 0 {    //old=001, 当前跑的这个goroutine要进队列,new的第3位到第31位表示队列中goroutine数量,这里+1
            new += 1 << mutexWaiterShift                  //new=1001
        }
        // The current goroutine switches mutex to starvation mode.
        // But if the mutex is currently unlocked, don't do the switch.
        // Unlock expects that starving mutex has waiters, which will not
        // be true in this case.
        if starving && old&mutexLocked != 0 {        //starving=false,并不须要进入starving模式
            new |= mutexStarving
        }
        if awoke {                                                      //awoke=false
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
              new初始化为1001, old=001
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {                 //old=001,这里不会break,由于当前的goroutine拿不到锁须要阻塞睡眠
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            queueLifo := waitStartTime != 0                                      //判断当前goroutine是否是for循环第一次走到这里,是的话,waitStartTime=0
            if waitStartTime == 0 {                                                    //queueLifo的true仍是false决定了goroutine入队列时,是排队仍是插到队头
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo)          //当前goroutine入等待队列, 跳到 “注脚1”,更多说明。此时goroutine会阻塞在这,锁释放,若是在队头,才会被唤醒。
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs    //唤醒时判断是否等待时间过长,超过了1ms,就设置starving为true,“注脚2”更多说明
            old = m.state
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
 
注脚1    这的runtime_SemacquireMutex是对上面说的sema.go中semacquire1的简单封装,里面最后会调用goPark让当前goroutine让出执行权限给runtime,同时设置当前goroutine为睡眠状态,不参与调度(表如今程序上,就是阻在那了)。
 
注脚2    1) 这也分两种状况,若是没有超1ms,starving=false
                     old = m.state              //当前确定是unlock了,当前goroutine才被唤醒了,因此old至少为000,咱们假定为000
                     if old&mutexStarving != 0    //old不是starving模式,不进if
       
                   awoke = true    //充置awoke和iter,从新走循环
                    iter = 0
                     ///////////////////////////
                     下次循环中,最后会设置new=001,当前goroutine被唤醒,加锁1,不是starving状态。
                     最后会在下面这break,跳出Lock函数
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
 
 
            2)若是超了1ms,straving = true
                 old = m.state              //当前确定是unlock了,当前goroutine才被唤醒了,因此old至少为000,咱们假定为000
                   if old&mutexStarving != 0    //old不是starving模式,不进if
 
                   awoke = true    //充置awoke和iter,从新走循环
                   iter = 0
                   ///////////////////////////
                 下次循环 new=101, 锁处于starving模式,当前goroutine被唤醒,已加锁
 
    二  若是处于starving会有什么影响?主要提如今Unlock函数中
    // Fast path: drop lock bit.
    //先清掉lock位,假设最简单的状况,其它位都为0,则m.state=000, new=000
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
 
    //这里就是starving模式的影响,若是处于starving模式,那么直接走else,从队列头部唤醒一个goroutine。
    if new&mutexStarving == 0 {
        old := new                   //old = 000
        for {
            // If there are no waiters or a goroutine has already
            // been woken or grabbed the lock, no need to wake anyone.
            // In starvation mode ownership is directly handed off from unlocking
            // goroutine to the next waiter. We are not part of this chain,
            // since we did not observe mutexStarving when we unlocked the mutex above.
            // So get off the way.
            //若是队列中没有等待的goroutine或者有goroutine已经被唤醒而且抢占了锁(这种状况就如lock中,正好处在cpu中的goroutine在自旋,正好在unlock后,立刻抢占了锁),那么就不须要wake等待队列了。
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            
            //若是队列中有等着的,而且也没有处在cpu中的goroutine去自旋获取锁,那么就抓住机会从等待队列中唤醒一个goroutine。
            // Grab the right to wake someone.
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false)
                return
            }
            old = m.state
        }
    } else {
        // Starving mode: handoff mutex ownership to the next waiter.
        // Note: mutexLocked is not set, the waiter will set it after wakeup.
        // But mutex is still considered locked if mutexStarving is set,
        // so new coming goroutines won't acquire it.
 
        //starving模式,直接从队列头取goroutine唤醒。上面lock函数中没有分析runtime_SemacquireMutex(&m.sema, queueLifo)阻塞被唤醒后,若是lock处因而starving模式,会怎么样,这里分析一下,注脚3
        runtime_Semrelease(&m.sema, true)
    }
 
注脚3  首先在unlock函数开头即便清了lock位,cpu中的goroutine也不能获取到锁(由于判断m.state的starving位是饥饿模式,只能队列中等待的goroutine取获取锁,因此cpu中的goroutine会进入等待队列),那么在unlock函数中runtime_Semrelease(&m.sema, true)时,会唤醒队列中一个睡眠的goroutine。
回到lock函数中,此时m.state应为100
 
            runtime_SemacquireMutex(&m.sema, queueLifo)    //在这被唤醒
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state            //old = 100
            if old&mutexStarving != 0 {          //lock处于starving中
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)                //先将当前等待队列减一个
                if !starving || old>>mutexWaiterShift == 1 {                              //若是当前队列空了,就把starving清0了
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)      //加锁跳出
                break
            }
总结:这里只简单说了下互斥锁,另外还有读写锁,不作赘述。互斥锁是在原子操做atomic之上实现的,后面会再详细写下原子操做。
这里先说几个有意思的问题,答案不必定正确,但愿大佬指正。
1  一个全局int变量,多核中一个goroutine读,一个写,没有更多操做,需不须要作原子操做。
   应该是不须要加的,intel P6处理器在硬件层面上是支持32位变量的load和store的原子性的。另外编译器对于变量的读或写也不会编译成多条指令。
 
2   一个全局int变量i, 对于多核,两个协程都同时执行i++,须要原子操做吗?
    须要的,对于i++,是典型的读改写操做,对于这样的操做,须要CAS原子操做保证原子性。
 
3  对于一个map,写加原子操做,读要不要加
    若是只是读或者写,而且值类型是整形的,应该是不须要atomic原子操做的,这里的意思是对于整形,不会出现写一半,或者读一半的状况,可是不可避免的,会出现这种状况,goroutine1对map写入1,goroutine2读到1,在处理的过程当中,goroutine1又从新赋值。
相关文章
相关标签/搜索