『并发包入坑指北』之向大佬汇报任务

前言

在面试过程当中聊到并发相关的内容时,很多面试官都喜欢问这类问题:java

当 N 个线程同时完成某项任务时,如何知道他们都已经执行完毕了。git

这也是本次讨论的话题之一,因此本篇为『并发包入坑指北』的第二篇;来聊聊常见的并发工具。github

本身实现

其实这类问题的核心论点都是:如何在一个线程中得知其余线程是否执行完毕。面试

假设如今有 3 个线程在运行,须要在主线程中得知他们的运行结果;能够分为如下几步:安全

  • 定义一个计数器为 3。
  • 每一个线程完成任务后计数减一。
  • 一旦计数器减为 0 则通知等待的线程。

因此也很容易想到能够利用等待通知机制来实现,和上文的『并发包入坑指北』之阻塞队列的相似。多线程

按照这个思路自定义了一个 MultipleThreadCountDownKit 工具,构造函数以下:并发

考虑到并发的前提,这个计数器天然须要保证线程安全,因此采用了 AtomicInteger框架

因此在初始化时须要根据线程数量来构建对象。ide

计数器减一

当其中一个业务线程完成后须要将这个计数器减一,直到减为0为止。函数

/**
     * 线程完成后计数 -1
     */
    public void countDown(){

        if (counter.get() <= 0){
            return;
        }

        int count = this.counter.decrementAndGet();
        if (count < 0){
            throw new RuntimeException("concurrent error") ;
        }

        if (count == 0){
            synchronized (notify){
                notify.notify();
            }
        }

    }

利用 counter.decrementAndGet() 来保证多线程的原子性,当减为 0 时则利用等待通知机制来 notify 其余线程。

等待全部线程完成

而须要知道业务线程执行完毕的其余线程则须要在未完成以前一直处于等待状态,直到上文提到的在计数器变为 0 时获得通知。

/**
     * 等待全部的线程完成
     * @throws InterruptedException
     */
    public void await() throws InterruptedException {
        synchronized (notify){
            while (counter.get() > 0){
                notify.wait();
            }

            if (notifyListen != null){
                notifyListen.notifyListen();
            }

        }
    }

原理也很简单,一旦计数器还存在时则会利用 notify 对象进行等待,直到被业务线程唤醒。

同时这里新增了一个通知接口能够自定义实现唤醒后的一些业务逻辑,后文会作演示。

并发测试

主要就是这两个函数,下面来作一个演示。

  • 初始化了三个计数器的并发工具 MultipleThreadCountDownKit
  • 建立了三个线程分别执行业务逻辑,完毕后执行 countDown()
  • 线程 3 休眠了 2s 用于模拟业务耗时。
  • 主线程执行 await() 等待他们三个线程执行完毕。

经过执行结果能够看出主线程会等待最后一个线程完成后才会退出;从而达到了主线程等待其他线程的效果。

MultipleThreadCountDownKit multipleThreadKit = new MultipleThreadCountDownKit(3);
    multipleThreadKit.setNotify(() -> LOGGER.info("三个线程完成了任务"));

也能够在初始化的时候指定一个回调接口,用于接收业务线程执行完毕后的通知。

固然和在主线程中执行这段逻辑效果是同样的(和执行 await() 方法处于同一个线程)。

CountDownLatch

固然咱们本身实现的代码没有通过大量生产环境的验证,因此主要的目的仍是尝试窥探官方的实现原理。

因此咱们如今来看看 juc 下的 CountDownLatch 是如何实现的。

经过构造函数会发现有一个 内部类 Sync,他是继承于 AbstractQueuedSynchronizer ;这是 Java 并发包中的基础框架,均可以单独拿来说了,因此此次重点不是它,从此咱们再着重介绍。

这里就能够把他简单理解为提供了和上文相似的一个计数器及线程通知工具就好了。

countDown

其实他的核心逻辑和咱们本身实现的区别不大。

public void countDown() {
        sync.releaseShared(1);
    }
    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

利用这个内部类的 releaseShared 方法,咱们能够理解为他想要将计数器减一。

看到这里有没有似曾相识的感受。

没错,在 JDK1.7 中的 AtomicInteger 自减就是这样实现的(利用 CAS 保证了线程安全)。

只是一旦计数器减为 0 时则会执行 doReleaseShared 唤醒其余的线程。


这里咱们只须要关心红框部分(其余的暂时不用关心,这里涉及到了 AQS 中的队列相关),最终会调用 LockSupport.unpark 来唤醒线程;就至关于上文调用 object.notify()

因此其实本质上仍是相同的。

await

其中的 await() 也是借用 Sync 对象的方法实现的。

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //判断计数器是否还未完成    
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

一旦还存在未完成的线程时,则会调用 doAcquireSharedInterruptibly 进入阻塞状态。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

一样的因为这也是 AQS 中的方法,咱们只须要关心红框部分;其实最终就是调用了 LockSupport.park 方法,也就至关于执行了 object.wait()

  • 全部的业务线程执行完毕后会在计数器减为 0 时调用 LockSupport.unpark 来唤醒线程。
  • 等待线程一旦计数器 > 0 时则会利用 LockSupport.park 来等待唤醒。

这样整个流程也就串起来了,它的使用方法也和上文的相似。

就不作过多介绍了。

实际案例

一样的来看一个实际案例。

在上一篇《一次分表踩坑实践的探讨》提到了对于全表扫描的状况下,须要利用多线程来提升查询效率。

好比咱们这里分为了 64 张表,计划利用 8 个线程来分别处理这些表的数据,伪代码以下:

CountDownLatch count = new CountDownLatch(64);
ConcurrentHashMap total = new ConcurrentHashMap();
for(Integer i=0;i<=63;i++){
    executor.execute(new Runnable(){
        @Override
        public void run(){
            List value = queryTable(i);
            total.put(value,NULL);
            count.countDown();
        }
    }) ;
    
}

count.await();
System.out.println("查询完毕");

这样就能够实现全部数据都查询完毕后再作统一汇总;代码挺简单,也好理解(固然也可使用线程池的 API)。

总结

CountDownLatch 算是 juc 中一个高频使用的工具,学会和理解他的使用会帮助咱们更容易编写并发应用。

文中涉及到的源码:

https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/communication/MultipleThreadCountDownKit.java

你的点赞与分享是对我最大的支持

相关文章
相关标签/搜索