JMM 自己是一种抽象的概念并非真实存在,它描述的是一组规定或则规范,经过这组规范定义了程序中的访问方式。java
JMM 同步规定api
因为 JVM 运行程序的实体是线程,而每一个线程建立时 JVM 都会为其建立一个工做内存,工做内存是每一个线程的私有数据区域,而 Java 内存模型中规定全部变量的储存在主内存,主内存是共享内存区域,全部的线程均可以访问,但线程对变量的操做(读取赋值等)必须都工做内存进行看。数组
首先要将变量从主内存拷贝的本身的工做内存空间,而后对变量进行操做,操做完成后再将变量写回主内存,不能直接操做主内存中的变量,工做内存中存储着主内存中的变量副本拷贝,前面说过,工做内存是每一个线程的私有数据区域,所以不一样的线程间没法访问对方的工做内存,线程间的通讯(传值)必须经过主内存来完成。缓存
内存模型图安全
可见性数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
/** * @Author: cuzz * @Date: 2019/4/16 21:29 * @Description: 可见性代码实例 */ public class VolatileDemo { public static void main(String[] args) { Data data = new Data(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + " coming..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } data.addOne(); System.out.println(Thread.currentThread().getName() + " updated..."); }).start(); while (data.a == 0) { // looping } System.out.println(Thread.currentThread().getName() + " job is done..."); } } class Data { // int a = 0; volatile int a = 0; void addOne() { this.a += 1; } } |
若是不加 volatile 关键字,则主线程会进入死循环,加 volatile 则主线程可以退出,说明加了 volatile 关键字变量,当有一个线程修改了值,会立刻被另外一个线程感知到,当前值做废,重新从主内存中获取值。对其余线程可见,这就叫可见性。多线程
原子性架构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
public class VolatileDemo { public static void main(String[] args) { // test01(); test02(); } // 测试原子性 private static void test02() { Data data = new Data(); for (int i = 0; i < 20; i++) { new Thread(() -> { for (int j = 0; j < 1000; j++) { data.addOne(); } }).start(); } // 默认有 main 线程和 gc 线程 while (Thread.activeCount() > 2) { Thread.yield(); } System.out.println(data.a); } } class Data { volatile int a = 0; void addOne() { this.a += 1; } } |
发现并不能输入 20000并发
有序性dom
计算机在执行程序时,为了提升性能,编译器个处理器经常会对指令作重排,通常分为如下 3 种
单线程环境里面确保程序最终执行的结果和代码执行的结果一致
处理器在进行重排序时必须考虑指令之间的数据依赖性
多线程环境中线程交替执行,因为编译器优化重排的存在,两个线程中使用的变量可否保证用的变量可否一致性是没法肯定的,结果没法预测
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class ReSortSeqDemo { int a = 0; boolean flag = false; public void method01() { a = 1; // flag = true; // ----线程切换---- flag = true; // a = 1; } public void method02() { if (flag) { a = a + 3; System.out.println("a = " + a); } } } |
若是两个线程同时执行,method01 和 method02 若是线程 1 执行 method01 重排序了,而后切换的线程 2 执行 method02 就会出现不同的结果。
volatile 实现禁止指令重排序的优化,从而避免了多线程环境下程序出现乱序的现象
先了解一个概念,内存屏障(Memory Barrier)又称内存栅栏,是一个 CPU 指令,他的做用有两个:
因为编译器个处理器都能执行指令重排序优化,若是在指令间插入一条 Memory Barrier 则会告诉编译器和 CPU,无论什么指令都不能个这条 Memory Barrier 指令重排序,也就是说经过插入内存屏障禁止在内存屏障先后执行重排序优化。内存屏障另外一个做用是强制刷出各类 CPU 缓存数据,所以任何 CPU 上的线程都能读取到这些数据的最新版本。
下面是保守策略下,volatile写插入内存屏障后生成的指令序列示意图:
下面是在保守策略下,volatile读插入内存屏障后生成的指令序列示意图:
多线程环境下可能存在的安全问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
@NotThreadSafe public class Singleton01 { private static Singleton01 instance = null; private Singleton01() { System.out.println(Thread.currentThread().getName() + " construction..."); } public static Singleton01 getInstance() { if (instance == null) { instance = new Singleton01(); } return instance; } public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { executorService.execute(()-> Singleton01.getInstance()); } executorService.shutdown(); } } |
发现构造器里的内容会屡次输出
双重锁单例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public class Singleton02 { private static volatile Singleton02 instance = null; private Singleton02() { System.out.println(Thread.currentThread().getName() + " construction..."); } public static Singleton02 getInstance() { if (instance == null) { synchronized (Singleton01.class) { if (instance == null) { instance = new Singleton02(); } } } return instance; } public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { executorService.execute(()-> Singleton02.getInstance()); } executorService.shutdown(); } } |
若是没有加 volatile 就不必定是线程安全的,缘由是指令重排序的存在,加入 volatile 能够禁止指令重排。
缘由是在于某一个线程执行到第一次检测,读取到的 instance 不为 null 时,instance 的引用对象可能尚未完成初始化。
instance = new Singleton()
能够分为如下三步完成
1 2 3 |
memory = allocate(); // 1.分配对象空间 instance(memory); // 2.初始化对象 instance = memory; // 3.设置instance指向刚分配的内存地址,此时instance != null |
步骤 2 和步骤 3 不存在依赖关系,并且不管重排前仍是重排后程序的执行结果在单线程中并无改变,所以这种优化是容许的。
发生重排
1 2 3 |
memory = allocate(); // 1.分配对象空间 instance = memory; // 3.设置instance指向刚分配的内存地址,此时instance != null,但对象尚未初始化完成 instance(memory); // 2.初始化对象 |
因此不加 volatile 返回的实例不为空,但多是未初始化的实例
1 2 3 4 5 6 7 8 9 10 11 |
public class CASDemo { public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(666); // 获取真实值,并替换为相应的值 boolean b = atomicInteger.compareAndSet(666, 2019); System.out.println(b); // true boolean b1 = atomicInteger.compareAndSet(666, 2020); System.out.println(b1); // false atomicInteger.getAndIncrement(); } } |
1 2 3 4 5 6 7 8 |
/** * Atomically increments by one the current value. * * @return the previous value */ public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } |
引出一个问题:UnSafe 类是什么?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; // setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { // 获取下面 value 的地址偏移量 valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; // ... } |
CAS 的全称 Compare-And-Swap,它是一条 CPU 并发。
它的功能是判断内存某一个位置的值是否为预期,若是是则更改这个值,这个过程就是原子的。
CAS 并发原体如今 JAVA 语言中就是 sun.misc.Unsafe 类中的各个方法。调用 UnSafe 类中的 CAS 方法,JVM 会帮咱们实现出 CAS 汇编指令。这是一种彻底依赖硬件的功能,经过它实现了原子操做。因为 CAS 是一种系统源语,源语属于操做系统用语范畴,是由若干条指令组成,用于完成某一个功能的过程,而且原语的执行必须是连续的,在执行的过程当中不容许被中断,也就是说 CAS 是一条原子指令,不会形成所谓的数据不一致的问题。
分析一下 getAndAddInt 这个方法
1 2 3 4 5 6 7 8 |
// unsafe.getAndAddInt public final int getAndAddInt(Object obj, long valueOffset, long expected, int val) { int temp; do { temp = this.getIntVolatile(obj, valueOffset); // 获取快照值 } while (!this.compareAndSwap(obj, valueOffset, temp, temp + val)); // 若是此时 temp 没有被修改,就能退出循环,不然从新获取 return temp; } |
原子引用
1 2 3 4 5 6 7 8 9 10 |
public class AtomicReferenceDemo { public static void main(String[] args) { User cuzz = new User("cuzz", 18); User faker = new User("faker", 20); AtomicReference<User> atomicReference = new AtomicReference<>(); atomicReference.set(cuzz); System.out.println(atomicReference.compareAndSet(cuzz, faker)); // true System.out.println(atomicReference.get()); // User(userName=faker, age=20) } } |
ABA 问题是怎么产生的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
/** * @program: learn-demo * @description: ABA * @author: cuzz * @create: 2019-04-21 23:31 **/ public class ABADemo { private static AtomicReference<Integer> atomicReference = new AtomicReference<>(100); public static void main(String[] args) { new Thread(() -> { atomicReference.compareAndSet(100, 101); atomicReference.compareAndSet(101, 100); }).start(); new Thread(() -> { // 保证上面线程先执行 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } atomicReference.compareAndSet(100, 2019); System.out.println(atomicReference.get()); // 2019 }).start(); } } |
当有一个值从 A 改成 B 又改成 A,这就是 ABA 问题。
时间戳原子引用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
package com.cuzz.thread; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicStampedReference; /** * @program: learn-demo * @description: ABA * @author: cuzz * @create: 2019-04-21 23:31 **/ public class ABADemo2 { private static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1); public static void main(String[] args) { new Thread(() -> { int stamp = atomicStampedReference.getStamp(); System.out.println(Thread.currentThread().getName() + " 的版本号为:" + stamp); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1 ); atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1 ); }).start(); new Thread(() -> { int stamp = atomicStampedReference.getStamp(); System.out.println(Thread.currentThread().getName() + " 的版本号为:" + stamp); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } boolean b = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp + 1); System.out.println(b); // false System.out.println(atomicStampedReference.getReference()); // 100 }).start(); } } |
咱们先保证两个线程的初始版本为一致,后面修改是因为版本不同就会修改失败。
故障现象
1 2 3 4 5 6 7 8 9 10 11 12 |
public class ContainerDemo { public static void main(String[] args) { List<Integer> list = new ArrayList<>(); Random random = new Random(); for (int i = 0; i < 100; i++) { new Thread(() -> { list.add(random.nextInt(10)); System.out.println(list); }).start(); } } } |
发现报 java.util.ConcurrentModificationException
致使缘由
解决方案
new Vector();
Collections.synchronizedList(new ArrayList<>());
new CopyOnWriteArrayList<>();
优化建议
是什么
代码实现
可重入锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public class ReentrantLock { boolean isLocked = false; Thread lockedBy = null; int lockedCount = 0; public synchronized void lock() throws InterruptedException { Thread thread = Thread.currentThread(); while (isLocked && lockedBy != thread) { wait(); } isLocked = true; lockedCount++; lockedBy = thread; } public synchronized void unlock() { if (Thread.currentThread() == lockedBy) { lockedCount--; if (lockedCount == 0) { isLocked = false; notify(); } } } } |
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
public class Count { // NotReentrantLock lock = new NotReentrantLock(); ReentrantLock lock = new ReentrantLock(); public void print() throws InterruptedException{ lock.lock(); doAdd(); lock.unlock(); } private void doAdd() throws InterruptedException { lock.lock(); // do something System.out.println("ReentrantLock"); lock.unlock(); } public static void main(String[] args) throws InterruptedException { Count count = new Count(); count.print(); } } |
发现能够输出 ReentrantLock,咱们设计两个线程调用 print() 方法,第一个线程调用 print() 方法获取锁,进入 lock() 方法,因为初始 lockedBy 是 null,因此不会进入 while 而挂起当前线程,而是是增量 lockedCount 并记录 lockBy 为第一个线程。接着第一个线程进入 doAdd() 方法,因为同一进程,因此不会进入 while 而挂起,接着增量 lockedCount,当第二个线程尝试lock,因为 isLocked=true,因此他不会获取该锁,直到第一个线程调用两次 unlock() 将 lockCount 递减为0,才将标记为 isLocked 设置为 false。
不可重入锁
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public class NotReentrantLock { private boolean isLocked = false; public synchronized void lock() throws InterruptedException { while (isLocked) { wait(); } isLocked = true; } public synchronized void unlock() { isLocked = false; notify(); } } |
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public class Count { NotReentrantLock lock = new NotReentrantLock(); public void print() throws InterruptedException{ lock.lock(); doAdd(); lock.unlock(); } private void doAdd() throws InterruptedException { lock.lock(); // do something lock.unlock(); } public static void main(String[] args) throws InterruptedException { Count count = new Count(); count.print(); } } |
当前线程执行print()方法首先获取lock,接下来执行doAdd()方法就没法执行doAdd()中的逻辑,必须先释放锁。这个例子很好的说明了不可重入锁。
synchronized 和 ReentrantLock 都是可重入锁
synchronzied
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public class SynchronziedDemo { private synchronized void print() { doAdd(); } private synchronized void doAdd() { System.out.println("doAdd..."); } public static void main(String[] args) { SynchronziedDemo synchronziedDemo = new SynchronziedDemo(); synchronziedDemo.print(); // doAdd... } } |
上面能够说明 synchronized 是可重入锁。
ReentrantLock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
public class ReentrantLockDemo { private Lock lock = new ReentrantLock(); private void print() { lock.lock(); doAdd(); lock.unlock(); } private void doAdd() { lock.lock(); lock.lock(); System.out.println("doAdd..."); lock.unlock(); lock.unlock(); } public static void main(String[] args) { ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo(); reentrantLockDemo.print(); } } |
上面例子能够说明 ReentrantLock 是可重入锁,并且在 #doAdd 方法中加两次锁和解两次锁也能够。
是指定尝试获取锁的线程不会当即堵塞,而是采用循环的方式去尝试获取锁,这样的好处是减小线程上线文切换的消耗,缺点就是循环会消耗 CPU。
手动实现自旋锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
public class SpinLock { private AtomicReference<Thread> atomicReference = new AtomicReference<>(); private void lock () { System.out.println(Thread.currentThread() + " coming..."); while (!atomicReference.compareAndSet(null, Thread.currentThread())) { // loop } } private void unlock() { Thread thread = Thread.currentThread(); atomicReference.compareAndSet(thread, null); System.out.println(thread + " unlock..."); } public static void main(String[] args) throws InterruptedException { SpinLock spinLock = new SpinLock(); new Thread(() -> { spinLock.lock(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("hahaha"); spinLock.unlock(); }).start(); Thread.sleep(1); new Thread(() -> { spinLock.lock(); System.out.println("hehehe"); spinLock.unlock(); }).start(); } } |
输出:
1 2 3 4 5 6 |
Thread[Thread-0,5,main] coming... Thread[Thread-1,5,main] coming... hahaha Thread[Thread-0,5,main] unlock... hehehe Thread[Thread-1,5,main] unlock... |
获取锁的时候,若是原子引用为空就获取锁,不为空表示有人获取了锁,就循环等待。
是什么
对于 ReentrantLock 和 synchronized 都是独占锁;对与 ReentrantReadWriteLock 其读锁是共享锁而写锁是独占锁。读锁的共享可保证并发读是很是高效的,读写、写读和写写的过程是互斥的。
读写锁例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
public class MyCache { private volatile Map<String, Object> map = new HashMap<>(); private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); WriteLock writeLock = lock.writeLock(); ReadLock readLock = lock.readLock(); public void put(String key, Object value) { try { writeLock.lock(); System.out.println(Thread.currentThread().getName() + " 正在写入..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } map.put(key, value); System.out.println(Thread.currentThread().getName() + " 写入完成,写入结果是 " + value); } finally { writeLock.unlock(); } } public void get(String key) { try { readLock.lock(); System.out.println(Thread.currentThread().getName() + " 正在读..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Object res = map.get(key); System.out.println(Thread.currentThread().getName() + " 读取完成,读取结果是 " + res); } finally { readLock.unlock(); } } } |
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public class ReadWriteLockDemo { public static void main(String[] args) { MyCache cache = new MyCache(); for (int i = 0; i < 5; i++) { final int temp = i; new Thread(() -> { cache.put(temp + "", temp + ""); }).start(); } for (int i = 0; i < 5; i++) { final int temp = i; new Thread(() -> { cache.get(temp + ""); }).start(); } } } |
输出结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
Thread-0 正在写入... Thread-0 写入完成,写入结果是 0 Thread-1 正在写入... Thread-1 写入完成,写入结果是 1 Thread-2 正在写入... Thread-2 写入完成,写入结果是 2 Thread-3 正在写入... Thread-3 写入完成,写入结果是 3 Thread-4 正在写入... Thread-4 写入完成,写入结果是 4 Thread-5 正在读... Thread-7 正在读... Thread-8 正在读... Thread-6 正在读... Thread-9 正在读... Thread-5 读取完成,读取结果是 0 Thread-7 读取完成,读取结果是 2 Thread-8 读取完成,读取结果是 3 Thread-6 读取完成,读取结果是 1 Thread-9 读取完成,读取结果是 4 |
能保证读写、写读和写写的过程是互斥的时候是独享的,读读的时候是共享的。
让一些线程堵塞直到另外一个线程完成一系列操做后才被唤醒。CountDownLatch 主要有两个方法,当一个或多个线程调用 await 方法时,调用线程会被堵塞,其余线程调用 countDown 方法会将计数减一(调用 countDown 方法的线程不会堵塞),当计数其值变为零时,因调用 await 方法被堵塞的线程会被唤醒,继续执行。
假设咱们有这么一个场景,教室里有班长和其余6我的在教室上自习,怎么保证班长等其余6我的都走出教室在把教室门给关掉。
1 2 3 4 5 6 7 8 9 10 |
public class CountDownLanchDemo { public static void main(String[] args) { for (int i = 0; i < 6; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 离开了教室..."); }, String.valueOf(i)).start(); } System.out.println("班长把门给关了,离开了教室..."); } } |
此时输出
1 2 3 4 5 6 7 |
0 离开了教室... 1 离开了教室... 2 离开了教室... 3 离开了教室... 班长把门给关了,离开了教室... 5 离开了教室... 4 离开了教室... |
发现班长都没有等其余人理他教室就把门给关了,此时咱们就可使用 CountDownLatch 来控制
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public class CountDownLanchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 0; i < 6; i++) { new Thread(() -> { countDownLatch.countDown(); System.out.println(Thread.currentThread().getName() + " 离开了教室..."); }, String.valueOf(i)).start(); } countDownLatch.await(); System.out.println("班长把门给关了,离开了教室..."); } } |
此时输出
1 2 3 4 5 6 7 |
0 离开了教室... 1 离开了教室... 2 离开了教室... 3 离开了教室... 4 离开了教室... 5 离开了教室... 班长把门给关了,离开了教室... |
咱们假设有这么一个场景,每辆车只能坐我的,当车满了,就发车。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(4, () -> { System.out.println("车满了,开始出发..."); }); for (int i = 0; i < 8; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 开始上车..."); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } } |
输出结果
1 2 3 4 5 6 7 8 9 10 |
Thread-0 开始上车... Thread-1 开始上车... Thread-3 开始上车... Thread-4 开始上车... 车满了,开始出发... Thread-5 开始上车... Thread-7 开始上车... Thread-2 开始上车... Thread-6 开始上车... 车满了,开始出发... |
假设咱们有 3 个停车位,6 辆车去抢
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public class SemaphoreDemo { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 6; i++) { new Thread(() -> { try { semaphore.acquire(); // 获取一个许可 System.out.println(Thread.currentThread().getName() + " 抢到车位..."); Thread.sleep(3000); System.out.println(Thread.currentThread().getName() + " 离开车位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); // 释放一个许可 } }).start(); } } } |
输出
1 2 3 4 5 6 7 8 9 10 11 12 |
Thread-1 抢到车位... Thread-2 抢到车位... Thread-0 抢到车位... Thread-2 离开车位 Thread-0 离开车位 Thread-3 抢到车位... Thread-1 离开车位 Thread-4 抢到车位... Thread-5 抢到车位... Thread-3 离开车位 Thread-5 离开车位 Thread-4 离开车位 |
当阻塞队列是满时,往队列里添加元素的操做将会被阻塞。
核心方法
| 方法\行为 | 抛异常 | 特定的值 | 阻塞 | 超时 |
| :——-: | :——-: | :—————: | :—-: | :————————-: |
| 插入方法 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
| 移除方法 | | poll()、remove(o) | take() | poll(timeout, timeunit) |
| 检查方法 | element() | peek() | | |
行为解释:
抛异常:若是操做不能立刻进行,则抛出异常
特定的值:若是操做不能立刻进行,将会返回一个特殊的值,通常是 true 或者 false
阻塞:若是操做不能立刻进行,操做会被阻塞
超时:若是操做不能立刻进行,操做会被阻塞指定的时间,若是指定时间没执行,则返回一个特殊值,通常是 true 或者 false
插入方法:
删除方法:
检查方法:
SynchronousQueue,实际上它不是一个真正的队列,由于它不会为队列中元素维护存储空间。与其余队列不一样的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
public class SynchronousQueueDemo { public static void main(String[] args) { SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>(); new Thread(() -> { try { synchronousQueue.put(1); Thread.sleep(3000); synchronousQueue.put(2); Thread.sleep(3000); synchronousQueue.put(3); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Integer val = synchronousQueue.take(); System.out.println(val); Integer val2 = synchronousQueue.take(); System.out.println(val2); Integer val3 = synchronousQueue.take(); System.out.println(val3); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } |
线程池用于多线程处理中,它能够根据系统的状况,能够有效控制线程执行的数量,优化运行效果。线程池作的工做主要是控制运行的线程的数量,处理过程当中将任务放入队列,而后在线程建立后启动这些任务,若是线程数量超过了最大数量,那么超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。
主要特色为:
主要优势
继承 Thread
实现 Runnable 接口
实现 Callable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public class CallableDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { // 在 FutureTask 中传入 Callable 的实现类 FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() { @Override public Integer call() throws Exception { return 666; } }); // 把 futureTask 放入线程中 new Thread(futureTask).start(); // 获取结果 Integer res = futureTask.get(); System.out.println(res); } } |
ThreadPoolExecutor做为java.util.concurrent包对外提供基础实现,之内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等等服务。
参数 | 做用 |
---|---|
corePoolSize | 核心线程池大小 |
maximumPoolSize | 最大线程池大小 |
keepAliveTime | 线程池中超过 corePoolSize 数目的空闲线程最大存活时间;能够allowCoreThreadTimeOut(true) 使得核心线程有效时间 |
TimeUnit | keepAliveTime 时间单位 |
workQueue | 阻塞任务队列 |
threadFactory | 新建线程工厂 |
RejectedExecutionHandler | 当提交任务数超过 maxmumPoolSize+workQueue 之和时,任务会交给RejectedExecutionHandler 来处理 |
说说线程池的底层工做原理?
重点讲解: 其中比较容易让人误解的是:corePoolSize,maximumPoolSize,workQueue之间关系。
当线程池小于corePoolSize时,新提交任务将建立一个新线程执行任务,即便此时线程池中存在空闲线程。
当线程池达到corePoolSize时,新提交任务将被放入 workQueue 中,等待线程池中任务调度执行。
当workQueue已满,且 maximumPoolSize 大于 corePoolSize 时,新提交任务会建立新线程执行任务。
当提交任务数超过 maximumPoolSize 时,新提交任务由 RejectedExecutionHandler 处理。
当线程池中超过corePoolSize 线程,空闲时间达到 keepAliveTime 时,关闭空闲线程 。
当设置allowCoreThreadTimeOut(true) 时,线程池中 corePoolSize 线程空闲时间达到 keepAliveTime 也将关闭。
若是读者对Java中的阻塞队列有所了解的话,看到这里或许就可以明白缘由了。
Java中的BlockingQueue主要有两种实现,分别是ArrayBlockingQueue 和 LinkedBlockingQueue。
ArrayBlockingQueue是一个用数组实现的有界阻塞队列,必须设置容量。
LinkedBlockingQueue是一个用链表实现的有界阻塞队列,容量能够选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。
这里的问题就出在:不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。也就是说,若是咱们不设置LinkedBlockingQueue的容量的话,其默认容量将会是Integer.MAX_VALUE。
而newFixedThreadPool中建立LinkedBlockingQueue时,并未指定容量。此时,LinkedBlockingQueue就是一个无边界队列,对于一个无边界队列来讲,是能够不断的向队列中加入任务的,这种状况下就有可能由于任务过多而致使内存溢出问题。
上面提到的问题主要体如今newFixedThreadPool和newSingleThreadExecutor两个工厂方法上,并非说newCachedThreadPool和newScheduledThreadPool这两个方法就安全了,这两种方式建立的最大线程数多是Integer.MAX_VALUE,而建立这么多线程,必然就有可能致使OOM。
自定义线程池
1 2 3 4 5 6 7 8 9 |
public class ThreadPoolExecutorDemo { public static void main(String[] args) { Executor executor = new ThreadPoolExecutor(2, 3, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy()); } } |
产生死锁的缘由
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public class DeadLockDemo { public static void main(String[] args) { String lockA = "lockA"; String lockB = "lockB"; DeadLockDemo deadLockDemo = new DeadLockDemo(); Executor executor = Executors.newFixedThreadPool(2); executor.execute(() -> deadLockDemo.method(lockA, lockB)); executor.execute(() -> deadLockDemo.method(lockB, lockA)); } public void method(String lock1, String lock2) { synchronized (lock1) { System.out.println(Thread.currentThread().getName() + "--获取到:" + lock1 + "; 尝试获取:" + lock2); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock2) { System.out.println("获取到两把锁!"); } } } } |
解决
jps -l 命令查定位进程号
1 2 3 4 5 |
28519 org.jetbrains.jps.cmdline.Launcher 32376 com.intellij.idea.Main 28521 com.cuzz.thread.DeadLockDemo 27836 org.jetbrains.kotlin.daemon.KotlinCompileDaemon 28591 sun.tools.jps.Jps |
jstack 28521 找到死锁查看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
2019-05-07 00:04:15 Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.191-b12 mixed mode): "Attach Listener" #13 daemon prio=9 os_prio=0 tid=0x00007f7acc001000 nid=0x702a waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE // ... Found one Java-level deadlock: ============================= "pool-1-thread-2": waiting to lock monitor 0x00007f7ad4006478 (object 0x00000000d71f60b0, a java.lang.String), which is held by "pool-1-thread-1" "pool-1-thread-1": waiting to lock monitor 0x00007f7ad4003be8 (object 0x00000000d71f60e8, a java.lang.String), which is held by "pool-1-thread-2" Java stack information for the threads listed above: =================================================== "pool-1-thread-2": at com.cuzz.thread.DeadLockDemo.method(DeadLockDemo.java:34) - waiting to lock <0x00000000d71f60b0> (a java.lang.String) - locked <0x00000000d71f60e8> (a java.lang.String) at com.cuzz.thread.DeadLockDemo.lambda$main$1(DeadLockDemo.java:21) at com.cuzz.thread.DeadLockDemo$$Lambda$2/2074407503.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) "pool-1-thread-1": at com.cuzz.thread.DeadLockDemo.method(DeadLockDemo.java:34) - waiting to lock <0x00000000d71f60e8> (a java.lang.String) - locked <0x00000000d71f60b0> (a java.lang.String) at com.cuzz.thread.DeadLockDemo.lambda$main$0(DeadLockDemo.java:20) at com.cuzz.thread.DeadLockDemo$$Lambda$1/558638686.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Found 1 deadlock. |
最后发现一个死锁。