【并发编程】JUC组件扩展(Callable、FutureTask、Fork/Join 框架、BlockingQueue)

前言

以前咱们已经学习了建立线程的2种方式,一种是直接继承Thread,另一种就是实现Runnable接口。java

这2种方式都有一个缺陷就是:在执行完任务以后没法获取执行结果。算法

若是要得到返回值就必须经过共享变量或者线程间通讯的方式,实现起来较复杂。数组

所以在Java5开始提供了CallableFuture,经过它们能够在任务执行完毕以后获得任务执行结果。框架

Callable

首先咱们来看一下Runnable的源码ide

@FunctionalInterface
public interface Runnable {
    /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */
    public abstract void run();
}
复制代码

能够看到run方法的返回值为void函数

下面来看一下Callable的源码学习

@FunctionalInterface
public interface Callable<V> {
    /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */
    V call() throws Exception;
}
复制代码

能够看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。this

Callable的使用会在后面的代码演示中给出。spa

Future

Future类位于java.util.concurrent包下,它是一个接口.线程

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码

Future接口中声明了5个方法,下面依次解释每一个方法的做用:

  • cancel方法:用来取消任务,若是取消任务成功则返回true,若是取消任务失败则返回false。参数mayInterruptIfRunning表示是否容许取消正在执行却没有执行完毕的任务,若是任务已经完成,则不管mayInterruptIfRunningtrue仍是false,此方法确定返回false,即若是取消已经完成的任务会返回false;若是任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;若是任务尚未执行,则不管mayInterruptIfRunning为true是什么,确定返回true

  • isCancelled方法:表示任务是否被取消成功,若是在任务正常完成前被取消成功,则返回 true

  • isDone方法:表示任务是否已经完成,若任务完成,则返回true

  • get()方法:用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;

  • get(long timeout, TimeUnit unit)用来获取执行结果,若是在指定时间内,还没获取到结果,就直接返回null。   

也就是说Future提供了三种功能:

  • 1.判断任务是否完成;

  • 2.可以中断任务;

  • 3.可以获取任务执行结果。

Future的使用

@Slf4j
public class FutureExample {

    static class MyCallable implements Callable<String> {

        @Override
        public String call() throws Exception {
            log.info("do sth in callable");
            Thread.sleep(5000);
            return "done";
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(new MyCallable());
        log.info("do sth in main");
        Thread.sleep(1000);
        String result = future.get();
        log.info("result: {}", result);
    }
}
复制代码

运行结果

FutureTask

FutureTask位于JUC包内但不是AQS的子类。

FutureTask实现了FutureRunnable接口,能够获取线程的返回值。

FutureTask的使用

@Slf4j
public class FutureTaskExample {


    public static void main(String[] args) throws InterruptedException, ExecutionException {

        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("do sth. in callable");
                Thread.sleep(5000);
                return "Done";
            }
        });

        new Thread(futureTask).start();
        log.info("do sth. in main");
        Thread.sleep(1000);
        String result = futureTask.get();
        log.info("result: {}", result);
    }
}
复制代码

运行结果与上面相同。

从使用例子中能够看出FutureTask是很是方便的,何时想用就何时启动线程就能够了。

FutureTask还能够传入多种参数类型,咱们进入它的源码看一下

/** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Callable}. * * @param callable the callable task * @throws NullPointerException if the callable is null */
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

/** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Runnable}, and arrange that {@code get} will return the * given result on successful completion. * * @param runnable the runnable task * @param result the result to return on successful completion. If * you don't need a particular result, consider using * constructions of the form: * {@code Future<?> f = new FutureTask<Void>(runnable, null)} * @throws NullPointerException if the runnable is null */
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
复制代码

能够看到FutureTask能够传入CallableRunnable,传入Runnable时还能够指定返回值类型。

Fork/Join 框架

Fork/Join 框架Java7提供的用于并行执行任务的框架,经过把大任务分红多个小任务最终汇总每一个小任务结果来获得最终结果。

它主要采用的是工做窃取算法,这个算法是指某个线程从其余队列里窃取任务来执行,过程以下图

线程的任务采用了双端队列,窃取任务时只能从尾部窃取。这个算法的优势就是能够充分利用线程进行并行计算,并减小了线程间的竞争。缺点是在极端状况下仍是存在竞争,如队列中只有一个任务时。

Fork/Join框架有必定的局限性:

  • 任务只能使用fork和join操做做为同步机制,若是使用了其余同步机制,工做线程执行时就不能执行其余任务了。
  • 任务不该该执行IO操做
  • 任务不能抛出检查异常,必须经过必要的代码来处理它们。

Coding演示

@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {

    public static final int threshold = 2;
    private int start;
    private int end;

    public ForkJoinTaskExample (int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        //若是任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            //若是任务大于阈值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            //执行任务
            leftTask.fork();
            rightTask.fork();

            //任务结束后合并结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            //合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

        Future<Integer> result = forkJoinPool.submit(task);

        try{
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}
复制代码

重写的compute方法就是递归调用自身不断将大任务拆分红小任务,最后汇总结果获得最终结果的。

BlockingQueue

BlockingQueue即阻塞队列。

当阻塞队列满时进行入队操做和当阻塞队列空时进行出队操做都会使线程进入阻塞状态。

主要用于生产者消费者场景。

ArrayBlockingQueue

大小固定,内部实现是一个数组,在初始化时须要指定容量且不能改变。

先进先出的方式存储数据,最新数据在尾部。

DelayQueue

阻塞的是内部元素。DelayQueue中的元素必须实现JUC里的Delay接口(Delay接口继承了Comparable接口),通常都以元素过时时间的优先级进行排序。

内部实现是优先队列和lock。

LinkedBlockingQueue

大小配置可选,初始化时指定大小就是有边界的,若不指定就是无边界的。

内部实现是链表,其余特色与ArrayBlockingQueue一致。

PriorityBlockingQueue

没有边界,有排序规则,容许插入null。

全部插入PriorityBlockingQueue的对象必须实现Comparable接口。

SynchronousQueue

同步队列

内部仅容许容纳一个元素,内部插入一个元素后就会被阻塞,除非这个元素被其余线程消费。

Written by Autu

2019.7.21

相关文章
相关标签/搜索