Java 8 并发: Threads 和 Executors

原文地址: Java 8 Concurrency Tutorial: Threads and Executorshtml

Java 5 初次引入了Concurrency API,并在随后的发布版本中不断优化和改进。这篇文章的大部分概念也适用于老的版本。个人代码示例主要聚焦在Java 8上,并大量适用 lambda 表达式和一些新特性。若是你还不熟悉 lambda 表达式,建议先阅读 Java 8 Tutorialjava

ThreadsRunnables

全部现代操做系统都是经过进程线程来支持并发的。进程一般是相互独立运行的程序实例。例如,你启动一个 Java 程序,操做系统会产生一个新的进程和其余程序并行运行。在这些进程中能够利用线程同时执行代码。这样咱们就能够充分利用 CPUshell

JavaJDK 1.0 开始就支持线程。在开始一个新线程以前,必须先指定运行的代码,一般称为 Task。下面是经过实现 Runnable 接口来启动一个新线程的例子:编程

Runnable task = () -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
};

task.run();

Thread thread = new Thread(task);
thread.start();

System.out.println("Done!");
复制代码

因为 Runnable 是一个函数式接口,咱们可使用 lambda 表达式来打印线程的名字到控制台。咱们直接在主线程上执行Runnable,而后开始一个新线程。在控制台你将看到这样的结果:api

Hello main
Hello Thread-0
Done!
复制代码

或者:并发

Hello main
Done!
Hello Thread-0
复制代码

因为是并发执行,咱们没法预测 Runnable 是在打印 Done 以前仍是以后调用,顺序不是不肯定的,所以并发编程成为大型应用程序开发中一项复杂的任务。oracle

线程也能够休眠一段时间,例以下面的例子:异步

Runnable runnable = () -> {
    try {
        String name = Thread.currentThread().getName();
        System.out.println("Foo " + name);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Bar " + name);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Thread thread = new Thread(runnable);
thread.start();
复制代码

执行上面的代码会在两个打印语句之间停留1秒钟。TimeUnit 是一个时间单位的枚举,或者能够经过调用 Thread.sleep(1000) 实现。函数

使用 Thread 类可能很是繁琐且容易出错。因为这个缘由,在2004年,Java 5版本引入了 Concurrency APIAPI 位于 java.util.concurrent 包下,包含了许多有用的有关并发编程的类。从那时起,每一个新发布的 Java 版本都增长了并发 APIJava 8 也提供了新的类和方法来处理并发。post

如今咱们来深刻了解一下Concurrency API中最重要的部分 - executor services

Executors

Concurrency API 引入了 ExecutorService 的概念,做为处理线程的高级别方式用来替代 ThreadsExecutors 可以异步的执行任务,而且一般管理一个线程池。这样咱们就不用手动的去建立线程了,线程池中的全部线程都将被重用。从而能够在一个 executor service 的整个应用程序生命周期中运行尽量多的并发任务。

下面是一个简单的 executors 例子:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
});

// => Hello pool-1-thread-1
复制代码

Executors 类提供了方便的工厂方法来建立不一样类型的 executor services 。在这个例子中使用了只执行一个线程的 executor

执行结果看起来和上面的示例相似,可是你会注意到一个重要区别:Java 进程永远不会中止,执行者必须明确的中止它,不然它会不断的接受新的任务。

ExecutorService 为此提供了两种方法:shutdown() 等待当前任务执行完毕,而 shutdownNow() 则中断全部正在执行的任务,并当即关闭执行程序。在 shudown 以后不能再提交任务到线程池。

下面是我关闭程序的首选方式:

try {
    System.out.println("attempt to shutdown executor");
    executor.shutdown();
    executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
    System.err.println("tasks interrupted");
}
finally {
    if (!executor.isTerminated()) {
        System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
}
复制代码

执行者调用 shutdown 关闭 executor,在等待 5 秒钟钟后,无论任务有没有执行完毕都调用 shutdownNow 中断正在执行的任务而关闭。

Callables 和 Futures

除了 Runnable 之外,executors 还支持 Callable 任务,和 Runnable 同样是一个函数式接口,但它是有返回值的。

下面是一个使用 lambda 表达式定义的 Callable ,在睡眠 1 秒后返回一个整形值。

Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
};
复制代码

Runnable 同样,Callable 也能够提交到 executor services,可是执行的结果是什么?因为 submit() 不等待任务执行完成,executor service 不能直接返回调用的结果。相对应的,它返回一个 Future 类型的结果,使用 Future 能够检索实际执行结果。

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("future done? " + future.isDone());

Integer result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);
复制代码

在将 Callable 提交给 executor 后,首先经过 isDone() 来检查 future 是否执行完毕。我敢确定,状况并不是如此,由于上面的调用在返回整数以前睡眠了 1 秒钟。

调用方法 get() 会阻塞当前线程,直到 callable 执行完成返回结果,如今 future 执行完成,并在控制台输出下面的结果:

future done? false
future done? true
result: 123
复制代码

Futureexecutor service 紧密结合,若是关闭 executor service, 每一个 Future 都会抛出异常。

executor.shutdownNow();
future.get();
复制代码

这里建立 executor 的方式与前面的示例不一样,这里使用 newFixedThreadPool(1) 来建立一个线程数量为 1 的线程池来支持 executor, 这至关于 newSingleThreadExecutor() ,稍后咱们咱们会经过传递一个大于 1 的值来增长线程池的大小。

Timeouts

任何对 future.get()的调用都会阻塞并等待 Callable 被终止。 在最坏的状况下,一个可调用函数将永远运行,从而使应用程序没法响应。能够简单地经过超时来抵消这些状况:

ExecutorService executor = Executors.newFixedThreadPool(1);

Future<Integer> future = executor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
});

future.get(1, TimeUnit.SECONDS);
复制代码

执行上面的代码会抛出 TimeoutException

Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
复制代码

指定了 1 秒钟的最长等待时间,可是在返回结果以前,可调用事实上须要 2 秒钟的时间。

InvokeAll

Executors 支持经过 invokeAll() 批量提交多个 Callable 。这个方法接受一个 Callable 类型集合的参数,并返回一个 Future 类型的 List

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println);
复制代码

在这个例子中,咱们利用 Java 8 的流来处理 invokeAll 调用返回的全部 Future。 咱们首先映射每一个 Future 的返回值,而后将每一个值打印到控制台。 若是还不熟悉流,请阅读Java 8 Stream Tutorial

InvokeAny

批量提交可调用的另外一种方法是 invokeAny(),它与 invokeAll() 略有不一样。 该方法不会返回全部的 Future 对象,它只返回第一个执行完毕任务的结果。

Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}
复制代码

咱们使用这种方法来建立一个有三个不一样睡眠时间的 Callable。 经过 invokeAny()将这些可调用对象提交给 executor,返回最快执行完毕结果,在这种状况下,task2:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
    callable("task1", 2),
    callable("task2", 1),
    callable("task3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task2
复制代码

上面的例子使用经过 newWorkStealingPool() 建立的另外一种类型的 executor。 这个工厂方法是 Java 8 的一部分,而且返回一个类型为 ForkJoinPoolexecutor,它与正常的 executor 略有不一样。 它不使用固定大小的线程池,默认状况下是主机CPU的可用内核数。

Scheduled Executors

咱们已经学会了如何在 Executors 上提交和运行任务。 为了屡次按期运行任务,咱们可使用 scheduled thread pools

ScheduledExecutorService 可以安排任务按期运行或在一段时间事后运行一次。

下面代码示例一个任务在三秒钟后运行:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(1337);

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);
复制代码

调度任务产生一个类型为 ScheduledFuture的值,除了 Future 以外,它还提供getDelay() 方法来检索任务执行的剩余时间。

为了定时执行的任务,executor 提供了两个方法 scheduleAtFixedRate()scheduleWithFixedDelay() 。 第一种方法可以执行具备固定时间间隔的任务,例如, 每秒一次:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
复制代码

此外,此方法还能够设置延迟时间,该延迟描述了首次执行任务以前的等待时间。

scheduleWithFixedDelay() 方法与 scheduleAtFixedRate() 略有不一样,不一样之处是它们的等待时间,scheduleWithFixedDelay() 的等待时间是在上一个任务结束和下一个任务开始之间施加的。

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Scheduling: " + System.nanoTime());
    }
    catch (InterruptedException e) {
        System.err.println("task interrupted");
    }
};

executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
复制代码

本示例在执行结束和下一次执行开始之间延迟 1 秒。 初始延迟为 0,任务持续时间为 2 秒。 因此咱们结束了一个0s,3s,6s,9s等的执行间隔。

相关文章
相关标签/搜索