图解并发包中锁的通用实现

导读

这篇文章咱们来聊聊Java并发包中锁的实现。 由于这其中涉及到了一点数据结构和线程挂起、唤醒等处理流程,我将源码中的关键逻辑绘制成图片的格式,方便你们有一个更加直观的理解。 html

阅读完本篇文章,你将了解到: java

  1. 抽象同步器AQS的实现原理 node

  2. ReentrantLock实现原理 算法

  3. 非公平锁和公平锁实现的区别 数据库

  4. 基于这些内容,您也能够本身进一步探索可中断锁的实现原理 编程

  5. AQS的核心是state字段以及双端等待队列 segmentfault

  6. 如何优雅的中断一个线程 后端

一、包结构介绍

如下内容是基于JDK 1.8进行分析的。 bash

咱们查看下java.util.concurrent.locks包下面,发现主要包含以下类: 网络

image-20200301111726969
image-20200301111726969

咱们来构建他们的UML图:

image-20200301113556313
image-20200301113556313

如上图,抛开内部类,抽象类,接口,主要实现了三把锁: ReentrantLockStampedLockReentrantReadWriteLock最经常使用的就是ReentrantLock了,关于ReentrantLock的详细说明以及使用案例: ReentrantLock介绍与使用

咱们能够发现ReentrantLock和ReentrantReadWriteLock顶层都是AbstractQueueSynchronizer类。 咱们先来介绍下AbstractQueuedSynchronizer类。

二、AbstractQueuedSynchronizer

AbstractQueuedSynchronizer,简写为AQS,抽象队列同步器。 它是一个用于构建锁和同步器的框架,许多同步器均可以经过AQS很容易而且高效的构造出来,如下都是经过ASQ构造出来的: ReentrantLockSemaphoreCountDownLatchReentrantReadWriteLockSynchronousQueueFutureTask

接下来,咱们先来看看这个抽象同步队列的原理。

2.一、AQS原理

说到AQS,咱们必需要先知道它是干吗的,而后再去研究它。 那我直接先讲重点了: AQS是经过队列来辅助实现线程同步的。 线程并发争夺state资源,争夺失败的则进入等待队列(同步队列)并进入阻塞状态,在state资源被释放以后,从队列头唤醒被阻塞的线程节点,进行state资源的竞争。

这样势必会涉及很频繁的队列入队出队操做,以及线程的阻塞唤醒操做。 这些操做偏偏是最难编写,最容易出错的,为此AQS把这些操做作了封装,以模板的方式提供出来,咱们能够经过实现模板的相关方法,实现不同的锁或者同步器。

AQS使用了模板方法,把同步队列都封装起来了,同时提供了如下五个未实现的方法,用于子类的重写:

AQS数据结构

AQS同步器数据结构

image-20200307215156707
image-20200307215156707

如上图,AQS中:

  • state全部线程经过经过CAS尝试给state设值,当state>0时表示被线程占用; 同一个线程屡次获取state,会叠加state的值,从而实现了可重入;

  • exclusiveOwnerThread在独占模式下该属性会用到,当线程尝试以独占模式成功给state设值,该线程会把本身设置到exclusiveOwnerThread变量中,代表当前的state被当前线程独占了;

  • 等待队列(同步队列)等待队列中存放了全部争夺state失败的线程,是一个双向链表结构。 state被某一个线程占用以后,其余线程会进入等待队列; 一旦state被释放(state=0),则释放state的线程会唤醒等待队列中的线程继续尝试cas设值state;

  • head指向等待队列的头节点,延迟初始化,除了初始化以外,只能经过setHead方法进行修改;

  • tail指向等待队列的队尾,延迟初始化,只能经过enq方法修改tail,该方法主要是往队列后面添加等待节点。

等待队列中的节点结构是怎样子的呢? 下面咱们来看看。

AQS队列节点数据结构

image-20200307220200532
image-20200307220200532
  • pre指向队列中的上一个节点;

  • waitStatus节点的等待状态,初始化为0,表示正常同步等待:

  • CANCELLED1 节点因超时或者被中断而取消时设置为取消状态;

  • SIGNAL-1 指示当前节点被释放后,须要调用unpark通知后面节点,若是后面节点发生竞争致使获取锁失败,也会将当前节点设置为SIGNAL;

  • CONDITION-2 指示该线程正在进行条件等待,条件队列中会用到;

  • PROPAGATE-3 共享模式下释放节点时设置的状态,表示无限传播下去。

  • thread当前节点操做的线程;

  • nextWaiter该字段在Condition条件等待中会用到,指向条件队列的下一个节点。 或者连接到SHARED常量,表示节点正在以共享模式等待;

  • next指向队列中的下一个节点。

若是想要了解AQS的实现,您须要先知道如下这些内容,由于源码中会大量使用:

LockSupport.park(Object blocker)和LockSupport.unpark(Thread thread)

AQS中线程的阻塞和唤醒基本上都使用这两个方法实现的。 其底层都是依赖Unsafe实现的。

LockSupport是用来建立锁和其余同步类的基本线程阻塞的原语。

此类与使用它的每一个线程关联一个许可(permit: 0表示无许可,1 表示有许可),若是有许可,将马上返回对park()的调用,而且在此过程化消耗掉它。 不然,park()会致使线程进入阻塞; 调用 unpark() 可以使许可证可用,若是尚不可用。 不过与信号量不一样的是,许可证不会累加,最多只有一个。

该类中常见的两个方法两个方法:

  • park(Object blocker)实现线程的阻塞。 除非有许可,不然出于线程调度目的将阻塞线程; 若是有许可,则将许可消耗,而后线程往下继续执行;

  • unpark(Thread thread)实现解除线程的阻塞。 若是线程在park方法上被阻塞,则调用该方法将取消阻塞。 不然,许可变为1,保证下一次调用park方法不会阻塞。

这两个方法底层是调用了Unsafe中的park和unpark的native方法。

具体底层实现,能够参考这里[1]

cas

咱们知道,计算机中提供了cas相关指令,这是一种乐观的并发策略,须要硬件指令集的发展才能支持,实现了: 操做+冲突检测的原子性。

IA64 和 X86 使用cmpxchg指令完成CAS功能。

cas 内存位置 旧预期值 新值

CAS存在ABA问题,可使用版本号进行控制,保证其正确性。

JDK中的CAS,相关类: Unsafe里面的compareAndSwapInt()以及compareAndSwapLong()等几个方法包装提供。 只有启动类加载器加载的class才能访问他,或者经过反射获取。

详细说明: 一文带你完全理解同步和锁的本质(干货)#2.1.五、基于硬件指令

interrupt

相关阅读: 如何优雅的中断线程

为了分析AQS的实现原理,咱们先挑一个方法来分析。

2.二、AQS中的通常处理流程

为了弄清楚AQS中是如何进行队列同步的,咱们先从一个简单的独占加锁方法提及。

2.2.一、public final void acquire(int arg)

这个方法是使用独占模式获取锁,忽略中断。 经过至少调用一次tryAcquire成功返回来实现。 不然,线程将排队,并可能反复阻塞和解除阻塞,并调用tryAcquire直到成功。

咱们先看一下这个方法的入口代码:

1public final void acquire(int arg) {2  if (!tryAcquire(arg) &&  // 尝试获取锁,这里是一个在AQS中未实现的方法,具体由子类实现3      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  // 获取不到锁,则 1.添加到等待队列 2.不断循环等待重试4    selfInterrupt();5}复制代码

tryAcquire

一开始,会尝试调用AQS中未实现的方法tryAcquire()尝试获取锁,获取成功则表示获取锁了,该方法的实现通常经过CAS进行设置state尝试获取锁:

image-20200311103257880
image-20200311103257880

不一样的锁能够有不一样的tryAcquire()实现,因此,你能够看到ReentrantLock锁里面会有非公平锁和公平锁的实现方式。

ReentrantLock公平锁的实现代码在获取锁以前多了一个判断: !hasQueuedPredecessors(),这个是判断若是当前线程节点以前没有其余节点了,那么咱们才能够尝试获取锁,这就是公平锁的体现。

addWaiter

获取锁失败以后,则会进入这一步,这里会尝试把线程节点追加到等待队列后面,是经过CAS进行追加的,追加失败的状况下,会循环重试,直至追加成功为止。 若是追加的时候,发现head节点还不存在,则先初始化一个head节点,而后追加上去:

1private Node addWaiter(Node mode) { 2  // 将当期线程构形成Node节点 3  Node node = new Node(Thread.currentThread(), mode); 4  // Try the fast path of enq; backup to full enq on failure 5  Node pred = tail; 6  if (pred != null) { 7    // 将原来尾节点设置为新节点的上一个节点 8    node.prev = pred; 9    // 尝试用新节点取代原来的尾节点10    if (compareAndSetTail(pred, node)) {11      // 取代成功,则将原来尾指针的下一个节点指向新节点12      pred.next = node;13      return node;14    }15  }16  // 若是当前尾指针为空,则调用enq方法17  enq(node);18  return node;19}复制代码
image-20200311103646009
image-20200311103646009

acquireQueued

加入等待队列以后,会执行该方法,不断循环地判断当前线程节点是否在head后面一位,若是是则调用tryAcquire()获取锁,若是获取成功,则把线程节点做为Node head,并把原Node head的next设置为空,断开原来的Node head。 注意这个Node head只是占位做用,每次处理的都是Node head的下一个节点:

1final boolean acquireQueued(final Node node, int arg) { 2  boolean failed = true; 3  try { 4    boolean interrupted = false; 5    for (;;) { 6      // 获取该节点的上一个节点,判断是否头节点,若是是则尝试获取锁 7      final Node p = node.predecessor(); 8      if (p == head && tryAcquire(arg)) { 9        // 获取锁成功,把当前节点变为头节点10        setHead(node);11        p.next = null; // help GC12        failed = false;13        return interrupted;14      }15      // 判断是否须要阻塞线程,该方法中会把取消状态的节点移除掉,而且把当前节点的前一个节点设置为SIGNAL16      if (shouldParkAfterFailedAcquire(p, node) &&17          parkAndCheckInterrupt())18        interrupted = true;19    }20  } finally {21    if (failed)22      cancelAcquire(node);23  }24}复制代码

若是当前节点的pre不是head,或者争抢失败,则会将前面节点的状态设置为SIGNAL。

若是前面的节点状态大于0,表示节点被取消,这个时候会把该节点从队列中移除掉。

下图为尝试CAS争抢锁,但失败了,而后把head节点状态设置为SIGNAL:

image-20200311103510923
image-20200311103510923

而后再会循环一次尝试获取锁,若是获取失败了,就调用LockSupport.park(this)挂起线程。

那么时候才会触发唤起线程呢? 这个时候咱们得先看看释放锁是怎么作的了。

你们看AQS的源码的时候,能够发现这里的线程阻塞与唤醒基本上是用一个循环+LockSupport.park+LockSupport.unpark实现的,你们知道为何要这样作?

相关阅读: 如何优雅的挂起线程

2.2.二、public final boolean release(int arg)

入口代码以下:

1public final boolean release(int arg) {2  if (tryRelease(arg)) { // 尝试释放锁3    Node h = head; 4    if (h != null && h.waitStatus != 0)  // 若是头节点waitStatus不为0,则唤醒后续线程节点继续处理5      unparkSuccessor(h);6    return true;7  }8  return false;9}复制代码

tryRelease()具体由子类实现。 通常处理流程是让state减1。

若是释放锁成功,而且头节点waitStatus!=0,那么会调用unparkSuccessor()通知唤醒后续的线程节点进行处理。

注意: 在遍历队列查找唤醒下一个节点的过程当中,若是发现下一个节点状态是CANCELLED那么就会忽略这个节点,而后从队列尾部向前遍历,找到与头结点最近的没有被取消的节点进行唤醒操做。

image-20200311103813051
image-20200311103813051

唤醒以后,节点对应的线程2又从acquireQueued()方法的阻塞处醒来继续参与争抢锁。 而且争抢成功了,那么会把head节点的下一个节点设置为null,让本身所处的节点变为head节点:

image-20200311104443722
image-20200311104443722

这样一个AQS独占式、非中断的抢占锁的流程就结束了。

2.2.三、完整流程

最后咱们再以另外一个维度的流程来演示下这个过程。

首先有4个线程争抢锁,线程1,成功了,其余三个失败了,分别依次入等待队列:

image-20200311105159906
image-20200311105159906

线程二、线程3依次入队列:

image-20200311110811069
image-20200311110811069

如今忽然发生了点事情,假设线程3用的是带有超时时间的tryLock,超过了等待时间,线程3状态变为取消状态了,这个时候,线程4追加到等待队列中后,发现前一个节点的状态是1取消状态,那么会执行操做把线程3节点从队列中移除掉:

image-20200311110905140
image-20200311110905140

最后,线程1释放了锁,而后把head节点ws设置为0,而且找到了离head最靠近的一个waitStatus<=0的线程并唤醒,而后参与竞争获取锁:

image-20200311110947681
image-20200311110947681

最终,线程2获取到了锁,而后把本身变为了Head节点,并取代了原来的Head节点:

image-20200311111049725
image-20200311111049725

接着就一直这样循环,我就再也不画图了,聪明的你应该对此了如指掌了。

三、使用AQS实现的锁

好了,有了这个AQS,咱们就能够很快速的构造属于本身的锁了。

咱们来分别构造一个独占不可中断公平锁和非公平锁吧? 没必要了,其实ReentrantLock正是这样一个锁:

发现里面分别有一个公平锁和非公平锁的实现。 相信通过本文介绍,你能够很快看懂他们的源码,并知道实现原理了。

除此以外,ReentrantLock同时提供了如下几个经常使用的API:

  • lock(): 调用该方法会使锁计数器加1,若是共享资源最初是空闲的,则将锁定并授予线程;

  • unlock(): 调用该方法使锁计数器减1,当计数达到0的时候,将释放资源;

  • tryLock(): 若是资源没有被任何其余线程占用,那么该方法返回true,而且锁计数器加1。 若是资源不是空闲的,则该方法返回false。 这个时候线程不会阻塞,而是直接退出返回结果;

  • lockInterruptible(): 该方法使得资源空闲时容许该线程在获取资源时被其余线程中断。 也就是说: 若是当前线程正在等待锁,但其余线程请求该锁,则当前线程将被中断并当即返回,不会继续等待获取锁;

感兴趣能够看个人这篇文章进一步了解: https://www.itzhai.com/cpj/introduction-and-use-of-reentrantlock.html

3.一、组合大于继承原则

咱们能够看到,ReentrantLock是经过委托AQS的子类FairSyncNonfairSync来调用AQS的方法,而不是直接扩展AQS,这样作能够避免ASQ中的方法污染了锁的API,破坏锁接口的简洁性。 

结语

好了,咱们今天就讲到这里了,最后,我留下两个课堂做业给你们思考下:

一、ReentrantLock的公平锁是怎么实现的? 如何作到公平的?

二、ReentrantLock的非公平锁是怎么实现的? 为何说它是非公平的?

三、ReentrantLock的可中断锁是如何实现的? interrupt()函数执行原理是什么?

四、ReentrantLock的可超时的锁是如何实现的?

相信聪明的你很快能够找到答案。

本文为arthinking基于相关技术资料和官方文档撰写而成,确保内容的准确性,若是你发现了有何错漏之处,烦请高抬贵手帮忙指正,万分感激。

你们能够关注个人博客: itzhai.com 获取更多文章,我将持续更新后端相关技术,涉及JVM、Java基础、架构设计、网络编程、数据结构、数据库、算法、并发编程、分布式系统等相关内容。

若是您以为读完本文有所收获的话,能够关注个人帐号,或者点个赞吧,码字不易,您的支持就是我写做的最大动力,再次感谢!

关注个人公众号,及时获取最新的文章。



更多文章

JVM系列专题: 公众号发送 JVM


References

[1]: 浅谈Java并发编程系列(八)—— LockSupport原理剖析

[2]: Java Thread Primitive Deprecation


本文做者: arthinking
博客连接: https://www.itzhai.com/cpj/aqs-and-lock-implementation-in-concurrent-packages.html
AQS与并发包中锁的实现
版权声明: BY-NC-SA许可协议: 创做不易,如需转载,请务必附加上博客连接,谢谢!



·END·

 访问IT宅(itzhai.com)查看个人博客更多文章

扫码关注及时获取新内容↓↓↓


Java架构杂谈

Java后端技术架构 · 技术专题 · 经验分享

blog: itzhai.com


码字不易,若有收获,点个
「赞」哦~

相关文章
相关标签/搜索