java.util.concurrent(J.U.C)大大提升了并发性能,AQS 被认为是 J.U.C 的核心。html
用来控制一个或者多个线程等待多个线程。java
维护了一个计数器 cnt,每次调用 countDown() 方法会让计数器的值减 1,减到 0 的时候,那些由于调用 await() 方法而在等待的线程就会被唤醒。git
public class CountdownLatchExample {
public static void main(String[] args) throws InterruptedException {
final int totalThread = 10;
CountDownLatch countDownLatch = new CountDownLatch(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {// 建立 totalThread 条线程
System.out.print("run..");
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println("end");
executorService.shutdown();
}
}
复制代码
run..run..run..run..run..run..run..run..run..run..end
复制代码
用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。程序员
和 CountdownLatch 类似,都是经过维护计数器来实现的。线程执行 await() 方法以后计数器会减 1,并进行等待,直到计数器为 0,全部调用 await() 方法而在等待的线程才能继续执行。github
CyclicBarrier 和 CountdownLatch 的一个区别是,CyclicBarrier 的计数器经过调用 reset() 方法能够循环使用,因此它才叫作循环屏障。算法
CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 在全部线程都到达屏障的时候会执行一次。缓存
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
复制代码
public class CyclicBarrierExample {
public static void main(String[] args) {
final int totalThread = 10;
CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.out.print("before..");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.print("after..");
});
}
executorService.shutdown();
}
}
复制代码
before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..
复制代码
Semaphore 相似于操做系统中的信号量,能够控制对互斥资源的访问线程数。安全
如下代码模拟了对某个服务的并发请求,每次只能有 3 个客户端同时访问,请求总数为 10。bash
public class SemaphoreExample {
public static void main(String[] args) {
final int clientCount = 3;
final int totalRequestCount = 10;
Semaphore semaphore = new Semaphore(clientCount);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
executorService.execute(()->{
try {
semaphore.acquire();
System.out.print(semaphore.availablePermits() + " ");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
});
}
executorService.shutdown();
}
}
复制代码
2 1 2 2 2 2 2 1 2 2
复制代码
在介绍 Callable 时咱们知道它能够有返回值,返回值经过 Future 进行封装。FutureTask 实现了 RunnableFuture 接口,该接口继承自 Runnable 和 Future 接口,这使得 FutureTask 既能够当作一个任务执行,也能够有返回值。服务器
public class FutureTask<V> implements RunnableFuture<V> 复制代码
public interface RunnableFuture<V> extends Runnable, Future<V> 复制代码
FutureTask 可用于异步获取执行结果或取消执行任务的场景。当一个计算任务须要执行很长时间,那么就能够用 FutureTask 来封装这个任务,主线程在完成本身的任务以后再去获取结果。
public class FutureTaskExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
for (int i = 0; i < 100; i++) {
Thread.sleep(10);
result += i;
}
return result;
}
});
Thread computeThread = new Thread(futureTask);
computeThread.start();
Thread otherThread = new Thread(() -> {
System.out.println("other task is running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
otherThread.start();
System.out.println(futureTask.get());
}
}
复制代码
other task is running...
4950
复制代码
java.util.concurrent.BlockingQueue 接口有如下阻塞队列的实现:
提供了阻塞的 take() 和 put() 方法:若是队列为空 take() 将阻塞,直到队列中有内容;若是队列为满 put() 将阻塞,直到队列有空闲位置。
使用 BlockingQueue 实现生产者消费者问题
public class ProducerConsumer {
private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
private static class Producer extends Thread {
@Override
public void run() {
try {
queue.put("product");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("produce..");
}
}
private static class Consumer extends Thread {
@Override
public void run() {
try {
String product = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("consume..");
}
}
}
复制代码
public static void main(String[] args) {
for (int i = 0; i < 2; i++) {
Producer producer = new Producer();
producer.start();
}
for (int i = 0; i < 5; i++) {
Consumer consumer = new Consumer();
consumer.start();
}
for (int i = 0; i < 3; i++) {
Producer producer = new Producer();
producer.start();
}
}
复制代码
produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..
复制代码
主要用于并行计算中,和 MapReduce 原理相似,都是把大的计算任务拆分红多个小任务并行计算。
public class ForkJoinExample extends RecursiveTask<Integer> {
private final int threshold = 5;
private int first;
private int last;
public ForkJoinExample(int first, int last) {
this.first = first;
this.last = last;
}
@Override
protected Integer compute() {
int result = 0;
if (last - first <= threshold) {
// 任务足够小则直接计算
for (int i = first; i <= last; i++) {
result += i;
}
} else {
// 拆分红小任务
int middle = first + (last - first) / 2;
ForkJoinExample leftTask = new ForkJoinExample(first, middle);
ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
leftTask.fork();
rightTask.fork();
result = leftTask.join() + rightTask.join();
}
return result;
}
}
复制代码
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinExample example = new ForkJoinExample(1, 10000);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future result = forkJoinPool.submit(example);
System.out.println(result.get());
}
复制代码
ForkJoin 使用 ForkJoinPool 来启动,它是一个特殊的线程池,线程数量取决于 CPU 核数。
public class ForkJoinPool extends AbstractExecutorService 复制代码
ForkJoinPool 实现了工做窃取算法来提升 CPU 的利用率。每一个线程都维护了一个双端队列(在线性表的两端进行插入和删除),用来存储须要执行的任务。工做窃取算法容许空闲的线程从其它线程的双端队列中窃取一个任务来执行。窃取的任务必须是最晚的任务,避免和队列所属线程发生竞争。例以下图中,Thread2 从 Thread1 的队列中拿出最晚的 Task1 任务,Thread1 会拿出 Task2 来执行,这样就避免发生竞争。可是若是队列中只有一个任务时仍是会发生竞争。
若是多个线程对同一个共享数据进行访问而不采起同步操做的话,那么操做的结果是不一致的。
如下代码演示了 1000 个线程同时对 cnt 执行自增操做,操做结束以后它的值有可能小于 1000。
public class ThreadUnsafeExample {
private int cnt = 0;
public void add() {
cnt++;
}
public int get() {
return cnt;
}
}
复制代码
public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000;
ThreadUnsafeExample example = new ThreadUnsafeExample();
final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() -> {
example.add();
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println(example.get());
}
复制代码
997
复制代码
Java 内存模型试图屏蔽各类硬件和操做系统的内存访问差别,以实现让 Java 程序在各类平台下都能达到一致的内存访问效果。
处理器上的寄存器的读写的速度比内存快几个数量级,为了解决这种速度矛盾,在它们之间加入了高速缓存。
加入高速缓存带来了一个新的问题:缓存一致性。若是多个缓存共享同一块主内存区域,那么多个缓存的数据可能会不一致,须要一些协议来解决这个问题。
线程只能直接操做工做内存中的变量,不一样线程之间的变量值传递须要经过主内存来完成。
Java 内存模型定义了 8 个操做来完成主内存和工做内存的交互操做。
代表此操做是不可分割的,不可中断,要所有执行,要么所有不执行。
可见性指当一个线程修改了共享变量的值,其它线程可以当即得知这个修改。
主要有三种实现可见性的方式:
对前面的线程不安全示例中的 cnt 变量使用 volatile 修饰,不能解决线程不安全问题,由于 volatile 并不能保证操做的原子性。
有序性是指:在本线程内观察,全部操做都是有序的。在一个线程观察另外一个线程,全部操做都是无序的,无序是由于发生了指令重排序。
volatile 关键字经过添加内存屏障的方式来禁止指令重排,即重排序时不能把后面的指令放到内存屏障以前。
也能够经过 synchronized 来保证有序性,它保证每一个时刻只有一个线程执行同步代码,至关因而让线程顺序执行同步代码。
上面提到了能够用 volatile 和 synchronized 来保证有序性。除此以外,JVM 还规定了先行发生原则,让一个操做无需控制就能先于另外一个操做完成。
在一个线程内,在程序前面的操做先行发生于后面的操做。
一个 unlock 操做先行发生于后面对同一个锁的 lock 操做。
对一个 volatile 变量的写操做先行发生于后面对这个变量的读操做。
Thread 对象的 start() 方法调用先行发生于此线程的每个动做。
Thread 对象的结束先行发生于 join() 方法返回。
对线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生,能够经过 interrupted() 方法检测到是否有中断发生。
一个对象的初始化完成(构造函数执行结束)先行发生于它的 finalize() 方法的开始。
finalize()是Object中的方法,当垃圾回收器将要回收对象所占内存以前被调用,即当一个对象被虚拟机宣告死亡时会先调用它finalize()方法,让此对象处理它生前的最后事情(这个对象能够趁这个时机挣脱死亡的命运)。
若是操做 A 先行发生于操做 B,操做 B 先行发生于操做 C,那么操做 A 先行发生于操做 C。
多个线程无论以何种方式访问某个类,而且在主调代码中不须要进行同步,都能表现正确的行为。
线程安全有如下几种实现方式:
不可变(Immutable)的对象必定是线程安全的,不须要再采起任何的线程安全保障措施。只要一个不可变的对象被正确地构建出来,永远也不会看到它在多个线程之中处于不一致的状态。多线程环境下,应当尽可能使对象成为不可变,来知足线程安全。
不可变的类型:
对于集合类型,可使用 Collections.unmodifiableXXX() 方法来获取一个不可变的集合。
public class ImmutableExample {
public static void main(String[] args) {
Map<String, Integer> map = new HashMap<>();
Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map);
unmodifiableMap.put("a", 1);
}
}
复制代码
Exception in thread "main" java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
at ImmutableExample.main(ImmutableExample.java:9)
复制代码
Collections.unmodifiableXXX() 先对原始的集合进行拷贝,须要对集合进行修改的方法都直接抛出异常。
public V put(K key, V value) {
throw new UnsupportedOperationException();
}
复制代码
synchronized 和 ReentrantLock。
互斥同步最主要的问题就是线程阻塞和唤醒所带来的性能问题,所以这种同步也称为阻塞同步。
互斥同步属于一种悲观的并发策略,老是认为只要不去作正确的同步措施,那就确定会出现问题。不管共享数据是否真的会出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分没必要要的加锁)、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程须要唤醒等操做。
随着硬件指令集的发展,咱们可使用基于冲突检测的乐观并发策略:先进行操做,若是没有其它线程争用共享数据,那操做就成功了,不然采起补偿措施(不断地重试,直到成功为止)。这种乐观的并发策略的许多实现都不须要将线程阻塞,所以这种同步操做称为非阻塞同步。
CAS 指令须要有 3 个操做数,分别是内存地址 V、旧的预期值 A 和新值 B。当执行操做时,只有当 V 的值等于 A,才将 V 的值更新为 B。
CAS的语义是“我认为V的值应该为A,若是是,那么将V的值更新为B,不然不修改并告诉V的值实际为多少”,CAS是项 乐观锁 技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知此次竞争中失败,并能够再次尝试。
J.U.C 包里面的整数原子类 AtomicInteger 的方法调用了 Unsafe 类的 CAS 操做。
如下代码使用了 AtomicInteger 执行了自增的操做。
private AtomicInteger cnt = new AtomicInteger();
public void add() {
cnt.incrementAndGet();
}
复制代码
如下代码是 incrementAndGet() 的源码,它调用了 Unsafe 的 getAndAddInt() 。
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
复制代码
如下代码是 getAndAddInt() 源码,var1 指示对象内存地址,var2 指示该字段相对对象内存地址的偏移,var4 指示操做须要加的数值,这里为 1。经过 getIntVolatile(var1, var2) 获得旧的预期值,经过调用 compareAndSwapInt() 来进行 CAS 比较,若是该字段内存地址中的值等于 var5,那么就更新内存地址为 var1+var2 的变量为 var5+var4。
能够看到 getAndAddInt() 在一个循环中进行,发生冲突的作法是不断的进行重试。
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
复制代码
若是一个变量初次读取的时候是 A 值,它的值被改为了 B,后来又被改回为 A,那 CAS 操做就会误认为它历来没有被改变过。
J.U.C 包提供了一个带有标记的原子引用类 AtomicStampedReference 来解决这个问题,它能够经过控制变量值的版原本保证 CAS 的正确性。大部分状况下 ABA 问题不会影响程序并发的正确性,若是须要解决 ABA 问题,改用传统的互斥同步可能会比原子类更高效。
要保证线程安全,并非必定就要进行同步。若是一个方法原本就不涉及共享数据,那它天然就无须任何同步措施去保证正确性。
多个线程访问同一个方法的局部变量时,不会出现线程安全问题,由于局部变量存储在虚拟机栈中,属于线程私有的。
public class StackClosedExample {
public void add100() {
int cnt = 0;
for (int i = 0; i < 100; i++) {
cnt++;
}
System.out.println(cnt);
}
}
复制代码
public static void main(String[] args) {
StackClosedExample example = new StackClosedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> example.add100());
executorService.execute(() -> example.add100());
executorService.shutdown();
}
复制代码
100
100
复制代码
若是一段代码中所须要的数据必须与其余代码共享,那就看看这些共享数据的代码是否能保证在同一个线程中执行。若是能保证,咱们就能够把共享数据的可见范围限制在同一个线程以内,这样,无须同步也能保证线程之间不出现数据争用的问题。
符合这种特色的应用并很多见,大部分使用消费队列的架构模式(如“生产者-消费者”模式)都会将产品的消费过程尽可能在一个线程中消费完。其中最重要的一个应用实例就是经典 Web 交互模型中的“一个请求对应一个服务器线程”(Thread-per-Request)的处理方式,这种处理方式的普遍应用使得不少 Web 服务端应用均可以使用线程本地存储来解决线程安全问题。
可使用 java.lang.ThreadLocal 类来实现线程本地存储功能。
对于如下代码,thread1 中设置 threadLocal 为 1,而 thread2 设置 threadLocal 为 2。过了一段时间以后,thread1 读取 threadLocal 依然是 1,不受 thread2 的影响。
public class ThreadLocalExample {
public static void main(String[] args) {
ThreadLocal threadLocal = new ThreadLocal();
Thread thread1 = new Thread(() -> {
threadLocal.set(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadLocal.get());
threadLocal.remove();
});
Thread thread2 = new Thread(() -> {
threadLocal.set(2);
threadLocal.remove();
});
thread1.start();
thread2.start();
}
}
复制代码
1
复制代码
这种代码也叫作纯代码(Pure Code),能够在代码执行的任什么时候刻中断它,转而去执行另一段代码(包括递归调用它自己),而在控制权返回后,原来的程序不会出现任何错误。
可重入代码有一些共同的特征,例如不依赖存储在堆上的数据和公用的系统资源、用到的状态量都由参数中传入、不调用非可重入的方法等。
这里的锁优化主要是指 JVM 对 synchronized 的优化。
互斥同步进入阻塞状态的开销都很大,应该尽可能避免。在许多应用中,共享数据的锁定状态只会持续很短的一段时间。自旋锁的思想是让一个线程在请求一个共享数据的锁时执行忙循环(自旋)一段时间,若是在这段时间内能得到锁,就能够避免进入阻塞状态。
忙循环: 忙循环就是程序员用循环让一个线程等待,不像传统方法wait(), sleep() 或 yield() 它们都放弃了CPU控制,而忙循环不会放弃CPU,它就是在运行一个空循环。这么作的目的是为了保留CPU缓存。
自旋锁虽然能避免进入阻塞状态从而减小开销,可是它须要进行忙循环操做占用 CPU 时间,它只适用于共享数据的锁定状态很短的场景。
在 JDK 1.6 中引入了自适应的自旋锁。自适应意味着自旋的次数再也不固定了,而是由前一次在同一个锁上的自旋次数及锁的拥有者的状态来决定。
锁消除是指对于被检测出不可能存在竞争的共享数据的锁进行消除。
锁消除主要是经过逃逸分析来支持,若是堆上的共享数据不可能逃逸出去被其它线程访问到,那么就能够把它们当成私有数据对待,也就能够将它们的锁进行消除。
对于一些看起来没有加锁的代码,其实隐式的加了不少锁。例以下面的字符串拼接代码就隐式加了锁:
public static String concatString(String s1, String s2, String s3) {
return s1 + s2 + s3;
}
复制代码
String 是一个不可变的类,编译器会对 String 的拼接自动优化。在 JDK 1.5 以前,会转化为 StringBuffer 对象的连续 append() 操做:
public static String concatString(String s1, String s2, String s3) {
StringBuffer sb = new StringBuffer();
sb.append(s1);
sb.append(s2);
sb.append(s3);
return sb.toString();
}
复制代码
每一个 append() 方法中都有一个同步块。虚拟机观察变量 sb,很快就会发现它的动态做用域被限制在 concatString() 方法内部。也就是说,sb 的全部引用永远不会逃逸到 concatString() 方法以外,其余线程没法访问到它,所以能够进行消除。
若是一系列的连续操做都对同一个对象反复加锁和解锁,频繁的加锁操做就会致使性能损耗。
上一节的示例代码中连续的 append() 方法就属于这类状况。若是虚拟机探测到由这样的一串零碎的操做都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操做序列的外部。对于上一节的示例代码就是扩展到第一个 append() 操做以前直至最后一个 append() 操做以后,这样只须要加锁一次就能够了。
轻量级锁不是为了代替重量级锁,它的本意是在没有多线程竞争的前提下,减小传统的重量级锁使用操做系统互斥量产生的性能消耗
偏向锁的思想是偏向于让第一个获取锁对象的线程,这个线程在以后获取该锁就再也不须要进行同步操做,甚至连 CAS 操做也再也不须要。
本文参考聊聊并发(八)——Fork/Join 框架介绍、线程通讯、Threads and Locks、Threads and Locks、CS-Notes、Java内存模型 之三个特性、轻量级锁、、thread state java