这段时间在研究Java
并发相关的内容,一段时间下来算是小有收获了。ReentrantLock
是Java
并发中的重要部分,因此也是个人首要研究对象,在学习它的过程当中,我发现它是基于抽象队列同步器AQS实现的,因此我花了点时间学习了一下AQS
的实现原理。这篇博客就来讲一说AQS
的做用,以及它是如何实现的。html
AQS
全称抽象队列同步器(AbstractQuenedSynchronizer),它是一个能够用来实现线程同步的基础框架。固然,它不是咱们理解的Spring
这种框架,它是一个类,类名就是AbstractQuenedSynchronizer
,若是咱们想要实现一个可以完成线程同步的锁或者相似的同步组件,就能够在使用AQS
来实现,由于它封装了线程同步的方式,咱们在本身的类中使用它,就能够很方便的实现一个咱们本身的锁。java
AQS
封装了不少方法,如获取独占锁,释放独占锁,获取共享锁,释放共享锁......咱们能够经过在本身的实现的同步组件中调用AQS
的这些方法来实现一个线程同步的功能。可是,根据AQS
的名称也可以想到,咱们不能直接建立AQS
的对象,调用这些方法,由于AQS
是一个抽象类,咱们须要继承AQS
,建立它的子类对象来使用它。在实际使用中,通常是在咱们本身的类中,之内部类的方式继承AQS
,而后在内部建立一个对象,在这个类内部使用,好比ReentrantLock
中就是定义了一个抽象内部类Sync
,继承AQS
,而后定义了一个NonfairSync
类,继承Sync
,NonfairSync
是一个非公平锁;同时又定义了一个FairSync
类继承Sync
,FairSync
是一个公平锁。node
公平锁:多个线程按照申请锁的顺序去得到锁,后申请锁的线程须要排队,等它以前的线程得到锁并释放后,它才能得到锁;算法
非公平锁:线程得到锁的顺序于申请锁的顺序无关,申请锁的线程能够直接尝试得到锁,谁抢到就是谁的;编程
咱们继承了AQS
,就能够直接调用它的方法了吗?固然不是。Java
中提供的抽象组件,都是帮咱们写好了通用的部分,可是一些具体的部分,还须要咱们本身实现。举个比较简单的例子,Java
中对自定义类型数组的排序,能够直接调用工具类的sort
方法,sort
方法已经实现了排序的算法,可是其中的比较过程是抽象的,须要咱们本身实现,因此咱们通常须要提供一个比较器(Comparator),或者让自定义类实现Comparable
接口。这就是模板方法设计模式。设计模式
模板方法:在一个方法中实现了一个算法的流程,可是其中的一些步骤是抽象的,须要在子类中实现,或者具体使用时实现。模板方法能够提升算法的复用性,提供了算法的弹性,对于不一样的需求,能够通用同一份代码。数组
而AQS
的实现就是封装了一系列的模板方法,包括获取锁、释放锁等,这些都是模板方法。这些方法中调用的一些方法并无具体实现,须要使用者根据本身的需求,在子类中进行实现。下面咱们就来看看AQS
中的这些方法。安全
AQS底层维护一个int类型的变量state来表示当前的同步状态,根据当前state的值,来判断当前释放处于锁定状态,或者是其余状态。而state
的每个值具体是什么含义,是由咱们本身实现的。咱们继承AQS
时,根据本身的需求,实现一些方法,其中就是经过修改state
的值来维持同步状态。而关于state
,主要有如下三个方法:并发
state
的值;state
的值;CAS
设置当前同步状态的值,方法可以保证设置同步状态时的原子性;参数expect
为state
的预期旧值,而update
是须要修改的新值,若设置成功,方法返回true
,不然false
;CAS是一种乐观锁,若不了解,能够看看这篇博客:并发——详细介绍CAS机制框架
接下来咱们再看一看在继承AQS
时,咱们能够重写的方法:
以上这些方法将会在AQS
的模板方法中被调用,咱们根据本身的需求,重写上述方法,控制同步状态state
的值,便可控制线程同步的方式。下面再来看看AQS
提供的模板方法:
AQS
提供的模板方法主要分为三类:
AQS
的同步队列中正在等待的线程状况; 下面咱们就来具体说一说AQS
是如何实现线程同步的。
前面提过,AQS
经过一个int
类型的变量state
来记录当前的同步状态,也能够理解为锁的状态,根据state
的值的不一样,能够判断当前锁是否已经被获取。就拿独占锁来讲,若咱们要实现的是一个独占锁,则锁被获取后,其余线程将没法获取锁,须要进入阻塞状态,等待锁被释放。而线程获取锁就是经过修改state
的值来实现的,一个线程修改state
成功,则表示它成功得到了锁;若失败,则表示已经有其余线程得到了锁,则它须要进入阻塞状态。下面咱们就来聊一聊AQS
如何实现维持多个线程等待的。
首先说明结论:AQS经过一个同步队列来维护当前获取锁失败,进入阻塞状态的线程。这个同步队列是一个双向链表,获取锁失败的线程会被封装成一个链表节点,加入链表的尾部排队,而AQS
保存了链表的头节点的引用head
以及链表的尾节点引用tail
。这个同步队列以下所示:
在这个同步队列中,每一个节点对应一个线程,每一个节点都有一个next
指针指向它的下一个节点,以及一个prev
指针指向它的上一个节点。队列中的头节点head
就是当前已经获取了锁,正在执行的线程对应的节点;而以后的这些节点,则对应着获取锁失败,正在排队的线程。
当一个线程获取锁失败,它会被封装成一个Node
,加入同步队列的尾部排队,同时线程会进入阻塞状态。也就是说,在同步队列中,除了头节点对应的线程是运行状态,其他的线程都是等待睡眠状态。而当头节点对应的线程释放锁时,它会唤醒它的下一个节点(也就是上图中的第二个节点),被唤醒的节点对应的线程开始尝试获取锁,若获取成功,它就会将本身置为head
,而后将原来的head
移出队列。接下来咱们就经过源码,具体分析一下AQS
的实现过程。
(1)获取锁的实现
AQS
的锁功能齐全,它既能够用来实现独占锁,也能够用来实现共享锁。
独占锁:也叫排他锁,即锁只能由一个线程获取,若一个线程获取了锁,则其余想要获取锁的线程只能等待,直到锁被释放。好比说写锁,对于写操做,每次只能由一个线程进行,若多个线程同时进行写操做,将极可能出现线程安全问题;
共享锁:锁能够由多个线程同时获取,锁被获取一次,则锁的计数器+1。比较典型的就是读锁,读操做并不会产生反作用,因此能够容许多个线程同时对数据进行读操做,而不会有线程安全问题,固然,前提是这个过程当中没有线程在进行写操做;
咱们首先分析一下独占锁。在AQS
中,经过方法acquire
来获取独占锁,acquire
方法的代码以下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
上面的方法执行流程以下:
首先调用tryAcquire
尝试获取一次锁,若返回true
,表示获取成功,则acquire
方法将直接返回;若返回false
,则会继续向后执行acquireQueued
方法;
tryAcquire
返回false
后,将执行acquireQueued
,可是这个方法传入的参数调用了addWaiter
方法;
addWaiter
方法的做用是将当前线封装成同步队列的节点,而后加入到同步队列的尾部进行排队,并返回此节点;
addWaiter
方法执行完成后,将它的返回值做为参数,调用acquireQueued
方法。acquireQueued
方法的做用是让当前线程在同步队列中阻塞,而后在被其余线程唤醒时去获取锁;
若线程被唤醒并成功获取锁后,将从acquireQueued
方法中退出,同时返回一个boolean
值表示当前线程是否被中断,若被中断,则会执行下面的selfInterrupt
方法,响应中断;
下面咱们就来具体分析这个方法中调用的几个方法的执行流程。首先第一个tryAcquire
方法:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
能够看到,这个方法的实现仅仅只是抛出了一个异常。咱们以前提过,AQS
是基于模板方法设计模式实现的,在其中定义了许多模板方法,在模板方法中会调用一些没有实现的方法,这些方法须要使用者根据本身的需求实现。而acquire
方法就是一个模板方法,其中调用的tryAcquire
方法就是须要咱们本身实现的方法。tryAcquire
的做用就是尝试修改state
值,也就是获取锁,若修改为功,则返回true
,不然返回false
。它的实现须要根据AQS
的子类具体分析,好比ReentrantLock
中的Sync
,这里我就不详细叙述了,后面写一篇专门讲ReentrantLock
的博客。下面来看看addWaiter
的源码:
// 将线程封装成一个节点,放入同步队列的尾部 private Node addWaiter(Node mode) { // 当前线程封装成同步队列的一个节点Node Node node = new Node(Thread.currentThread(), mode); // 这个节点须要插入到原尾节点的后面,因此咱们在这里先记下原来的尾节点 Node pred = tail; // 判断尾节点是否为空,若为空表示队列中尚未节点,则不执行如下步骤 if (pred != null) { // 记录新节点的前一个节点为原尾节点 node.prev = pred; // 将新节点设置为新尾节点,使用CAS操做保证了原子性 if (compareAndSetTail(pred, node)) { // 若设置成功,则让原来的尾节点的next指向新尾节点 pred.next = node; return node; } } // 若以上操做失败,则调用enq方法继续尝试(enq方法见下面) enq(node); return node; } private Node enq(final Node node) { // 使用死循环不断尝试 for (;;) { // 记录原尾节点 Node t = tail; // 若原尾节点为空,则必须先初始化同步队列,初始化以后,下一次循环会将新节点加入队列 if (t == null) { // 使用CAS设置建立一个默认的节点做为首届点 if (compareAndSetHead(new Node())) // 首尾指向同一个节点 tail = head; } else { // 如下操做与addWaiter方法中的if语句块内一致 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
以上就是addWaiter
方法的实现过程,我在代码中使用注释对每一步进行了详细的解析,它的执行过程大体能够总结为:将新线程封装成一个节点,加入到同步队列的尾部,若同步队列为空,则先在其中加入一个默认的节点,再进行加入;若加入失败,则使用死循环(也叫自旋)不断尝试,直到成功为止。这个过程当中使用CAS
保证了添加节点的原子性。下面看看acquireQueued
方法的源码:
/** * 让线程不间断地获取锁,若线程对应的节点不是头节点的下一个节点,则会进入等待状态 * @param node the node */ final boolean acquireQueued(final Node node, int arg) { // 记录失败标志 boolean failed = true; try { // 记录中断标志,初始为true boolean interrupted = false; // 循环执行,由于线程在被唤醒后,可能再次获取锁失败,须要重写进入等待 for (;;) { // 获取当前线程节点的前一个节点 final Node p = node.predecessor(); // 若前一个节点是头节点,则tryAcquire尝试获取锁,若获取成功,则执行if中的代码 if (p == head && tryAcquire(arg)) { // 将当前节点设置为头节点 setHead(node); // 将原来的头节点移出同步队列 p.next = null; // help GC // 失败标志置为false failed = false; // 返回中断标志,acquire方法能够根据返回的中断标志,判断当前线程是否被中断 return interrupted; } // shouldParkAfterFailedAcquire方法判断当前线程是否可以进入等待状态, // 若当前线程的节点不是头节点的下一个节点,则须要进入等待状态, // 在此方法内部,当前线程会找到它的前驱节点中,第一个还在正常等待或执行的节点, // 让其做为本身的直接前驱,而后在须要时将本身唤醒(由于其中有些线程可能被中断), // 若找到,则返回true,表示本身能够进入等待状态了; // 则继续调用parkAndCheckInterrupt方法,当前线程在这个方法中等待, // 直到被其余线程唤醒,或者被中断后返回,返回时将返回一个boolean值, // 表示这个线程是否被中断,若为true,则将执行下面一行代码,将中断标志置为true if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 上面代码中只有一个return语句,且return的前一句就是failed = false; // 因此只有当异常发生时,failed才会保持true的状态运行到此处; // 异常多是线程被中断,也多是其余方法中的异常, // 好比咱们本身实现的tryAcquire方法 // 此时将取消线程获取锁的动做,将它从同步队列中移除 if (failed) cancelAcquire(node); } }
以上就是acquireQueued
方法的源码分析。这个方法的做用能够归纳为:让线程在同步队列中阻塞,直到它成为头节点的下一个节点,被头节点对应的线程唤醒,而后开始获取锁,若获取成功才会从方法中返回。这个方法会返回一个boolean
值,表示这个正在同步队列中的线程是否被中断。
到此,获取独占锁的实现就分析完毕了。须要注意的是,这些过程当中使用的compareAndSetXXX
这种形式的方法,都是基于CAS
机制实现的,保证了这些操做的原子性。
(2)释放锁的实现
分析完获取独占锁的代码后,咱们再来看看释放锁的实现。释放独占锁是经过release
方法实现的:
public final boolean release(int arg) { // 调用tryRelease尝试修改state释放锁,若成功,将返回true,不然false if (tryRelease(arg)) { // 若修改state成功,则表示释放锁成功,须要将当前线程移出同步队列 // 当前线程在同步队列中的节点就是head,因此此处记录head Node h = head; // 若head不是null,且waitStatus不为0,表示它是一个装有线程的正常节点, // 在以前提到的addWaiter方法中,若同步队列为空,则会建立一个默认的节点放入head // 这个默认的节点不包含线程,它的waitStatus就是0,因此不能释放锁 if (h != null && h.waitStatus != 0) // 若head是一个正常的节点,则调用unparkSuccessor唤醒它的下一个节点所对应的线程 unparkSuccessor(h); // 释放成功 return true; } // 释放锁失败 return false; }
以上就是同步队列中头节点对应的线程释放锁的过程。release
也是一个模板方法,其中经过调用tryRelease
尝试释放锁,而tryRelease
也须要使用者本身实现。在以前也说过,头节点释放锁时,须要唤醒它的下一个节点对应的线程,让这个线程再也不等待,去获取锁,而这个过程就是经过unparkSuccessor
方法实现的。
前面提到过,AQS
不只仅能够用来实现独占锁,还能够用来实现共享锁,下面咱们就来看看AQS
中,有关共享锁的模板方法的实现。首先是获取共享锁的实现,在AQS
中,定义了acquireShared
方法用来获取共享锁:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
能够看到,这个方法比较简短。首先调用tryAcquireShared
方法尝试获取一次共享锁,即修改state
的值,若返回值>=0
,则表示获取成功,线程不受影响,继续向下执行;若返回值小于0
,表示获取共享锁失败,则线程须要进入到同步队列中等待,调用doAcquireShared
方法。acquireShared
方法也是AQS
的一个模板方法,而其中的tryAcquireShared
方法就是须要使用者本身实现的方法。下面咱们来看看doAcquireShared
方法的实现:
/** * 不间断地获取共享锁,若线程对应的节点不是头节点的下一个节点,将进入等待状态 * 实现与acquireQueued很是相似 * @param arg the acquire argument */ private void doAcquireShared(int arg) { // 往同步队列的尾部添加一个默认节点,Node.SHARED是一个Node常量, // 它的值就是一个不带任何参数的Node对象,也就是new Node(); final Node node = addWaiter(Node.SHARED); // 失败标志,默认为true boolean failed = true; try { // 中断标志,用来判断线程在等待的过程当中释放被中断 boolean interrupted = false; // 死循环不断尝试获取共享锁 for (;;) { // 获取默认节点的前一个节点 final Node p = node.predecessor(); // 判断当前节点的前一个节点是否为head节点 if (p == head) { // 尝试获取共享锁 int r = tryAcquireShared(arg); // 若r>0,表示获取成功 if (r >= 0) { // 当前线程获取锁成功后,调用setHeadAndPropagate方法将当前线程设置为head // 同时,若共享锁还能被其余线程获取,则在这个方法中也会向后传递,唤醒后面的线程 setHeadAndPropagate(node, r); // 将原来的head的next置为null p.next = null; // help GC // 判断当前线程是否中断,若被中断,则调用selfInterrupt方法响应中断 if (interrupted) selfInterrupt(); // 失败标志置为false failed = false; return; } } // 如下代码和获取独占锁的acquireQueued方法相同,即让当前线程进入等待状态 // 具体解析能够看上面acquireQueued方法的解析 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
doAcquireShared
方法的实现和获取独占锁中的acquireQueued
方法很相似,可是主要有一点不一样,那就是线程在被唤醒后,若成功获取到了共享锁,还须要判断共享锁是否还能被其余线程获取,若能够,则继续向后唤醒它的下一个节点对应的线程。下面再看看释放共享锁的代码,释放共享锁时经过方法releaseShared
:
public final boolean releaseShared(int arg) { // 尝试修改state的值释放锁 if (tryReleaseShared(arg)) { // 若成功,则调用如下方法唤醒后继节点中的线程 doReleaseShared(); return true; } return false; }
releaseShared
也是一个模板方法,它经过调用使用者本身实现的tryReleaseShared
方法尝试释放锁,修改state
的值,若返回true
,表示修改为功,则继续向下调用doReleaseShared
唤醒head
的下一个节点对应的线程,让它开始尝试获取锁;若修改state
失败,则返回false
。
介绍完上面的内容,下面咱们就来基于AQS
实现一个本身的同步器,或者说锁。咱们须要实现的锁要求以下:
实现一个锁,它是一个共享锁,可是每次至多支持两个线程同时获取锁,若当前已经有两个线程获取了锁,则其余获取锁的线程须要等待。
实现代码以下:
/** * 抽象队列同步器(AQS)使用: * 实现一个同一时刻至多只支持两个线程同时执行的同步器 */ // 让当前类继承Lock接口 public class TwinLock implements Lock { // 定义锁容许的最大线程数 private final static int DEFAULT_SYNC_COUNT = 2; // 建立一个锁对象,用以进行线程同步,Sync继承自AQS private final Sync sync = new Sync(DEFAULT_SYNC_COUNT); // 之内部类的形式实现一个同步器类,也就是锁,这个锁继承自AQS private static final class Sync extends AbstractQueuedSynchronizer { // 构造方法中指定锁支持的线程数量 Sync(int count) { // 若count小于0,则默认为2 if (count <= 0) { count = DEFAULT_SYNC_COUNT; } // 设置初始同步状态 setState(count); } /** * 重写tryAcquireShared方法,这个方法用来修改同步状态state,也就是获取锁 */ @Override protected int tryAcquireShared(int arg) { // 循环尝试 for (; ; ) { // 获取当前的同步状态 int nowState = getState(); // 计算当前线程获取锁后,新的同步状态 // 注意这里使用了减法,由于此时的state表示的是还能支持多少个线程 // 而当前线程若是得到了锁,则state就要减少 int newState = nowState - arg; // 若是newState小于0,表示当前已经没有剩余的资源了 // 则当前线程不能获取锁,此时将直接返回小于0的newState; // 或者newState>0,就会执行compareAndSetState方法修改state的值, // 若修改为功将,将返回大于0的newState; // 若修改失败,则表示有其余线程也在尝试修改state,此时循环一次后,再次尝试 if (newState < 0 || compareAndSetState(nowState, newState)) { return newState; } } } /** * 尝试释放同步状态 */ @Override protected boolean tryReleaseShared(int arg) { for (; ; ) { // 获取当前同步状态 int nowState = getState(); // 计算释放后的新同步状态,这里使用加法, // 表示有线程释放锁后,当前锁能够支持的线程数量增长了 int newState = nowState + arg; // 使用CAS修改同步状态,若成功则返回true,不然自旋 if (compareAndSetState(nowState, newState)) { return true; } } } } /** * 获取锁的方法 */ @Override public void lock() { // 这里调用的是AQS的模板方法acquireShared, // 在acquireShared中将调用咱们重写的tryAcquireShared方法 // 传入参数为1表示当前线程,当前线程获取锁后,state将-1 sync.acquireShared(1); } /** * 解锁 */ @Override public void unlock() { // 这里调用的是AQS的模板方法releaseShared, // 在acquireShared中将调用咱们重写的tryReleaseShared方法 // 传入参数为1表示当前线程,当前线程释放锁后,state将+1 sync.releaseShared(1); } /*******************其余须要实现的方法省略***************************/ }
以上就实现了一个支持两个线程同时容许的共享锁,下面咱们经过一个测试代码来测试效果:
public static void main(String[] args) throws InterruptedException { // 建立一个咱们自定义的锁对象 Lock lock = new TwinLock(); // 启动10个线程去尝试获取锁 for (int i = 0; i < 10; i++) { Thread t = new Thread(()->{ // 循环执行 while (true) { // 获取锁 lock.lock(); try { // 休眠1秒 Thread.sleep(1000); // 输出线程名称 System.out.println(Thread.currentThread().getName()); // 再次休眠一秒 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放锁 lock.unlock(); } } }); // 将线程设置为守护线程,主线程结束后,收获线程自动结束 t.setDaemon(true); t.start(); } // 主线程每隔1秒输出一个分割行 for (int i = 0; i < 10; i++) { Thread.sleep(1000); System.out.println("********************************"); } }
以上测试代码运行后,在每两个分割行之间,最多不会输出超过两个线程的名称,线程名称的输出将会以两个一队出现。个人输出结果以下:
******************************** Thread-1 Thread-0 ******************************** ******************************** Thread-2 Thread-1 ******************************** ******************************** Thread-2 Thread-1 ******************************** ******************************** Thread-2 Thread-3 ******************************** ******************************** Thread-3 Thread-4 ********************************
在研究AQS
的过程当中,我一直有这个疑惑——AQS
如何让线程阻塞,直到最后才知道有一个叫LockSupport
的工具类。这个工具类定义了不少静态方法,当须要让一个阻塞,或者唤醒一个线程时,就能够调用这个类中的方法,它的底层实现是经过一个sun.misc.Unsafe
类的对象,unsafe
类的方法都是本地方法,由其余语言实现,这个类是给不支持地址操做的Java
,提供的一个操做内存地址的后门。
AQS
中经过如下两个方法来阻塞和唤醒线程:
前面讲解AQS
的代码中,用到了方法unparkSuccessor
,它的主要做用就是唤醒当前节点的下一个节点对应的线程,咱们能够看看它的部分实现:
private void unparkSuccessor(Node node) { // ...........省略其余代码............ // 如下代码即为唤醒当前节点的下一个节点对应的线程 Node s = node.next; if (s != null) LockSupport.unpark(s.thread); // 使用LockSupport }
其实AQS
还支持一些其余的方法,好比说在获取锁时设置超时时间等,这些方法的实现与上面介绍的几种大同小异,限于篇幅,这里就不进行叙述了。以上内容对AQS
的实现原理以及主要方法的实现作了一个比较细致的介绍,相信看完以后会对AQS
有一个比较深刻的理解,可是想要理解以上内容,须要具有并发的一些基础知识,好比说线程的状态,CAS
机制等。最后但愿这篇博客对须要的人有所帮助吧。