上一篇已经解释了ReentrantLock,AQS基本框架html
AQS是JUC重要的同步器,全部的锁基于这个同步器实现,先温习下 主要由如下几个重要部分组成java
那么今天就是重点介绍AQS中的ConditionObject 功能和实现。node
在AQS中,Node节点经过nextWaiter指针串起来的就是条件队列,中有ConditionObject对象,实现接口Condition,由具体的lock,如ReentrantLock.newCondition建立和调用。api
Condition主要接口有:缓存
public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通讯方式,Condition均可以实现。多线程
**好比一个对象里面能够有多个Condition,能够注册在不一样的condition,能够有选择性的调度线程,很灵活。而Synchronized只有一个condition(就是对象自己),全部的线程都注册在这个conditon身上,线程调度不灵活。 ** Condition和传统的线程通讯没什么区别,Condition的强大之处在于它能够为多个线程间创建不一样的Condition,下面引入API中的一段代码(原文地址,http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html),ArrayBlockingQueue 提供了相似的功能,实习原理相似。oracle
class BoundedBuffer { final Lock lock = new ReentrantLock();//锁对象 final Condition notFull = lock.newCondition();//写线程条件 final Condition notEmpty = lock.newCondition();//读线程条件 final Object[] items = new Object[100];//缓存队列 int putptr/*写索引*/, takeptr/*读索引*/, count/*队列中存在的数据个数*/; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length)//若是队列满了 notFull.await();//阻塞写线程 items[putptr] = x; if (++putptr == items.length) putptr = 0;//若是写索引写到队列的最后一个位置了,那么置为0 ++count;//个数++ notEmpty.signal();//唤醒读线程 } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0)//若是队列为空 notEmpty.await();//阻塞读线程 Object x = items[takeptr];//取值 if (++takeptr == items.length) takeptr = 0;//若是读索引读到队列的最后一个位置了,那么置为0 --count;//个数-- notFull.signal();//唤醒写线程 return x; } finally { lock.unlock(); } } }
在多线程环境下,缓存区提供了两个方法,put and take,put是存数据的,take是取数据的,items充当缓存队列,运行场景是有多个线程同时往调用put和take;这里的读和写共用的实际上是一把锁lock,实际读写不能同时并行;当写入一个数据后,便通知读线程,写满后调用notFull.await()阻塞写;当读入一个数据后,便通知写线程,读空后调用notEmpty.await()阻塞读,等待写条件通知信号;框架
接下来咱们讲解AQS.ConditionObject 内部源码。 主要分析如下几个方法:less
这里我先定义下两个概念:ui
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //添加到Condition本身维护的一个链表中,经过nextWaiter链接起来,即条件等待队列 int savedState = fullyRelease(node); //释当前节点的锁,并返回释放以前的锁状态,执行await必定是先拿到了当前锁的 int interruptMode = 0; while (!isOnSyncQueue(node)) { //判断该节点是否在锁等待队列中,当释放锁后就不在锁队列中了,等待条件队列的线程应当被继续阻塞,若是在锁等待队列中,则说明有资格获取锁,执行下一步,聪明的你发现是在signal时被再次加入锁等待队列 LockSupport.park(this); //在条件队列中阻塞该线程 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 被唤醒后,从新开始正式竞争锁,一样,若是竞争不到仍是会将本身沉睡,等待唤醒从新开始竞争。 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // 存在下一个节点,从条件队列中取消不是在CONDITION的线程节点 unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
当前waiter线程加入条件等待队列里,从队尾入队,并将节点线程状态设置为CONDITION
/** *当前waiter线程加入条件等待队列里,从队尾入队,并将节点线程状态设置为CONDITION * [@return](https://my.oschina.net/u/556800) 返回新的节点 */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
释当前节点的锁,并返回释放以前的锁状态,执行await必定是先拿到了当前锁的
/** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */
//释当前节点的锁,并返回释放以前的锁状态,执行await必定是先拿到了当前锁的
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
判断这个节点是否在锁等待队列中,显然waitStatus为CONDITION这种结点,只属于条件队列,不在锁等待队列中。
判断这个节点是否在锁等待队列中,显然waitStatus为CONDITION这种结点,只属于条件队列,不在锁等待队列中。 final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); }
从条件队列中的头节点开始,释放第一个节点线程,加入到锁等待队列
//从条件队列中的头节点开始,释放第一个节点线程,加入到锁等待队列 public final void signal() { //当前线程不是拥有锁线程这抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter;//条件队列中第一个节点 if (first != null) doSignal(first); }
doSignal()方法是一个while循环,直到transferForSignal成功为止, 不断地完成这样一个transfer(条件队列--->锁同步等待队列)操做,直到有一个成功为止。
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; //将第一个节点的继任变量设置空,意义是first与next节点断掉,便于gc first.nextWaiter = null; // 将节点从条件队列转移到锁等待队列,若是成功则返回true,while循环终止,若是转移失败,则从first日后找,聪明的你确定就知道,若是是doSignalAll,这从first一直日后执行transferForSignal } while (!transferForSignal(first) && (first = firstWaiter) != null); }
将节点从条件队列转移到锁等待队列,若是成功则返回true transferForSignal作了两件事
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal). */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //假设在条件队列中,将node状态设置0,若是waitStatus不能被更改,则该节点已经被取消 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ //经过enq将节点加入到锁等待队列,从队尾插入,并返回前驱节点。 Node p = enq(node); int ws = p.waitStatus; //若是该前驱节点被取消或者更改waitStatus状态为SIGNAL失败,说明前驱失效,则唤醒在锁同步等待队列当前节点 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
。。。待续