Java SDK 并发包全面总结

1、Lock 和 Condition

Java 并发包中的 Lock 和 Condition 主要解决的是线程的互斥和同步问题,这二者的配合使用,至关于 synchronized、wait()、notify() 的使用。java

1. Lock 的优点

比起传统的 synchronized 关键字,Lock 最大的不一样(或者说优点)在于:编程

  • 阻塞的线程可以响应中断,这样可以有机会释放本身持有的锁,避免死锁
  • 支持超时,若是线程在必定时间内未获取到锁,不是进入阻塞状态,而是抛出异常
  • 非阻塞的获取锁,若是未获取到锁,不进入阻塞状态,而是直接返回

三种状况分别对应 Lock 的三个方法:void lockInterruptibly()boolean tryLock(long time, TimeUnit unit)boolean tryLock()安全

Lock 最经常使用的一个实现类是 ReentrantLock,表明可重入锁,意思是能够反复获取同一把锁。
除此以外,Lock 的构造方法能够传入一个 boolean 值,表示是不是公平锁。性能优化

2. Lock 和 Condition 的使用

前面实现的简单的阻塞队列就是使用 Lock 和 Condition ,如今其含义已经很是明确了:多线程

public class BlockingQueue<T> {
    private int capacity;
    private int size;

    //定义锁和条件
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    /**
     * 入队列
     */
    public void enqueue(T data){
        lock.lock();
        try {
            //若是队列满了,须要等待,直到队列不满
            while (size >= capacity){
                notFull.await();
            }
            //入队代码,省略
            //入队以后,通知队列已经不为空了
            notEmpty.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //在finally块中释放锁,避免死锁
            lock.unlock();
        }
    }

    /**
     * 出队列
     */
    public T dequeue(){
        lock.lock();
        try {
            //若是队列为空,须要等待,直到队列不为空
            while (size <= 0){
                notEmpty.await();
            }
            //出队代码,省略
            //出队列以后,通知队列已经不满了
            notFull.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        //实际应该返回出队数据
        return null;
    }
}

能够看到,Lock 须要手动的加锁和解锁,而且解锁操做是放在 finally 块中的,这是一种编程范式,尽可能遵照。并发

2、ReadWriteLock

ReadWriteLock 表示读写锁,适用于读多写少的状况,读写锁通常有几个特征:app

  • 读锁与读锁之间不互斥,即容许多个线程同时读变量。
  • 写锁与读锁之间互斥,一个线程在写时,不容许读操做。
  • 写锁与写锁之间互斥,只容许 一个线程写操做。

读写锁减少了锁的粒度,在读多写少的场景下,对性能的提高较为明显。ReadWriteLock 的简单使用示例以下:框架

public class ReadWriteLockTest {

    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock =lock.readLock();
    private final Lock writeLock =lock.writeLock();

    private int value;

    //加写锁
    private void addValue(){
        writeLock.lock();
        try {
            value += 1;
        }
        finally {
            writeLock.unlock();
        }
    }

    //加读锁
    private int getValue(){
        readLock.lock();
        try {
            return value;
        }
        finally {
            readLock.unlock();
        }
    }
}

读写锁的升级与降级异步

Java 中不容许锁的升级,即加写锁时必须释放读锁。ide

可是容许锁的降级,即加读锁时,能够不释放写锁,最后读锁和写锁一块儿释放。

3、StampedLock

1. StampedLock 的使用及特色

StampedLock 是 Java 1.8 版本中提供的锁,主要支持三种锁模式:写锁、悲观读锁、乐观读。

其中写锁和悲观读锁跟 ReadWriteLock 中的写锁和读锁的概念相似。StampedLock 在使用的时候不同,加锁的时候会返回一个参数,解锁的时候须要传入这个参数,示例以下:

public class StampedLockTest {

    private final StampedLock lock = new StampedLock();
    private int value;

    private void addValue(){
        long stamp = lock.writeLock();
        try {
            value += 1;
        }
        finally {
            lock.unlockWrite(stamp);
        }
    }
}

StampedLock 最主要的特色是支持“乐观读”,即当进行读操做的时候,并非全部的写操做都被阻塞,容许一个线程获取写锁。乐观读的使用示例以下:

public class StampedLockTest {

    private final StampedLock lock = new StampedLock();
    private int value;

    private void getValue(){
        //乐观读,读入变量
        long stamp = lock.tryOptimisticRead();
        int a = value;
        //若是验证失败
        if (!lock.validate(stamp)){
            //升级为悲观读锁,继续读入变量
            stamp = lock.readLock();
            try {
                a = value;
            }
            finally {
                lock.unlockRead(stamp);
            }
        }
    }
}

须要注意的是,这里使用 validate() 方法进行验证,若是乐观读失败,则升级为悲观读锁,继续获取变量。

2. StampedLock 的注意事项

StampedLock 不支持重入,即不可反复获取同一把锁。

在使用 StampedLock 的时候,不要调用中断操做。若是须要支持中断,能够调用 readLockInterruptibly 和 writeLockInterruptibly 方法。

4、Semaphore

Semaphore 表示信号量,初始化对象的时候,须要传一个参数,表示信号量的计数器值。acquire() 方法将计数器加 1,release() 方法减 1,这两个方法都可以保证原子性。

信号量的简单示例:

public class SemaphoreTest {

    private final Semaphore semaphore = new Semaphore(1);
    private int value;

    public void addValue() {
        try {
            semaphore.acquire();
            value += 1;
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            semaphore.release();
        }
    }

程序中使用信号量实现了一个线程安全的方法,初始值设为了 1,当多个方法访问 addValue 方法的时候,因为 acquire 方法保证原子性,因此只能有一个线程将计数器减 1 并进入临界区,另外一个线程等待。

一个线程执行完后,调用 release 方法,计数器加 1,另外一个等待的线程被唤醒。

Semaphore 与 Lock 的一个不一样点即是信号量容许多个线程同时进入临界区,例如将初始值设置的更大一些。例以下面这个例子:

public class SemaphoreTest {

    //初始值 2,表示 2 个线程可同时进入临界区
    private final Semaphore semaphore = new Semaphore(2);

    public void test() {
        try {
            semaphore.acquire();
            System.out.println("线程" + Thread.currentThread().getName() + " 进入临界区 : " + System.currentTimeMillis());
            Thread.sleep(1000);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            semaphore.release();
        }
    }
}

5、CountDownLatch

CountDownLatch 是一个线程同步的工具,主要实现一个线程等待多个线程的功能。在原始的 Thread 中,能够调用 join() 方法来等待线程执行完毕,而 CountDownLatch 则能够用在线程池中的线程等待。

下面是 CountDownLatch 的使用示例:

public class CountDownLatchTest {

    //实际生产中不推荐使用这种建立线程的方式
    private final ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public void test() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(2);
        threadPool.execute(() -> {
            System.out.println("线程1执行完毕");
            latch.countDown();
        });
        threadPool.execute(() -> {
            System.out.println("线程2执行完毕");
            latch.countDown();
        });
        latch.await();
        System.out.println("两个线程都执行完毕");

        threadPool.shutdown();
    }
}

CountDownLatch 的初始值为 2,线程执行完毕则调用 countDown 方法,计数器减 1。减到 0 的时候,会唤醒主线程继续执行。

6、CyclicBarrier

CyclicBarrier 也是一个线程同步工具类,主要实现多个线程之间的互相等待。

CyclicBarrier 有两个构造函数,能够传一个计数器的初始值,还能够加上一个 Runnable,表示计数器执行减到 0 的时候,须要执行的回调方法。

public class CyclicBarrierTest {

    private final ExecutorService threadPool = Executors.newFixedThreadPool(2);
    private final CyclicBarrier barrier = new CyclicBarrier(2, this::note);

    public void print(){
        threadPool.execute(() -> {
            System.out.println("线程1执行完毕");
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        threadPool.execute(() -> {
            System.out.println("线程2执行完毕");
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        threadPool.shutdown();
    }
    public void note(){
        System.out.println("两个线程执行完毕");
    }
}

示例中设置 CyclicBarrier 的初始值为 2,线程执行完毕调用 await 方法,计数器减 1。print() 方法中的两个线程执行完后,计数器减到 0,就会调用 note 方法。

7、ThreadPoolExecutor

1. 线程池的工做原理

因为线程是一种重量级对象,频繁的建立和销毁比较消耗系统资源,所以线程池的优点就显现出来了。线程池可有下降资源消耗,由于不用频繁建立和销毁线程;提升响应速度,须要执行任务时,可直接使用线程池中的线程资源;还可以有效的管理、监控线程池中的线程。

Java 中的线程池的实现是一种很典型的生产者-消费者模式,使用线程的一方是生产者,主要提供须要执行的任务,线程池是消费者,消费生产者提供的任务。

下面这段代码可以帮助理解线程池的实现原理(仅用于帮助理解,实际执行结果有出入):

public class ThreadPool {
    //保存任务的阻塞队列
    private BlockingQueue<Runnable> workQueue;
    //保存工做线程的列表
    private List<WorkThread> threadList = new ArrayList<>();

    //构造方法
    public ThreadPool(int poolSize, BlockingQueue<Runnable> workQueue) {
        this.workQueue = workQueue;
        //根据poolSize的数量建立工做线程,并执行线程
        for (int i = 0; i < poolSize; i++) {
            WorkThread thread = new WorkThread();
            thread.start();
            threadList.add(thread);
        }
    }

    //执行任务的方法,主要是将任务添加到队列中
    public void execute(Runnable task) {
        try {
            workQueue.put(task);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //工做线程
    class WorkThread extends Thread{
        @Override
        public void run() {
            //循环取出任务执行
            while (!workQueue.isEmpty()) {
                try {
                    Runnable task = workQueue.take();
                    task.run();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

上面的代码注释很详细了,主要是使用了一个阻塞队列,用来存储生产者的任务。而后在构造器中建立线程,并循环从队列中取出任务执行。

2. Java 中的线程池

Java 中提供了 Executors 这个类来快速建立线程池,简单使用示例以下:

Executors.newSingleThreadExecutor();//建立一个线程的线程池
Executors.newFixedThreadPool(5);//建立固定数量线程
Executors.newCachedThreadPool();//建立可调整数量的线程
Executors.newScheduledThreadPool(5);//建立定时任务线程池

可是在《阿里巴巴Java开发手册》中,明确禁止使用 Executors 建立线程池(甚至也不建议使用 Thread 显式建立线程),主要缘由是 Executors 的默认方法都是使用的无界队列,在高负载的状况下,很容易致使 OOM(Out Of Memory)。

因此在 Java 中建立线程池的正确姿式是使用 ThreadPoolExecutor ,其构造函数有七个:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,//可选
                          RejectedExecutionHandler handler//可选
                          ) { ...
  • corePoolSize:线程池中最少的线程数
  • maximumPoolSize:线程池中建立的最大的线程数
  • keepAliveTime:表示线程池中线程的活跃时间,若是线程在这个活跃时间内没有执行任务,而且线程数量超过了 corePoolSize,那么线程池就会回收多余的线程。
  • TimeUnit:上一个参数的时间单位
  • workQueue:保存任务的队列,为了不 OOM,建议使用有界队列
  • threadFactory:可选参数,不传的话就是默认值。也能够本身传一个实现了 ThreadFactory 接口的类,表示自定义线程,例如给线程指定名字,线程组等。
  • handler:可选参数。定义任务的拒绝策略,表示无空闲线程时,而且队列中的任务满了的,怎么拒绝新的任务。目前的拒绝策略有四种:

    • AbortPolicy:默认的拒绝策略,抛出 RejectedExecutionException 异常
    • CallerRunsPolicy:让提交任务的线程本身去执行这个任务
    • DiscardOldestPolicy:丢弃最老的任务,及最早加入队列中的任务,并添加新的任务
    • DiscardPolicy:直接丢弃任务,而且不会抛出任何异常

调用 ThreadPoolExecutor

线程池建立好了以后,就须要执行任务,ThreadPoolExecutor 提供了两个方法,一是 execute,二是 submit。execute 没有返回值,也就是说没法获取执行结果。使用示例以下:

public static void main(String[] args) {
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, queue);
    threadPool.execute(() -> {
        System.out.println("In this world");
    });

    threadPool.shutdown();
}

而 submit 方法有一个 Future 接口的返回值,Future 接口有五个方法:

  • cancle:取消任务
  • isCancled:任务是否已取消
  • isDone:任务是否已执行完
  • get:获取任务执行结果
  • get(long timeout, TimeUnit unit):支持超时获取任务执行结果

下面代码展现了取消任务的方法:

public static void main(String[] args) {
    
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, queue);

    Future<?> future = threadPool.submit(() -> {
        System.out.println("I am roseduan");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    future.cancel(false);
    threadPool.shutdown();
}

程序的本意是打印语句而后休眠 5 秒,但因为调用了 cancle 方法 ,所以程序直接结束,不会有任何输出。

8、FutureTask

FutureTask 也是一个支持获取任务执行结果的工具类,FutureTask 实现了 Runnable 和 Future 接口。

因此能够将 FutureTask 做为任务提交给 ThreadPoolExecutor 或者 Thread 执行,而且能够获取执行结果。简单的使用以下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    //建立任务
    FutureTask<String> task = new FutureTask<>(() -> "Java and " + "Python");

    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, queue);
    
    threadPool.execute(task);
    //获取执行结果
    System.out.println(task.get());

    threadPool.shutdown();
}

传给 Thread 做为参数的使用示例以下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<Integer> task = new FutureTask<>(() -> 1 + 2);
        Thread thread = new Thread(task);
        thread.start();
        System.out.println(task.get());//输出3
    }

9、CompletableFuture

CompletableFuture 是一个异步编程的工具类,异步化可以最大化并行程序的执行,是多线程性能优化的基础。

1. 建立 CompletableFuture 对象

Completable 有四个静态方法,能够用来建立对象:

runAsync(Runnable runnable);//无返回值
runAsync(Runnable runnable, Executor executor);//无返回值,可指定线程池

supplyAsync(Supplier<U> supplier);//有返回值
supplyAsync(Supplier<U> supplier, Executor executor);//有返回值,可指定线程池

能够看到,四个方法分为了是否有返回值,和是否自定义线程池。若是不自定义线程池,那么 CompletableFuture 会使用公共的线程池,默认建立 CPU 核数的数量的线程池,当有多个任务的时候,仍是建议根据每一个任务自定义线程池。

一个简单的使用示例以下,其中 task3 会等待两个任务都执行完毕:

public static void main(String[] args) {
    CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
        System.out.println("任务1执行完毕");
    });
    CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务2执行完毕");
    });
    CompletableFuture<String> task3 = task1.thenCombine(task2, (__, res) -> "两个任务执行完毕");

    System.out.println(task3.join());
}

CompletableFuture 实现了 Future 接口,所以能够查看任务执行的状况,而且能够获取返回值。

2. CompletionStage 接口中的方法

CompletableFuture 还实现了 CompletionStage 接口。这个接口描述了任务之间的时序关系,分别有串行、并行、聚合三种关系。须要注意的是,并行本就是其所具备的特性,因此再也不探讨了,而且聚合关系又分为了 AND 聚合关系和 OR 聚合关系。下面依次介绍串行、AND 聚合、OR 聚合这三种关系。

首先是串行关系,串行很简单,一个任务执行完后再执行另外一个任务,例以下图:

在这里插入图片描述

描述串行关系的几个方法是:thenApply、thenAccept、thenRun、thenCompose。

thenApply 既支持接收参数,又可以支持返回值。

thenAccept 支持接收参数,可是不支持返回值。

thenRun 既不能接收参数,也不能有返回值。

CompletionStage 中的大部分方法都有带有 Async 后缀的方法,表示可能会使用其余的线程来执行主体中的内容,后面介绍的方法都相似这样,再也不赘述。

简单的使用示例以下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务1执行完毕");
        return "Task1";
    }).thenApply((s) -> "接收到的参数 : " + s);;

    System.out.println(future.get());
}

其次是 AND 汇聚关系,典型的场景即是一个线程等待两个线程都执行完后再执行,例以下图:

在这里插入图片描述

描述 AND 聚合关系的有三个方法:thenCombine、thenAcceptBoth、runAfterBoth,其是否接收参数和支持返回值,和上面的三个方法对应。一个简单的使用示例以下:

public static void main(String[] args) {
    CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务1执行完毕");
        return "task1";
    });
    CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务2执行完毕");
        return "task2";
    });

    CompletableFuture<String> task3 = task1.thenCombine(task2, (r,s) -> r + " " + s);
    System.out.println(task3.join());
}

任务 1 休眠了 2 秒,任务 3 会等待前面两个任务执行完成以后再执行。

最后是 OR 聚合关系,表示线程等待其中一个线程知足条件以后,就能够继续执行了,不用等待所有的线程。

在这里插入图片描述

描述 OR 聚合关系的是 applyToEither、acceptEither、runAfterEither。使用示例和上面的相似,只须要将方法改一下就是了,这里再也不赘述了。

3. 处理异常

在异步编程中,CompletionStage 接口还提供了几个能够处理异常的方法,和 try() catch() finally() 相似。

这几个方法分别是 :

  • exceptionally:至关于 catch
  • whenComplete:至关于 finally
  • handle:至关于 finally ,支持返回值

使用示例以下:

public static void main(String[] args) {
    CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
        String str = null;
        return str.length();
        //至关于catch
    }).exceptionally((e) -> {
        System.out.println("发生异常");
        return 0;
    });

    //至关于 finally
    task.whenComplete((s, r) -> {
        System.out.println("执行结束");
    });
    System.out.println(task.join());
}

10、CompletionService

CompletionService 是一个批量执行异步任务的工具类,先来看一个例子:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    StringBuffer sb = new StringBuffer();

    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            5, 5,
            10, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(5));

    Future<String> task1 = threadPool.submit(() -> {
        Thread.sleep(2000);
        return "Task1";
    });
    Future<String> task2 = threadPool.submit(() -> "Task2");
    Future<String> task3 = threadPool.submit(() -> "Task3");
    
    sb.append(task1.get());
    sb.append(task2.get());
    sb.append(task3.get());
}

程序的意思是,依次执行三个任务,并将其结果存储到 StringBuffer 中,因为 task1 休眠了 2 秒,因此 sb 会在这里阻塞。

因为这三个任务之间没有关联,因此等待的消耗彻底是不必的,解决的办法即是利用一个阻塞队列,先执行完的任务将结果保存在队列中,sb 从队列中取出就好了。

CompletionService 实际上就是将线程池和阻塞队列的功能整合了起来,解决了相似上面的问题。CompletionService 的实现类是 ExecutorCompletionService,这个类有两个构造方法:

public ExecutorCompletionService(Executor executor) {}
public ExecutorCompletionService(Executor executor,
    BlockingQueue<Future<V>> completionQueue) {}

若是不传一个阻塞队列,则会使用默认的无界队列。

CompletionService 主要有这几个方法:

submit() 提交任务、take() 从阻塞队列中获取执行结果(若是队列为空,线程阻塞)、poll() 也是从队列中获取执行结果(若是队列为空,则返回 null),另外 poll 还支持超时获取。

使用 CompletionService 改造后的程序示例以下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    StringBuffer sb = new StringBuffer();

    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            5, 5,
            10, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(5));

    CompletionService<String> service = new ExecutorCompletionService<>(threadPool);
    service.submit(() -> {
        Thread.sleep(2000);
        return "Task1";
    });
    service.submit(() -> "Task2");
    service.submit(() -> "Task3");

    System.out.println(sb.append(service.take().get()).toString());
    System.out.println(sb.append(service.take().get()).toString());
    System.out.println(sb.append(service.take().get()).toString());
}

11、Fork/Join

1. Fork/Join 使用

Fork/Join 是一个处理分治任务的计算框架,所谓分治,即分而治之,将一个任务分解成子任务,求解子任务,而后将子任务的结果合并,就获得了最后的结果。分治思想的应用十分的普遍,例如常见的快速排序、归并排序,还有流行的大数据计算框架 MapReduce,都应用了分治思想。

Java 中,Fork 对应的是 任务分解,Join 则表示 子任务的结果合并。

Fork/Join 主要包含两个主要的实现类:

  • 一是线程池 ForkJoinPool,默认会建立 CPU核数数量的线程
  • 二是 ForkJoinTask,这是一个抽象类,主要的方法有 fork() 和 join(),前者表示执行子任务,后者表示阻塞等待子任务的执行结果。ForkJoinTask 还有两个子类:

    • RecursiveTask
    • RecursiveAction

这两个类也是抽象的,咱们须要自定义并继承这个类,并覆盖其 compute 方法。其中 RecursiveTask 有返回值,而 RecursiveAction 没有返回值。

下面是一个使用 ForkJoin 的示例,实现了 n 的阶乘,注释写得比较详细。

public class ForkJoinTest {
    public static void main(String[] args) {
        //建立线程池
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        //建立任务
        Factorial task = new Factorial(6);
        //invoke 方法执行任务(还可使用 execute、submit),获得执行的结果
        Integer res = forkJoinPool.invoke(task);
        System.out.println(res);
    }

    static class Factorial extends RecursiveTask<Integer> {
        private final int n;
        Factorial(int n) {
            this.n = n;
        }
        @Override
        protected Integer compute() {
            if (n == 0){
                return 1;
            }
            Factorial f = new Factorial(n - 1);
            //执行子任务
            f.fork();
            //等待子任务结果
            return n * factorial.join();
        }
    }
}

2. ForkJoinPool 原理

和普通的线程池相似,ForkJoinPool 是一个特殊的线程池,而且也采用的是生产者 - 消费者模式。跟普通线程池共享一个队列不一样,ForkJoinPool 其中维护了多个双端队列,当一个线程对应的任务队列为空的时候,线程并不会空闲,而是“窃取”其余队列的任务执行。

因为是双端队列,正常执行任务和“窃取任务”能够从两端进行出队,这样避免了数据竞争。

采用“任务窃取”这种模式,也是 ForkJoinPool 比普通线程池更加智能的体现。

相关文章
相关标签/搜索