在多线程编程中咱们会遇到不少须要使用线程同步机制去解决的并发问题,而这些同步机制就是多线程编程中影响正确性和运行效率的重中之重。这不由让我感到好奇,这些同步机制是如何实现的呢?好奇心是进步的源泉,就让咱们一块儿来揭开同步机制源码的神秘面纱吧。java
在本文中,咱们会从JDK中大多数同步机制的共同基础AbstractQueuedSynchronizer
类开始提及,而后经过源码了解咱们最经常使用的两个同步类可重入锁ReentrantLock
和闭锁CountDownLatch
的具体实现。经过这篇文章咱们将能够了解到ReentrantLock
和CountDownLatch
两个经常使用同步类的源代码实现,而且掌握阅读其余基于AQS实现的同步工具类源码的能力,甚至能够利用AQS写出本身的同步工具类。编程
阅读这篇文章须要了解基本的线程同步机制,有兴趣的读者能够参考一下这篇文章《多线程中那些看不到的陷阱》。安全
ReentrantLock
是咱们经常使用的一种可重入互斥锁,是synchronized关键字的一个很好的替代品。互斥指的就是同一时间只能有一个线程获取到这个锁,而可重入是指若是一个线程再次获取一个它已经持有的互斥锁,那么仍然会成功。数据结构
这个类的源码在JDK的java.util.concurrent
包下,咱们能够在IDE中点击类名跳转到具体的类定义,好比下面就是在个人电脑上跳转以后看到的ReentrantLock类的源代码。在这里咱们能够看到在ReentrantLock类中还包含了一个继承自AbstractQueuedSynchronizer类的内部类,并且有一个该内部类Sync
类型的字段sync
。实际上ReentrantLock类就是经过这个内部类对象来实现线程同步的。多线程
若是打开CountDownLatch
的源代码,咱们会发现这个类里也一样有一个继承自AbstractQueuedSynchronizer类的子类Sync,而且也有一个Sync
类型的字段sync
。在java.util.concurrent
包下的大多数同步工具类的底层都是经过在内部定义一个AbstractQueuedSynchronizer
类的子类来实现的,包括咱们在本文中没提到的许多其余经常使用类也是如此,好比:读写锁ReentrantReadWriteLock、信号量Semaphore等。并发
那么这个AbstractQueuedSynchronizer
类也就是咱们所说的AQS,究竟是何方神圣呢?这个类首先像咱们上面提到的,是大多数多线程同步工具类的基础。它内部包含了一个对同步器的等待队列,其中包含了全部在等待获取同步器的线程,在这个等待队列中的线程将会在同步器释放时被唤醒。好比一个线程在获取互斥锁失败时就会被放入到等待队列中等待被唤醒,这也就是AQS中的Q——“Queued”的由来。工具
而类名中的第一个单词Abstract
是由于AQS是一个抽象类,它的使用方法就是实现继承它的子类,而后使用这个子类类型的对象。在这个子类中咱们会经过重写下列的五个方法中的一部分或者所有来指定这个同步器的行为策略:ui
boolean tryAcquire(int arg)
,独占式获取同步器,独占式指同一时间只能有一个线程获取到同步器;boolean tryRelease(int arg)
,独占式释放同步器;boolean isHeldExclusively()
,同步器是否被当前线程独占式地持有;int tryAcquireShared(int arg)
,共享式获取同步器,共享式指的是同一时间可能有多个线程同时获取到同步器,可是可能会有数量的限制;boolean tryReleaseShared(int arg)
,共享式释放同步器。这五个方法之因此能指定同步器的行为,则是由于AQS中的其余方法就是经过对这五个方法的调用来实现的。好比在下面的acquire方法中就调用了tryAcquire来获取同步器,而且在被调用的acquireQueued
方法内部也是经过tryAcquire
方法来循环尝试获取同步器的。this
public final void acquire(int arg) { // 1. 调用tryAcquire方法尝试获取锁 // 2. 若是获取失败(tryAcquire返回false),则调用addWaiter方法将当前线程保存到等待队列中 // 3. 以后调用acquireQueued方法来循环执行“获取同步器 -> 获取失败休眠 -> 被唤醒从新获取”过程 // 直到成功获取到同步器返回false;或者被中断返回true if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 若是acquireQueued方法返回true说明线程被中断了 // 因此调用selfInterrupt方法中断当前线程 selfInterrupt(); }
下面,咱们就来看看在ReentrantLock
和CountDownLatch
两个类中定义的AQS子类究竟是如何重写这五个方法的。spa
CountDownLatch
是一种典型的闭锁,好比我须要使用四个线程完成四种不一样的计算,而后把四个线程的计算结果相加后返回,这种状况下主线程就须要等待四个完成不一样任务的工做线程完成以后才能继续执行。那么咱们就能够建立一个初始的count值为4的CountDownLatch
,而后在每一个工做线程完成任务时都对这个CountDownLatch
执行一个countDown
操做,这样CountDownLatch中的count值就会减1。当count值减到0时,主线程就会从阻塞中恢复,而后将四个任务的结果相加后返回。
下面是CountDownLath
的几个经常使用方法:
void await()
,等待操做,若是count值目前已是0了,那么就直接返回;不然就进入阻塞状态,等待count值变为0;void countDown()
,减小计数操做,会让count减1。调用屡次countDown()
方法让count值变为0以后,被await()
方法阻塞的线程就能够继续执行了。了解了CountDownLatch
的基本用法以后咱们就来看看这个闭锁究竟是怎么实现的,首先,咱们来看一下CountDownLatch
中AQS的子类,内部类Sync
的定义。
下面的代码是CountDownLatch
中AQS的子类Sync
的定义,Sync
是CountDownLatch
类中的一个内部类。在这个类中重写了AQS的tryAcquireShared
和tryReleaseShared
两个方法,这两个都是共享模式须要重写的方法,由于CountDownLatch
在count值为0时能够被任意多个线程同时获取成功,因此应该实现共享模式的方法。
在CountDownLatch
的Sync
中使用了AQS的state值用来存放count值,在初始化时会把state值初始化为n。而后在调用tryReleaseShared
时会将count值减1,可是由于这个方法可能会被多个线程同时调用,因此要用CAS操做保证更新操做的原子性,就像咱们用AtomicInteger
同样。在CAS失败时咱们须要经过重试来保证把state减1,若是CAS成功时,即便有许多线程同时执行这个操做最后的结果也必定是正确的。在这里,tryReleaseShared
方法的返回值表示这个释放操做是否可让等待中的线程成功获取同步器,因此只有在count为0时才能返回true。
tryAcquireShared
方法就比较简单了,直接返回state是否等于0便可,由于只有在CountDownLatch
中的count值为0时全部但愿获取同步器的线程才能获取成功并继续执行。若是count不为0,那么线程就须要进入阻塞状态,等到count值变为0才能继续执行。
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; // 构造器,初始化count值 // 在这个子类中把count值保存到了AQS的state中 Sync(int count) { setState(count); } // 获取当前的count值 int getCount() { return getState(); } // 获取操做在state为0时会成功,不然失败 // tryAcquireShared失败时,线程会进入阻塞状态等待获取成功 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 对闭锁执行释放操做减少计数值 protected boolean tryReleaseShared(int releases) { // 减少coun值,在count值归零时唤醒等待的线程 for (;;) { int c = getState(); // 若是计数已经归零,则直接释放失败 if (c == 0) return false; // 将计数值减1 int nextc = c-1; // 为了线程安全,以CAS循环尝试更新 if (compareAndSetState(c, nextc)) return nextc == 0; } } }
看了CountDownLatch
中的Sync
内部类定义以后,咱们再来看看CountDownLatch
是如何使用这个内部类的。
在CountDownLatch
的构造器中,初始化CountDownLatch
对象时会同时在其内部初始化保存一个Sync
类型的对象到sync
字段用于以后的同步操做。而且传入Sync
类构造器的count必定会大于等于0。
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
有了Sync类型的对象以后,咱们在await()
方法里就能够直接调用sync
的acquireSharedInterruptibly
方法来获取同步器并陷入阻塞,等待count值变为0了。在AQS的acquireSharedInterruptibly
方法中会在调用咱们重写的tryAcquireShared
方法获取失败时进入阻塞状态,直到CountDownLatch
的count值变为0时才能成功获取到同步器。
public void await() throws InterruptedException { // 调用sync对象的获取方法来进入锁等待 sync.acquireSharedInterruptibly(1); }
而在CountDownLatch
的另外一个减小count值的重要方法countDown()
中,咱们一样是经过调用sync
上的方法来实现具体的同步功能。在这里,AQS的releaseShared(1)
方法中一样会调用咱们在Sync
类中重写的tryReleaseShared
方法来执行释放操做,并在tryReleaseShared
方法返回true时去唤醒等待队列中的阻塞等待线程,让它们在count值为0时可以继续执行。
public void countDown() { sync.releaseShared(1); }
从上文中能够看出,CoundDownLatch
中的各类功能都是经过内部类Sync
来实现的,而这个Sync
类就是一个继承自AQS的子类。经过在内部类Sync
中重写了AQS的tryAcquireShared
和tryReleaseShared
两个方法,咱们就指定了AQS的行为策略,使其可以符合咱们对CountDownLatch
功能的指望。这就是AQS的使用方法,下面咱们来看一个你们可能会更熟悉的例子,来进一步了解AQS在独占模式下的用法。
可重入锁ReentrantLock
能够说是咱们的老朋友了,从最先的synchronized
关键字开始,咱们就开始使用相似的功能了。可重入锁的特色主要有两点:
同一时间只能有一个线程持有
同一线程能够对一个锁重复获取成功屡次
ReentrantLock
类的经常使用操做主要有三种:
获取锁,一个线程一旦获取锁成功后就会阻塞其余线程获取同一个锁的操做,因此一旦获取失败,那么当前线程就会被阻塞
public void lock()
方法释放锁,获取锁以后就要在使用完以后释放它,不然别的线程都将会因没法获取锁而被阻塞,因此咱们通常会在finally中进行锁的释放操做
ReentrantLock
对象的unlock
方法来释放锁获取条件变量,条件变量是和互斥锁搭配使用的一种很是有用的数据结构,有兴趣的读者能够经过《从0到1实现本身的阻塞队列(上)》这篇文章来了解条件变量具体的使用方法
Condition newCondition()
方法来获取条件变量对象,而后调用条件变量对象上的await()
、signal()
、signalAll()
方法来进行使用在ReentrantLock
类中存在两种AQS的子类,一个实现了非公平锁,一个实现了公平锁。所谓的“公平”指的就是获取互斥锁成功返回的时间会和获取锁操做发起的时间顺序一致,例若有线程A已经持有了互斥锁,当线程B、C、D按字母顺序获取锁并进入等待,线程A释放锁后必定是线程B被唤醒,线程B释放锁后必定是C先被唤醒。也就是说锁被释放后对等待线程的唤醒顺序和获取锁操做的顺序一致。并且若是在这个过程当中,有其余线程发起了获取锁操做,由于等待队列中已经有线程在等待了,那么这个线程必定要排到等待队列最后去,而不能直接抢占刚刚被释放还未被刚刚被唤醒的线程锁持有的锁。
下面咱们一样先看一下ReentrantLock
类中定义的AQS子类Sync
的具体源代码。下面是上一段说到的非公平Sync类和公平Sync类两个类的共同父类Sync
的带注释源代码,里面包含了大部分核心功能的实现。虽然下面包含了该类完整的源代码,可是咱们如今只须要关心三个核心操做,也是咱们在独占模式下须要重写的三个AQS方法:tryAcquire
、tryRelease
和isHeldExclusively
。建议在看完文章以后再回来回顾该类中其余的方法实现,直接跳过其余的方法固然也是彻底没有问题的。
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 实现Lock接口的lock方法,子类化的主要缘由是为了非公平版本的快速实现 */ abstract void lock(); /** * 执行非公平的tryLock。tryAcquire方法在子类中被实现,可是二者都须要非公平版本的trylock方法实现。 */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 若是锁还未被持有 if (c == 0) { // 经过CAS尝试获取锁 if (compareAndSetState(0, acquires)) { // 若是锁获取成功则将锁持有者改成当前线程,并返回true setExclusiveOwnerThread(current); return true; } } // 锁已经被持有,则判断锁的持有者是不是当前线程 else if (current == getExclusiveOwnerThread()) { // 可重入锁,若是锁的持有者是当前线程,那就在state上加上新的获取数 int nextc = c + acquires; // 判断新的state值有没有溢出 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 将新的state更新为新的值,由于能够进入这段代码的只有一个线程 // 因此不须要线程安全措施 setState(nextc); return true; } return false; } // 重写了AQS的独占式释放锁方法 protected final boolean tryRelease(int releases) { // 计算剩余的锁持有量 // 由于只有当前线程持有该锁的状况下才能执行这个方法,因此不须要作多线程保护 int c = getState() - releases; // 若是当前线程未持有锁,则直接抛出错误 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 若是锁持有数已经减小到0,则释放该锁,并清空锁持有者 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 更新state值,只有state值被设置为0才是真正地释放了锁 // 因此setState和setExclusiveOwnerThread之间不须要额外的同步措施 setState(c); return free; } // 当前线程是否持有该锁 protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } // 建立对应的条件变量 final ConditionObject newCondition() { return new ConditionObject(); } // 从外层传递进来的方法 // 获取当前的锁持有者 final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } // 获取锁的持有计数 // 若是当前线程持有了该锁则返回state值,不然返回0 final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } // 判断锁是否已经被持有 final boolean isLocked() { return getState() != 0; } }
实际的tryAcquire
方法将在公平Sync类与非公平Sync类两个子类中实现,可是这两个子类都须要调用父类Sync
中的非公平版本的tryAcquire——nonfairTryAcquire
方法。在这个方法中,咱们主要作两件事:
当前锁还未被人持有。在ReentrantLock
中使用AQS的state来保存锁的状态,state等于0时表明锁没有被任何线程持有,若是state大于0,那么就表明持有者对该锁的重复获取次数
compareAndSetState
来原子性地修改state值,修改为功则须要设置当前线程为锁的持有线程并返回true表明获取成功;不然就返回锁已被当前线程持有
而对于独占式释放同步器的tryRelease
方法,则在父类Sync
中直接实现了,两个公平/非公平子类调用的都是同一段代码。首先,只有锁的持有者才能释放锁,因此若是当前线程不是全部者线程在释放操做中就会抛出异常。若是释放操做会将持有计数清零,那么当前线程就再也不是该锁的持有者了,锁会被彻底释放,而锁的全部者会被设置为null。最后,Sync
会将减掉入参中的释放数以后的新持有计数更新到AQS的state中,并返回锁是否已经被彻底释放了。
isHeldExclusively
方法比较简单,它只是检查锁的持有者是不是当前线程。
Sync
的两个公平/非公平子类的实现比较简单,下面是非公平版本子类的源代码。在非公平版本的实现中,调用lock
方法首先会尝试经过CAS修改AQS的state值来直接抢占锁,若是抢占成功就直接将持有者设置为当前线程;若是抢占失败就调用acquire
方法走正常流程来获取锁。而在acquire
方法中就会调用子类中的tryAcquire
方法并进一步调用到上文提到的父类中的nonfairTryAcquire
方法来完成锁获取操做。
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * 执行锁操做。尝试直接抢占,若是失败的话就回到正常的获取流程进行 */ final void lock() { // 尝试直接抢占 if (compareAndSetState(0, 1)) // 抢占成功设置锁全部者 setExclusiveOwnerThread(Thread.currentThread()); else // 抢占失败走正常获取流程 acquire(1); } // 实现AQS方法,使用nonfairTryAcquire实现 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
而在公平版本的Sync子类FairSync
中,为了保证成功获取到锁的顺序必定要和发起获取锁操做的顺序一致,因此天然不能在lock
方法中进行CAS方式的抢占,只能老老实实调用acquire
方法走正式流程。而acquire
方法最终就会调用子类中定义的tryAcquire
来真正获取锁。
在tryAcquire
方法中,代码主要处理了两种状况:
当前锁尚未被线程锁持有
acquire
方法中的代码会将当前线程放入到等待队列中阻塞等待锁的释放。这就保证了在获取锁时已经有线程等待的状况下,任何线程都要进入等待队列去等待获取锁,而不能直接对锁进行获取。当前线程已经持有了该锁
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // 直接使用acquire进行获取锁操做 final void lock() { acquire(1); } /** * 公平版本的tryAcquire方法。不要授予访问权限,除非是递归调用或者没有等待线程或者这是第一个调用 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 若是锁没有被持有 if (c == 0) { // 为了实现公平特性,因此只有在等待队列为空的状况下才能直接抢占 // 不然只能进入队列等待 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 若是锁已被持有,且当前线程就是持有线程 else if (current == getExclusiveOwnerThread()) { // 计算新的state值 int nextc = c + acquires; // 若是锁计数溢出,则抛出异常 if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 设置state状态值 setState(nextc); return true; } return false; } }
最后,咱们来看看ReentrantLock类中的lock()
、unlock()
、newCondition
方法对Sync类对象的使用方式。
首先是在构造器中,根据入参指定的公平/非公平模式建立不一样的内部Sync
类对象,若是是公平模式就是用FairSync
类,若是是非公平模式就是用NonfairSync
类。
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
而后在互斥锁的锁定方法lock()
中,ReentrantLock
直接使用Sync类中的lock
方法来实现了锁的获取功能。
public void lock() { // 调用sync对象的lock方法实现 sync.lock(); }
在unlock()
方法中也是同样的状况,ReentrantLock
直接依赖Sync类对象来实现这个功能。
public void unlock() { // 调用了sync对象的release方法实现 sync.release(1); }
最后一个建立条件变量的方法则直接依赖于AQS中定义的方法,咱们在ReentranctLock
的Sync
类中并不须要作任务额外的工做,AQS就能为咱们作好全部的事情。
public Condition newCondition() { // 调用了sync对象继承自AQS的`newCondition`方法实现 return sync.newCondition(); }
经过ReentrantLock
的例子咱们可以更明显地感觉到,这些基于AQS实现同步功能的类中并不须要作太多额外的工做,大多数操做都是经过直接调用Sync
类对象上的方法来实现的。只要定义好了继承自AQS的子类Sync
,并经过Sync
类重写几个AQS的关键方法来指定AQS的行为策略,就能够实现风格迥异的各类同步工具类了。
在这篇文章中,咱们从AQS的基本概念提及,简单介绍了AQS的具体用法,而后经过CountDownLatch
和ReentrantLock
两个经常使用的多线程同步工具类的源码来具体了解了AQS的使用方式。咱们不只能够彻底弄明白这两个线程同步类的实现原理与细节,并且最重要的是找到了AQS这个幕后大BOSS。经过AQS,咱们不只能够更容易地阅读并理解其余同步工具类的使用与实现,并且甚至能够动手开发出咱们本身的自定义同步工具类。
到了这里,这一系列多线程编程相关的技术文章就接近尾声了。后续我还会发布一篇囊括这个系列全部内容的总结性文章,里面会对多线程编程相关的知识脉络作一次全面的梳理,而后将每一个知识点连接到具体阐释这个主题的文章中去。让读者能够在宏观和微观两个层面理解多线程编程的原理与技巧,帮助你们创建完整的Java多线程理论与实践知识体系。有兴趣的读者能够关注一下后续的文章,感谢你们的支持。