Java高级面试必问:AQS 究竟是什么?

前言

JDK1.5之前只有synchronized同步锁,而且效率很是低,所以大神Doug Lea本身写了一套并发框架,这套框架的核心就在于AbstractQueuedSynchronizer类(即AQS),性能很是高,因此被引入JDK包中,即JUC。那么AQS是怎么实现的呢?本篇就是对AQS及其相关组件进行分析,了解其原理,并领略大神的优美而又精简的代码。node

AbstractQueuedSynchronizer

AQS是JUC下最核心的类,没有之一,因此咱们先来分析一下这个类的数据结构。
1.png缓存

AQS内部是使用了双向链表将等待线程连接起来,当发生并发竞争的时候,就会初始化该队列并让线程进入睡眠等待唤醒,同时每一个节点会根据是否为共享锁标记状态为共享模式或独占模式。这个数据结构须要好好理解并紧紧记住,下面分析的组件都将基于此实现。安全

Lock

Lock是一个接口,提供了加/解锁的通用API,JUC主要提供了两种锁,ReentrantLock和ReentrantReadWriteLock,前者是重入锁,实现Lock接口,后者是读写锁,自己并无实现Lock接口,而是其内部类ReadLock或WriteLock实现了Lock接口。先来看看Lock都提供了哪些接口:性能优化

// 普通加锁,不可打断;未获取到锁进入AQS阻塞
void lock();

// 可打断锁
void lockInterruptibly() throws InterruptedException;

// 尝试加锁,未获取到锁不阻塞,返回标识
boolean tryLock();

// 带超时时间的尝试加锁
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

// 解锁
void unlock();

// 建立一个条件队列
Condition newCondition();

看到这里读者们能够先思考下,本身如何来实现上面这些接口。数据结构

ReentrantLock

加锁多线程

synchronized和ReentrantLock都是可重入的,后者使用更加灵活,也提供了更多的高级特性,但其本质的实现原理是差很少的(听说synchronized是借鉴了ReentrantLock的实现原理)。ReentrantLock提供了两个构造方法:并发

public ReentrantLock() {
       sync = new NonfairSync();
   }

   public ReentrantLock(boolean fair) {
       sync = fair ? new FairSync() : new NonfairSync();
   }

有参构造是根据参数建立公平锁或非公平锁,而无参构造默认则是非公平锁,由于非公平锁性能很是高,而且大部分业务并不须要使用公平锁。至于为何非公平锁性能很高,我们接着往下看。app

非公平锁/公平锁框架

lockide

非公平锁和公平锁在实现上基本一致,只有个别的地方不一样,所以下面会采用对比分析方法进行分析。
从lock方法开始:

public void lock() {
       sync.lock();
   }

其实是委托给了内部类Sync,该类实现了AQS(其它组件实现方法也基本上都是这个套路);因为有公平和非公平两种模式,所以该类又实现了两个子类:FairSync和NonfairSync:

// 非公平锁
   final void lock() {
       if (compareAndSetState(0, 1))
           setExclusiveOwnerThread(Thread.currentThread());
       else
           acquire(1);
   }

// 公平锁
   final void lock() {
      acquire(1);
   }

这里就是公平锁和非公平锁的第一个不一样,非公平锁首先会调用CAS将state从0改成1,若是能改为功则表示获取到锁,直接将exclusiveOwnerThread设置为当前线程,不用再进行后续操做;不然则同公平锁同样调用acquire方法获取锁,这个是在AQS中实现的模板方法:

public final void acquire(int arg) {
       if (!tryAcquire(arg) &&
           acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           selfInterrupt();
   }

tryAcquire

这里两种锁惟一不一样的实现就是tryAcquire方法,先来看非公平锁的实现:

protected final boolean tryAcquire(int acquires) {
       return nonfairTryAcquire(acquires);
   }

   final boolean nonfairTryAcquire(int acquires) {
       final Thread current = Thread.currentThread();
       int c = getState();
       if (c == 0) {
           if (compareAndSetState(0, acquires)) {
               setExclusiveOwnerThread(current);
               return true;
           }
       }
       else if (current == getExclusiveOwnerThread()) {
           int nextc = c + acquires;
           if (nextc < 0) // overflow
               throw new Error("Maximum lock count exceeded");
           setState(nextc);
           return true;
       }
       return false;
   }

state=0表示尚未被线程持有锁,直接经过CAS修改,能修改为功的就获取到锁,修改失败的线程先判断exclusiveOwnerThread是否是当前线程,是则state+1,表示重入次数+1并返回true,加锁成功,不然则返回false表示尝试加锁失败并调用acquireQueued入队。

protected final boolean tryAcquire(int acquires) {
       final Thread current = Thread.currentThread();
       int c = getState();
       if (c == 0) {
           if (!hasQueuedPredecessors() &&
               compareAndSetState(0, acquires)) {
               setExclusiveOwnerThread(current);
               return true;
           }
       }
       else if (current == getExclusiveOwnerThread()) {
           int nextc = c + acquires;
           if (nextc < 0)
               throw new Error("Maximum lock count exceeded");
           setState(nextc);
           return true;
       }
       return false;
   }

   public final boolean hasQueuedPredecessors() {
       Node t = tail; // Read fields in reverse initialization order
       Node h = head;
       Node s;
       // 首尾不相等且头结点线程不是当前线程则表示须要进入队列
       return h != t &&
           ((s = h.next) == null || s.thread != Thread.currentThread());
   }

上面就是公平锁的尝试获取锁的代码,能够看到基本和非公平锁的代码是同样的,区别在于首次加锁须要判断是否已经有队列存在,没有才去加锁,有则直接返回false。

addWaiter

接着来看addWaiter方法,当尝试加锁失败时,首先就会调用该方法建立一个Node节点并添加到队列中去。

private Node addWaiter(Node mode) {
       Node node = new Node(Thread.currentThread(), mode);
       Node pred = tail;
       // 尾节点不为null表示已经存在队列,直接将当前线程做为尾节点
       if (pred != null) {
           node.prev = pred;
           if (compareAndSetTail(pred, node)) {
               pred.next = node;
               return node;
           }
       }
       // 尾结点不存在则表示尚未初始化队列,须要初始化队列
       enq(node);
       return node;
   }

   private Node enq(final Node node) {
// 自旋
       for (;;) {
           Node t = tail;
           if (t == null) { // 只会有一个线程设置头节点成功
               if (compareAndSetHead(new Node()))
                   tail = head;
           } else { // 其它设置头节点失败的都会自旋设置尾节点
               node.prev = t;
               if (compareAndSetTail(t, node)) {
                   t.next = node;
                   return t;
               }
           }
       }
   }

这里首先传入了一个独占模式的空节点,并根据该节点和当前线程建立了一个Node,而后判断是否已经存在队列,若存在则直接入队,不然调用enq方法初始化队列,提升效率。
此处还有一个很是细节的地方,为何设置尾节点时都要先将以前的尾节点设置为node.pre的值呢,而不是在CAS以后再设置?好比像下面这样:

if (compareAndSetTail(pred, node)) {
node.prev = pred;
   pred.next = node;
   return node;
}

由于若是这样作的话,在CAS设置完tail后会存在一瞬间的tail.pre=null的状况,而Doug Lea正是考虑到这种状况,不论什么时候获取tail.pre都不会为null。

acquireQueued

接着看acquireQueued方法:

final boolean acquireQueued(final Node node, int arg) {
    // 为true表示存在须要取消加锁的节点,仅从这段代码能够看出,
    // 除非发生异常,不然不会存在须要取消加锁的节点。
       boolean failed = true;
       try {
        // 打断标记,由于调用的是lock方法,因此是不可打断的
        // (但其实是打断了的,只不过这里采用了一种**静默**处理方式,稍后分析)
           boolean interrupted = false;
           for (;;) {
               final Node p = node.predecessor();
               if (p == head && tryAcquire(arg)) {
                   setHead(node);
                   p.next = null; // help GC
                   failed = false;
                   return interrupted;
               }
               if (shouldParkAfterFailedAcquire(p, node) &&
                   parkAndCheckInterrupt())
                   interrupted = true;
           }
       } finally {
           if (failed)
               cancelAcquire(node);
       }
   }

   private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
       int ws = pred.waitStatus;
       if (ws == Node.SIGNAL)
           return true;
           
       if (ws > 0) {
           do {
               node.prev = pred = pred.prev;
           } while (pred.waitStatus > 0);
           pred.next = node;
       } else {
           compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
       }
       return false;
   }

   private final boolean parkAndCheckInterrupt() {
       LockSupport.park(this);
       return Thread.interrupted();
   }

这里就是队列中线程加锁/睡眠的核心逻辑,首先判断刚刚调用addWaiter方法添加到队列的节点是不是头节点,若是是则再次尝试加锁,这个刚刚分析过了,非公平锁在这里就会再次抢一次锁,抢锁成功则设置为head节点并返回打断标记;不然则和公平锁同样调用shouldParkAfterFailedAcquire判断是否应该调用park方法进入睡眠。

park细节

为何在park前须要这么一个判断呢?由于当前节点的线程进入park后只能被前一个节点唤醒,那前一个节点怎么知道有没有后继节点须要唤醒呢?所以当前节点在park前须要给前一个节点设置一个标识,即将waitStatus设置为Node.SIGNAL(-1),而后自旋一次再走一遍刚刚的流程,若仍是没有获取到锁,则调用parkAndCheckInterrupt进入睡眠状态。

打断

读者可能会比较好奇Thread.interrupted这个方法是作什么用的。

public static boolean interrupted() {
       return currentThread().isInterrupted(true);
   }

这个是用来判断当前线程是否被打断过,并清除打断标记(如果被打断过则会返回true,并将打断标记设置为false),因此调用lock方法时,经过interrupt也是会打断睡眠的线程的,只是Doug Lea作了一个假象,让用户无感知;但有些场景又须要知道该线程是否被打断过,因此acquireQueued最终会返回interrupted打断标记,若是是被打断过,则返回的true,并在acquire方法中调用selfInterrupt再次打断当前线程(将打断标记设置为true)。
这里咱们对比看看lockInterruptibly的实现:

public void lockInterruptibly() throws InterruptedException {
       sync.acquireInterruptibly(1);
   }

   public final void acquireInterruptibly(int arg)
           throws InterruptedException {
       if (Thread.interrupted())
           throw new InterruptedException();
       if (!tryAcquire(arg))
           doAcquireInterruptibly(arg);
   }

   private void doAcquireInterruptibly(int arg)
       throws InterruptedException {
       final Node node = addWaiter(Node.EXCLUSIVE);
       boolean failed = true;
       try {
           for (;;) {
               final Node p = node.predecessor();
               if (p == head && tryAcquire(arg)) {
                   setHead(node);
                   p.next = null; // help GC
                   failed = false;
                   return;
               }
               if (shouldParkAfterFailedAcquire(p, node) &&
                   parkAndCheckInterrupt())
                   throw new InterruptedException();
           }
       } finally {
           if (failed)
               cancelAcquire(node);
       }
   }

能够看到区别就在于使用lockInterruptibly加锁被打断后,是直接抛出InterruptedException异常,咱们能够捕获这个异常进行相应的处理。

取消

最后来看看cancelAcquire是如何取消加锁的,该状况比较特殊,简单了解下便可:

private void cancelAcquire(Node node) {
       if (node == null)
           return;

// 首先将线程置空
       node.thread = null;

// waitStatus > 0表示节点处于取消状态,则直接将当前节点的pre指向在此以前的最后一个有效节点
       Node pred = node.prev;
       while (pred.waitStatus > 0)
           node.prev = pred = pred.prev;

// 保存前一个节点的下一个节点,若是在此以前存在取消节点,这里就是以前取消被取消节点的头节点
       Node predNext = pred.next;
       
       node.waitStatus = Node.CANCELLED;

// 当前节点是tail节点,则替换尾节点,替换成功则将新的尾结点的下一个节点设置为null;
// 不然须要判断是将当前节点的下一个节点赋值给最后一个有效节点,仍是唤醒下一个节点。
       if (node == tail && compareAndSetTail(node, pred)) {
           compareAndSetNext(pred, predNext, null);
       } else {
           int ws;
           if (pred != head &&
               ((ws = pred.waitStatus) == Node.SIGNAL ||
                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
               pred.thread != null) {
               Node next = node.next;
               if (next != null && next.waitStatus <= 0)
                   compareAndSetNext(pred, predNext, next);
           } else {
               unparkSuccessor(node);
           }

           node.next = node; // help GC
       }
   }

解锁

public void unlock() {
       sync.release(1);
   }

   public final boolean release(int arg) {
       if (tryRelease(arg)) {
           Node h = head;
           if (h != null && h.waitStatus != 0)
               unparkSuccessor(h);
           return true;
       }
       return false;
   }

   protected final boolean tryRelease(int releases) {
       int c = getState() - releases;
       if (Thread.currentThread() != getExclusiveOwnerThread())
           throw new IllegalMonitorStateException();
       boolean free = false;
       if (c == 0) {
           free = true;
           setExclusiveOwnerThread(null);
       }
       setState(c);
       return free;
   }

   private void unparkSuccessor(Node node) {
       int ws = node.waitStatus;
       if (ws < 0)
           compareAndSetWaitStatus(node, ws, 0);

       Node s = node.next;
       // 并发状况下,可能已经被其它线程唤醒或已经取消,则从后向前找到最后一个有效节点并唤醒
       if (s == null || s.waitStatus > 0) {
           s = null;
           for (Node t = tail; t != null && t != node; t = t.prev)
               if (t.waitStatus <= 0)
                   s = t;
       }
       if (s != null)
           LockSupport.unpark(s.thread);
   }

解锁就比较简单了,先调用tryRelease对state执行减一操做,若是state==0,则表示彻底释放锁;若果存在后继节点,则调用unparkSuccessor唤醒后继节点,唤醒后的节点的waitStatus会从新被设置为0.
只是这里有一个小细节,为何是从后向前找呢?由于咱们在开始说过,设置尾节点保证了node.pre不会为null,但pre.next仍有多是null,因此这里只能从后向前找到最后一个有效节点。

小结

2.png

上面是ReentrantLock的加锁流程,能够看到整个流程不算复杂,只是判断和跳转比较多,主要是Doug Lea将代码和性能都优化到了极致,代码很是精简,但细节却很是多。另外经过上面的分析,咱们也能够发现,公平锁和非公平锁的区别就在于非公平锁无论是否有线程在排队,先抢三次锁,而公平锁则会判断是否存在队列,有线程在排队则直接进入队列排队;另外线程在park被唤醒后非公平锁还会抢锁,公平锁仍然须要排队,因此非公平锁的性能比公平锁高不少,大部分状况下咱们使用非公平锁便可。

ReentrantReadWriteLock

ReentrantLock是一把独占锁,只支持重入,不支持共享,因此JUC包下还提供了读写锁,这把锁支持读读并发,但读写、写写都是互斥的。
读写锁也是基于AQS实现的,也包含了一个继承自AQS的内部类Sync,一样也有公平和非公平两种模式,下面主要讨论非公平模式下的读写锁实现。
读写锁实现相对比较复杂,在ReentrantLock中就是使用的int型的state属性来表示锁被某个线程占有和重入次数,而ReentrantReadWriteLock分为了读和写两种锁,要怎么用一个字段表示两种锁的状态呢?Doug Lea大师将state字段分为了高二字节和低二字节,即高16位用来表示读锁状态,低16位则用来表示写锁,以下图:
在这里插入图片描述

由于读写锁状态都只用了两个字节,因此可重入的次数最可能是65535,固然正常状况下重入是不可能达到这么多的。
那它是怎么实现的呢?仍是先从构造方法开始:

public ReentrantReadWriteLock() {
       this(false);
   }

   public ReentrantReadWriteLock(boolean fair) {
       sync = fair ? new FairSync() : new NonfairSync();
       readerLock = new ReadLock(this);
       writerLock = new WriteLock(this);
   }

一样默认就是非公平锁,同时还建立了readerLock和writerLock两个对象,咱们只须要像下面这样就能获取到读写锁:

private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private static Lock r = lock.readLock();
   private static Lock w = lock.writeLock();

写锁

因为写锁的加锁过程相对更简单,下面先从写锁加锁开始分析,入口在ReentrantReadWriteLock#WriteLock.lock()方法,点进去看,发现仍是使用的AQS中的acquire方法:

public final void acquire(int arg) {
       if (!tryAcquire(arg) &&
           acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           selfInterrupt();
   }

因此不一样的地方也只有tryAcquire方法,咱们重点分析这个方法就行:

static final int SHARED_SHIFT   = 16;
// 65535
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
// 低16位是1111....1111
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 获得c低16位的值
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

   protected final boolean tryAcquire(int acquires) {
       Thread current = Thread.currentThread();
       int c = getState();
       // 获取写锁加锁和重入的次数
       int w = exclusiveCount(c);
       if (c != 0) { // 已经有线程持有锁
        // 这里有两种状况:1. c!=0 && w==0表示有线程获取了读锁,不管是否是当前线程,直接返回false,
        // 也就是说读-写锁是不支持升级重入的(但支持写-读降级),缘由后文会详细分析;
        // 2. c!=0 && w!=0 && current != getExclusiveOwnerThread()表示有其它线程持有了写锁,写写互斥
           if (w == 0 || current != getExclusiveOwnerThread())
               return false;

// 超出65535,抛异常
           if (w + exclusiveCount(acquires) > MAX_COUNT)
               throw new Error("Maximum lock count exceeded");
           // 不然写锁的次数直接加1
           setState(c + acquires);
           return true;
       }

// c==0才会走到这,但这时存在两种状况,有队列和无队列,因此公平锁和非公平锁处理不一样,
// 前者须要判断是否存在队列,有则尝试加锁失败,无则加锁成功,而非公平锁直接使用CAS加锁便可
       if (writerShouldBlock() ||
           !compareAndSetState(c, c + acquires))
           return false;
       setExclusiveOwnerThread(current);
       return true;
   }

写锁尝试加锁的过程就分析完了,其他的部分上文已经讲过,这里再也不赘述。

读锁

public void lock() {
       sync.acquireShared(1);
   }

   public final void acquireShared(int arg) {
       if (tryAcquireShared(arg) < 0)
           doAcquireShared(arg);
   }

读锁在加锁开始就和其它锁不一样,调用的是acquireShared方法,意为获取共享锁。

static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
// 右移16位获得读锁状态的值
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

   protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        // 为何读写互斥?由于读锁一上来就判断了是否有其它线程持有了写锁(当前线程持有写锁再获取读锁是能够的)
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        int r = sharedCount(c);
        // 公平锁判断是否存在队列,非公平锁判断第一个节点是否是EXCLUSIVE模式,是的话会返回true
        // 返回false则须要判断读锁加锁次数是否超过65535,没有则使用CAS给读锁+1
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
            // 第一个读锁线程就是当前线程
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
            // 记录读锁的重入
                firstReaderHoldCount++;
            } else {
            // 获取最后一次加读锁的重入次数记录器HoldCounter
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                // 当前线程第一次重入须要初始化,以及当前线程和缓存的最后一次记录器的线程id不一样,须要从ThreadLocalHoldCounter拿到对应的记录器
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                // 缓存到ThreadLocal
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }

这段代码有点复杂,首先须要保证读写互斥,而后进行初次加锁,若加锁失败就会调用fullTryAcquireShared方法进行兜底处理。在初次加锁中与写锁不一样的是,写锁的state能够直接用来记录写锁的重入次数,由于写写互斥,但读锁是共享的,state用来记录读锁的加锁次数了,重入次数该怎么记录呢?重入是指同一线程,那么是否是可使用ThreadLocl来保存呢?没错,Doug Lea就是这么处理的,新增了一个HoldCounter类,这个类只有线程id和重入次数两个字段,当线程重入的时候就会初始化这个类并保存在ThreadLocalHoldCounter类中,这个类就是继承ThreadLocl的,用来初始化HoldCounter对象并保存。
这里还有个小细节,为何要使用cachedHoldCounter缓存最后一次加读锁的HoldCounter?由于大部分状况下,重入和释放锁的线程颇有可能就是最后一次加锁的线程,因此这样作可以提升加解锁的效率,Doug Lea真是把性能优化到了极致。
上面只是初次加锁,有可能会加锁失败,就会进入到fullTryAcquireShared方法:

final int fullTryAcquireShared(Thread current) {
       HoldCounter rh = null;
       for (;;) {
           int c = getState();
           if (exclusiveCount(c) != 0) {
               if (getExclusiveOwnerThread() != current)
                   return -1;
           } else if (readerShouldBlock()) {
               if (firstReader == current) {
                   // assert firstReaderHoldCount > 0;
               } else {
                   if (rh == null) {
                       rh = cachedHoldCounter;
                       if (rh == null || rh.tid != getThreadId(current)) {
                           rh = readHolds.get();
                           if (rh.count == 0)
                               readHolds.remove();
                       }
                   }
                   if (rh.count == 0)
                       return -1;
               }
           }
           if (sharedCount(c) == MAX_COUNT)
               throw new Error("Maximum lock count exceeded");
           if (compareAndSetState(c, c + SHARED_UNIT)) {
               if (sharedCount(c) == 0) {
                   firstReader = current;
                   firstReaderHoldCount = 1;
               } else if (firstReader == current) {
                   firstReaderHoldCount++;
               } else {
                   if (rh == null)
                       rh = cachedHoldCounter;
                   if (rh == null || rh.tid != getThreadId(current))
                       rh = readHolds.get();
                   else if (rh.count == 0)
                       readHolds.set(rh);
                   rh.count++;
                   cachedHoldCounter = rh; // cache for release
               }
               return 1;
           }
       }
   }

这个方法中代码和tryAcquireShared基本上一致,只是采用了自旋的方式,处理初次加锁中的漏网之鱼,读者们可自行阅读分析。
上面两个方法若返回大于0则表示加锁成功,小于0则会调用doAcquireShared方法,这个就和以前分析的acquireQueued差很少了:

private void doAcquireShared(int arg) {
    // 先添加一个SHARED类型的节点到队列
       final Node node = addWaiter(Node.SHARED);
       boolean failed = true;
       try {
           boolean interrupted = false;
           for (;;) {
               final Node p = node.predecessor();
               if (p == head) {
                // 再次尝试加读锁
                   int r = tryAcquireShared(arg);
                   if (r >= 0) {
                    // 设置head节点以及传播唤醒后面的读线程
                       setHeadAndPropagate(node, r);
                       p.next = null; // help GC
                       if (interrupted)
                           selfInterrupt();
                       failed = false;
                       return;
                   }
               }
               // 只有前一个节点的waitStatus=-1时才会park,=0或者-3(先不考虑-2和1的状况)都会设置为-1后再次自旋尝试加锁,若仍是加锁失败就会park
               if (shouldParkAfterFailedAcquire(p, node) &&
                   parkAndCheckInterrupt())
                   interrupted = true;
           }
       } finally {
           if (failed)
               cancelAcquire(node);
       }
   }

   private void setHeadAndPropagate(Node node, int propagate) {
    // 设置头节点
       Node h = head; // Record old head for check below
       setHead(node);
       
       // propagate是tryAcquireShared的返回值,当前线程加锁成功还要去唤醒后继的共享节点
       // (其他的判断比较复杂,笔者也还未想明白,知道的读者能够指点一下)
       if (propagate > 0 || h == null || h.waitStatus < 0 ||
           (h = head) == null || h.waitStatus < 0) {
           Node s = node.next;
           // 判断后继节点是不是共享节点
           if (s == null || s.isShared())
               doReleaseShared();
       }
   }

   private void doReleaseShared() {
       for (;;) {
           Node h = head;
           // 存在后继节点
           if (h != null && h != tail) {
               int ws = h.waitStatus;
               if (ws == Node.SIGNAL) {
                // 当前一个节点加锁成功后天然须要将-1改回0,并唤醒后继线程,同时自旋将0改成-2让唤醒传播下去
                   if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                       continue;        
                   unparkSuccessor(h);
               }
               // 设置头节点的waitStatus=-2,使得唤醒能够传播下去
               else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                   continue;            
           }
           if (h == head)          
               break;
       }
   }

   private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
       int ws = pred.waitStatus;
       if (ws == Node.SIGNAL)
           return true;
       if (ws > 0) {
           do {
               node.prev = pred = pred.prev;
           } while (pred.waitStatus > 0);
           pred.next = node;
       } else {
           compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
       }
       return false;
   }

这里的逻辑也很是的绕,当多个线程同时调用addWaiter添加到队列中后,而且假设这些节点的第一个节点的前一个节点就是head节点,那么第一个节点就能加锁成功(假设都是SHARED节点),其他的节点在第一个节点设置头节点以前都会进入shouldParkAfterFailedAcquire方法,这时候waitStatus都等于0,因此继续自旋不会park,若再次加锁还失败就会park(由于这时候waitStatus=-1),但都是读线程的状况下通常都不会出现,由于setHeadAndPropagate第一步就是修改head,因此其他SHARED节点最终都能加锁成功并一直将唤醒传播下去。
以上就是读写锁加锁过程,解锁比较简单,这里就不详细分析了。

小结

读写锁将state分为了高二字节和低二字节,分别存储读锁和写锁的状态,实现更为的复杂,在使用上还有几点须要注意:

  • 读读共享,可是在读中间穿插了写的话,后面的读都会被阻塞,直到前面的写释放锁后,后面的读才会共享,相关原理看完前文不难理解。

  • 读写锁只支持降级重入,不支持升级重入。由于若是支持升级重入的话,是会出现死锁的。以下面这段代码:

private static void rw() {
       r.lock();
       try {
           log.info("获取到读锁");
           w.lock();
           try {
               log.info("获取到写锁");
           } finally {
               w.unlock();
           }
       } finally {
           r.unlock();
       }
   }

多个线程访问都能获取到读锁,但读写互斥,彼此都要等待对方的读锁释放才能获取到写锁,这就形成了死锁。
ReentrantReadWriteLock在某些场景下性能上不算高,所以Doug Lea在JDK1.8的时候又提供了一把高性能的读写锁StampedLock,前者读写锁都是悲观锁,然后者提供了新的模式——乐观锁,但它不是基于AQS实现的,本文不进行分析。

Condition

Lock接口中还有一个方法newCondition,这个方法就是建立一个条件队列:

public Condition newCondition() {
       return sync.newCondition();
   }

   final ConditionObject newCondition() {
       return new ConditionObject();
   }

所谓条件队列就是建立一个新的ConditionObject对象,这个对象的数据结构在开篇就看过了,包含首、尾两个节点字段,每当调用Condition#await方法时就会在对应的Condition对象中排队等待:

public final void await() throws InterruptedException {
       if (Thread.interrupted())
           throw new InterruptedException();
       // 加入条件队列
       Node node = addConditionWaiter();
       // 由于Condition.await必须配合Lock.lock使用,因此await时就是将已得到锁的线程所有释放掉
       int savedState = fullyRelease(node);
       int interruptMode = 0;
       // 判断是在同步队列仍是条件队列,后者则直接park
       while (!isOnSyncQueue(node)) {
           LockSupport.park(this);
           // 获取打断处理方式(抛出异常或重设标记)
           if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
               break;
       }
       // 调用aqs的方法
       if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
           interruptMode = REINTERRUPT;
       if (node.nextWaiter != null) // clean up if cancelled
        // 清除掉已经进入同步队列的节点
           unlinkCancelledWaiters();
       if (interruptMode != 0)
           reportInterruptAfterWait(interruptMode);
   }

   private Node addConditionWaiter() {
       Node t = lastWaiter;
       // 清除状态为取消的节点
       if (t != null && t.waitStatus != Node.CONDITION) {
           unlinkCancelledWaiters();
           t = lastWaiter;
       }

// 建立一个CONDITION状态的节点并添加到队列末尾
       Node node = new Node(Thread.currentThread(), Node.CONDITION);
       if (t == null)
           firstWaiter = node;
       else
           t.nextWaiter = node;
       lastWaiter = node;
       return node;
   }

await方法实现比较简单,大部分代码都是上文分析过的,这里再也不重复。接着来看signal方法:

public final void signal() {
       if (!isHeldExclusively())
           throw new IllegalMonitorStateException();
       // 从条件队列第一个节点开始唤醒
       Node first = firstWaiter;
       if (first != null)
           doSignal(first);
   }

   private void doSignal(Node first) {
       do {
           if ( (firstWaiter = first.nextWaiter) == null)
               lastWaiter = null;
           first.nextWaiter = null;
       } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
   }

   final boolean transferForSignal(Node node) {
    // 修改waitStatus状态,若是修改失败,则说明该节点已经从条件队列转移到了同步队列
       if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
           return false;

// 上面修改为功,则将该节点添加到同步队列末尾,并返回以前的尾结点
       Node p = enq(node);
       int ws = p.waitStatus;
       if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // unpark当前线程,结合await方法看
           LockSupport.unpark(node.thread);
       return true;
   }

signal的逻辑也比较简单,就是唤醒条件队列中的第一个节点,主要是要结合await的代码一块儿理解。

其它组件

上文分析的锁都是用来实现并发安全控制的,而对于多线程协做JUC又基于AQS提供了CountDownLatch、CyclicBarrier、Semaphore等组件,下面一一分析。

CountDownLatch

CountDownLatch在建立的时候就须要指定一个计数:

CountDownLatch countDownLatch = new CountDownLatch(5);

而后在须要等待的地方调用countDownLatch.await()方法,而后在其它线程完成任务后调用countDownLatch.countDown()方法,每调用一次该计数就会减一,直到计数为0时,await的地方就会自动唤醒,继续后面的工做,因此CountDownLatch适用于一个线程等待多个线程的场景,那它是怎么实现的呢?读者们能够结合上文本身先思考下。

public CountDownLatch(int count) {
       if (count < 0) throw new IllegalArgumentException("count < 0");
       this.sync = new Sync(count);
   }

   Sync(int count) {
       setState(count);
   }

与前面讲的锁同样,也有一个内部类Sync继承自AQS,而且在构造时就将传入的计数设置到了state属性,看到这里不难猜到CountDownLatch的实现原理了。

public void await() throws InterruptedException {
       sync.acquireSharedInterruptibly(1);
   }

   public final void acquireSharedInterruptibly(int arg)
           throws InterruptedException {
       if (Thread.interrupted())
           throw new InterruptedException();
       if (tryAcquireShared(arg) < 0)
           doAcquireSharedInterruptibly(arg);
   }

   protected int tryAcquireShared(int acquires) {
       return (getState() == 0) ? 1 : -1;
   }

在await方法中使用的是可打断的方式获取的共享锁,一样除了tryAcquireShared方法,其他的都是复用的以前分析过的代码,而tryAcquireShared就是判断state是否等于0,不等于就阻塞。

public void countDown() {
       sync.releaseShared(1);
   }

   public final boolean releaseShared(int arg) {
       if (tryReleaseShared(arg)) {
           doReleaseShared();
           return true;
       }
       return false;
   }
   
   protected boolean tryReleaseShared(int releases) {
       for (;;) {
           int c = getState();
           if (c == 0)
               return false;
           int nextc = c-1;
           if (compareAndSetState(c, nextc))
               return nextc == 0;
       }
   }

而调用countDown就更简单了,每次对state递减,直到为0时才会调用doReleaseShared释放阻塞的线程。
最后须要注意的是CountDownLatch的计数是不支持重置的,每次使用都要新建一个。

CyclicBarrier

CyclicBarrier和CountDownLatch使用差很少,不过它只有await方法。CyclicBarrier在建立时一样须要指定一个计数,当调用await的次数达到计数时,全部线程就会同时唤醒,至关于设置了一个“起跑线”,须要等全部运动员都到达这个“起跑线”后才能一块儿开跑。另外它还支持重置计数,提供了reset方法。

public CyclicBarrier(int parties) {
       this(parties, null);
   }

   public CyclicBarrier(int parties, Runnable barrierAction) {
       if (parties <= 0) throw new IllegalArgumentException();
       this.parties = parties;
       this.count = parties;
       this.barrierCommand = barrierAction;
   }

CyclicBarrier提供了两个构造方法,咱们能够传入一个Runnable类型的回调函数,当达到计数时,由最后一个调用await的线程触发执行。

public int await() throws InterruptedException, BrokenBarrierException {
       try {
           return dowait(false, 0L);
       } catch (TimeoutException toe) {
           throw new Error(toe); // cannot happen
       }
   }

   private int dowait(boolean timed, long nanos)
       throws InterruptedException, BrokenBarrierException,
              TimeoutException {
       final ReentrantLock lock = this.lock;
       lock.lock();
       try {
           final Generation g = generation;

           if (g.broken)
               throw new BrokenBarrierException();

// 是否打断,打断会唤醒全部条件队列中的线程
           if (Thread.interrupted()) {
               breakBarrier();
               throw new InterruptedException();
           }

// 计数为0时,唤醒条件队列中的全部线程
           int index = --count;
           if (index == 0) {  // tripped
               boolean ranAction = false;
               try {
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();
                   ranAction = true;
                   nextGeneration();
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }

           for (;;) {
               try {
                // 不带超时时间直接进入条件队列等待
                   if (!timed)
                       trip.await();
                   else if (nanos > 0L)
                       nanos = trip.awaitNanos(nanos);
               } catch (InterruptedException ie) {
                   if (g == generation && ! g.broken) {
                       breakBarrier();
                       throw ie;
                   } else {
                       Thread.currentThread().interrupt();
                   }
               }

               if (g.broken)
                   throw new BrokenBarrierException();

               if (g != generation)
                   return index;

               if (timed && nanos <= 0L) {
                   breakBarrier();
                   throw new TimeoutException();
               }
           }
       } finally {
           lock.unlock();
       }
   }

   private void nextGeneration() {
       // signal completion of last generation
       trip.signalAll();
       // set up next generation
       count = parties;
       generation = new Generation();
   }

这里逻辑比较清晰,就是使用了ReentrantLock以及Condition来实现。在构造方法中咱们能够看到保存了两个变量count和parties,每次调用await都会对count变量递减,count不为0时都会进入到trip条件队列中等待,不然就会经过signalAll方法唤醒全部的线程,并将parties从新赋值给count。
reset方法很简单,这里不详细分析了。

Semaphore

Semaphore是信号的意思,或者说许可,能够用来控制最大并发量。初始定义好有几个信号,而后在须要获取信号的地方调用acquire方法,执行完成后,须要调用release方法回收信号。

public Semaphore(int permits) {
       sync = new NonfairSync(permits);
   }
 
   public Semaphore(int permits, boolean fair) {
       sync = fair ? new FairSync(permits) : new NonfairSync(permits);
   }

它也有两个构造方法,能够指定公平或是非公平,而permits就是state的值。

public void acquire() throws InterruptedException {
       sync.acquireSharedInterruptibly(1);
   }

// 非公平方式
   final int nonfairTryAcquireShared(int acquires) {
       for (;;) {
           int available = getState();
           int remaining = available - acquires;
           if (remaining < 0 ||
               compareAndSetState(available, remaining))
               return remaining;
       }
   }

// 公平方式
   protected int tryAcquireShared(int acquires) {
       for (;;) {
           if (hasQueuedPredecessors())
               return -1;
           int available = getState();
           int remaining = available - acquires;
           if (remaining < 0 ||
               compareAndSetState(available, remaining))
               return remaining;
       }
   }

acquire方法和CountDownLatch是同样的,只是tryAcquireShared区分了公平和非公平方式。获取到信号至关于加共享锁成功,不然则进入队列阻塞等待;而release方法和读锁解锁方式也是同样的,只是每次release都会将state+1。

最后

感谢你们看到这里,文章有不足,欢迎你们指出;若是你以为写得不错,那就给我一个赞吧。

相关文章
相关标签/搜索