Go语言学习 - Understanding Lock

Introduction

咱们从零开始想象mutex是怎么上锁的, 假设咱们规定一种游戏规则: "你必须把这个数字从0变成1". 改为的人算赢, 没改为的人就等着. 等刚刚赢的人再把数字改回0, 这样你就有机会再抢一把了. 这就是mutex上锁的基本原理. 再进一步的, 有以下两个细节:html

  • 如今有两个线程并行, 他们出手的时候都看到这个是0, 过会儿他们都把这个数字改为1, 这个锁被上了两回, 并且他们都认为本身是对的: "我看到的时候它的确是0呀?我错在哪儿了?"
  • 第一我的用完锁了, 其余人如何得知这个锁如今已经能够继续抢了?

并行哄抢的问题

回到问题1, 问题出现的关键就是这个操做并非原子的, 也就是说是两我的同时抢同时改, 若是咱们能让这两我的排一下队, 他抢完了你再抢问题是否是就不会有这种问题了? 说的没错, 但感受都是废话, 这种并行的东西如何保证让他排队按顺序来呢?java

说到底核心就是两个汇编指令: CMPXCHGLOCK, 设想咱们在单核心的状况下, 也就是说一次只有一个线程在运行, 这种状况下没有真正的并行, 只有多核心的状况下才会出现真正的并行去抢. 以上两个命令个中, 第一个抢到而且决定上锁的人执行LOCK, 先冷冻住其余核心, 而后CMPXHG负责作比对而后把修改过的数据存回那个内存里golang

以上, 就是实现锁最核心的一步, 如何让他们排队去修改一个内存, 对应Go里面处处都是的atomic.CompareAndSwap(*addr,old,new), 若是你想要修改*addr里的东西, 从old修改为new, 你必须排队. 这些东西看起来像不像C++里的volatile关键字? 由于两者都使用了相似的冷冻+修改的把戏shell

沉睡唤醒的问题

咱们虽然解决了排队抢锁的问题, 可是这个锁离真正的实用还有必定距离, 好比咱们就没解决如何唤醒的问题, 咱们都知道有一种东西叫semaphore信号标, 这种东西就能作到沉睡/唤醒G, 比较神奇的是在Go语言里面semaphore的本质上只是一个uint32函数

type semaRoot struct {
  treap  *sudog
  nwait  int32
}
func getSemaRoot(addr *uint32) *semaRoot {
  return &semtable[uintptr(unsafe.Pointer(addr))%semaTableSize]
}
复制代码

一个数字固然不可能存的下又是沉睡线程又是唤醒这么多功能的, 所以这玩意儿只是一个"index", 只是一个索引, 咱们用这个索引召集一个semaphore结构体: 咱们有一个全局变量用于存全部的semaphore, 而后用这个数字的地址做为下标, 取出对应真正的结构体, 完成seamphore全部功能的正是这个结构体.源码分析

func semacquire1(addr) {
    s := acquireSudog()
    root := semroot(addr)
    atomic.Xadd(&root.nwait, 1)
    for {
    		if cansemacquire(addr) {
			      atomic.Xadd(&root.nwait, -1)
			      break
		    }
        root.queue(addr, s)
        goparkunlock(&root.lock)
    }
    releaseSudog(s)
}
复制代码

这就是semaphore上锁的过程, 首先咱们将当前的G打包成sudog, 而后利用这个uint32获取对应的semaphore, 并将信号标的等待计数+1, 而后进入for沉睡/唤醒循环:post

  • 若是能够获取信号标, 退出循环, 并将信号标等待计数减1
  • 若是不能得到信号标, 将当前g加入信号标等待队列中
  • 经过gopark陷入沉睡, 等待唤醒
func semarelease1(addr) {
    root := semroot(addr)
  	if atomic.Load(&root.nwait) == 0 {
		    return
	  }
  	s, t0 := root.dequeue(addr)
	  if s != nil {
		    atomic.Xadd(&root.nwait, -1)
		    readyWithTime(s)
		    goyield()
	  }
}
复制代码

到了解锁的时候, 咱们一样拿着这个uint32先获取semaphore, 若是等待着队列长度为0, 咱们不用唤醒任何人, 直接退出, 不然取出一个沉睡的G, 将等待队列长度减1, 经过goready唤醒这个G并放到P的run_next, 最后经过yield()完成一次调度, 直接切换到run_next去执行ui

sync.Mutex

如今咱们已经搞定一个锁的两个核心组件了, 一个是排队, 一个是沉睡+唤醒功能. 有了semaphore做为做为唤醒的基础, 咱们惟一须要理解的就只有三种状态是怎么转换的this

type Mutex struct {
  state int32    // 状态
  sema  uint32   // 用于计算休眠G数量的信号量
}   
复制代码

能够看到锁的状态有S/W/L三项, 若是有两个G在争抢使用, 假设这两个G分别是X与Y, 他们的状态迁移是这样的:atom

  • 若是X已经锁定, 则状态为--L(已上锁,无人争夺), 这个时候Y再尝试锁定, 进入沉睡,状态为S-L(已上锁,有人沉睡)
  • X此时解除锁定, 状态从S-L变成S--(未上锁,有人沉睡), 进一步的, 发现有人沉睡, 则开始唤醒步骤, 状态变动成-W-(未上锁,有人沉睡且开始唤醒)
    • Y被唤醒且成功抢到锁, 状态变成--L(已上锁无人争夺)
    • Y还没被唤醒, X就又再次抢锁, X抢成了, 状态变成-WL(唤醒阶段,且被上锁), Y被唤醒后发现锁不可用继续沉睡, 状态变成S-L

sync.RWMutex

type RWMutex struct {
	w           Mutex  
	writerSem   uint32 
	readerSem   uint32 
	readerCount int32  
	readerWait  int32 
}
复制代码

RWMutex是读/写互斥锁。锁能够由任意数量的读者或单个写者持有

  • mutex的目的是防止两个写G同时写入
  • readerSem的场景: 一个写G任务还没结束, 读G已经想开始读了
  • writerSem的场景: 一些读G任务还没结束, 写G已经想开始写了
  • readerCount字段: 执行中+堵塞中的读G
  • readerWait字段: 只是执行中的读G
const rwmutexMaxReaders = 1 << 30

func (rw *RWMutex) RLock() {
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		runtime_Semacquire(&rw.readerSem)
	}
}
复制代码

一个新的读G来拉! 不管能不能进行下去, 先给count加一:

  1. 若是readerCount小于零说明写G正在写, 这个时候读G是不能读的, 经过拿下读锁进入堵塞
func (rw *RWMutex) Lock() {
    rw.w.Lock()
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_Semacquire(&rw.writerSem)
    }
}
复制代码

首先咱们锁上Mutex,防止其余写G跟我抢. 而后咱们将count减小1<<30:

  1. 减小这么大的数字会致使count必定小于零,看看上面的RLock函数, 若是count小于零会致使锁上读锁,从而致使新来的读G堵塞住.
  2. 检查readerWait, 这个数字表明正在读的(不包含等的)G的数量, 若是这个数字不为零, 表明有读G正在工做, 经过拿下写锁进入堵塞
func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        if atomic.AddInt32(&rw.readerWait, -1) == 0 {
            runtime_Semrelease(&rw.writerSem, false)
        }
    }
}
复制代码
  1. 一个读G结束了, 将readerCount减1, 若是小于零的话说明写G正在工做, 可能也正在堵塞, 咱们想知道这个写G究竟是在工做仍是在堵塞因而咱们将readerWait也减1
  2. 若是获得的不是零, 说明写G真的是在堵塞, 并且还有别的读G还没完工, 写G在等大家读G都结束了才能开始工做
  3. 若是获得的是零, 说明写G在堵塞, 并且最后一个读G也已经结束了, 这时候释放写锁, 刚刚卡在上锁环节的写G此时被唤醒开始工做
func (rw *RWMutex) Unlock() {
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false)
    }
    rw.w.Unlock()
}
复制代码

写G结束了, 这个时候咱们能够容许读G开始工做了, 刚刚由于写G工做, 一些读G堵塞着在:

  1. 将readerCount加回来, 这个时候获得的数字表明正在堵塞中的读G的数量. 经过sema_release放那些读G开始工做
  2. 咱们须要将Mutex释放, 容许其余写G开始工做

ABA问题

func pop(top) {
  for {
      this := top
      next := top.next
      if CompareAndSwap(top,this,next) {
          break
    }
  }
}
复制代码

ABA是CompareAndSwap中潜在的隐患, 咱们以上面这张图为例, pop()函数删除链表最顶端的节点, 同时指针移到下一个位置上去

  1. (图一) 开始删了, 预期是能删走节点A, 同时top指针会指在节点X上
  2. (图二) 就在执行CAS的瞬间, 调度了, 另外一个G进来, 先是删了节点A, 而后将节点BC推动来了, 并且节点C的地址恰巧就是以前节点A的地址
  3. 结束调度, 回到以前的G, 你认为这个时候会发生什么?

根据CAS的原理, 先检查top是否是this: top指向节点C, 地址0x0014, 对上了, 因而top指针变成了0x0018, 直接指向了节点X, 中间的节点B被无视了.

这里面的问题出在哪儿呢? CAS只管表层, 也就是地址的值不变就好了, 至于里面存了什么, 存的东西有没有变是无论的. 这就好像你拎了一箱子钱, 人家把你箱子里钱都偷没了, 你却只知道箱子仍是那个箱子, 殊不知道钱已经没了, 一个道理. 这就是问题出现的根本缘由

ABA问题的解决

doubleCAS被用于解决这种比较极端的例子, 仍是刚刚那个例子, 咱们要去CAS一个地址, 假设一个地址长32位, 那咱们就搞64位整数来存这个地址.

每次CAS的时候比较的不仅是地址, 而是地址+计数器这个64位整数, 只有两个都对上了才算经过, 结合上面的ABA问题, 若是这个时候A节点被删了, 同时BC被推动来, 0x0014对应的count确定是不同的, 等到调度回来CAS没法经过

Reference

相关文章
相关标签/搜索