CountDownLatch
众所周知,它能解决一个任务必须在其余任务完成的状况下才能执行的问题,代码层面来讲就是只有计数countDown到0的时候,await处的代码才能继续向下运行,例如:java
import java.util.*; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws Exception { CountDownLatch latch = new CountDownLatch(3); ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 15, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5)); Future<Integer>[] futures = new Future[3]; for (int i = 0; i < 3; i++){ futures[i] = executor.submit(() -> { Random rand = new Random(); int n = rand.nextInt(100); int result = 0; for (int j = 0; j < n; j++){ result += j; } System.out.println(result + "|" + Thread.currentThread().getName()); latch.countDown(); return result; }); } latch.await(); System.out.println("合计每一个任务的结果:" + (futures[0].get()+futures[1].get()+futures[2].get())); } }
运行结果:node
源码
实际上内部十分简单,里面只有一个AQS的子类dom
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; // 它把AQS的state(同步状态)做为计数器,在AQS里,state是个volatile标记的int变量 Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { // 同步状态为0,则返回1,不然返回-1 return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); // 若是状态为0则返回false if (c == 0) return false; // 计数器减1 int nextc = c-1; // CAS操做,若是内存中的同步状态值等于指望值c,那么将同步状态设置为给定的更新值nextc if (compareAndSetState(c, nextc)) return nextc == 0; // 当计数器减到0,返回true } } } public void countDown() { sync.releaseShared(1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
下面看具体作了什么事情工具
先来看awaitoop
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 当计数器不等于0,返回-1,证实还有任务未执行完,进入下面方法等待 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 把当前线程包装成Node放入等待队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 获取当前线程的前驱节点,以检查等待状态 final Node p = node.predecessor(); if (p == head) { // 若是计数器等于0,返回1,证实此时阻塞能够解除了 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
上面的过程能够总结为:当进入await方法后,若是此时计数器不为0,则进入死循环一直检查计数器的值,直到为0退出,此时中止等待。测试
再来看countDownui
public final boolean releaseShared(int arg) { // 尝试计数器减1,只有减到0才会返回true if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 等待状态为SIGNAL if (ws == Node.SIGNAL) { // 把当前节点的等待状态从SIGNAL设置成0,若是设置失败则继续循环。 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 成功的话则卸载当前节点的全部后继 unparkSuccessor(h); } // 若是等待状态为0,则尝试将状态设置为PROPAGATE,若是设置失败则继续循环。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
countDown的过程能够总结为:尝试将计数器-1,直到为0,为0的时候通知等待线程。this
CycleBarrier
栏栅的做用就是让指定的一批任务可以同时开始执行,好比spa
import java.util.*; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws Exception { CyclicBarrier cyclicBarrier = new CyclicBarrier(3); ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 15, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5)); Future<Integer>[] futures = new Future[3]; for (int i = 0; i < 3; i++){ futures[i] = executor.submit(() -> { System.out.println("await|" + Thread.currentThread().getName()); cyclicBarrier.await(); Random rand = new Random(); int n = rand.nextInt(100); int result = 0; for (int j = 0; j < n; j++){ result += j; } System.out.println(result + "|" + Thread.currentThread().getName()); return result; }); } } }
运行结果线程
源码
进来以后首先发现的是成员变量
/** 用来保护栅栏入口的锁 */ private final ReentrantLock lock = new ReentrantLock(); /** 等待条件,直到计数器为0 */ private final Condition trip = lock.newCondition(); /** 参与线程的个数 */ private final int parties; /* 计数器为0时要运行的命令,由用户定义 */ private final Runnable barrierCommand; /** 当前等待的一代 */ private Generation generation = new Generation(); /** * parties数量的等待线程。每一代等待的数量从parties到0。当调用nextGeneration或者breakBarrier方法时重置。 */ private int count;
从这里能够看出,除了内部实现用的ReentrantLock,其工做过程无非:计数器不为0的时候线程等待;当等待线程所有就绪,也就是计数器减为0的时候重置计数器并通知全部线程继续运行。
致使计数器重置缘由有两个:一个就是发生异常,将当前这一代标记为无效(broken=true);另外一个就是正常就绪,开启下一代(new Generation)
核心方法dowait
// 状况一:timed=false,nanos=0L,表明一直阻塞 // 状况二:timed=true,nanos!=0L,表明在超时时间内阻塞 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { // 获取当前这一代 final Generation g = generation; // 若是当前这一代已经销毁,抛异常 if (g.broken) throw new BrokenBarrierException(); // 测试当前线程是否被中断 if (Thread.interrupted()) { // 将broken设置为true,表明这一代已经销毁,重置count;而后通知全部等待线程 breakBarrier(); throw new InterruptedException(); } // count 减1 int index = --count; // 若是减1以后变成0,证实等待线程所有就绪。 if (index == 0) { // tripped boolean ranAction = false; try { // 若是用户定义了额外的命令,则执行 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 开启下一代(通知全部等待线程,重置count,new一个新的Generation) nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out // 若是减1以后不等于0,也就是还有其它线程没有就绪,那么进入此循环,直到就绪或者被销毁,或者被中断和超时 for (;;) { try { if (!timed) // 未定义超时,则一直阻塞 trip.await(); else if (nanos > 0L) // 等待指定的超时时间 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; // 超时,则销毁这一代,通知全部等待线程并重置count if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
总结
两个工具实现思路都很简单,惟一我思考的是,为何CountDownLatch只能用一次?
CycleBarrier很明显,它不管正常执行或者发生异常中断都有重置count的逻辑。
而CountDownLatch则没有重置的逻辑,那么,究竟是CountDownLatch不能重置仍是仅仅由于没有重置的逻辑。为此我把CountDownLatch的代码照搬,而后加上了简单的重置方法,以下:
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class MyCountDown { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } /** * 新加 * @param count */ void reset(int count){ // 从新设置状态 setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; private final int count; public MyCountDown(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); this.count = count; } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } /** * 新加 */ public void reset(){ // 调用重置的方法 this.sync.reset(count); } }
测试:
import java.util.*; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws Exception { MyCountDown myCountDown = new MyCountDown(3); ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 15, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5)); Future<Integer>[] futures = new Future[3]; for (int i = 0; i < 3; i++){ futures[i] = executor.submit(() -> { Random rand = new Random(); int n = rand.nextInt(100); int result = 0; for (int j = 0; j < n; j++){ result += j; } System.out.println(result + "|" + Thread.currentThread().getName()); Thread.sleep(new Random().nextInt(2000)); // 模拟耗时 myCountDown.countDown(); return result; }); } myCountDown.await(); System.out.println("第一次:" + (futures[0].get() + futures[1].get() + futures[2].get())); myCountDown.reset(); // 重置 for (int i = 0; i < 3; i++){ futures[i] = executor.submit(() -> { Random rand = new Random(); int n = rand.nextInt(100); int result = 0; for (int j = 0; j < n; j++){ result += j; } System.out.println(result + "|" + Thread.currentThread().getName()); Thread.sleep(new Random().nextInt(2000)); // 模拟耗时 myCountDown.countDown(); return result; }); } myCountDown.await(); System.out.println("若是重置无效,则这个信息会先于任务信息输出"); System.out.println("第二次:" + (futures[0].get() + futures[1].get() + futures[2].get())); } }
输出
若是换成CountDownLatch
import java.util.*; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws Exception { CountDownLatch latch = new CountDownLatch(3); ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 15, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5)); Future<Integer>[] futures = new Future[3]; for (int i = 0; i < 3; i++){ futures[i] = executor.submit(() -> { Random rand = new Random(); int n = rand.nextInt(100); int result = 0; for (int j = 0; j < n; j++){ result += j; } System.out.println(result + "|" + Thread.currentThread().getName()); Thread.sleep(new Random().nextInt(2000)); // 模拟耗时 latch.countDown(); return result; }); } latch.await(); System.out.println("第一次:" + (futures[0].get() + futures[1].get() + futures[2].get())); for (int i = 0; i < 3; i++){ futures[i] = executor.submit(() -> { Random rand = new Random(); int n = rand.nextInt(100); int result = 0; for (int j = 0; j < n; j++){ result += j; } System.out.println(result + "|" + Thread.currentThread().getName()); Thread.sleep(new Random().nextInt(2000)); // 模拟耗时 latch.countDown(); return result; }); } latch.await(); System.out.println("若是重置无效,则这个信息会先于任务信息输出"); System.out.println("第二次:" + (futures[0].get() + futures[1].get() + futures[2].get())); } }
输出
因此能够得出结论,CountDownLatch不是没有办法重置,只不过没有写相关逻辑。固然这个问题若是我说错了,望指正。