ASQ:AbstractQueuedSynchronizerhtml
它维护了一个volatile int state(表明共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列,有个内部类Node定义了节点。队列由AQS的volatile成员变量head和tail组成一个双向链表)node
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。设计模式
AQS是抽象类,使用了模板方法设计模式,已经将流程定义好,且实现了对等待队列的维护,所以实现者只须要按需实现AQS预留的四个方法便可。多线程
通常来讲,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种便可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。并发
1.1 acquire(int)app
此方法是独占模式下线程获取共享资源的顶层入口。若是获取到资源,线程直接返回,不然进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。函数
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
1. tryAcquire():尝试获取资源。 2. addWaiter(Node.EXCLUSIVE):获取资源失败,将该线程加入等待队列尾部,标记为独占模式。 3. acquireQueued(Node,int):获取该node指定数量的资源数,会一直等待成功获取才返回,返回值是在获取期间是否中断过
1. tryAcquire()高并发
/** * Attempts to acquire in exclusive mode. This method should query * if the state of the object permits it to be acquired in the * exclusive mode, and if so to acquire it. * * <p>This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. This can be used * to implement method {@link Lock#tryLock()}. * * <p>The default * implementation throws {@link UnsupportedOperationException}. * * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return {@code true} if successful. Upon success, this object has * been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
这是个抽象方法,用于给实现者自定义实现,此方法尝试去获取独占资源。若是获取成功,则直接返回true,不然直接返回false。这也正是tryLock()的语义。
2. addWaiter(Node)oop
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { //新建Node Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //快速尝试一次,使用CAS将node放到队尾,失败调用enq Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //保证将Node放入队尾 enq(node); return node; }
enq源码源码分析
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; //若是尾节点为空,说明队列还未进行初始化 if (t == null) { // Must initialize //CAS设置头结点 if (compareAndSetHead(new Node())) //初始头尾相同,从下一次循环开始尝试加入新Node tail = head; } else { node.prev = t; //CAS将当前节点设置为尾节点 if (compareAndSetTail(t, node)) { //设置成功返回当前节点 t.next = node; return t; } } } }
3. acquireQueued(Node, int)
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { //标志是否成功获取资源 boolean failed = true; try { //是否被中断 boolean interrupted = false; for (;;) { //获取前驱Node final Node p = node.predecessor(); //若是本身是队列中第二个节点,那会进行尝试获取,进入这里判断要么是一次,要么是被前驱节点给unPark唤醒了。 if (p == head && tryAcquire(arg)) { //成功获取资源,设置自身为头节点,将原来的头结点剥离队列 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //判断是否须要被park,若是须要进行park并检测是否被中断 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //若是获取资源失败了将当前node取消, if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire方法
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //若是前驱的状态已是signal,表明前驱释放是会通知唤醒你,那么此node能够安心被park if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ //若是前驱已经被取消,那么从当前node一直往前找,直到有非取消的node,直接排在它的后面,此时不须要park,会出去再尝试一次获取资源。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //前驱节点没有被取消,那么告诉前驱节点释放的时候通知本身 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt()
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { //让该线程进入wait状态 LockSupport.park(this); //返回期间是被中断过 return Thread.interrupted(); }
acquireQueued流程总结
2.找到“有效”(not canceled)的前驱,并通知前驱释放了要“通知”(watiStatus=signal)我,安心被park。
3。被前驱unpark,或interrrupt(),继续流程1。
至此acquire流程完结,
1.2 release(int)
此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,若是完全释放了(即state=0),它会唤醒等待队列里的其余线程来获取资源。这也正是unlock()的语义。
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { //调用实现者的尝试解锁方法,由于已经得到锁,因此基本不会失败 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒下一个节点 unparkSuccessor(h); return true; } return false; }
unparkSuccessor()
/** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0)//设置当前节点的状态容许失败,失败了也不要紧。 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //找到下一个须要被唤醒的节点 Node s = node.next; if (s == null || s.waitStatus > 0) {//若是是空或被取消 s = null; //从尾节点开始寻找,直到找到最前面的有效的节点。由于锁已经释放,因此从尾节点开始找能够避免由于高并发下复杂的队列动态变化带来的逻辑判断, for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
注意实现者实现tryRelease应该是当state为0时才返回
1.3 acquireShared(int)
此方法是共享模式下线程获取共享资源的顶层入口。若是获取到资源,线程直接返回。若是有剩余资源则会唤醒下一个线程,不然进入wait,且整个过程忽略中断的影响。
/** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ public final void acquireShared(int arg) { //尝试获取指定数量资源 if (tryAcquireShared(arg) < 0) //获取资源直到成功 doAcquireShared(arg); }
共享模式下的流程与独占模式极为类似,首先根据tryAcquireShared(arg)尝试是否能获取到资源,能则直接返回,不能则会进入队列按入队顺序依次唤醒尝试获取。
tryAcquireShared(int)
/** * Attempts to acquire in shared mode. This method should query if * the state of the object permits it to be acquired in the shared * mode, and if so to acquire it. * * <p>This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. * * <p>The default implementation throws {@link * UnsupportedOperationException}. * * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return a negative value on failure; zero if acquisition in shared * mode succeeded but no subsequent shared-mode acquire can * succeed; and a positive value if acquisition in shared * mode succeeded and subsequent shared-mode acquires might * also succeed, in which case a subsequent waiting thread * must check availability. (Support for three different * return values enables this method to be used in contexts * where acquires only sometimes act exclusively.) Upon * success, this object has been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if shared mode is not supported */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
这是AQS预留给实现者的方法,用于共享模式下尝试获取指定数量的资源,返回值<0表明获取失败,=0表明获取成功且无剩余资源,>0表明还有剩余资源
doAcquireShared(int)方法用于共享模式获取资源会直到获取成功才返回
/** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { //添加当前线程的Node模式为共享模式至队尾, final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //获取前驱节点 final Node p = node.predecessor(); //若是本身是老二才有尝试的资格 if (p == head) { //尝试获取指定数量资源 int r = tryAcquireShared(arg); if (r >= 0) { //若是成功获取,将当前节点设置为头节点,若是有剩余资源唤醒下一有效节点 setHeadAndPropagate(node, r); p.next = null; // help GC //若是有中断,本身补偿中断 if (interrupted) selfInterrupt(); failed = false; return; } } //判断是否须要被park,和park后检查是否被中弄断 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //若是获取失败,取消当前节点 if (failed) cancelAcquire(node); } }
流程和独占模式几乎如出一辙,可是代码的书写缺有不一样,不知原做者是咋想的。区别于独占不一样的有两点
setHeadAndPropagate()看有剩余资源的时候如何唤醒下一节点
/** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //将当前节点设置为head节点 setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ //若是有剩余资源 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //当下一个有效节点存在且是共享模式时,会唤醒它 if (s == null || s.isShared()) doReleaseShared(); } }
doReleaseShared()唤醒下一共享模式节点
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //若是头结点状态是“通知后继” if (ws == Node.SIGNAL) { //将其状态改成0,表示已通知 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //唤醒后继 unparkSuccessor(h); } //若是已通知后继,则改成可传播,在下次acquire中的shouldParkAfterFailedAcquire会将改成SIGNAL else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //若是头结点变了,再次循环 if (h == head) // loop if head changed break; } }
共享模式acquire与独占模式技术相同,惟一的不一样就是在于若是当前节点获取资源成功且有剩余则会唤醒下一节点,资源能够为多个线程功能分配,而独占模式则就是一个线程独占。
1.4 releaseShared(int)
此方法是共享模式下线程释放共享资源的顶层入口。若是释放资源成功,直接返回。若是有剩余资源则会唤醒下一个线程,且整个过程忽略中断的影响。
/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { //尝试共享模式获取资源 if (tryReleaseShared(arg)) { //唤醒下一节点 doReleaseShared(); return true; } return false; }
参考文章
Java并发之AQS详解