#前言java
很久没写blog了,最近学车,时间真的少了不少,花了一些时间写了一篇AQS,请你们指正。node
翻阅AbstractQueuedSynchronizer的源码,会发现以下注释:并发
Pprovides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
AbstractQueuedSynchronizer提供一个基于FIFO队列的框架,该框架用于实现阻塞锁和相关同步器(例如:semaphores)。框架
如此可知,AbstractQueuedSynchronizer能够视为JDK同步器的框架,理解它,有助于理解JDK的同步器。ide
本人依据JDK源码中的注释结合并发经验,总结了以下AQS框架说明:工具
AQS是依赖状态进行同步操做的,其内部使用一个整形变量state,来表示同步状态,此状态值依据具体的同步器语义实现。例如:在CountDownLatch中state即为须要等待的线程数。ui
AQS的子类必须定义在获取和释放上对应的状态值。对于AQS状态变量的操做必须使用getState,setState,compareAndSetState 三个原子方法。线程
AQS 提供了互斥与共享两种模式,AbstractQueuedSynchronizer类中的final方法已完善队列和阻塞机制,子类仅须要实现protected方法,设计
AQS的子类应该被定义为非公共的内部助手类,用于实现它们的封闭类的同步属性code
AQS在序列化时仅序列化状态,在默认状况下会获得一个空的线程队列。子类一般须要实现readObject方法,用来设置初始状态。
hasQueuedPredecessors在设计公平的同步器时使用,若是该方法返回true,公平的同步器tryAcquire方法应该返回false
ConditionObject AQS的内部类,子类能够用ConditionObject实现条件谓词,若不须要实现条件谓词能够不实现。
//JDK中的源码 public final void acquire(int state) { if (!tryAcquire(state) && acquireQueued(addWaiter(Node.EXCLUSIVE), state)) selfInterrupt(); }
其对应代码的语义为:
while (!获取不成功) { 若是当前线程不在队列中, 加入队列 阻塞当前线程 } 即阻塞直到获取成功。
//JDK中的源码 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
对应代码的语义为:
if (尝试释放成功) 解锁队列中的第一个线程
若是当前节点为队列中的第一个节点,尝试获取,获取成功进行head后续节点的设置。如获取失败维护先后节点关系,若须要阻塞进行阻塞,以后继续重试。 若出现异常获取失败,取消当前节点获取操做。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)//尝试获取失败 doAcquireShared(arg);//进行共享式获取 } /** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) {//当前节点的先驱节点为head,即当前节点为第一个 int r = tryAcquireShared(arg); if (r >= 0) {//尝试获取成功 //向上冒泡,保证head节点的后驱节点为未获取到的节点 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //包装为 获取失败的节点 若须要中断进行中断 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
确保级联释放,即便有其余的线程正在进行的获取/释放。 这个过程一般尝试释放head的后续节点,若是他须要被释放。 若是该节点不须要,会向下传递释放动做,直到释放成功。 此外,咱们必须在添加新节点时进行循环处理。不一样于其余操做 中释放后续节点,咱们须要知道CAS是否重置了状态,因此咱们须要重复检查。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } private void doReleaseShared() { /* */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 循环检查状态 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // 循环检查CAS } if (h == head) // 循环检查是否有新节点 break; } }
在不可重入锁Mutex中 ,咱们使用state=0表示释放,state=1表示获取
class Mutex implements Lock, java.io.Serializable { // 内部助手同步类Sync private static class Sync extends AbstractQueuedSynchronizer { // 当state=1表示获取了独占锁 protected boolean isHeldExclusively() { return getState() == 1; } // 若是state=0,锁是释放状态,尝试获取 public boolean tryAcquire(int acquires) { assert acquires == 1; // acquires为1表示进行获取操做,其余值无效 if (compareAndSetState(0, 1)) {//CAS操做 setExclusiveOwnerThread(Thread.currentThread());//设置锁的持有者为当前线程 return true; } return false; } //尝试释放 protected boolean tryRelease(int releases) { assert releases == 1; // 传入的值为1表示进行释放,其余值无效 if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0);//设置状态为0,表示锁已释放 return true; } // 提供一个条件谓词 Condition newCondition() { return new ConditionObject(); } // 反序列化属性 private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); //设置初始状态为释放 } } // 全部同步操做 委托给Sync,下面咱们实现必要的锁须要的操做 private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
state=0表示未被通知(等待中,不可共享获取),state!=0表示被通知(可共享获取)
class BooleanLatch { //内部同步器,state=0表示未被通知(等待中,不可共享获取),state!=0表示被通知(可共享获取) private static class Sync extends AbstractQueuedSynchronizer { boolean isSignalled() { return getState() != 0; } /** *tryAcquireShared 返回负值 获取失败 *0 获取成功其余线程不能获取 *正值获取成功,其余线程也可获取成功 / protected int tryAcquireShared(int ignore) { return isSignalled() ? 1 : -1; } protected boolean tryReleaseShared(int ignore) { setState(1); return true; } } private final Sync sync = new Sync(); public boolean isSignalled() { return sync.isSignalled(); } public void signal() { sync.releaseShared(1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } }
分析JDK中的同步类,除了了解AQS外,还要知道每一个同步器中的state的语义是什么,AQS上边已经分析了,下面介绍下几个同步器的state的语义。
ReentrantLock 只支持独占方式的获取操做,它实现了tryAcquire,tryRelease和isHeldExclusively.
ReentrantLock的状态用于存储锁获取的操做次数,同一线程每获取一次加1,每释放一次减小1.
tryAcquire代码简要分析
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // 当前状态值(即锁获取的操做)>0,锁的全部者非当前线程,获取失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT)//若是状态值饱和,获取失败,即超过最大可获取线程数 throw new Error("Maximum lock count exceeded"); //符合获取锁的条件,更新状态值, setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //设置锁的持有者为当前线程 setExclusiveOwnerThread(current); return true; }
CountDownLatch同步状态保存当前的计数值。相似BooleanLatch,不作分析。
Semaphore的同步状态用于存储当前能够许可的数量。
Semaphore中的tryAcquireShared,tryReleaseShared
tryAcquireShared,获取当前可用许可数量,若可用许可数量大于申请数量,经过compareAndSetState设置新的剩余许可数量,不然获取失败。
tryReleaseShared获取当前可用许可数量,若是当前剩余许可数量+释放数量>0,过compareAndSetState设置新的剩余许可数量,不然获取失败。
/** *tryAcquireShared,获取当前可用许可数量,若可用许可数量大于申请数量,经过compareAndSetState设置新的剩余许可数量,不然获取失败。 */ final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } /** * *tryReleaseShared获取当前可用许可数量,若是当前剩余许可数量+释放数量>0,过compareAndSetState设置新的剩余许可数量,不然获取失败。 */ protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
FutueTask的同步器状态值以下:
NEW = 0; //初始状态 COMPLETING = 1; //运行中 NORMAL = 2; //完成 EXCEPTIONAL = 3; //异常 CANCELLED = 4; //已取消 INTERRUPTING = 5; //中断中 INTERRUPTED = 6; //已中断
可能的状态转换
NEW(初始状态) -> COMPLETING(运行中) -> NORMAL (已完成)
NEW(初始状态) -> COMPLETING(运行中) -> EXCEPTIONAL (异常)
NEW (初始状态)-> CANCELLED (已取消)
NEW (初始状态)-> INTERRUPTING (中断中)-> INTERRUPTED (已中断)
Future.get的语义很是相似闭锁,若是发生了某件事件(由FutureTask表示的任务执行完成 或者取消),那么线程能够恢复执行,不然一致阻塞。
AQS是JDK并发的框架,仔细理解有助于理解JDK的同步工具。 对于JDK的部分同步类,进行了简要说明,详细自行查阅源码。 对于JDK同步类的源码建议进行以下步骤: 1.理解同步器的状态值的语义 2.该同步器使用是AQS的什么模式, 是共享,互斥,仍是共享与互斥都有。 3.优先理解同步器的tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared方法,以后查看其它方法。