Java 8 并发教程:线程和执行器

Java 8 并发教程:线程和执行器

原文:Java 8 Concurrency Tutorial: Threads and Executorsjava

译者:BlankKellygit

来源:Java8并发教程:Threads和Executorsgithub

欢迎阅读个人Java8并发教程的第一部分。这份指南将会以简单易懂的代码示例来教给你如何在Java8中进行并发编程。这是一系列教程中的第一部分。在接下来的15分钟,你将会学会如何经过线程,任务(tasks)和 exector services来并行执行代码。编程

并发在Java5中首次被引入并在后续的版本中不断获得加强。在这篇文章中介绍的大部分概念一样适用于之前的Java版本。不过个人代码示例聚焦于Java8,大量使用lambda表达式和其余新特性。若是你对lambda表达式不属性,我推荐你首先阅读个人Java 8 教程post

ThreadRunnable

全部的现代操做系统都经过进程和线程来支持并发。进程是一般彼此独立运行的程序的实例,好比,若是你启动了一个Java程序,操做系统产生一个新的进程,与其余程序一块儿并行执行。在这些进程的内部,咱们使用线程并发执行代码,所以,咱们能够最大限度的利用CPU可用的核心(core)。性能

Java从JDK1.0开始执行线程。在开始一个新的线程以前,你必须指定由这个线程执行的代码,一般称为task。这能够经过实现Runnable——一个定义了一个无返回值无参数的run()方法的函数接口,以下面的代码所示:学习

Runnable task = () -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
};

task.run();

Thread thread = new Thread(task);
thread.start();

System.out.println("Done!");

由于Runnable是一个函数接口,因此咱们利用lambda表达式将当前的线程名打印到控制台。首先,在开始一个线程前咱们在主线程中直接运行runnable。

控制台输出的结果可能像下面这样:

Hello main
Hello Thread-0
Done!

或者这样:

Hello main
Done!
Hello Thread-0

因为咱们不能预测这个runnable是在打印'done'前执行仍是在以后执行。顺序是不肯定的,所以在大的程序中编写并发程序是一个复杂的任务。

咱们能够将线程休眠肯定的时间。在这篇文章接下来的代码示例中咱们能够经过这种方法来模拟长时间运行的任务。

Runnable runnable = () -> {
    try {
        String name = Thread.currentThread().getName();
        System.out.println("Foo " + name);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Bar " + name);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Thread thread = new Thread(runnable);
thread.start();

当你运行上面的代码时,你会注意到在第一条打印语句和第二条打印语句之间存在一分钟的延迟。TimeUnit在处理单位时间时一个有用的枚举类。你能够经过调用Thread.sleep(1000)来达到一样的目的。

使用Thread类是很单调的且容易出错。因为并发API在2004年Java5发布的时候才被引入。这些API位于java.util.concurrent包下,包含不少处理并发编程的有用的类。自从这些并发API引入以来,在随后的新的Java版本发布过程当中获得不断的加强,甚至Java8提供了新的类和方法来处理并发。

接下来,让咱们走进并发API中最重要的一部——executor services。

Executor

并发API引入了ExecutorService做为一个在程序中直接使用Thread的高层次的替换方案。Executos支持运行异步任务,一般管理一个线程池,这样一来咱们就不须要手动去建立新的线程。在不断地处理任务的过程当中,线程池内部线程将会获得复用,所以,在咱们可使用一个executor service来运行和咱们想在咱们整个程序中执行的同样多的并发任务。

下面是使用executors的第一个代码示例:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
});

// => Hello pool-1-thread-1

Executors类提供了便利的工厂方法来建立不一样类型的 executor services。在这个示例中咱们使用了一个单线程线程池的 executor。

代码运行的结果相似于上一个示例,可是当运行代码时,你会注意到一个很大的差异:Java进程从没有中止!Executors必须显式的中止-不然它们将持续监听新的任务。

ExecutorService提供了两个方法来达到这个目的——shutdwon()会等待正在执行的任务执行完而shutdownNow()会终止全部正在执行的任务并当即关闭execuotr。

这是我喜欢的一般关闭executors的方式:

try {
    System.out.println("attempt to shutdown executor");
    executor.shutdown();
    executor.awaitTermination(5, TimeUnit.SECONDS);
    }
catch (InterruptedException e) {
    System.err.println("tasks interrupted");
}
finally {
    if (!executor.isTerminated()) {
        System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
}

executor经过等待指定的时间让当前执行的任务终止来“温柔的”关闭executor。在等待最长5分钟的时间后,execuote最终会经过中断全部的正在执行的任务关闭。

CallableFuture

除了Runnable,executor还支持另外一种类型的任务——Callable。Callables也是相似于runnables的函数接口,不一样之处在于,Callable返回一个值。

下面的lambda表达式定义了一个callable:在休眠一分钟后返回一个整数。

Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
};

Callbale也能够像runnbales同样提交给 executor services。可是callables的结果怎么办?由于submit()不会等待任务完成,executor service不能直接返回callable的结果。不过,executor 能够返回一个Future类型的结果,它能够用来在稍后某个时间取出实际的结果。

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("future done? " + future.isDone());

Integer result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);

在将callable提交给exector以后,咱们先经过调用isDone()来检查这个future是否已经完成执行。我十分肯定这会发生什么,由于在返回那个整数以前callable会休眠一分钟、

在调用get()方法时,当前线程会阻塞等待,直到callable在返回实际的结果123以前执行完成。如今future执行完毕,咱们能够在控制台看到以下的结果:

future done? false
future done? true
result: 123

Future与底层的executor service紧密的结合在一块儿。记住,若是你关闭executor,全部的未停止的future都会抛出异常。

executor.shutdownNow();
future.get();

你可能注意到咱们此次建立executor的方式与上一个例子稍有不一样。咱们使用newFixedThreadPool(1)来建立一个单线程线程池的 execuot service。
这等同于使用newSingleThreadExecutor不过使用第二种方式咱们能够稍后经过简单的传入一个比1大的值来增长线程池的大小。

超时

任何future.get()调用都会阻塞,而后等待直到callable停止。在最糟糕的状况下,一个callable持续运行——所以使你的程序将没有响应。咱们能够简单的传入一个时长来避免这种状况。

ExecutorService executor = Executors.newFixedThreadPool(1);

    Future<Integer> future = executor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
});

    future.get(1, TimeUnit.SECONDS);

运行上面的代码将会产生一个TimeoutException

Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)

你可能已经猜到俄为何会排除这个异常。咱们指定的最长等待时间为1分钟,而这个callable在返回结果以前实际须要两分钟。

invokeAll

Executors支持经过invokeAll()一次批量提交多个callable。这个方法结果一个callable的集合,而后返回一个future的列表。

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println);

在这个例子中,咱们利用Java8中的函数流(stream)来处理invokeAll()调用返回的全部future。咱们首先将每个future映射到它的返回值,而后将每一个值打印到控制台。若是你还不属性stream,能够阅读个人Java8 Stream 教程

invokeAny

批量提交callable的另外一种方式就是invokeAny(),它的工做方式与invokeAll()稍有不一样。在等待future对象的过程当中,这个方法将会阻塞直到第一个callable停止而后返回这一个callable的结果。

为了测试这种行为,咱们利用这个帮助方法来模拟不一样执行时间的callable。这个方法返回一个callable,这个callable休眠指定 的时间直到返回给定的结果。

Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}

咱们利用这个方法建立一组callable,这些callable拥有不一样的执行时间,从1分钟到3分钟。经过invokeAny()将这些callable提交给一个executor,返回最快的callable的字符串结果-在这个例子中为任务2:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task2

上面这个例子又使用了另外一种方式来建立executor——调用newWorkStealingPool()。这个工厂方法是Java8引入的,返回一个ForkJoinPool类型的 executor,它的工做方法与其余常见的execuotr稍有不一样。与使用一个固定大小的线程池不一样,ForkJoinPools使用一个并行因子数来建立,默认值为主机CPU的可用核心数。

ForkJoinPools 在Java7时引入,将会在这个系列后面的教程中详细讲解。让咱们深刻了解一下 scheduled executors 来结束本次教程。

ScheduledExecutor

咱们已经学习了如何在一个 executor 中提交和运行一次任务。为了持续的屡次执行常见的任务,咱们能够利用调度线程池。

ScheduledExecutorService支持任务调度,持续执行或者延迟一段时间后执行。

下面的实例,调度一个任务在延迟3分钟后执行:

ScheduledExecutorService executor =                 Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(1337);

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);

调度一个任务将会产生一个专门的future类型——ScheduleFuture,它除了提供了Future的全部方法以外,他还提供了getDelay()方法来得到剩余的延迟。在延迟消逝后,任务将会并发执行。

为了调度任务持续的执行,executors 提供了两个方法scheduleAtFixedRate()scheduleWithFixedDelay()。第一个方法用来以固定频率来执行一个任务,好比,下面这个示例中,每分钟一次:

ScheduledExecutorService executor =     Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

另外,这个方法还接收一个初始化延迟,用来指定这个任务首次被执行等待的时长。

请记住:scheduleAtFixedRate()并不考虑任务的实际用时。因此,若是你指定了一个period为1分钟而任务须要执行2分钟,那么线程池为了性能会更快的执行。

在这种状况下,你应该考虑使用scheduleWithFixedDelay()。这个方法的工做方式与上咱们上面描述的相似。不一样之处在于等待时间 period 的应用是在一次任务的结束和下一个任务的开始之间。例如:

ScheduledExecutorService executor =         Executors.newScheduledThreadPool(1);

Runnable task = () -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Scheduling: " + System.nanoTime());
    }
    catch (InterruptedException e) {
        System.err.println("task interrupted");
    }
};

executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

这个例子调度了一个任务,并在一次执行的结束和下一次执行的开始之间设置了一个1分钟的固定延迟。初始化延迟为0,任务执行时间为0。因此咱们分别在0s,3s,6s,9s等间隔处结束一次执行。如你所见,scheduleWithFixedDelay()在你不能预测调度任务的执行时长时是颇有用的。

这是并发系列教程的第一部分。我推荐你亲手实践一下上面的代码示例。你能够从 Github 上找到这篇文章中全部的代码示例,因此欢迎你fork这个仓库,并收藏它

我但愿你会喜欢这篇文章。若是你有任何的问题均可以在下面评论或者经过 Twitter 向我反馈。

相关文章
相关标签/搜索