1. 简介java
从诞生开始,Java 就支持线程、锁等关键的并发概念。这篇文章旨在为使用了多线程的 Java 开发者理解 Core Java 中的并发概念以及使用方法。安全
2. 概念数据结构
表1 并发概念
多线程
2.1 竞争条件并发
多个线程对共享资源执行一系列操做,根据每一个线程的操做顺序可能存在几种结果,这时出现竞争条件。下面的代码不是线程安全的,并且能够不止一次地初始化 value,由于 check-then-act(检查 null,而后初始化),因此延迟初始化的字段不具有原子性:app
class Lazy <T> {
private volatile T value;
T get() {
if (value == null)
value = initialize();
return value;
}
}
2.2 数据竞争异步
两个或多个线程试图访问同一个非 final 变量而且不加上同步机制,这时会发生数据竞争。没有同步机制可能致使这样的状况,线程执行过程当中作出其余线程没法看到的更改,于是致使读到修改前的数据。这样反过来可能又会致使无限循环、破坏数据结构或获得错误的计算结果。下面这段代码可能会无限循环,由于读线程可能永远不知道写线程所作的更改:async
class Waiter implements Runnable {
private boolean shouldFinish;
void finish() { shouldFinish = true; }
public void run() {
long iteration = 0;
while (!shouldFinish) {
iteration++;
}
System.out.println("Finished after: " + iteration);
}
}
class DataRace {
public static void main(String[] args) throws InterruptedException {
Waiter waiter = new Waiter();
Thread waiterThread = new Thread(waiter);
waiterThread.start();
waiter.finish();
waiterThread.join();
}
}
Java 内存模型定义基于一些操做,好比读写字段、 Monitor 同步等。这些操做能够按照 happens-before 关系进行排序。这种关系可用来推断一个线程什么时候看到另外一个线程的操做结果,以及构成一个程序同步后的全部信息。ide
happens-before 关系具有如下特性:函数
在线程开始全部操做前调用 Thread#start
在获取 Monitor 前,释放该 Monitor
在读取 volatile 变量前,对该变量执行一次写操做
在写入 final 变量前,确保在对象引用已存在
线程中的全部操做应在 Thread#join 返回以前完成
使用 synchronized 关键字能够防止不一样线程同时执行相同代码块。因为进入同步执行的代码块以前加锁,受该锁保护的数据能够在排他模式下操做,从而让操做具有原子性。此外,其余线程在得到相同的锁后也能看到操做结果。
class AtomicOperation {
private int counter0;
private int counter1;
void increment() {
synchronized (this) {
counter0++;
counter1++;
}
}
}
也能够在方法上加 synchronized 关键字。
表2 当整个方法都标记 synchronized 时使用的 Monitor
锁是可重入的。若是线程已经持有锁,它能够再次成功地得到该锁。
class Reentrantcy {
synchronized void doAll() {
doFirst();
doSecond();
}
synchronized void doFirst() {
System.out.println("First operation is successful.");
}
synchronized void doSecond() {
System.out.println("Second operation is successful.");
}
}
竞争的程度对获取 Monitor 的方式有影响:
表3: Monitor 状态
wait/notify/notifyAll 方法在 Object 类中声明。若是以前设置了超时,线程进入 WAITING 或 TIMED_WAITING 状态前保持 wait状态。要唤醒一个线程,能够执行下列任何操做:
另外一个线程调用 notify 将唤醒任意一个在 Monitor 上等待的线程。
另外一个线程调用 notifyAll 将唤醒全部在等待 Monitor 上等待的线程。
调用 Thread#interrupt 后会抛出 InterruptedException 异常。
最多见的模式是条件循环:
class ConditionLoop {
private boolean condition;
synchronized void waitForCondition() throws InterruptedException {
while (!condition) {
wait();
}
}
synchronized void satisfyCondition() {
condition = true;
notifyAll();
}
}
请记住,在对象上调用 wait/notify/notifyAll,须要首先得到该对象的锁
在检查等待条件的循环中保持等待:这解决了另外一个线程在等待开始以前即知足条件时的计时问题。 此外,这样作还可让你的代码免受可能(也的确会)发生的虚假唤醒
在调用 notify/notifyAll 前,要确保知足等待条件。若是不这样作会引起通知,然而没有线程可以避免等待循环
volatile 解决了可见性问题,让修改为为原子操做。因为存在 happens-before 关系,在接下来读取 volatile 变量前,先对 volatile 变量进行写操做。 从而保证了对该字段的任何读操做都能督读到最近一次修改后的值。
class VolatileFlag implements Runnable {
private volatile boolean shouldStop;
public void run() {
while (!shouldStop) {
// 执行操做
}
System.out.println("Stopped.");
}
void stop() {
shouldStop = true;
}
public static void main(String[] args) throws InterruptedException {
VolatileFlag flag = new VolatileFlag();
Thread thread = new Thread(flag);
thread.start();
flag.stop();
thread.join();
}
}
java.util.concurrent.atomic package 包含了一组类,它们用相似 volatile 的无锁方式支持单个值的原子复合操做。
使用 AtomicXXX 类,能够实现 check-then-act 原子操做:
class CheckThenAct {
private final AtomicReference<String> value = new AtomicReference<>();
void initialize() {
if (value.compareAndSet(null, "Initialized value")) {
System.out.println("Initialized only once.");
}
}
}
AtomicInteger 和 AtomicLong 都提供原子 increment/decrement 操做:
class Increment {
private final AtomicInteger state = new AtomicInteger();
void advance() {
int oldState = state.getAndIncrement();
System.out.println("Advanced: '" + oldState + "' -> '" + (oldState + 1) + "'.");
}
}
若是你但愿有这样一个计数器,不须要在获取计数的时候具有原子性,能够考虑用 LongAdder 取代 AtomicLong/AtomicInteger。 LongAdder 能在多个单元中存值并在须要时增长计数,所以在竞争激烈的状况下表现更好。
一种在线程中包含数据但不用锁的方法是使用 ThreadLocal 存储。从概念上讲,ThreadLocal 能够看作每一个 Thread 存有一份本身的变量。Threadlocal 一般用于保存每一个线程的值,好比“当前事务”或其余资源。 此外,还能够用于维护每一个线程的计数器、统计信息或 ID 生成器。
class TransactionManager {
private final ThreadLocal<Transaction> currentTransaction
= ThreadLocal.withInitial(NullTransaction::new);
Transaction currentTransaction() {
Transaction current = currentTransaction.get();
if (current.isNull()) {
current = new TransactionImpl();
currentTransaction.set(current);
}
return current;
}
}
想让一个对象在当前做用域外使用能够发布对象,例如从 getter 返回该对象的引用。 要确保安全地发布对象,仅在对象彻底构造好后发布,可能须要同步。 能够经过如下方式安全地发布:
静态初始化器。只有一个线程能够初始化静态变量,由于类的初始化在获取排他锁条件下完成。
class StaticInitializer {
// 无需额外初始化条件,发布一个不可变对象
public static final Year year = Year.of(2017);
public static final Set<String> keywords;
// 使用静态初始化器构造复杂对象
static {
// 建立可变集合
Set<String> keywordsSet = new HashSet<>();
// 初始化状态
keywordsSet.add("java");
keywordsSet.add("concurrency");
// 设置 set 不可修改
keywords = Collections.unmodifiableSet(keywordsSet);
}
}
volatile 字段。因为写入 volatile 变量发生在读操做以前,所以读线程总能读到最新的值。
class Volatile {
private volatile String state;
void setState(String state) {
this.state = state;
}
String getState() {
return state;
}
}
Atomic。例如 AtomicInteger 将值存储在 volatile 字段中,因此 volatile 变量的规则在这里也适用。
class Atomics {
private final AtomicInteger state = new AtomicInteger();
void initializeState(int state) {
this.state.compareAndSet(0, state);
}
int getState() {
return state.get();
}
}
final 字段
class Final {
private final String state;
Final(String state) {
this.state = state;
}
String getState() {
return state;
}
}
确保在对象构造期间不会修改此引用。
class ThisEscapes {
private final String name;
ThisEscapes(String name) {
Cache.putIntoCache(this);
this.name = name;
}
String getName() { return name; }
}
class Cache {
private static final Map<String, ThisEscapes> CACHE = new ConcurrentHashMap<>();
static void putIntoCache(ThisEscapes thisEscapes) {
// 'this' 引用在对象彻底构造以前发生了改变
CACHE.putIfAbsent(thisEscapes.getName(), thisEscapes);
}
}
正确同步字段
class Synchronization {
private String state;
synchronized String getState() {
if (state == null)
state = "Initial";
return state;
}
}
不可变对象的一个重要特征是线程安全,所以不须要同步。要成为不可变对象:
全部字段都标记 final
全部字段必须是可变或不可变的对象,注意不要改变对象做用域,不然构造后不能改变对象状态
this 引用在构造对象时不要泄露
类标记 final,子类没法重载改变类的行为
不可变对象示例:
// 标记为 final,禁止继承
public final class Artist {
// 不可变变量,字段标记 final
private final String name;
// 不可变变量集合, 字段标记 final
private final List<Track> tracks;
public Artist(String name, List<Track> tracks) {
this.name = name;
// 防护性拷贝
List<Track> copy = new ArrayList<>(tracks);
// 使可变集合不可修改
this.tracks = Collections.unmodifiableList(copy);
// 构造对象期间,'this' 不传递到任何其余地方
}
// getter、equals、hashCode、toString 方法
}
// 标记为 final,禁止继承
public final class Track {
// 不可变变量,字段标记 final
private final String title;
public Track(String title) {
this.title = title;
}
// getter、equals、hashCode、toString 方法
}
java.lang.Thread 类用于表示应用程序线程或 JVM 线程。 代码始终在某个 Thread 类的上下文中执行,使用 Thread#currentThread() 可返回本身的当前线程。
表4 线程状态
表5 线程协调方法
清理全部资源,并在当前运行级别尽量能完成线程执行
当前方法声明抛出 InterruptedException。
若是方法没有声明抛出 InterruptedException,那么应该经过调用 Thread.currentThread().interrupt() 将中断标志恢复为 true。 而且在这个级别上抛出更合适的异常。为了能在更高调用级别上处理中断,把中断标志设置为 true 很是重要
线程能够指定一个 UncaughtExceptionHandler 接收因为发生未捕获异常致使线程忽然终止的通知。
Thread thread = new Thread(runnable);
thread.setUncaughtExceptionHandler((failedThread, exception) -> {
logger.error("Caught unexpected exception in thread '{}'.",
failedThread.getName(), exception);
});
thread.start();
有多个线程,每一个线程都在等待另外一个线程持有的资源,造成一个获取资源的线程循环,这时会发生死锁。最典型的资源是对象 Monitor ,但也多是任何可能致使阻塞的资源,例如 wait/notify。
下面的代码可能产生死锁:
class Account {
private long amount;
void plus(long amount) { this.amount += amount; }
void minus(long amount) {
if (this.amount < amount)
throw new IllegalArgumentException();
else
this.amount -= amount;
}
static void transferWithDeadlock(long amount, Account first, Account second){
synchronized (first) {
synchronized (second) {
first.minus(amount);
second.plus(amount);
}
}
}
}
若是同时出现如下状况,就会发生死锁:
一个线程正试图从第一个账户切换到第二个账户,并已得到了第一个账户的锁
另外一个线程正试图从第二个账户切换到第一个账户,并已得到第二个账户的锁
避免死锁的方法:
按顺序加锁:老是以相同的顺序获取锁
class Account {
private long id;
private long amount;
// 此处略去了一些方法
static void transferWithLockOrdering(long amount, Account first, Account second){
boolean lockOnFirstAccountFirst = first.id < second.id;
Account firstLock = lockOnFirstAccountFirst ? first : second;
Account secondLock = lockOnFirstAccountFirst ? second : first;
synchronized (firstLock) {
synchronized (secondLock) {
first.minus(amount);
second.plus(amount);
}
}
}
}
锁定超时:获取锁时不要无限期阻塞,而是释放全部锁并重试
class Account {
private long amount;
// 此处略去了一些方法
static void transferWithTimeout(
long amount, Account first, Account second, int retries, long timeoutMillis
) throws InterruptedException {
for (int attempt = 0; attempt < retries; attempt++) {
if (first.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
{
try {
if (second.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
{
try {
first.minus(amount);
second.plus(amount);
}
finally {
second.lock.unlock();
}
}
}
finally {
first.lock.unlock();
}
}
}
}
}
Jvm 可以检测 Monitor 死锁,并以线程转储的形式打印死锁信息。
当线程将全部时间用于协商资源访问或者检测避免死锁,以致于没有线程可以访问资源时,会形成活锁(Livelock)。 线程饥饿发生在线程长时间持锁,致使一些线程没法继续执行被“饿死”。
线程池的核心接口是 ExecutorService。 java.util.concurrent 还提供了一个静态工厂类 Executors,其中包含了新建线程池的工厂方法,新建的线程池参数采用最多见的配置。
表6 静态工厂方法
译注:在并行计算中,work-stealing 是一种针对多线程计算机程序的调度策略。 它解决了在具备固定数量处理器或内核的静态多线程计算机上执行动态多线程计算的问题,这种计算能够“产生”新的执行线程。 在执行时间、内存使用和处理器间通讯方面都可以高效地完成任务。
在调整线程池的大小时,一般须要根据运行应用程序的计算机中的逻辑核心数量来肯定线程池的大小。 在 Java 中,能够经过调用 Runtime.getRuntime().availableProcessors() 读取。
表7 线程池实现
可经过 ExecutorService#submit、ExecutorService#invokeAll 或 ExecutorService#invokeAny 提交任务,可根据不一样任务进行屡次重载。
表8 任务的功能接口
Future 是对异步计算的一种抽象,表明计算结果。计算结果多是某个计算值或异常。ExecutorService 的大多数方法都使用 Future 做为返回类型。使用 Future 时,可经过提供的接口检查当前状态,或者一直阻塞直到结果计算完成。
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "result");
try {
String result = future.get(1L, TimeUnit.SECONDS);
System.out.println("Result is '" + result + "'.");
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
catch (TimeoutException e) {
throw new RuntimeException(e);
}
assert future.isDone();
java.util.concurrent.locks package 提供了标准 Lock 接口。ReentrantLock 在实现 synchronized 关键字功能的同时还包含了其余功能,例如获取锁的状态信息、非阻塞 tryLock() 和可中断锁定。直接使用 ReentrantLock 示例以下:
class Counter {
private final Lock lock = new ReentrantLock();
private int value;
int increment() {
lock.lock();
try {
return ++value;
} finally {
lock.unlock();
}
}
}
java.util.concurrent.locks package 还包含 ReadWriteLock 接口(以及 Reentrantreadelock 实现)。该接口定义了一对锁进行读写操做,一般支持多个并发读取,但只容许一个写入。
class Statistic {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private int value;
void increment() {
lock.writeLock().lock();
try {
value++;
} finally {
lock.writeLock().unlock();
}
}
int current() {
lock.readLock().lock();
try {
return value;
} finally {
lock.readLock().unlock();
}
}
}
CountDownLatch 用一个计数器初始化。线程能够调用 await() 等待计数归零。其余线程(或同一线程)可能会调用 countDown() 来减少计数。一旦计数归零即不可重用。CountDownLatch 用于发生某些操做时触发一组未知的线程。
CompletableFuture 是对异步计算的一种抽象。 与普通 Future 不一样,CompletableFuture 仅支持阻塞方式得到结果。当结果产生或发生异常时,执行由已注册的回调函数建立的任务管道。不管是建立过程当中(经过 CompletableFuture#supplyAsync/runAsync),仍是在加入回调过程当中(*async 系列方法),若是没有指定标准的全局 ForkJoinPool#commonPool 均可以设置执行计算的执行器。
考虑 CompletableFuture 已执行完毕,那么经过非 *async 方法注册的回调将在调用者的线程中执行。
若是程序中有几个 future,可使用 CompletableFuture#allOf 得到一个 future,这个 future 在全部 future 完成时结束。也能够调用 CompletableFuture#anyOf 得到一个 future,这个 future 在其中任何一个 future 完成时结束。
ExecutorService executor0 = Executors.newWorkStealingPool();
ExecutorService executor1 = Executors.newWorkStealingPool();
// 当这两个 future 完成时结束
CompletableFuture<String> waitingForAll = CompletableFuture
.allOf(
CompletableFuture.supplyAsync(() -> "first"),
CompletableFuture.supplyAsync(() -> "second", executor1)
)
.thenApply(ignored -> " is completed.");
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Concurrency Refcard", executor0)
// 使用同一个 executor
.thenApply(result -> "Java " + result)
// 使用不一样的 executor
.thenApplyAsync(result -> "Dzone " + result, executor1)
// 当前与其余 future 完成后结束
.thenCombine(waitingForAll, (first, second) -> first + second)
// 默认使用 ForkJoinPool#commonPool 做为 executor
.thenAcceptAsync(result -> {
System.out.println("Result is '" + result + "'.");
})
// 通用处理
.whenComplete((ignored, exception) -> {
if (exception != null)
exception.printStackTrace();
});
// 第一个阻塞调用:在 future 完成前保持阻塞
future.join();
future
// 在当前线程(main)中执行
.thenRun(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."))
// 默认使用 ForkJoinPool#commonPool 做为 executor
.thenRunAsync(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."))
使集合线程安全最简单方法是使用 Collections#synchronized* 系列方法。 因为这种解决方案在竞争激烈的状况下性能不好,因此 java.util.concurrent 提供了多种针对并发优化的数据结构。
表9:java.util.concurrent 中的 Lists
译注:copy-on-write(写入时复制)是一种计算机程序设计领域的优化策略。其核心思想是,若是有多个调用者同时请求相同资源(如内存或磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者试图修改资源的内容时,系统才会真正复制一份专用副本给该调用者,而其余调用者所见到的最初的资源仍然保持不变。这个过程对其余的调用者透明。这种作法的主要优势是若是调用者没有修改该资源,就不会新建副本,所以多个调用者只是读取操做能够共享同一份资源。
表10 java.util.concurrent 中的 Map
表11 java.util.concurrent 中的 Set
封装 concurrent map 进而建立 concurrent set 的另外一种方法:
Set<T> concurrentSet = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>());
队列就像是“生产者”和“消费者”之间的管道。按照“先进先出(FIFO)”顺序,将对象从管道的一端加入,从管道的另外一端取出。BlockingQueue 接口继承了 Queue接口,而且增长了(生产者添加对象时)队列满或(消费者读取或移除对象时)队列空的处理。 在这些状况下,BlockingQueue 提供的方法能够一直保持或在一段时间内保持阻塞状态,直到等待的条件由另外一个线程的操做改变。
表12 java.util.concurrent 中的 Queue