AQS(AbstractQueuedSynchronizer)抽象队列同步器,属于多线程编程的基本工具;JDK对其定义得很详细,并提供了多种经常使用的工具类(重入锁,读写锁,信号量,CyclicBarrier,CountDownLatch),在阅读源码的时候,我是从具体工具类往上读的,这样会比较便于理解AQS的设计。node
下面,我将从五种经常使用类去分析源码,进而学习AQS。编程
论文地址安全
咱们要阅读的重入锁,它首先遵循Lock的规范,而且实现了序列化接口;而Lock的规范,必然定义了如何锁的,如何解锁的,而且规定了newCondition这个方法。而重入锁中,真正使用AQS的是他里面内涵的一个实现类Sync,它继承自AQS,并具备AQS的全部规范。多线程
这个内涵的Sync,在重入锁中实现了两种类型的队列,一个是公平队列,另外一个是非公平队列,这取决于你构造重入锁的时候传入的是哪个,默认是非公平锁;less
咱们在进入lock这个方法的时候,看到它真正调用的是acquire方法,而acquire方法,是AQS的一个标准定义;咱们先进入公平锁的阅读;工具
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
咱们往上找,就找到了AQS里的acquire方法;这个方法写的很是干净,首先申请一个arg数量的权限,若是申请不成功,则进入等待队列;这个tryAcquire方法,是在子类实现的;这里插入一下,AQS里存在一个state字段,它表示可一个许可,而重入锁中它初始化为0;而后咱们找到公平锁的实现方法。oop
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
在重入锁中,咱们要能获取锁,实际上是state是否等于0;性能
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
这个方法里面有几种状况会返回false,(h != t)表示等待队列非空,为空就会返回false;另外一种状况就是,队列非空,当前队列不等于后继结点的队列,会返回true;由于是公平队列,你要申请权限必然是没人排队,即便有人排队,也得是你最前面才能申请;ok,下一个条件就是CAS这个state值,成功就将独占状态设为当前线程;这个else if,就是重入锁重入的关键了,若是当前线程和独占线程是一个,那就将权限再加acquires,固然这个state会超过上限并抛出overflow相似的异常。学习
若是申请不成功,固然要排队了,排队都是双端队列的CRUD;ui
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
CAS初始化head,并头尾指向一个地方;而后注意到外面是否是有一个for,又是自旋CAS的操做;在第二次循环的时候,会将node,整在后面。
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
这个方法,就是获得的刚才入队的Node,而且又来一个自旋;而后看本身是否是头结点,若是不是,则进入等待队列,使用LockSupport来使当期线程休眠;这样就构成了申请锁并排队的过程;
接着咱们去看unlock方法,它指向的是AQS的release接口,与acquire相反,它是将state作减法;
而这个方法,只有是独占线程调用才能够,由于全部lock的非独占线程,全都会被park;
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
因此在这里并不须要CAS,它的安全性在于不是独占线程就会抛出非法异常;若是它释放成功,就会唤醒后继结点;后继结点是head的后一个,这时候后继结点被阻塞在lock代码行能够往下走了。这样就造成了一个线程同步的重入锁(公平)。咱们能够看到AQS的设计很精湛,不少方法,都是重写定制的,它值作了一些规范的定义。
抽象队列同步器,它是基于一个队列作线程排队的设计,那么这个队列的基本元素Node,咱们看一下。
定义了一系列的状态,携带每一次申请的线程thread,等等,很是直观,注释里还给你画出来了。
看完公平锁,咱们了解到它每次申请都要日后排队,可想而知费公平锁,就是不排队?仍是要排队的。
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
咱们垂手可得找到上面的方法,它走的流程多了一个,就是若是许但是能够申请的,则不须要排队,直接申请,若是申请失败,则走入公平锁的流程。这就是说全部来申请锁的线程,都有一次竞争的机会,若是没有竞争上,仍是排队。而release接口并未区别实现,因此每次unpark线程,仍是按照队列顺序。
重入锁就这么简单的读完了。接着,公平锁和非公平锁的性能区别在哪呢?在于重复的park和unpark线程,对于非公平锁,线程被park的概率会小一点,由于它不是必然排队;而公平锁必然是排队的,它们的排队机制是同样的,而非公平锁park线程的概率更小,则性能优于公平锁。
看个图,就不须要读了。一把读锁,一把写锁,有兴趣能够本身研读。
信号量是与重入锁彻底不一样类型的锁,由于他是共享的(搞这么复杂,不就是state初始化大于0)。它的做用用过的都有印象,就是多个线程能够共享这一把锁,而线程大于初始化凭证以后,就会被阻塞。
/** * Fair version */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
我相信能够很简单地找到这个FairSync,在semaphore中。它先判断是否只有你来申请,若是不是就回去操做state,若是申请小于0,直接返回,排队。
/** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
与重入锁不一样的是,入队的时候Node指定为SHARE模式,而且再次尝试获取锁,若是获取的锁是大于等于0的,将会调用setHeadAndPropagate方法传播释放。若是是大于0的,则会调用release接口,下一个唤醒的线程又会重复上述过程,一直唤醒到==0。和重复锁不同的是,它具备传染性。
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
只要调用了这个方法,就会进行上述所说的传染;正经的,这个方法就是激活线程并设置PROPAGATE,表示一直日后传播激活。
共享锁和独占锁的区别在于,解锁线程会不会传染...
这样,咱们已经读了aqs里面的两种锁了。
在看完上面的基本元素以后,搞一个倒计时器是什么鬼,不就是搞个变量而后一直减,而后等于0的时候一古脑儿释放???
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
很容易找到上述代码,咱们调用倒计时器时候,就会调用countDown方法,每次都会给初始化减一。这时候主线程或者等待线程,调用await在等待。直到它减到0,就会作doReleaseShared,这个时候等待队列只有一个,就是父级线程,它就能够往下走了。由于这货减到0以后不会reset,因此不能复用。。。。
我曹?这货可贵一笔,停更。