AQS: AbstractQueuedSynchronizer,即队列同步器。是构建锁或者其余同步组件的基础框架。它维护了一个volatile int state(表明共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。html
state的访问方式有:java
自定义同步器须要根据须要重写如下方法node
而后能够调用 acquire、release、releaseShared等方法来实现功能。c#
AQS定义两种资源共享方式:Exclusive和Sharesegmentfault
经过ReentrantLock来分析独占锁。多线程
ReentrantLock的基本使用形式是:框架
ReentrantLock lock = new ReentrantLock(); try { lock.lock(); // 加锁 } catch (Exception e) { } finally { lock.unlock(); // 解锁 }
ReentrantLock内部类Sync继承了AQS,ReentrantLock#lock即调用了Sync#lock。Sync又有两个子类分别是NonfairSync和FairSync,分别实现了非公平锁和公平锁。oop
看下NonfairSync的lockui
static final class NonfairSync extends Sync { // ... final void lock() { if (compareAndSetState(0, 1)) // lock的时候直接使用cas去抢占state,成功就返回了,表示抢锁成功 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); // 抢占state状态失败才调用AQS的acquire方法 } // ... }
再看下FairSync的lock线程
static final class FairSync extends Sync { // ... final void lock() { acquire(1); // 直接调用AQS的acquire } // ... }
acquire是lock调用的关键。自定义锁须要经过acquire来设置state和将节点加入FIFO等待队列操做。
public final void acquire(int arg) { if (!tryAcquire(arg) && // 用户自定义内容,返回true表示获取锁成功,不然加入等待队列 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 建立节点,加入等待队列 selfInterrupt(); // if上述true,则中断线程 }
上面代码能够分开来看就会简单点。
首先看下NonfairSync的tryAcquire
static final class NonfairSync extends Sync { // ... protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // state==0表示能够获取锁 if (compareAndSetState(0, acquires)) { // cas设置锁,成功则加锁成功 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 若是不是0,可是仍是当前线程,则可重入 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; // 不然加锁失败返回fasle,就要进行加入等待队列的处理 }
再看下FairSync的tryAcquire
static final class FairSync extends Sync { // .. protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && // state==0而且等待队列没有其余线程才会加锁 compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
总结来说,tryAcquire是用户自定义的,根据state设置锁状态,根据返回值来决定是否加入等待队列。公平锁和非公平锁的差别主要在:新来的锁会不会插队。
addWaiter用于初始化队列并增长新的node节点到等待队列中
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 若是tail不是null则在tail后加入该节点;设想添加第一个节点的时候,tail为null,则走不到这里,则会调用下面的enq(node)初始化后再加入节点 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { // cas添加失败则调用enq(node)死循环添加 pred.next = node; return node; } } enq(node); // 初始化和死循环添加 return node; } private Node enq(final Node node) { for (;;) { // 直到添加成功为止 Node t = tail; if (t == null) { // Must initialize // 若是tail是null,则先初始化 if (compareAndSetHead(new Node())) // 用一个空的node做为head tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
注意:等待队列中的头结点是初始化的空节点或者已经获取到锁的节点,不是正在等待获取锁的节点,即第一个节点是dummy node。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 先尝试获取锁,若是前节点是head而且获取到锁,则当前节点成为head,这也和2.2.2的"注意"相呼应。 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 没有获取到锁,判断是否能够park线程,符合条件则当前线程被park if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
看下线程被park的条件,即shouldParkAfterFailedAcquire。在这以前咱们须要简单了解下Node的waitStatus字段
waitStatus一共四个状态
这里咱们关心两个状态:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 前节点状态为SIGNAL才能够阻塞, 由于初始化值为0,因此第一次是不会直接返回true return true; if (ws > 0) { // 前节点取消了,则一直往前遍历,直到找到waitStatus不大于0的 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 设置节点为SIGNAL } return false; }
这里注意到因为waitStatus初始值为0,shouldParkAfterFailedAcquire第一次判断的时候是返回fasle的,即线程不是立刻被park,在第二次的时候才会被park。从中也能够看到线程先自旋2次,最后再park:第一次是先尝试获取锁的地方,即if (p == head && tryAcquire(arg))的位置,第二次是由于shouldParkAfterFailedAcquire返回false,因此须要再运行一次。
到如今为止,咱们能够看到,没有获取到锁的线程是以节点的形式加入到了等待队列,而且park了,不占用cpu时间。
unlock调用了AQS的release方法
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { // 用户自定义 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 唤醒head节点的后续节点 return true; } return false; }
tryRelease由用户自定义,被AQS中release调用,来看下ReentrantLock中tryRelease的调用
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); // 这里不须要使用cas,由于是独占锁,释放锁的时候,确定只有一个线程访问 return free; }
tryRelease简单来讲就是设置state,可是注意由于是独占锁,因此并不须要使用cas来设置state。
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // 唤醒后续节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
到这里能够看到,AQS维护了一个正在获取锁的线程的等待队列,获取到锁的线程是head节点,当释放锁的时候会唤醒其后续节点,经过这样的过程达到了独占锁的效果。而且使用了cas和自旋减小了资源的损耗。
其实从代码上来总结,最大的区别就是,共享锁在被唤醒后不但会像独占锁那样将本身的节点设置为head,并且会继续唤醒它的后续节点,后续节点又会唤醒后续节点的节点。这样当一个共享锁获取到锁后,全部等待的线程都将获取到锁。
能够经过分析CountDownLatch来分析下共享锁。CountDownLatch的使用形式能够当作是获取锁和释放锁的过程,这样就更容易理解共享锁了。
CountDownLatch countDownLatch = new CountDownLatch(1); countDownLatch.await(); // 获取锁,能够是多个线程都在调用 countDownLatch.countDown(); // 释放锁, 当释放后全部获取共享锁的线程都会获取到锁
countDownLatch.await()能够看作是获取锁。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); // 调用AQS的方法 } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) // 自定义tryAcquireShared doAcquireSharedInterruptibly(arg); // 和独占锁的tryAcquire基本思想是一致的,若是获取锁失败就加入到等待队列中 }
看下countDownLatch自定义的tryAcquireShared,仍是比较简单的
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; // state为0则获取锁成功,不然则是获取锁失败 }
而后看下doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); // addWaiter和独占锁调用的同一个方法,只是节点类型为SHARED boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); // 和独占锁的主要区别所在,独占锁只是setHead,而共享锁会setHeadAndPropagate,即设置head而且会传播,将后续的共享锁也唤醒 p.next = null; // help GC failed = false; return; } } // 主要思想和独占锁是一致的 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
看下setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); // 设置当前节点为head // 若是propagate>0才会唤醒后续shared节点,这里propagate为用户自定义的tryAcquireShared的返回值 if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); // 唤醒head节点的后续节点, releaseShared一样调用了该方法 } }
这里主要注意propagate值,即tryAcquireShared的返回值。若是tryAcquireShare<则表示没有获取到锁;若是tryAcquireShare==0则表示获取锁成功,可是不会唤醒后续shared节点,这点从上述代码中能够看到;若是tryAcquireShare>0,则表示获取锁成功且唤醒后续share节点。
countDownLatch.countDown能够当作是释放锁的过程,只不过若是count值不为1的话,须要释放屡次才算释放成功。
public void countDown() { sync.releaseShared(1); // 调用了AQS的releaseShared } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); // 和setHeadAndPropagate调用的是同一个方法 return true; } return false; }
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); // 和独占锁同样,唤醒head后续节点 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
AQS的整体思路是将等待的线程封装成Node节点放在等待队列上。获取锁的节点为head节点,释放锁的时候,head节点会唤醒后续节点关联的线程
须要区别的是:独占锁只会唤醒后续节点的线程;而共享锁后续节点被唤醒后会接着继续唤醒他本身的后续节点,一直到把全部连续的共享节点都唤醒。