咱们从零开始想象mutex是怎么上锁的, 假设咱们规定一种游戏规则: "你必须把这个数字从0变成1". 改为的人算赢, 没改为的人就等着. 等刚刚赢的人再把数字改回0, 这样你就有机会再抢一把了. 这就是mutex上锁的基本原理. 再进一步的, 有以下两个细节:html
回到问题1, 问题出现的关键就是这个操做并非原子的, 也就是说是两我的同时抢同时改, 若是咱们能让这两我的排一下队, 他抢完了你再抢问题是否是就不会有这种问题了? 说的没错, 但感受都是废话, 这种并行的东西如何保证让他排队按顺序来呢?java
说到底核心就是两个汇编指令: CMPXCHG
与 LOCK
, 设想咱们在单核心的状况下, 也就是说一次只有一个线程在运行, 这种状况下没有真正的并行, 只有多核心的状况下才会出现真正的并行去抢. 以上两个命令个中, 第一个抢到而且决定上锁的人执行LOCK
, 先冷冻住其余核心, 而后CMPXHG
负责作比对而后把修改过的数据存回那个内存里golang
以上, 就是实现锁最核心的一步, 如何让他们排队去修改一个内存, 对应Go里面处处都是的atomic.CompareAndSwap(*addr,old,new)
, 若是你想要修改*addr
里的东西, 从old修改为new, 你必须排队. 这些东西看起来像不像C++里的volatile关键字? 由于两者都使用了相似的冷冻+修改的把戏shell
咱们虽然解决了排队抢锁的问题, 可是这个锁离真正的实用还有必定距离, 好比咱们就没解决如何唤醒的问题, 咱们都知道有一种东西叫semaphore信号标, 这种东西就能作到沉睡/唤醒G, 比较神奇的是在Go语言里面semaphore的本质上只是一个uint32
函数
type semaRoot struct {
treap *sudog
nwait int32
}
func getSemaRoot(addr *uint32) *semaRoot {
return &semtable[uintptr(unsafe.Pointer(addr))%semaTableSize]
}
复制代码
一个数字固然不可能存的下又是沉睡线程又是唤醒这么多功能的, 所以这玩意儿只是一个"index", 只是一个索引, 咱们用这个索引召集一个semaphore结构体: 咱们有一个全局变量用于存全部的semaphore, 而后用这个数字的地址做为下标, 取出对应真正的结构体, 完成seamphore全部功能的正是这个结构体.源码分析
func semacquire1(addr) {
s := acquireSudog()
root := semroot(addr)
atomic.Xadd(&root.nwait, 1)
for {
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
break
}
root.queue(addr, s)
goparkunlock(&root.lock)
}
releaseSudog(s)
}
复制代码
这就是semaphore上锁的过程, 首先咱们将当前的G打包成sudog, 而后利用这个uint32
获取对应的semaphore, 并将信号标的等待计数+1, 而后进入for沉睡/唤醒循环:post
func semarelease1(addr) {
root := semroot(addr)
if atomic.Load(&root.nwait) == 0 {
return
}
s, t0 := root.dequeue(addr)
if s != nil {
atomic.Xadd(&root.nwait, -1)
readyWithTime(s)
goyield()
}
}
复制代码
到了解锁的时候, 咱们一样拿着这个uint32
先获取semaphore, 若是等待着队列长度为0, 咱们不用唤醒任何人, 直接退出, 不然取出一个沉睡的G, 将等待队列长度减1, 经过goready唤醒这个G并放到P的run_next
, 最后经过yield()
完成一次调度, 直接切换到run_next
去执行ui
如今咱们已经搞定一个锁的两个核心组件了, 一个是排队, 一个是沉睡+唤醒功能. 有了semaphore做为做为唤醒的基础, 咱们惟一须要理解的就只有三种状态是怎么转换的this
type Mutex struct {
state int32 // 状态
sema uint32 // 用于计算休眠G数量的信号量
}
复制代码
能够看到锁的状态有S/W/L三项, 若是有两个G在争抢使用, 假设这两个G分别是X与Y, 他们的状态迁移是这样的:atom
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
复制代码
RWMutex是读/写互斥锁。锁能够由任意数量的读者或单个写者持有
mutex
的目的是防止两个写G同时写入readerSem
的场景: 一个写G任务还没结束, 读G已经想开始读了writerSem
的场景: 一些读G任务还没结束, 写G已经想开始写了readerCount
字段: 执行中+堵塞中的读GreaderWait
字段: 只是执行中的读Gconst rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_Semacquire(&rw.readerSem)
}
}
复制代码
一个新的读G来拉! 不管能不能进行下去, 先给count加一:
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_Semacquire(&rw.writerSem)
}
}
复制代码
首先咱们锁上Mutex,防止其余写G跟我抢. 而后咱们将count减小1<<30:
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
runtime_Semrelease(&rw.writerSem, false)
}
}
}
复制代码
func (rw *RWMutex) Unlock() {
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
rw.w.Unlock()
}
复制代码
写G结束了, 这个时候咱们能够容许读G开始工做了, 刚刚由于写G工做, 一些读G堵塞着在:
func pop(top) {
for {
this := top
next := top.next
if CompareAndSwap(top,this,next) {
break
}
}
}
复制代码
ABA是CompareAndSwap
中潜在的隐患, 咱们以上面这张图为例, pop()
函数删除链表最顶端的节点, 同时指针移到下一个位置上去
根据CAS的原理, 先检查top是否是this: top指向节点C, 地址0x0014, 对上了, 因而top指针变成了0x0018, 直接指向了节点X, 中间的节点B被无视了.
这里面的问题出在哪儿呢? CAS只管表层, 也就是地址的值不变就好了, 至于里面存了什么, 存的东西有没有变是无论的. 这就好像你拎了一箱子钱, 人家把你箱子里钱都偷没了, 你却只知道箱子仍是那个箱子, 殊不知道钱已经没了, 一个道理. 这就是问题出现的根本缘由
doubleCAS被用于解决这种比较极端的例子, 仍是刚刚那个例子, 咱们要去CAS一个地址, 假设一个地址长32位, 那咱们就搞64位整数来存这个地址.
每次CAS的时候比较的不仅是地址, 而是地址+计数器这个64位整数, 只有两个都对上了才算经过, 结合上面的ABA问题, 若是这个时候A节点被删了, 同时BC被推动来, 0x0014对应的count确定是不同的, 等到调度回来CAS没法经过