在并发编程时总会遇到一种这样的场景:等待一系列任务作完后,才能开始作某个任务。当遇到这种场景时,两个类cross our mind:CountDownLatch和CyclicBarrier。下面从使用方法和内部实现原理分别对这两个类作出介绍。java
class MyThread extends Thread{ private CountDownLatch latch; public MyThread(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { System.out.println(Thread.currentThread().getName()); Thread.sleep(100); // 任务完成 state - 1 latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); }finally { latch.countDown(); } } }
在完成每个任务后,latch中的int数字作减一操做。编程
public class CountDownLatchTest { @Test public void main() { // 初始化值为3 CountDownLatch latch = new CountDownLatch(3); // 启动3个任务 for (int i = 3; i > 0; i --) { new MyThread(latch).start(); } try { // 等待三个任务完成 latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("count = " + latch.getCount()); System.out.println("finished"); } }
主线程中启动了三个子线程,而后调用了latch.await()方法。并发
Thread-1 Thread-0 Thread-2 count = 0 finished
从输出结果能够看出,主线程在等待三个子线程完成任务以后才结束的。app
public class NormalTask implements Runnable { CyclicBarrier barrier; NormalTask(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { Thread.sleep(100); barrier.await(); System.out.println(System.currentTimeMillis() + " first step finished"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }
每一个任务完成须要100ms。ide
public class FinalTask implements Runnable { @Override public void run() { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis() + " second step finished"); } }
后完成的任务须要10ms。函数
主线程启动了两个先执行的线程,将后完成的线程做为参数传入CyclicBarrier。oop
@Test public void testInterruptException() throws InterruptedException { // 主线程做为参数传入,主线程须要等待子线程完成 CyclicBarrier barrier = new CyclicBarrier(2,new FinalTask()); new Thread(new NormalTask(barrier)).start(); new Thread(new NormalTask(barrier)).start(); Thread.sleep(300); }
运行结果测试
1543326017854 first step finished 1543326017854 first step finished 1543326017870 second step finished
从运行结构能够看出,先启动的任务几乎同时完成,然后完成的任务结束时间比前两个线程完成时间晚16ms,其中6ms是启动线程所花费的。主线程中sleep 300ms 是为了等待全部的线程都执行完成。也可使用join实现相同的效果。在这里解释一下为何不能像CountDownLatch同样用主线程做为等待线程。我刚开始也是这样作的,发现主线程一下就跑完了,根本不停。查看了源码才发现,CyclicBarrier没有park主线程。具体逻辑相见下文的原理分析。this
两个类均可以实现一个任务等待其余几个任务完成后再执行。线程
两个类都是在初始化时,传入一个整形数字,表示须要等待几个任务完成后才能开始执行等待的任务。可是其底层实现的原理彻底不一样。下面对两个类的实现原理作具体介绍。
CountDownLatch park 的是主线程,是主线程和全部的子线程在竞争同一把锁。可是初始化时,他把锁默认给了子线程(将AQS中的state 置为须要等待的子线程的个数)。
Sync(int count) { setState(count); } public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
而主线程在调用await方法时,先检测state是否为0,若是=0 就不用park了,这时说明子线程都已完成了。若是!= 0。则park。
每一个子线程在执行完任务后,将state使用cas的方式减1,并尝试取唤醒(unpark)主线程。
public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // 经过cas将state减1 若是state = 0 则调用doReleaseShared唤醒AQS队列中的主线程 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
上面完整的介绍了CountDownLatch的工做原理。
为了与CountDwonLatch 对比,也为了方便描述问题,咱们将先执行的任务叫作子线程,将后执行的任务叫作主线程。
CyclicBarrier 在初始化时将int值不但赋值给了state,其内部也留了一个备份,这就是CyclicBarrier能够调用reset从新使用的一个缘由。并且其内部是在可重入锁ReentrantLock和Condition的基础上实现的,在其代码内部几乎看不到CAS代码,看到更多的是重入锁的lock和unlock以及Condition的await和singal。
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
其中的parties就是子线程个数的备份,而barrierAction无关紧要。
在子任务完成后就会调用await方法:
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
其核心逻辑在dowait方法中。dowait的核心逻辑是,先上锁,然后检查异常,若是有线程抛出过异常则当前线程也抛出异常。
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } ....
若是没有线程抛出异常,将count减一,并检查count是否为0 若是不为0 将当前的线程放入Condition的等待队列。若是等于0 则唤醒以前的全部线程。
int index = --count; if (index == 0) { // 若是等于0说明全部的任务都已完成,唤醒全部Condition中的线程。 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); // 放入Condition队列中 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) {
至此,CyclicBarrier的原理页介绍完成了。
经过以上分析能够得出,CountDownLatch 更适合一个任务等待一些任务执行完成后再执行,而CyclicBarrier更适合保证一批任务同时结束。