日拱一兵 | 原创html
- 你有一个思想,我有一个思想,咱们交换后,一我的就有两个思想
- If you can NOT explain it simply, you do NOT understand it well enough
并发编程的三大核心是分工
,同步
和互斥
。在平常开发中,常常会碰到须要在主线程中开启多个子线程去并行的执行任务,而且主线程须要等待全部子线程执行完毕再进行汇总的场景,这就涉及到分工与同步的内容了java
在讲 有序性可见性,Happens-before来搞定 时,提到过 join() 规则,使用 join() 就能够简单的实现上述场景:node
@Slf4j public class JoinExample { public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.info("Thread-1 执行完毕"); } }, "Thread-1"); Thread thread2 = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.info("Thread-2 执行完毕"); } }, "Thread-2"); thread1.start(); thread2.start(); thread1.join(); thread2.join(); log.info("主线程执行完毕"); } }
运行结果:编程
整个过程能够这么理解api
咱们来查看 join() 的实现源码:多线程
其实现原理是不停的检查 join 线程是否存活,若是 join 线程存活,则 wait(0) 永远的等下去,直至 join 线程终止后,线程的 this.notifyAll() 方法会被调用(该方法是在 JVM 中实现的,JDK 中并不会看到源码),退出循环恢复主线程执行。很显然这种循环检查的方式比较低效并发
除此以外,使用 join() 缺乏不少灵活性,好比实际项目中不多让本身单首创建线程(缘由在 我会手动建立线程,为何要使用线程池? 中说过)而是使用 Executor, 这进一步减小了 join() 的使用场景,因此 join() 的使用在多数是停留在 demo 演示上oracle
那如何实现文中开头提到的场景呢?
CountDownLatch, 直译过来【数量向下门闩】,那确定里面有计数器的存在了。咱们将上述程序用 CountDownLatch 实现一下,先让你们有个直观印象app
@Slf4j public class CountDownLatchExample { private static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { // 这里不推荐这样建立线程池,最好经过 ThreadPoolExecutor 手动建立线程池 ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.info("Thread-1 执行完毕"); //计数器减1 countDownLatch.countDown(); } }); executorService.submit(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.info("Thread-2 执行完毕"); //计数器减1 countDownLatch.countDown(); } }); log.info("主线程等待子线程执行完毕"); log.info("计数器值为:" + countDownLatch.getCount()); countDownLatch.await(); log.info("计数器值为:" + countDownLatch.getCount()); log.info("主线程执行完毕"); executorService.shutdown(); } }
运行结果以下:less
结合上述示例的运行结果,相信你也能猜出 CountDownLatch 的实现原理了:
countDownLatch.countDown()
方法将计数器数值减1
不知道你是否注意,
countDownLatch.countDown();
这行代码能够写在子线程执行的任意位置,不像 join() 要彻底等待子线程执行完,这也是 CountDownLatch 灵活性的一种体现
上述的例子仍是过于简单,Oracle 官网 CountDownLatch 说明 有两个很是经典的使用场景,示例很简单,强烈建议查看相关示例代码,打开使用思路。我将两个示例代码以图片的形式展现在此处:
startSignal
,阻止任何工人 Worker
继续工做,直到司机 Driver
准备好让他们继续工做doneSignal
,容许司机 Driver
等待,直到全部的工人 Worker
完成。另外一种典型的用法是将一个问题分红 N 个部分 (好比将一个大的 list 拆分红多分,每一个 Worker 干一部分),Worker 执行完本身所处理的部分后,计数器减1,当全部子部分完成后,Driver 才继续向下执行
结合官网示例,相信你已经能够结合你本身的业务场景解,经过 CountDownLatch 解决一些串行瓶颈来提升运行效率了,会用还远远不够,咱得知道 CountDownLatch 的实现原理
CountDownLatch 是 AQS 实现中的最后一个内容,有了前序文章的知识铺垫:
当你看到 CountDownLatch 的源码内容,你会高兴的笑起来,内容真是太少了
展开类结构所有内容就这点东西
既然 CountDownLatch 是基于 AQS 实现的,那确定也离不开对同步状态变量 state 的操做,咱们在初始化的时候就将计数器的值赋值给了state
另外,它能够多个线程同时获取,那必定是基于共享式获取同步变量的用法了,因此它须要经过重写下面两个方法控制同步状态变量 state :
CountDownLatch 暴露给使用者的只有 await()
和 countDown()
两个方法,前者是阻塞本身,由于只有获取同步状态才会才会出现阻塞的状况,那天然是在 await()
的方法内部会用到 tryAcquireShared()
;有获取就要有释放,那后者 countDown()
方法内部天然是要用到 tryReleaseShared()
方法了
PS:若是你对上面这个很天然的推断理解有困难,强烈建议你看一下前序文章的铺垫,以防止知识断层带来的困扰
先来看 await() 方法, 从方法签名上看,该方法会抛出 InterruptedException, 因此它是能够响应中断的,这个咱们在 Java多线程中断机制 中明确说明过
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
其内部调用了同步器提供的模版方法 acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 若是监测到中断标识为true,会重置标识,而后抛出 InterruptedException if (Thread.interrupted()) throw new InterruptedException(); // 调用重写的 tryAcquireShared 方法,该方法结果若是大于零则直接返回,程序继续向下执行,若是小于零,则会阻塞本身 if (tryAcquireShared(arg) < 0) // state不等于0,则尝试阻塞本身 doAcquireSharedInterruptibly(arg); }
重写的 tryAcquireShared
方法很是简单, 就是判断同步状态变量 state 的值是否为 0, 若是为零 (子线程已经所有执行完毕)则返回1, 不然返回 -1
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
若是子线程没有所有执行完毕,则会经过 doAcquireSharedInterruptibly
方法阻塞本身,这个方法在 Java AQS共享式获取同步状态及Semaphore的应用分析 中已经仔细分析过了,这里就再也不赘述了
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 再次尝试获取同步装阿嚏,若是大于0,说明子线程所有执行完毕,直接返回 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 阻塞本身 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
await()
方法的实现就是这么简单,接下来看看 countDown()
的实现原理
public void countDown() { sync.releaseShared(1); }
一样是调用同步器提供的模版方法 releaseShared
public final boolean releaseShared(int arg) { // 调用本身重写的同步器方法 if (tryReleaseShared(arg)) { // 唤醒调用 await() 被阻塞的线程 doReleaseShared(); return true; } return false; }
重写的 tryReleaseShared
一样很简单
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); // 若是当前状态值为0,则直接返回 (1) if (c == 0) return false; // 使用 CAS 让计数器的值减1 (2) int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
代码 (1) 判断当前同步状态值,若是为0 则直接返回 false;不然执行代码 (2),使用 CAS 将计数器减1,若是 CAS 失败,则循环重试,最终返回 nextc == 0
的结果值,若是该值返回 true,说明最后一个线程已调用 countDown() 方法,而后就要唤醒调用 await() 方法被阻塞的线程,一样因为分析过 AQS 的模版方法 doReleaseShared 整个释放同步状态以及唤醒的过程,因此这里一样再也不赘述了
仔细看CountDownLatch重写的 tryReleaseShared
方法,有一点须要和你们说明:
代码 (1)
if (c == 0)
看似没什么用处,其实用处大大滴,若是没有这个判断,当计数器值已经为零了,其余线程再调用 countDown 方法会将计数器值变为负值
如今就差 await(long timeout, TimeUnit unit)
方法没介绍了
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
该方法签名一样抛出 InterruptedException,意思可响应中断。它其实就是 await() 更完善的一个版本,简单来讲就是
主线程设定等待超时时间,若是该时间内子线程没有执行完毕,主线程也会 直接返回
咱们将上面的例子稍稍修改一下你就会明白(主线程超时时间设置为 2 秒,而子线程要 sleep 5 秒)
@Slf4j public class CountDownLatchTimeoutExample { private static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { // 这里不推荐这样建立线程池,最好经过 ThreadPoolExecutor 手动建立线程池 ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.info("Thread-1 执行完毕"); //计数器减1 countDownLatch.countDown(); } }); executorService.submit(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.info("Thread-2 执行完毕"); //计数器减1 countDownLatch.countDown(); } }); log.info("主线程等待子线程执行完毕"); log.info("计数器值为:" + countDownLatch.getCount()); countDownLatch.await(2, TimeUnit.SECONDS); log.info("计数器值为:" + countDownLatch.getCount()); log.info("主线程执行完毕"); executorService.shutdown(); } }
运行结果以下:
形象化的展现上述示例的运行过程
CountDownLatch 的实现原理就是这么简单,了解了整个实现过程后,你也许发现了使用 CountDownLatch 的一个问题:
计数器减 1 操做是 一次性的,也就是说当计数器减到 0, 再有线程调用 await() 方法,该线程会直接经过, 不会再起到等待其余线程执行结果起到同步的做用了
为了解决这个问题,贴心的 Doug Lea 大师早已给咱们准备好相应策略 CyclicBarrier
原本想将CyclicBarrier
的内容放到下一个章节,可是 CountDownLatch 的内容着实有些少,不够解渴,另外有对比才有伤害,因此内容没结束,咱得继续看CyclicBarrier
)
上面简单说了一下 CyclicBarrier 被创造出来的理由,这里先看一下它的字面解释:
概念老是有些抽象,咱们将上面的例子用 CyclicBarrier 再作个改动,先让你们有个直观的使用概念
@Slf4j public class CyclicBarrierExample { // 建立 CyclicBarrier 实例,计数器的值设置为2 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); int breakCount = 0; // 将线程提交到线程池 executorService.submit(() -> { try { log.info(Thread.currentThread() + "第一回合"); Thread.sleep(1000); cyclicBarrier.await(); log.info(Thread.currentThread() + "第二回合"); Thread.sleep(2000); cyclicBarrier.await(); log.info(Thread.currentThread() + "第三回合"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executorService.submit(() -> { try { log.info(Thread.currentThread() + "第一回合"); Thread.sleep(2000); cyclicBarrier.await(); log.info(Thread.currentThread() + "第二回合"); Thread.sleep(1000); cyclicBarrier.await(); log.info(Thread.currentThread() + "第三回合"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executorService.shutdown(); } }
运行结果:
结合程序代码与运行结果,咱们能够看出,子线程执行完第一回合后(执行回合所需时间不一样),都会调用 await() 方法,等全部线程都到达屏障点后,会突破屏障继而执行第二回合,一样的道理最终到达第三回合
形象化的展现上述示例的运行过程
看到这里,你应该明白 CyclicBarrier 的基本用法,但随之你心里也应该有了一些疑问:
带着这些问题咱们来看一看源码
一样先打开 CyclicBarrier 的类结构,展开类所有内容,其实也没多少内容
从类结构中看到有:
咱们继续带着这些猜想,结合上面的实例代码一点点来验证
// 建立 CyclicBarrier 实例,计数器的值设置为2 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
查看构造函数 (这里的英文注释舍不得删掉,由于说的太清楚了,我来结合注释来讲明一下):
private final int parties; private int count; public CyclicBarrier(int parties) { this(parties, null); } /** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @param barrierAction the command to execute when the barrier is * tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
根据注释说明,parties 表明冲破屏障以前要触发的线程总数,count 自己又是计数器,那问题来了
直接就用 count 不就能够了嘛?为啥一样用于初始化计数器,要维护两个变量呢?
从 parties 和 count 的变量声明中,你也能看出一些门道,前者有 final 修饰,初始化后就不能够改变了,由于 CyclicBarrier 的设计目的是能够循环利用的,因此始终用 parties 来记录线程总数,当 count 计数器变为 0 后,若是没有 parties 的值赋给它,怎么进行从新复用再次计数呢,因此这里维护两个变量颇有必要
接下来就看看 await() 究竟是怎么实现的
// 从方法签名上能够看出,该方法一样能够被中断,另外还有一个 BrokenBarrierException 异常,咱们一会看 public int await() throws InterruptedException, BrokenBarrierException { try { // 调用内部 dowait 方法, 第一个参数为 false,表示不设置超时时间,第二个参数也就没了意义 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
接下来看看 dowait(false, 0L)
作了哪些事情 (这个方法内容有点多,别担忧,逻辑并不复杂,请看关键代码注释)
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 还记得以前说过的 Lock 标准范式吗? JDK 内部都是这么使用的,你必定也要遵循范式 lock.lock(); try { final Generation g = generation; // broken 是静态内部类 Generation惟一的一个成员变量,用于记录当前屏障是否被打破,若是打破,则抛出 BrokenBarrierException 异常 // 这里感受挺困惑的,咱们要【冲破】屏障,这里【打破】屏障却抛出异常,注意我这里的用词 if (g.broken) throw new BrokenBarrierException(); // 若是线程被中断,则会经过 breakBarrier 方法将 broken 设置为true,也就是说,若是有线程收到中断通知,直接就打破屏障,中止 CyclicBarrier, 并唤醒全部线程 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // ************************************ // 由于 breakBarrier 方法在这里会被调用屡次,为了便于你们理解,我直接将 breakBarrier 代码插入到这里 private void breakBarrier() { // 将打破屏障标识 设置为 true generation.broken = true; // 重置计数器 count = parties; // 唤醒全部等待的线程 trip.signalAll(); } // ************************************ // 每当一个线程调用 await 方法,计数器 count 就会减1 int index = --count; // 当 count 值减到 0 时,说明这是最后一个调用 await() 的子线程,则会突破屏障 if (index == 0) { // tripped boolean ranAction = false; try { // 获取构造函数中的 barrierCommand,若是有值,则运行该方法 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 激活其余因调用 await 方法而被阻塞的线程,并重置 CyclicBarrier nextGeneration(); // ************************************ // 为了便于你们理解,我直接将 nextGeneration 实现插入到这里 private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } // ************************************ return 0; } finally { if (!ranAction) breakBarrier(); } } // index 不等于0, 说明当前不是最后一个线程调用 await 方法 // loop until tripped, broken, interrupted, or timed out for (;;) { try { // 没有设置超时时间 if (!timed) // 进入条件等待 trip.await(); else if (nanos > 0L) // 不然,判断超时时间,这个咱们在 AQS 中有说明过,包括为何最后超时阈值 spinForTimeoutThreshold 再也不比较的缘由,你们会看就好 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 条件等待被中断,则判断是否有其余线程已经使屏障破坏。若没有则进行屏障破坏处理,并抛出异常;不然再次中断当前线程 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); // 若是新一轮回环结束,会经过 nextGeneration 方法新建 generation 对象 if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
doWait 就是 CyclicBarrier 的核心逻辑, 能够看出,该方法入口使用了 ReentrantLock,这也就是为何 Generation broken 变量没有被声明为 volatile 类型保持可见性,由于对其的更改都是在锁的内部,一样在锁的内部对计数器 count 作更新,也保证了原子性
doWait 方法中,是经过 nextGeneration 方法来从新初始化/重置 CyclicBarrier 状态的,该类中还有一个 reset() 方法,也是重置 CyclicBarrier 状态的
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
但 reset() 方法并无在 CyclicBarrier 内部被调用,显然是给 CyclicBarrier 使用者来调用的,那问题来了
何时调用 reset() 方法呢
正常状况下,CyclicBarrier 是会被自动重置状态的,从 reset 的方法实现中能够看出调用了 breakBarrier
方法,也就是说,调用 reset 会使当前处在等待中的线程最终抛出 BrokenBarrierException 并当即被唤醒,因此说 reset() 只会在你想打破屏障时才会使用
上述示例,咱们构建 CyclicBarrier 对象时,并无传递 barrierCommand 对象, 咱们修改示例传入一个 barrierCommand 对象,看看会有什么结果:
// 建立 CyclicBarrier 实例,计数器的值设置为2 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> { log.info("所有运行结束"); });
运行结果:
从运行结果中来看,每次冲破屏障后都会执行 CyclicBarrier 初始化 barrierCommand 的方法, 这与咱们对 doWait() 方法的分析彻底吻合,从上面的运行结果中能够看出,最后一个线程是运行 barrierCommand run() 方法的线程,咱们再来形象化的展现一下整个过程
从上图能够看出,barrierAction 与每次突破屏障是串行化的执行过程,假如 barrierAction 是很耗时的汇总操做,那这就是能够优化的点了,咱们继续修改代码
// 建立单线程线程池 private static Executor executor = Executors.newSingleThreadExecutor(); // 建立 CyclicBarrier 实例,计数器的值设置为2 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> { executor.execute(() -> gather()); }); private static void gather() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("所有运行结束"); }
咱们这里将 CyclicBarrier 的回调函数 barrierAction使用单线程的线程池,这样最后一个冲破屏障的线程就不用等待 barrierAction 的执行,直接分配个线程池里的线程异步执行,进一步提高效率
运行结果以下:
咱们再形象化的看一下整个过程:
这里使用了单一线程池,增长了并行操做,提升了程序运行效率,那问题来了:
若是 barrierAction 很是很是耗时,冲破屏障的任务就可能堆积在单一线程池的等待队列中,就存在 OOM 的风险,那怎么办呢?
这是就要须要必定的限流策略或者使用线程池的拒绝的略等
那把单一线程池换成非单一的固定线程池不就能够了嘛?好比 fixed(5)
乍一看确实能缓解单线程池可能引发的任务堆积问题,上面代码咱们看到的 gather() 方法,假如该方法内部没有使用锁或者说存在竟态条件,那 CyclicBarrier 的回调函数 barrierAction 使用多线程一定引发结果的不许确
因此在实际使用中还要结合具体的业务场景不断优化代码,使之更加健壮
本文讲解了 CountDownLatch 和 CyclicBarrier 的经典使用场景以及实现原理,以及在使用过程当中可能会遇到的问题,好比将大的 list 拆分做业就能够用到前者,读取多个 Excel 的sheet 页,最后进行结果汇总就能够用到后者 (文中完整示例代码已上传)
最后,再形象化的比喻一下
接下来,我们就聊聊那些可使用的 Future 特性
日拱一兵 | 原创