<!-- GFM-TOC -->php
<!-- GFM-TOC -->
建立后还没有启动。
可能正在运行,也可能正在等待 CPU 时间片。
包含了操做系统线程状态中的 Running 和 Ready。
等待获取一个排它锁,若是其线程释放了锁就会结束此状态。
等待其它线程显式地唤醒,不然不会被分配 CPU 时间片。
进入方法 | 退出方法 |
---|---|
没有设置 Timeout 参数的 Object.wait() 方法 | Object.notify() / Object.notifyAll() |
没有设置 Timeout 参数的 Thread.join() 方法 | 被调用的线程执行完毕 |
LockSupport.park() 方法 | LockSupport.unpark(Thread) |
无需等待其它线程显式地唤醒,在必定时间以后会被系统自动唤醒。
调用 Thread.sleep() 方法使线程进入限期等待状态时,经常用“使一个线程睡眠”进行描述。
调用 Object.wait() 方法使线程进入限期等待或者无限期等待时,经常用“挂起一个线程”进行描述。
睡眠和挂起是用来描述行为,而阻塞和等待用来描述状态。
阻塞和等待的区别在于,阻塞是被动的,它是在等待获取一个排它锁。而等待是主动的,经过调用 Thread.sleep() 和 Object.wait() 等方法进入。
进入方法 | 退出方法 |
---|---|
Thread.sleep() 方法 | 时间结束 |
设置了 Timeout 参数的 Object.wait() 方法 | 时间结束 / Object.notify() / Object.notifyAll() |
设置了 Timeout 参数的 Thread.join() 方法 | 时间结束 / 被调用的线程执行完毕 |
LockSupport.parkNanos() 方法 | LockSupport.unpark(Thread) |
LockSupport.parkUntil() 方法 | LockSupport.unpark(Thread) |
能够是线程结束任务以后本身结束,或者产生了异常而结束。
有三种使用线程的方法:
实现 Runnable 和 Callable 接口的类只能当作一个能够在线程中运行的任务,不是真正意义上的线程,所以最后还须要经过 Thread 来调用。能够说任务是经过线程驱动从而执行的。
须要实现 run() 方法。
经过 Thread 调用 start() 方法来启动线程。
public class MyRunnable implements Runnable { public void run() { // ... } }
public static void main(String[] args) { MyRunnable instance = new MyRunnable(); Thread thread = new Thread(instance); thread.start(); }
与 Runnable 相比,Callable 能够有返回值,返回值经过 FutureTask 进行封装。
public class MyCallable implements Callable<Integer> { public Integer call() { return 123; } }
public static void main(String[] args) throws ExecutionException, InterruptedException { MyCallable mc = new MyCallable(); FutureTask<Integer> ft = new FutureTask<>(mc); Thread thread = new Thread(ft); thread.start(); System.out.println(ft.get()); }
一样也是须要实现 run() 方法,由于 Thread 类也实现了 Runable 接口。
当调用 start() 方法启动一个线程时,虚拟机会将该线程放入就绪队列中等待被调度,当一个线程被调度时会执行该线程的 run() 方法。
public class MyThread extends Thread { public void run() { // ... } }
public static void main(String[] args) { MyThread mt = new MyThread(); mt.start(); }
实现接口会更好一些,由于:
Executor 管理多个异步任务的执行,而无需程序员显式地管理线程的生命周期。这里的异步是指多个任务的执行互不干扰,不须要进行同步操做。
主要有三种 Executor:
public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) { executorService.execute(new MyRunnable()); } executorService.shutdown(); }
守护线程是程序运行时在后台提供服务的线程,不属于程序中不可或缺的部分。
当全部非守护线程结束时,程序也就终止,同时会杀死全部守护线程。
main() 属于非守护线程。
使用 setDaemon() 方法将一个线程设置为守护线程。
public static void main(String[] args) { Thread thread = new Thread(new MyRunnable()); thread.setDaemon(true); }
Thread.sleep(millisec) 方法会休眠当前正在执行的线程,millisec 单位为毫秒。
sleep() 可能会抛出 InterruptedException,由于异常不能跨线程传播回 main() 中,所以必须在本地进行处理。线程中抛出的其它异常也一样须要在本地进行处理。
public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } }
对静态方法 Thread.yield() 的调用声明了当前线程已经完成了生命周期中最重要的部分,能够切换给其它线程来执行。该方法只是对线程调度器的一个建议,并且也只是建议具备相同优先级的其它线程能够运行。
public void run() { Thread.yield(); }
一个线程执行完毕以后会自动结束,若是在运行过程当中发生异常也会提早结束。
经过调用一个线程的 interrupt() 来中断该线程,若是该线程处于阻塞、限期等待或者无限期等待状态,那么就会抛出 InterruptedException,从而提早结束该线程。可是不能中断 I/O 阻塞和 synchronized 锁阻塞。
对于如下代码,在 main() 中启动一个线程以后再中断它,因为线程中调用了 Thread.sleep() 方法,所以会抛出一个 InterruptedException,从而提早结束线程,不执行以后的语句。
public class InterruptExample { private static class MyThread1 extends Thread { @Override public void run() { try { Thread.sleep(2000); System.out.println("Thread run"); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public static void main(String[] args) throws InterruptedException { Thread thread1 = new MyThread1(); thread1.start(); thread1.interrupt(); System.out.println("Main run"); }
Main run java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at InterruptExample.lambda$main$0(InterruptExample.java:5) at InterruptExample$$Lambda$1/713338599.run(Unknown Source) at java.lang.Thread.run(Thread.java:745)
若是一个线程的 run() 方法执行一个无限循环,而且没有执行 sleep() 等会抛出 InterruptedException 的操做,那么调用线程的 interrupt() 方法就没法使线程提早结束。
可是调用 interrupt() 方法会设置线程的中断标记,此时调用 interrupted() 方法会返回 true。所以能够在循环体中使用 interrupted() 方法来判断线程是否处于中断状态,从而提早结束线程。
public class InterruptExample { private static class MyThread2 extends Thread { @Override public void run() { while (!interrupted()) { // .. } System.out.println("Thread end"); } } }
public static void main(String[] args) throws InterruptedException { Thread thread2 = new MyThread2(); thread2.start(); thread2.interrupt(); }
Thread end
调用 Executor 的 shutdown() 方法会等待线程都执行完毕以后再关闭,可是若是调用的是 shutdownNow() 方法,则至关于调用每一个线程的 interrupt() 方法。
如下使用 Lambda 建立线程,至关于建立了一个匿名内部线程。
public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(() -> { try { Thread.sleep(2000); System.out.println("Thread run"); } catch (InterruptedException e) { e.printStackTrace(); } }); executorService.shutdownNow(); System.out.println("Main run"); }
Main run java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9) at ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
若是只想中断 Executor 中的一个线程,能够经过使用 submit() 方法来提交一个线程,它会返回一个 Future<?> 对象,经过调用该对象的 cancel(true) 方法就能够中断线程。
Future<?> future = executorService.submit(() -> { // .. }); future.cancel(true);
Java 提供了两种锁机制来控制多个线程对共享资源的互斥访问,第一个是 JVM 实现的 synchronized,而另外一个是 JDK 实现的 ReentrantLock。
1. 同步一个代码块
public void func() { synchronized (this) { // ... } }
它只做用于同一个对象,若是调用两个对象上的同步代码块,就不会进行同步。
对于如下代码,使用 ExecutorService 执行了两个线程,因为调用的是同一个对象的同步代码块,所以这两个线程会进行同步,当一个线程进入同步语句块时,另外一个线程就必须等待。
public class SynchronizedExample { public void func1() { synchronized (this) { for (int i = 0; i < 10; i++) { System.out.print(i + " "); } } } }
public static void main(String[] args) { SynchronizedExample e1 = new SynchronizedExample(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(() -> e1.func1()); executorService.execute(() -> e1.func1()); }
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
对于如下代码,两个线程调用了不一样对象的同步代码块,所以这两个线程就不须要同步。从输出结果能够看出,两个线程交叉执行。
public static void main(String[] args) { SynchronizedExample e1 = new SynchronizedExample(); SynchronizedExample e2 = new SynchronizedExample(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(() -> e1.func1()); executorService.execute(() -> e2.func1()); }
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
2. 同步一个方法
public synchronized void func () { // ... }
它和同步代码块同样,做用于同一个对象。
3. 同步一个类
public void func() { synchronized (SynchronizedExample.class) { // ... } }
做用于整个类,也就是说两个线程调用同一个类的不一样对象上的这种同步语句,也会进行同步。
public class SynchronizedExample { public void func2() { synchronized (SynchronizedExample.class) { for (int i = 0; i < 10; i++) { System.out.print(i + " "); } } } }
public static void main(String[] args) { SynchronizedExample e1 = new SynchronizedExample(); SynchronizedExample e2 = new SynchronizedExample(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(() -> e1.func2()); executorService.execute(() -> e2.func2()); }
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
4. 同步一个静态方法
public synchronized static void fun() { // ... }
做用于整个类。
ReentrantLock 是 java.util.concurrent(J.U.C)包中的锁。
public class LockExample { private Lock lock = new ReentrantLock(); public void func() { lock.lock(); try { for (int i = 0; i < 10; i++) { System.out.print(i + " "); } } finally { lock.unlock(); // 确保释放锁,从而避免发生死锁。 } } }
public static void main(String[] args) { LockExample lockExample = new LockExample(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(() -> lockExample.func()); executorService.execute(() -> lockExample.func()); }
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
1. 锁的实现
synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的。
2. 性能
新版本 Java 对 synchronized 进行了不少优化,例如自旋锁等,synchronized 与 ReentrantLock 大体相同。
3. 等待可中断
当持有锁的线程长期不释放锁的时候,正在等待的线程能够选择放弃等待,改成处理其余事情。
ReentrantLock 可中断,而 synchronized 不行。
4. 公平锁
公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次得到锁。
synchronized 中的锁是非公平的,ReentrantLock 默认状况下也是非公平的,可是也能够是公平的。
5. 锁绑定多个条件
一个 ReentrantLock 能够同时绑定多个 Condition 对象。
除非须要使用 ReentrantLock 的高级功能,不然优先使用 synchronized。这是由于 synchronized 是 JVM 实现的一种锁机制,JVM 原生地支持它,而 ReentrantLock 不是全部的 JDK 版本都支持。而且使用 synchronized 不用担忧没有释放锁而致使死锁问题,由于 JVM 会确保锁的释放。
当多个线程能够一块儿工做去解决某个问题时,若是某些部分必须在其它部分以前完成,那么就须要对线程进行协调。
在线程中调用另外一个线程的 join() 方法,会将当前线程挂起,而不是忙等待,直到目标线程结束。
对于如下代码,虽然 b 线程先启动,可是由于在 b 线程中调用了 a 线程的 join() 方法,b 线程会等待 a 线程结束才继续执行,所以最后可以保证 a 线程的输出先于 b 线程的输出。
public class JoinExample { private class A extends Thread { @Override public void run() { System.out.println("A"); } } private class B extends Thread { private A a; B(A a) { this.a = a; } @Override public void run() { try { a.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("B"); } } public void test() { A a = new A(); B b = new B(a); b.start(); a.start(); } }
public static void main(String[] args) { JoinExample example = new JoinExample(); example.test(); }
A B
调用 wait() 使得线程等待某个条件知足,线程在等待时会被挂起,当其余线程的运行使得这个条件知足时,其它线程会调用 notify() 或者 notifyAll() 来唤醒挂起的线程。
它们都属于 Object 的一部分,而不属于 Thread。
只能用在同步方法或者同步控制块中使用,不然会在运行时抛出 IllegalMonitorStateException。
使用 wait() 挂起期间,线程会释放锁。这是由于,若是没有释放锁,那么其它线程就没法进入对象的同步方法或者同步控制块中,那么就没法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,形成死锁。
public class WaitNotifyExample { public synchronized void before() { System.out.println("before"); notifyAll(); } public synchronized void after() { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("after"); } }
public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); WaitNotifyExample example = new WaitNotifyExample(); executorService.execute(() -> example.after()); executorService.execute(() -> example.before()); }
before after
wait() 和 sleep() 的区别
java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,能够在 Condition 上调用 await() 方法使线程等待,其它线程调用 signal() 或 signalAll() 方法唤醒等待的线程。
相比于 wait() 这种等待方式,await() 能够指定等待的条件,所以更加灵活。
使用 Lock 来获取一个 Condition 对象。
public class AwaitSignalExample { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void before() { lock.lock(); try { System.out.println("before"); condition.signalAll(); } finally { lock.unlock(); } } public void after() { lock.lock(); try { condition.await(); System.out.println("after"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); AwaitSignalExample example = new AwaitSignalExample(); executorService.execute(() -> example.after()); executorService.execute(() -> example.before()); }
before after
java.util.concurrent(J.U.C)大大提升了并发性能,AQS 被认为是 J.U.C 的核心。
用来控制一个线程等待多个线程。
维护了一个计数器 cnt,每次调用 countDown() 方法会让计数器的值减 1,减到 0 的时候,那些由于调用 await() 方法而在等待的线程就会被唤醒。
public class CountdownLatchExample { public static void main(String[] args) throws InterruptedException { final int totalThread = 10; CountDownLatch countDownLatch = new CountDownLatch(totalThread); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < totalThread; i++) { executorService.execute(() -> { System.out.print("run.."); countDownLatch.countDown(); }); } countDownLatch.await(); System.out.println("end"); executorService.shutdown(); } }
run..run..run..run..run..run..run..run..run..run..end
用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。
和 CountdownLatch 类似,都是经过维护计数器来实现的。线程执行 await() 方法以后计数器会减 1,并进行等待,直到计数器为 0,全部调用 await() 方法而在等待的线程才能继续执行。
CyclicBarrier 和 CountdownLatch 的一个区别是,CyclicBarrier 的计数器经过调用 reset() 方法能够循环使用,因此它才叫作循环屏障。
CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 在全部线程都到达屏障的时候会执行一次。
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
public class CyclicBarrierExample { public static void main(String[] args) { final int totalThread = 10; CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < totalThread; i++) { executorService.execute(() -> { System.out.print("before.."); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.print("after.."); }); } executorService.shutdown(); } }
before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..
Semaphore 相似于操做系统中的信号量,能够控制对互斥资源的访问线程数。
如下代码模拟了对某个服务的并发请求,每次只能有 3 个客户端同时访问,请求总数为 10。
public class SemaphoreExample { public static void main(String[] args) { final int clientCount = 3; final int totalRequestCount = 10; Semaphore semaphore = new Semaphore(clientCount); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < totalRequestCount; i++) { executorService.execute(()->{ try { semaphore.acquire(); System.out.print(semaphore.availablePermits() + " "); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }); } executorService.shutdown(); } }
2 1 2 2 2 2 2 1 2 2
在介绍 Callable 时咱们知道它能够有返回值,返回值经过 Future<V> 进行封装。FutureTask 实现了 RunnableFuture 接口,该接口继承自 Runnable 和 Future<V> 接口,这使得 FutureTask 既能够当作一个任务执行,也能够有返回值。
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
FutureTask 可用于异步获取执行结果或取消执行任务的场景。当一个计算任务须要执行很长时间,那么就能够用 FutureTask 来封装这个任务,主线程在完成本身的任务以后再去获取结果。
public class FutureTaskExample { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { int result = 0; for (int i = 0; i < 100; i++) { Thread.sleep(10); result += i; } return result; } }); Thread computeThread = new Thread(futureTask); computeThread.start(); Thread otherThread = new Thread(() -> { System.out.println("other task is running..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); otherThread.start(); System.out.println(futureTask.get()); } }
other task is running... 4950
java.util.concurrent.BlockingQueue 接口有如下阻塞队列的实现:
提供了阻塞的 take() 和 put() 方法:若是队列为空 take() 将阻塞,直到队列中有内容;若是队列为满 put() 将阻塞,直到队列有空闲位置。
使用 BlockingQueue 实现生产者消费者问题
public class ProducerConsumer { private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5); private static class Producer extends Thread { @Override public void run() { try { queue.put("product"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("produce.."); } } private static class Consumer extends Thread { @Override public void run() { try { String product = queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("consume.."); } } }
public static void main(String[] args) { for (int i = 0; i < 2; i++) { Producer producer = new Producer(); producer.start(); } for (int i = 0; i < 5; i++) { Consumer consumer = new Consumer(); consumer.start(); } for (int i = 0; i < 3; i++) { Producer producer = new Producer(); producer.start(); } }
produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..
主要用于并行计算中,和 MapReduce 原理相似,都是把大的计算任务拆分红多个小任务并行计算。
public class ForkJoinExample extends RecursiveTask<Integer> { private final int threshold = 5; private int first; private int last; public ForkJoinExample(int first, int last) { this.first = first; this.last = last; } @Override protected Integer compute() { int result = 0; if (last - first <= threshold) { // 任务足够小则直接计算 for (int i = first; i <= last; i++) { result += i; } } else { // 拆分红小任务 int middle = first + (last - first) / 2; ForkJoinExample leftTask = new ForkJoinExample(first, middle); ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last); leftTask.fork(); rightTask.fork(); result = leftTask.join() + rightTask.join(); } return result; } }
public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinExample example = new ForkJoinExample(1, 10000); ForkJoinPool forkJoinPool = new ForkJoinPool(); Future result = forkJoinPool.submit(example); System.out.println(result.get()); }
ForkJoin 使用 ForkJoinPool 来启动,它是一个特殊的线程池,线程数量取决于 CPU 核数。
public class ForkJoinPool extends AbstractExecutorService
ForkJoinPool 实现了工做窃取算法来提升 CPU 的利用率。每一个线程都维护了一个双端队列,用来存储须要执行的任务。工做窃取算法容许空闲的线程从其它线程的双端队列中窃取一个任务来执行。窃取的任务必须是最晚的任务,避免和队列所属线程发生竞争。例以下图中,Thread2 从 Thread1 的队列中拿出最晚的 Task1 任务,Thread1 会拿出 Task2 来执行,这样就避免发生竞争。可是若是队列中只有一个任务时仍是会发生竞争。
若是多个线程对同一个共享数据进行访问而不采起同步操做的话,那么操做的结果是不一致的。
如下代码演示了 1000 个线程同时对 cnt 执行自增操做,操做结束以后它的值有可能小于 1000。
public class ThreadUnsafeExample { private int cnt = 0; public void add() { cnt++; } public int get() { return cnt; } }
public static void main(String[] args) throws InterruptedException { final int threadSize = 1000; ThreadUnsafeExample example = new ThreadUnsafeExample(); final CountDownLatch countDownLatch = new CountDownLatch(threadSize); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < threadSize; i++) { executorService.execute(() -> { example.add(); countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println(example.get()); }
997
Java 内存模型试图屏蔽各类硬件和操做系统的内存访问差别,以实现让 Java 程序在各类平台下都能达到一致的内存访问效果。
处理器上的寄存器的读写的速度比内存快几个数量级,为了解决这种速度矛盾,在它们之间加入了高速缓存。
加入高速缓存带来了一个新的问题:缓存一致性。若是多个缓存共享同一块主内存区域,那么多个缓存的数据可能会不一致,须要一些协议来解决这个问题。
全部的变量都存储在主内存中,每一个线程还有本身的工做内存,工做内存存储在高速缓存或者寄存器中,保存了该线程使用的变量的主内存副本拷贝。
线程只能直接操做工做内存中的变量,不一样线程之间的变量值传递须要经过主内存来完成。
Java 内存模型定义了 8 个操做来完成主内存和工做内存的交互操做。
Java 内存模型保证了 read、load、use、assign、store、write、lock 和 unlock 操做具备原子性,例如对一个 int 类型的变量执行 assign 赋值操做,这个操做就是原子性的。可是 Java 内存模型容许虚拟机将没有被 volatile 修饰的 64 位数据(long,double)的读写操做划分为两次 32 位的操做来进行,即 load、store、read 和 write 操做能够不具有原子性。
有一个错误认识就是,int 等原子性的类型在多线程环境中不会出现线程安全问题。前面的线程不安全示例代码中,cnt 属于 int 类型变量,1000 个线程对它进行自增操做以后,获得的值为 997 而不是 1000。
为了方便讨论,将内存间的交互操做简化为 3 个:load、assign、store。
下图演示了两个线程同时对 cnt 进行操做,load、assign、store 这一系列操做总体上看不具有原子性,那么在 T1 修改 cnt 而且尚未将修改后的值写入主内存,T2 依然能够读入旧值。能够看出,这两个线程虽然执行了两次自增运算,可是主内存中 cnt 的值最后为 1 而不是 2。所以对 int 类型读写操做知足原子性只是说明 load、assign、store 这些单个操做具有原子性。
AtomicInteger 能保证多个线程修改的原子性。
使用 AtomicInteger 重写以前线程不安全的代码以后获得如下线程安全实现:
public class AtomicExample { private AtomicInteger cnt = new AtomicInteger(); public void add() { cnt.incrementAndGet(); } public int get() { return cnt.get(); } }
public static void main(String[] args) throws InterruptedException { final int threadSize = 1000; AtomicExample example = new AtomicExample(); // 只修改这条语句 final CountDownLatch countDownLatch = new CountDownLatch(threadSize); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < threadSize; i++) { executorService.execute(() -> { example.add(); countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println(example.get()); }
1000
除了使用原子类以外,也可使用 synchronized 互斥锁来保证操做的原子性。它对应的内存间交互操做为:lock 和 unlock,在虚拟机实现上对应的字节码指令为 monitorenter 和 monitorexit。
public class AtomicSynchronizedExample { private int cnt = 0; public synchronized void add() { cnt++; } public synchronized int get() { return cnt; } }
public static void main(String[] args) throws InterruptedException { final int threadSize = 1000; AtomicSynchronizedExample example = new AtomicSynchronizedExample(); final CountDownLatch countDownLatch = new CountDownLatch(threadSize); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < threadSize; i++) { executorService.execute(() -> { example.add(); countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println(example.get()); }
1000
可见性指当一个线程修改了共享变量的值,其它线程可以当即得知这个修改。Java 内存模型是经过在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值来实现可见性的。
主要有有三种实现可见性的方式:
对前面的线程不安全示例中的 cnt 变量使用 volatile 修饰,不能解决线程不安全问题,由于 volatile 并不能保证操做的原子性。
有序性是指:在本线程内观察,全部操做都是有序的。在一个线程观察另外一个线程,全部操做都是无序的,无序是由于发生了指令重排序。在 Java 内存模型中,容许编译器和处理器对指令进行重排序,重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。
volatile 关键字经过添加内存屏障的方式来禁止指令重排,即重排序时不能把后面的指令放到内存屏障以前。
也能够经过 synchronized 来保证有序性,它保证每一个时刻只有一个线程执行同步代码,至关因而让线程顺序执行同步代码。
上面提到了能够用 volatile 和 synchronized 来保证有序性。除此以外,JVM 还规定了先行发生原则,让一个操做无需控制就能先于另外一个操做完成。
Single Thread rule
在一个线程内,在程序前面的操做先行发生于后面的操做。
Monitor Lock Rule
一个 unlock 操做先行发生于后面对同一个锁的 lock 操做。
Volatile Variable Rule
对一个 volatile 变量的写操做先行发生于后面对这个变量的读操做。
Thread Start Rule
Thread 对象的 start() 方法调用先行发生于此线程的每个动做。
Thread Join Rule
Thread 对象的结束先行发生于 join() 方法返回。
Thread Interruption Rule
对线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生,能够经过 interrupted() 方法检测到是否有中断发生。
Finalizer Rule
一个对象的初始化完成(构造函数执行结束)先行发生于它的 finalize() 方法的开始。
Transitivity
若是操做 A 先行发生于操做 B,操做 B 先行发生于操做 C,那么操做 A 先行发生于操做 C。
多个线程无论以何种方式访问某个类,而且在主调代码中不须要进行同步,都能表现正确的行为。
线程安全有如下几种实现方式:
不可变(Immutable)的对象必定是线程安全的,不须要再采起任何的线程安全保障措施。只要一个不可变的对象被正确地构建出来,永远也不会看到它在多个线程之中处于不一致的状态。多线程环境下,应当尽可能使对象成为不可变,来知足线程安全。
不可变的类型:
对于集合类型,可使用 Collections.unmodifiableXXX() 方法来获取一个不可变的集合。
public class ImmutableExample { public static void main(String[] args) { Map<String, Integer> map = new HashMap<>(); Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map); unmodifiableMap.put("a", 1); } }
Exception in thread "main" java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableMap.put(Collections.java:1457) at ImmutableExample.main(ImmutableExample.java:9)
Collections.unmodifiableXXX() 先对原始的集合进行拷贝,须要对集合进行修改的方法都直接抛出异常。
public V put(K key, V value) { throw new UnsupportedOperationException(); }
synchronized 和 ReentrantLock。
互斥同步最主要的问题就是线程阻塞和唤醒所带来的性能问题,所以这种同步也称为阻塞同步。
互斥同步属于一种悲观的并发策略,老是认为只要不去作正确的同步措施,那就确定会出现问题。不管共享数据是否真的会出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分没必要要的加锁)、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程须要唤醒等操做。
随着硬件指令集的发展,咱们可使用基于冲突检测的乐观并发策略:先进行操做,若是没有其它线程争用共享数据,那操做就成功了,不然采起补偿措施(不断地重试,直到成功为止)。这种乐观的并发策略的许多实现都不须要将线程阻塞,所以这种同步操做称为非阻塞同步。
乐观锁须要操做和冲突检测这两个步骤具有原子性,这里就不能再使用互斥同步来保证了,只能靠硬件来完成。硬件支持的原子性操做最典型的是:比较并交换(Compare-and-Swap,CAS)。CAS 指令须要有 3 个操做数,分别是内存地址 V、旧的预期值 A 和新值 B。当执行操做时,只有当 V 的值等于 A,才将 V 的值更新为 B。
J.U.C 包里面的整数原子类 AtomicInteger 的方法调用了 Unsafe 类的 CAS 操做。
如下代码使用了 AtomicInteger 执行了自增的操做。
private AtomicInteger cnt = new AtomicInteger(); public void add() { cnt.incrementAndGet(); }
如下代码是 incrementAndGet() 的源码,它调用了 Unsafe 的 getAndAddInt() 。
public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; }
如下代码是 getAndAddInt() 源码,var1 指示对象内存地址,var2 指示该字段相对对象内存地址的偏移,var4 指示操做须要加的数值,这里为 1。经过 getIntVolatile(var1, var2) 获得旧的预期值,经过调用 compareAndSwapInt() 来进行 CAS 比较,若是该字段内存地址中的值等于 var5,那么就更新内存地址为 var1+var2 的变量为 var5+var4。
能够看到 getAndAddInt() 在一个循环中进行,发生冲突的作法是不断的进行重试。
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
若是一个变量初次读取的时候是 A 值,它的值被改为了 B,后来又被改回为 A,那 CAS 操做就会误认为它历来没有被改变过。
J.U.C 包提供了一个带有标记的原子引用类 AtomicStampedReference 来解决这个问题,它能够经过控制变量值的版原本保证 CAS 的正确性。大部分状况下 ABA 问题不会影响程序并发的正确性,若是须要解决 ABA 问题,改用传统的互斥同步可能会比原子类更高效。
要保证线程安全,并非必定就要进行同步。若是一个方法原本就不涉及共享数据,那它天然就无须任何同步措施去保证正确性。
多个线程访问同一个方法的局部变量时,不会出现线程安全问题,由于局部变量存储在虚拟机栈中,属于线程私有的。
public class StackClosedExample { public void add100() { int cnt = 0; for (int i = 0; i < 100; i++) { cnt++; } System.out.println(cnt); } }
public static void main(String[] args) { StackClosedExample example = new StackClosedExample(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(() -> example.add100()); executorService.execute(() -> example.add100()); executorService.shutdown(); }
100 100
若是一段代码中所须要的数据必须与其余代码共享,那就看看这些共享数据的代码是否能保证在同一个线程中执行。若是能保证,咱们就能够把共享数据的可见范围限制在同一个线程以内,这样,无须同步也能保证线程之间不出现数据争用的问题。
符合这种特色的应用并很多见,大部分使用消费队列的架构模式(如“生产者-消费者”模式)都会将产品的消费过程尽可能在一个线程中消费完。其中最重要的一个应用实例就是经典 Web 交互模型中的“一个请求对应一个服务器线程”(Thread-per-Request)的处理方式,这种处理方式的普遍应用使得不少 Web 服务端应用均可以使用线程本地存储来解决线程安全问题。
可使用 java.lang.ThreadLocal 类来实现线程本地存储功能。
对于如下代码,thread1 中设置 threadLocal 为 1,而 thread2 设置 threadLocal 为 2。过了一段时间以后,thread1 读取 threadLocal 依然是 1,不受 thread2 的影响。
public class ThreadLocalExample { public static void main(String[] args) { ThreadLocal threadLocal = new ThreadLocal(); Thread thread1 = new Thread(() -> { threadLocal.set(1); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(threadLocal.get()); threadLocal.remove(); }); Thread thread2 = new Thread(() -> { threadLocal.set(2); threadLocal.remove(); }); thread1.start(); thread2.start(); } }
1
为了理解 ThreadLocal,先看如下代码:
public class ThreadLocalExample1 { public static void main(String[] args) { ThreadLocal threadLocal1 = new ThreadLocal(); ThreadLocal threadLocal2 = new ThreadLocal(); Thread thread1 = new Thread(() -> { threadLocal1.set(1); threadLocal2.set(1); }); Thread thread2 = new Thread(() -> { threadLocal1.set(2); threadLocal2.set(2); }); thread1.start(); thread2.start(); } }
它所对应的底层结构图为:
每一个 Thread 都有一个 ThreadLocal.ThreadLocalMap 对象。
/* ThreadLocal values pertaining to this thread. This map is maintained * by the ThreadLocal class. */ ThreadLocal.ThreadLocalMap threadLocals = null;
当调用一个 ThreadLocal 的 set(T value) 方法时,先获得当前线程的 ThreadLocalMap 对象,而后将 ThreadLocal->value 键值对插入到该 Map 中。
public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); }
get() 方法相似。
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }
ThreadLocal 从理论上讲并非用来解决多线程并发问题的,由于根本不存在多线程竞争。
在一些场景 (尤为是使用线程池) 下,因为 ThreadLocal.ThreadLocalMap 的底层数据结构致使 ThreadLocal 有内存泄漏的状况,应该尽量在每次使用 ThreadLocal 后手动调用 remove(),以免出现 ThreadLocal 经典的内存泄漏甚至是形成自身业务混乱的风险。
这种代码也叫作纯代码(Pure Code),能够在代码执行的任什么时候刻中断它,转而去执行另一段代码(包括递归调用它自己),而在控制权返回后,原来的程序不会出现任何错误。
可重入代码有一些共同的特征,例如不依赖存储在堆上的数据和公用的系统资源、用到的状态量都由参数中传入、不调用非可重入的方法等。
这里的锁优化主要是指 JVM 对 synchronized 的优化。
互斥同步进入阻塞状态的开销都很大,应该尽可能避免。在许多应用中,共享数据的锁定状态只会持续很短的一段时间。自旋锁的思想是让一个线程在请求一个共享数据的锁时执行忙循环(自旋)一段时间,若是在这段时间内能得到锁,就能够避免进入阻塞状态。
自旋锁虽然能避免进入阻塞状态从而减小开销,可是它须要进行忙循环操做占用 CPU 时间,它只适用于共享数据的锁定状态很短的场景。
在 JDK 1.6 中引入了自适应的自旋锁。自适应意味着自旋的次数再也不固定了,而是由前一次在同一个锁上的自旋次数及锁的拥有者的状态来决定。
锁消除是指对于被检测出不可能存在竞争的共享数据的锁进行消除。
锁消除主要是经过逃逸分析来支持,若是堆上的共享数据不可能逃逸出去被其它线程访问到,那么就能够把它们当成私有数据对待,也就能够将它们的锁进行消除。
对于一些看起来没有加锁的代码,其实隐式的加了不少锁。例以下面的字符串拼接代码就隐式加了锁:
public static String concatString(String s1, String s2, String s3) { return s1 + s2 + s3; }
String 是一个不可变的类,编译器会对 String 的拼接自动优化。在 JDK 1.5 以前,会转化为 StringBuffer 对象的连续 append() 操做:
public static String concatString(String s1, String s2, String s3) { StringBuffer sb = new StringBuffer(); sb.append(s1); sb.append(s2); sb.append(s3); return sb.toString(); }
每一个 append() 方法中都有一个同步块。虚拟机观察变量 sb,很快就会发现它的动态做用域被限制在 concatString() 方法内部。也就是说,sb 的全部引用永远不会逃逸到 concatString() 方法以外,其余线程没法访问到它,所以能够进行消除。
若是一系列的连续操做都对同一个对象反复加锁和解锁,频繁的加锁操做就会致使性能损耗。
上一节的示例代码中连续的 append() 方法就属于这类状况。若是虚拟机探测到由这样的一串零碎的操做都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操做序列的外部。对于上一节的示例代码就是扩展到第一个 append() 操做以前直至最后一个 append() 操做以后,这样只须要加锁一次就能够了。
JDK 1.6 引入了偏向锁和轻量级锁,从而让锁拥有了四个状态:无锁状态(unlocked)、偏向锁状态(biasble)、轻量级锁状态(lightweight locked)和重量级锁状态(inflated)。
如下是 HotSpot 虚拟机对象头的内存布局,这些数据被称为 Mark Word。其中 tag bits 对应了五个状态,这些状态在右侧的 state 表格中给出。除了 marked for gc 状态,其它四个状态已经在前面介绍过了。
下图左侧是一个线程的虚拟机栈,其中有一部分称为 Lock Record 的区域,这是在轻量级锁运行过程建立的,用于存放锁对象的 Mark Word。而右侧就是一个锁对象,包含了 Mark Word 和其它信息。
轻量级锁是相对于传统的重量级锁而言,它使用 CAS 操做来避免重量级锁使用互斥量的开销。对于绝大部分的锁,在整个同步周期内都是不存在竞争的,所以也就不须要都使用互斥量进行同步,能够先采用 CAS 操做进行同步,若是 CAS 失败了再改用互斥量进行同步。
当尝试获取一个锁对象时,若是锁对象标记为 0 01,说明锁对象的锁未锁定(unlocked)状态。此时虚拟机在当前线程的虚拟机栈中建立 Lock Record,而后使用 CAS 操做将对象的 Mark Word 更新为 Lock Record 指针。若是 CAS 操做成功了,那么线程就获取了该对象上的锁,而且对象的 Mark Word 的锁标记变为 00,表示该对象处于轻量级锁状态。
若是 CAS 操做失败了,虚拟机首先会检查对象的 Mark Word 是否指向当前线程的虚拟机栈,若是是的话说明当前线程已经拥有了这个锁对象,那就能够直接进入同步块继续执行,不然说明这个锁对象已经被其余线程线程抢占了。若是有两条以上的线程争用同一个锁,那轻量级锁就再也不有效,要膨胀为重量级锁。
偏向锁的思想是偏向于让第一个获取锁对象的线程,这个线程在以后获取该锁就再也不须要进行同步操做,甚至连 CAS 操做也再也不须要。
当锁对象第一次被线程得到的时候,进入偏向状态,标记为 1 01。同时使用 CAS 操做将线程 ID 记录到 Mark Word 中,若是 CAS 操做成功,这个线程之后每次进入这个锁相关的同步块就不须要再进行任何同步操做。
当有另一个线程去尝试获取这个锁对象时,偏向状态就宣告结束,此时撤销偏向(Revoke Bias)后恢复到未锁定状态或者轻量级锁状态。
以为文章不错的欢迎关注个人WX公众号: 程序员乔戈里
我是 百度后台开发工程师,哈工大计算机本硕,专一分享技术干货/编程资源/求职面试/成长感悟等,关注送3000G编程资源,免费下载CSDN资源。