上一篇咱们主要经过ExecutorCompletionService与FutureTask类的源码,对Future模型体系的原理作了了解,本篇开始解读concurrent包中的工具类的源码。首先来看两个很是实用的工具类CyclicBarrier与CountDownLatch是如何实现的。安全
CyclicBarrier直译过来是“循环屏障”,做用是可使固定数量的线程都达到某个屏障点(调用await方发处)后,才继续向下执行。关于用法和实例本文就不作过多说明,如今直接进入CyclicBarrier的源码。app
首先,来看下CyclicBarrier的几个标志性的成员变量:框架
1 private static class Generation { 2 boolean broken = false; 3 } 4 /** The number of parties */ 5 private final int parties; 6 /* The command to run when tripped */ 7 private final Runnable barrierCommand; 8 /** The current generation */ 9 private Generation generation = new Generation(); 10 11 /** 12 * Number of parties still waiting. Counts down from parties to 0 13 * on each generation. It is reset to parties on each new 14 * generation or when broken. 15 */ 16 private int count;
这几个成员变量有如下说明:工具
了解上述成员变量的说明后,基本上就能够知道了CyclicBarrier的实现原理,下面咱们来看看代码是如何写的。其实实现很简单,咱们只需经过await()方法就能够说明:oop
1 public int await() throws InterruptedException, BrokenBarrierException { 2 try { 3 return dowait(false, 0L); 4 } catch (TimeoutException toe) { 5 throw new Error(toe); // cannot happen; 6 } 7 }
await()方法调用了真是的执行方法dowait(),这个方法里涵盖了全部乾坤:ui
1 /** 2 * Main barrier code, covering the various policies. 3 */ 4 private int dowait(boolean timed, long nanos) 5 throws InterruptedException, BrokenBarrierException, 6 TimeoutException { 7 final ReentrantLock lock = this.lock; 8 lock.lock(); 9 try { 10 final Generation g = generation; 11 12 if (g.broken) 13 throw new BrokenBarrierException(); 14 15 if (Thread.interrupted()) { 16 breakBarrier(); 17 throw new InterruptedException(); 18 } 19 20 int index = --count; 21 if (index == 0) { // tripped 22 boolean ranAction = false; 23 try { 24 final Runnable command = barrierCommand; 25 if (command != null) 26 command.run(); 27 ranAction = true; 28 nextGeneration(); 29 return 0; 30 } finally { 31 if (!ranAction) 32 breakBarrier(); 33 } 34 } 35 36 // loop until tripped, broken, interrupted, or timed out 37 for (;;) { 38 try { 39 if (!timed) 40 trip.await(); 41 else if (nanos > 0L) 42 nanos = trip.awaitNanos(nanos); 43 } catch (InterruptedException ie) { 44 if (g == generation && ! g.broken) { 45 breakBarrier(); 46 throw ie; 47 } else { 48 // We're about to finish waiting even if we had not 49 // been interrupted, so this interrupt is deemed to 50 // "belong" to subsequent execution. 51 Thread.currentThread().interrupt(); 52 } 53 } 54 55 if (g.broken) 56 throw new BrokenBarrierException(); 57 58 if (g != generation) 59 return index; 60 61 if (timed && nanos <= 0L) { 62 breakBarrier(); 63 throw new TimeoutException(); 64 } 65 } 66 } finally { 67 lock.unlock(); 68 } 69 }
代码第20行对应“说明2”。this
代码第21行对应“说明3”。spa
代码第26行对应“说明4”。线程
代码第28行对应“说明5”,nextGeneration()方法中使用generation = new Generation();表示屏障已经换代,并唤醒全部线程。nextGeneration()请自行查看源码。code
代码第16行、第45行等全部调用breakBarrier()方法处,对应“说明6”,表示屏障被破坏,breakBarrier()方法中将generation.broken = true,唤醒全部线程,抛出异常。
最后,代码第40行处trip.await(),表示持有trip的线程进入等待被唤醒状态。
另外,实现中还有一个很重要的点,就是第8行的lock和第67行的unlock,保证同步状态下执行这段逻辑,也就保证了count与generation.broken的线程安全。
以上就是CyclicBarrier(循环使用的屏障)的源码实现,是否是比较简单。
CountDownLatch直译过来是“倒计数锁”。在线程的countDown()动做将计数减至0时,全部的await()处的线程将能够继续向下执行。CountDownLatch的功能与CyclicBarrier有一点点像,但实现方式却很不一样,下面直接来观察CountDownLatch的两个最重要的方法:
1 public void await() throws InterruptedException { 2 sync.acquireSharedInterruptibly(1); 3 } 4 5 public void countDown() { 6 sync.releaseShared(1); 7 }
能够看到,这两个方法实际是由静态内部类Sync来实现的。这个Sync咱们在上一篇FutureTask的实现中也见过,那咱们就先简单介绍下Sync到底是用来作什么的:
Sync extends AbstractQueuedSynchronizer
这个抽象类AbstractQueuedSynchronizer是一个框架,这个框架使用了“共享”与“独占”两张方式经过一个int值来表示状态的同步器。类中含有一个先进先出的队列用来存储等待的线程。
这个类定义了对int值的原子操做的方法,并强制子类定义int的那种状态是获取,哪一种状态是释放。子类能够选择“共享”和“独占”的一种或两种来实现。
共享方式的实现方式是死循环尝试获取对象状态,相似自旋锁。
独占方式的实现方式是经过实现Condition功能的内部的类,保证独占锁。
而咱们正在解读的CountDownLatch中的内部类Sync是使用的共享方式,对于AbstractQueuedSynchronizer的解读本篇不打算详细说明,由于笔者对“独占”方式还没完全弄通,若是之后有机会再来补充。
接下来就直接观察CountDownLatch.Sync的源码:
1 /** 2 * Synchronization control For CountDownLatch. 3 * Uses AQS state to represent count. 4 */ 5 private static final class Sync extends AbstractQueuedSynchronizer { 6 private static final long serialVersionUID = 4982264981922014374L; 7 8 Sync(int count) { 9 setState(count); 10 } 11 12 int getCount() { 13 return getState(); 14 } 15 16 public int tryAcquireShared(int acquires) { 17 return getState() == 0? 1 : -1; 18 } 19 20 public boolean tryReleaseShared(int releases) { 21 // Decrement count; signal when transition to zero 22 for (;;) { 23 int c = getState(); 24 if (c == 0) 25 return false; 26 int nextc = c-1; 27 if (compareAndSetState(c, nextc)) 28 return nextc == 0; 29 } 30 } 31 }
结合最初列出的await()和countDown()方法,
经过上述代码第9行能够看到,CountDownLatch将构造时传入的用来倒计数的count做为状态值。
经过上述代码第17行能够看到,CountDownLatch定义了当count=0时表示能够共享获取状态(在await()方法中调用的sync.acquireSharedInterruptibly(1)会死循环尝试获取状态)。
经过上述代码第26行能够看到,CountDownLatch定义了当count-1表示一次共享释放状态(在countDown()方法中调用的sync.releaseShared(1)会涉及)。
以上就是CountDownLatch的源码实现。
CyclicBarrier与CountDownLatch有一点类似之处,可是有很大区别。它们的异同我我的总结以下:
相似功能
不一样之处