golang 的metux 的实现有几个点作法是很是有意思的,一个是底层数据结构上,用了平时不多用的位运算,第二个,用到了自旋,并作了自旋策略控制,最后是用了信号量控制协程。golang
首先是golang mutex 中用了不少位运算。位运算不作细介绍,对内存利用比较高的算法都有涉及,好比redies 的压缩列表,好比golang 的Protobuffer。算法
有几个关键点,iota 在定义的时候,用的多,作自增运算: 数据结构
const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexWaiterShift = iota ) // 这里 第一个变量为1 ,第二个变量为10, 第三个为10
而后,位运算的求或和求与用的不少,一个是与1 求或将某位置1,一个是与0 求与将某位置0,这些都是用于改变某些标志位的方式,不要看懵逼了:函数
new := old | mutexLocked // 将old 的最后一位置1,表示new 锁必定是被持有状态 if old&mutexLocked != 0 { // 将最后一位保留后,其余位所有置0, 判断最后一位的状态是否是0,判断是否是被持有 if runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ continue } new = old + 1<<mutexWaiterShift }
第二个是golang 的加锁会自旋,4次自旋没拿到锁后再将协程休眠,这样能够减小切换成本。这里关判断条件是自旋次数,cpu核数,p 的数量:ui
func sync_runtime_canSpin(i int) bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }
最后是利用信号量挂起和唤醒协程,核心函数是atom
runtime_SemacquireMutex(&m.sema) runtime_Semrelease(&m.sema)
获取信号时,当s > 0 ,将s--,若是s 为负数,会将当前g 放入阻塞队列,挂起直到s>0。code
func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return // cas 获取到锁,直接返回 } awoke := false //循环标记 iter := 0 //循环计数器 for { old := m.state //保存当前锁状态 new := old | mutexLocked //将状态位最后一位指定1 if old&mutexLocked != 0 { //锁被占用 if runtime_canSpin(iter) { //检查是否能够进入自旋锁,4次 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { //awoke标记为true awoke = true } runtime_doSpin()//进入自旋 iter++ continue } new = old + 1<<mutexWaiterShift //锁被占用,且自旋次数超过4次,挂起协程数+1,下面步骤将g 挂起并等待 } if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken //清除标志 } if atomic.CompareAndSwapInt32(&m.state, old, new) { //更新协程计数 if old&mutexLocked == 0 { break } // 锁请求失败,进入休眠状态,等待信号唤醒后从新开始循环,一直阻塞在这里 runtime_SemacquireMutex(&m.sema) awoke = true iter = 0 } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
解锁的过程就和加锁反过来便可:orm
func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } new := atomic.AddInt32(&m.state, -mutexLocked)// 移除加锁位 if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } old := new for { //当休眠队列内的等待计数为0或者自旋状态计数器为0,退出 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { return } // 等待协程数-1,更改清除标记位 new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema)// 释放锁,发送释放信号,对应以前的acquire return } old = m.state } }