在上一篇博客,简单的说下了AQS的基本概念,核心源码解析,可是还有一部份内容没有涉及到,就是AQS对条件变量的支持,这篇博客将着重介绍这方面的内容。node
咱们先经过模拟一个消费者/生产者模型来看下条件变量的基本应用:less
public class CommonResource { private boolean isHaveData = false; Lock lock = new ReentrantLock(); Condition producer_con = lock.newCondition(); Condition consumer_con = lock.newCondition(); public void product() { lock.lock(); try { while (isHaveData) { try { System.out.println("还有数据,等待消费数据"); producer_con.await(); } catch (InterruptedException e) { } } System.out.println("生产者生产数据了"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } isHaveData = true; consumer_con.signal(); } finally { lock.unlock(); } } public void consume() { lock.lock(); try { while (!isHaveData) { try { System.out.println("没有数据了,等待生产者消费数据"); consumer_con.await(); } catch (InterruptedException e) { } } System.out.println("消费者消费数据"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } isHaveData = false; producer_con.signal(); } finally { lock.unlock(); } } }
public class Main { public static void main(String[] args) { CommonResource resource = new CommonResource(); new Thread(() -> { while (true) { resource.product(); } }).start(); new Thread(() -> { while (true) { resource.consume(); } }).start(); } }
运行结果:
ui
这就是条件变量的应用,第一反应是否是和object中的wait/nofity很像,wait/nofity是配合synchronized工做的,而条件变量的await/signal是配合使用AQS实现的锁
来完成工做的,固然也要看用AQS实现的锁是否支持了条件变量。synchronized只能与一个共享变量进行工做,而AQS实现的锁支持多个条件变量。this
咱们试着分析下上面的代码:线程
首先建立了两个条件变量,一个条件变量用来阻塞/唤醒消费者线程,一个条件变量用来阻塞/唤醒生产者线程。code
生产者,首先获取了独占锁,判断是否有数据:对象
最终释放锁。blog
消费者,首先获取了独占锁,判断是否有数据:队列
最终释放锁。ssl
这里有两点须要特别注意:
为了加深对条件变量的理解,咱们再来看一个例子,两个线程交替打印奇偶数:
public class Test { private int num = 0; private Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void add() { while(num<100) { try { lock.lock(); System.out.println(Thread.currentThread().getName() + ":" + num++); condition.signal(); condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } }
public class Main { public static void main(String[] args) { Test test=new Test(); new Thread(() -> { test.add(); }).start(); new Thread(() -> { test.add(); }).start(); } }
运行结果:
翻阅网上的大多数案例是分两个线程方法交替打印,同时开两个条件变量,其中一个条件变量负责阻塞/唤醒打印奇数的线程,一个变量负责阻塞/唤醒打印偶数的线程,可是我的以为没什么必要,两个线程共用一个线程方法,共用一个条件变量也能够。不知道各位看官是什么想的?
当咱们点开lock.newCondition,发现它有好几个实现类:
咱们选择ReentrantLock的实现类,实际上其余实现类也是相同的,只是为了和上面案例中的对应起来,因此先选择ReentrantLock的实现类:
public Condition newCondition() { return sync.newCondition(); }
继续往下点:
final ConditionObject newCondition() { return new ConditionObject(); }
能够看到,当咱们调用lock.newnewCondition,最终会new出一个ConditionObject对象,而ConditionObject类是AbstractQueuedSynchronizer的内部类,咱们先看下ConditionObject的UML图:
其中firstWaiter保存的是该条件变量下条件队列的首节点,lastWaiter保存的是该条件变量下条件队列的尾节点。这里只保存了条件队列的首节点和尾节点,中间的节点保存在哪里呢? 让咱们点开await方法:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
在这里,咱们就搞清楚三个问题便可:
第一个问题在addConditionWaiter方法能够获得答案:
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
首先是判断条件队列中的尾节点是否被取消了,若是被取消了,执行unlinkCancelledWaiters方法。咱们这里确定没有被取消,事实上,若是是第一次调用await方法,lastWaiter是为空的,因此确定不会进入第一个if。随后,新建一个Node,这个Node类就是上一篇博客中大量介绍过的,也是AbstractQueuedSynchronizer的内部类,也就是新建了一个Node节点,其中保存了当前线程和Node的类型,这里Node的类型是CONDITION,若是t==null,则说明新建的Node是第一个节点,因此赋值给firstWaiter ,不然将尾节点的nextWaiter设置为新Node,造成一个单向链表,这个nextWaiter在哪里呢,它是经过node点出来的,也就是它也属于node类的一个字段:
这说明了一个比较重要的问题:
AQS的阻塞队列是以双向的链表的形式保存的,是经过prev和next创建起关系的,可是AQS中的条件队列是以单向链表的形式保存的,是经过nextWaiter创建起关系的,也就是AQS的阻塞队列和AQS中的条件队列并不是同一个队列。
其实,AQS中的条件队列也是一个阻塞队列,只是为了方便,因此在本篇博客中出现的AQS中的条件队列特指在被条件变量await的,而阻塞队列特指FIFO双向链表队列。
第一个问题解决了,咱们再来看第二个问题,第二个问题答案在await的第二个方法:
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
首先调用getState方法,这个state是什么,不知你们是否还有印象,对于ReentrantLock来讲,state就是重入次数,随后调用release方法,传入state。也就是无论重入了多少次,这里是一次性把锁彻底释放掉。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
能够看到释放锁仍是调用了tryRelease方法,这个方法正是须要被重写的。
当完成了前两个方法的调用后,就会进行一个判断isOnSyncQueue,通常来讲会进入这个if,park这个线程,等待唤醒,这就解决了第三个问题。
下面咱们再来看看signal方法,一样的,咱们须要解决几个问题:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
重点在于doSignal中的transferForSignal方法:
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true;
在这个方法中,咱们能够会调用enq方法,把条件队列的线程放入阻塞队列中,而后调用unpark方法,唤醒线程。
本篇博客到这里也结束了。
通过上下两篇博客,相信你们对AQS必定有了一个比较浅显的理解。聪明的你,能够看出来,其实这两篇博客有不少内容都没有讲透,甚至有点模棱两可,只是“走马观花”,因此这也符合了个人标题:难以理解的AQS。的确,AQS要深刻研究的话,不比线程池简单多少。看,我又再给本身找理由了。但愿通过从此的沉淀,我能够把这两篇博客重写下,而后换个标题“完全理解AQS”,嘿嘿。