java多线程我我的以为是javaSe中最难的一部分,我之前也是感受学会了,可是真正有多线程的需求殊不知道怎么下手,实际上仍是对多线程这块知识了解不深入,不知道多线程api的应用场景,不知道多线程的运行流程等等,本篇文章将使用实例+图解+源码的方式来解析java多线程。java
文章篇幅较长,你们也能够有选择的看具体章节,建议多线程的代码所有手敲,永远不要相信你看到的结论,本身编码后运行出来的,才是本身的。数据库
进程api
线程缓存
并发:单核cpu运行多线程时,时间片进行很快的切换。线程轮流执行cputomcat
并行:多核cpu运行 多线程时,真正的在同一时刻运行安全
java提供了丰富的api来支持多线程。服务器
多线程能实现的均可以用单线程来完成,那单线程运行的好好的,为何java要引入多线程的概念呢?网络
多线程的好处:多线程
单线程只有一条执行线,过程容易理解,能够在大脑中清晰的勾勒出代码的执行流程并发
多线程倒是多条线,并且通常多条线之间有交互,多条线之间须要通讯,通常难点有如下几点
有时候但愿本身变成一个字节穿梭于服务器中,搞清楚前因后果,就像无敌破坏王同样(没看过这部电影的能够看下,脑洞大开)。
任务: 线程的执行体。也就是咱们的核心代码逻辑
定义任务
Thread实现任务的局限性
Runnable和Callable解决了Thread的局限性
可是Runbale相比Callable有如下的局限性
以下代码 几种定义线程的方式
@Slf4j class T extends Thread { @Override public void run() { log.info("我是继承Thread的任务"); } } @Slf4j class R implements Runnable { @Override public void run() { log.info("我是实现Runnable的任务"); } } @Slf4j class C implements Callable<String> { @Override public String call() throws Exception { log.info("我是实现Callable的任务"); return "success"; } }
建立线程的方式
启动线程的方式
// 启动继承Thread类的任务 new T().start(); // 启动继承Thread匿名内部类的任务 可用lambda优化 Thread t = new Thread(){ @Override public void run() { log.info("我是Thread匿名内部类的任务"); } }; // 启动实现Runnable接口的任务 new Thread(new R()).start(); // 启动实现Runnable匿名实现类的任务 new Thread(new Runnable() { @Override public void run() { log.info("我是Runnable匿名内部类的任务"); } }).start(); // 启动实现Runnable的lambda简化后的任务 new Thread(() -> log.info("我是Runnable的lambda简化后的任务")).start(); // 启动实现了Callable接口的任务 结合FutureTask 能够获取线程执行的结果 FutureTask<String> target = new FutureTask<>(new C()); new Thread(target).start(); log.info(target.get());
以上各个线程相关的类的类图以下
多核cpu下,多线程是并行工做的,若是线程数多,单个核又会并发的调度线程,运行时会有上下文切换的概念
cpu执行线程的任务时,会为线程分配时间片,如下几种状况会发生上下文切换。
当发生上下文切换时,操做系统会保存当前线程的状态,并恢复另外一个线程的状态,jvm中有块内存地址叫程序计数器,用于记录线程执行到哪一行代码,是线程私有的。
idea打断点的时候能够设置为Thread模式,idea的debug模式能够看出栈帧的变化
yield()方法会让运行中的线程切换到就绪状态,从新争抢cpu的时间片,争抢时是否获取到时间片看cpu的分配。
代码以下
// 方法的定义 public static native void yield(); Runnable r1 = () -> { int count = 0; for (;;){ log.info("---- 1>" + count++); } }; Runnable r2 = () -> { int count = 0; for (;;){ Thread.yield(); log.info(" ---- 2>" + count++); } }; Thread t1 = new Thread(r1,"t1"); Thread t2 = new Thread(r2,"t2"); t1.start(); t2.start(); // 运行结果 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129504 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129505 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129506 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129507 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129508 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129509 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129510 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129511 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129512 11:49:15.798 [t2] INFO thread.TestYield - ---- 2>293 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129513 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129514 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129515 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129516 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129517 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129518
如上述结果所示,t2线程每次执行时进行了yield(),线程1执行的机会明显比线程2要多。
线程的优先级
线程内部用1~10的数来调整线程的优先级,默认的线程优先级为NORM_PRIORITY:5
cpu比较忙时,优先级高的线程获取更多的时间片
cpu比较闲时,优先级设置基本没用
public final static int MIN_PRIORITY = 1; public final static int NORM_PRIORITY = 5; public final static int MAX_PRIORITY = 10; // 方法的定义 public final void setPriority(int newPriority) { }
cpu比较忙时
Runnable r1 = () -> { int count = 0; for (;;){ log.info("---- 1>" + count++); } }; Runnable r2 = () -> { int count = 0; for (;;){ log.info(" ---- 2>" + count++); } }; Thread t1 = new Thread(r1,"t1"); Thread t2 = new Thread(r2,"t2"); t1.setPriority(Thread.NORM_PRIORITY); t2.setPriority(Thread.MAX_PRIORITY); t1.start(); t2.start(); // 可能的运行结果 11:59:00.696 [t1] INFO thread.TestYieldPriority - ---- 1>44102 11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135903 11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135904 11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135905 11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135906
cpu比较闲时
Runnable r1 = () -> { int count = 0; for (int i = 0; i < 10; i++) { log.info("---- 1>" + count++); } }; Runnable r2 = () -> { int count = 0; for (int i = 0; i < 10; i++) { log.info(" ---- 2>" + count++); } }; Thread t1 = new Thread(r1,"t1"); Thread t2 = new Thread(r2,"t2"); t1.setPriority(Thread.MIN_PRIORITY); t2.setPriority(Thread.MAX_PRIORITY); t1.start(); t2.start(); // 可能的运行结果 线程1优先级低 却先运行完 12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>7 12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>8 12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>9 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>2 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>3 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>4 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>5 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>6 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>7 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>8 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>9
默认状况下,java进程须要等待全部线程都运行结束,才会结束,有一种特殊线程叫守护线程,当全部的非守护线程都结束后,即便它没有执行完,也会强制结束。
默认的线程都是非守护线程。
垃圾回收线程就是典型的守护线程
// 方法的定义 public final void setDaemon(boolean on) { } Thread thread = new Thread(() -> { while (true) { } }); // 具体的api。设为true表示未守护线程,当主线程结束后,守护线程也结束。 // 默认是false,当主线程结束后,thread继续运行,程序不中止 thread.setDaemon(true); thread.start(); log.info("结束");
线程的阻塞能够分为好多种,从操做系统层面和java层面阻塞的定义可能不一样,可是广义上使得线程阻塞的方式有下面几种
使线程休眠,会将运行中的线程进入阻塞状态。当休眠时间结束后,从新争抢cpu的时间片继续运行
// 方法的定义 native方法 public static native void sleep(long millis) throws InterruptedException; try { // 休眠2秒 // 该方法会抛出 InterruptedException异常 即休眠过程当中可被中断,被中断后抛出异常 Thread.sleep(2000); } catch (InterruptedException异常 e) { } try { // 使用TimeUnit的api可替代 Thread.sleep TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { }
join是指调用该方法的线程进入阻塞状态,等待某线程执行完成后恢复运行
// 方法的定义 有重载 // 等待线程执行完才恢复运行 public final void join() throws InterruptedException { } // 指定join的时间。指定时间内 线程还未执行完 调用方线程不继续等待就恢复运行 public final synchronized void join(long millis) throws InterruptedException{}
Thread t = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } r = 10; }); t.start(); // 让主线程阻塞 等待t线程执行完才继续执行 // 去除该行,执行结果为0,加上该行 执行结果为10 t.join(); log.info("r:{}", r); // 运行结果 13:09:13.892 [main] INFO thread.TestJoin - r:10
// 相关方法的定义 public void interrupt() { } public boolean isInterrupted() { } public static boolean interrupted() { }
打断标记:线程是否被打断,true表示被打断了,false表示没有
isInterrupted() 获取线程的打断标记 ,调用后不会修改线程的打断标记
interrupt()方法用于中断线程
interrupted() 获取线程的打断标记,调用后清空打断标记 即若是获取为true 调用后打断标记为false (不经常使用)
interrupt实例: 有个后台监控线程不停的监控,当外界打断它时,就结束运行。代码以下
@Slf4j class TwoPhaseTerminal{ // 监控线程 private Thread monitor; public void start(){ monitor = new Thread(() ->{ // 不停的监控 while (true){ Thread thread = Thread.currentThread(); // 判断当前线程是否被打断 if (thread.isInterrupted()){ log.info("当前线程被打断,结束运行"); break; } try { Thread.sleep(1000); // 监控逻辑中被打断后,打断标记为true log.info("监控"); } catch (InterruptedException e) { // 睡眠时被打断时抛出异常 在该处捕获到 此时打断标记仍是false // 在调用一次中断 使得中断标记为true thread.interrupt(); } } }); monitor.start(); } public void stop(){ monitor.interrupt(); } }
上面说了一些基本的api的使用,调用上面的方法后都会使得线程有对应的状态。
线程的状态可从 操做系统层面分为五种状态 从java api层面分为六种状态。
Thread类中的内部枚举State
public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; }
Runnable 线程调用了start()方法后进入该状态,该状态包含了三种状况
六种线程状态和方法的对应关系
主要总结Thread类中的核心方法
方法名称 | 是否static | 方法说明 |
---|---|---|
start() | 否 | 让线程启动,进入就绪状态,等待cpu分配时间片 |
run() | 否 | 重写Runnable接口的方法,线程获取到cpu时间片时执行的具体逻辑 |
yield() | 是 | 线程的礼让,使得获取到cpu时间片的线程进入就绪状态,从新争抢时间片 |
sleep(time) | 是 | 线程休眠固定时间,进入阻塞状态,休眠时间完成后从新争抢时间片,休眠可被打断 |
join()/join(time) | 否 | 调用线程对象的join方法,调用者线程进入阻塞,等待线程对象执行完或者到达指定时间才恢复,从新争抢时间片 |
isInterrupted() | 否 | 获取线程的打断标记,true:被打断,false:没有被打断。调用后不会修改打断标记 |
interrupt() | 否 | 打断线程,抛出InterruptedException异常的方法都可被打断,可是打断后不会修改打断标记,正常执行的线程被打断后会修改打断标记 |
interrupted() | 否 | 获取线程的打断标记。调用后会清空打断标记 |
stop() | 否 | 中止线程运行 不推荐 |
suspend() | 否 | 挂起线程 不推荐 |
resume() | 否 | 恢复线程运行 不推荐 |
currentThread() | 是 | 获取当前线程 |
Object中与线程相关方法
方法名称 | 方法说明 |
---|---|
wait()/wait(long timeout) | 获取到锁的线程进入阻塞状态 |
notify() | 随机唤醒被wait()的一个线程 |
notifyAll(); | 唤醒被wait()的全部线程,从新争抢时间片 |
问题有可能出如今多个线程访问共享资源
临界区: 一段代码若是对共享资源的多线程读写操做,这段代码就被称为临界区。
注意的是 指令交错指的是 java代码在解析成字节码文件时,java代码的一行代码在字节码中可能有多行,在线程上下文切换时就有可能交错。
线程安全指的是多线程调用同一个对象的临界区的方法时,对象的属性值必定不会发生错误,这就是保证了线程安全。
以下面不安全的代码
// 对象的成员变量 private static int count = 0; public static void main(String[] args) throws InterruptedException { // t1线程对变量+5000次 Thread t1 = new Thread(() -> { for (int i = 0; i < 5000; i++) { count++; } }); // t2线程对变量-5000次 Thread t2 = new Thread(() -> { for (int i = 0; i < 5000; i++) { count--; } }); t1.start(); t2.start(); // 让t1 t2都执行完 t1.join(); t2.join(); System.out.println(count); } // 运行结果 -1399
上面的代码 两个线程,一个+5000次,一个-5000次,若是线程安全,count的值应该仍是0。
可是运行不少次,每次的结果不一样,且都不是0,因此是线程不安全的。
线程安全的类必定全部的操做都线程安全吗?
开发中常常会说到一些线程安全的类,如ConcurrentHashMap,线程安全指的是类里每个独立的方法是线程安全的,可是方法的组合就不必定是线程安全的。
成员变量和静态变量是否线程安全?
若是存在多线程共享
局部变量是否线程安全?
局部变量引用的对象未必是线程安全的
同步锁也叫对象锁,是锁在对象上的,不一样的对象就是不一样的锁。
该关键字是用于保证线程安全的,是阻塞式的解决方案。
让同一个时刻最多只有一个线程能持有对象锁,其余线程在想获取这个对象锁就会被阻塞,不用担忧上下文切换的问题。
注意: 不要理解为一个线程加了锁 ,进入 synchronized代码块中就会一直执行下去。若是时间片切换了,也会执行其余线程,再切换回来会紧接着执行,只是不会执行到有竞争锁的资源,由于当前线程还未释放锁。
当一个线程执行完synchronized的代码块后 会唤醒正在等待的线程
synchronized实际上使用对象锁保证临界区的原子性 临界区的代码是不可分割的 不会由于线程切换所打断
基本使用
// 加在方法上 实际是对this对象加锁 private synchronized void a() { } // 同步代码块,锁对象能够是任意的,加在this上 和a()方法做用相同 private void b(){ synchronized (this){ } } // 加在静态方法上 实际是对类对象加锁 private synchronized static void c() { } // 同步代码块 实际是对类对象加锁 和c()方法做用相同 private void d(){ synchronized (TestSynchronized.class){ } } // 上述b方法对应的字节码源码 其中monitorenter就是加锁的地方 0 aload_0 1 dup 2 astore_1 3 monitorenter 4 aload_1 5 monitorexit 6 goto 14 (+8) 9 astore_2 10 aload_1 11 monitorexit 12 aload_2 13 athrow 14 return
线程安全的代码
private static int count = 0; private static Object lock = new Object(); private static Object lock2 = new Object(); // t1线程和t2对象都是对同一对象加锁。保证了线程安全。此段代码不管执行多少次,结果都是0 public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { for (int i = 0; i < 5000; i++) { synchronized (lock) { count++; } } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 5000; i++) { synchronized (lock) { count--; } } }); t1.start(); t2.start(); // 让t1 t2都执行完 t1.join(); t2.join(); System.out.println(count); }
重点:加锁是加在对象上,必定要保证是同一对象,加锁才能生效
线程间通讯能够经过共享变量+wait()¬ify()来实现
wait()将线程进入阻塞状态,notify()将线程唤醒
当多线程竞争访问对象的同步方法时,锁对象会关联一个底层的Monitor对象(重量级锁的实现)
以下图所示 Thread0,1先竞争到锁执行了代码后,2,3,4,5线程同时来执行临界区的代码,开始竞争锁
注意:
static final Object lock = new Object(); new Thread(() -> { synchronized (lock) { log.info("开始执行"); try { // 同步代码内部才能调用 lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("继续执行核心逻辑"); } }, "t1").start(); new Thread(() -> { synchronized (lock) { log.info("开始执行"); try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("继续执行核心逻辑"); } }, "t2").start(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("开始唤醒"); synchronized (lock) { // 同步代码内部才能调用 lock.notifyAll(); } // 执行结果 14:29:47.138 [t1] INFO TestWaitNotify - 开始执行 14:29:47.141 [t2] INFO TestWaitNotify - 开始执行 14:29:49.136 [main] INFO TestWaitNotify - 开始唤醒 14:29:49.136 [t2] INFO TestWaitNotify - 继续执行核心逻辑 14:29:49.136 [t1] INFO TestWaitNotify - 继续执行核心逻辑
wait 和 sleep的区别?
两者都会让线程进入阻塞状态,有如下区别
LockSupport是juc下的工具类,提供了park和unpark方法,能够实现线程通讯
与wait和notity相比的不一样点
指的是有生产者来生产数据,消费者来消费数据,生产者生产满了就不生产了,通知消费者取,等消费了再进行生产。
消费者消费不到了就不消费了,通知生产者生产,生产到了再继续消费。
public static void main(String[] args) throws InterruptedException { MessageQueue queue = new MessageQueue(2); // 三个生产者向队列里存值 for (int i = 0; i < 3; i++) { int id = i; new Thread(() -> { queue.put(new Message(id, "值" + id)); }, "生产者" + i).start(); } Thread.sleep(1000); // 一个消费者不停的从队列里取值 new Thread(() -> { while (true) { queue.take(); } }, "消费者").start(); } } // 消息队列被生产者和消费者持有 class MessageQueue { private LinkedList<Message> list = new LinkedList<>(); // 容量 private int capacity; public MessageQueue(int capacity) { this.capacity = capacity; } /** * 生产 */ public void put(Message message) { synchronized (list) { while (list.size() == capacity) { log.info("队列已满,生产者等待"); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.addLast(message); log.info("生产消息:{}", message); // 生产后通知消费者 list.notifyAll(); } } public Message take() { synchronized (list) { while (list.isEmpty()) { log.info("队列已空,消费者等待"); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = list.removeFirst(); log.info("消费消息:{}", message); // 消费后通知生产者 list.notifyAll(); return message; } } } // 消息 class Message { private int id; private Object value; }
为了更形象的表达加同步锁的概念,这里举一个生活中的例子,尽可能把以上的概念具体化出来。
这里举一个每一个人很是感兴趣的一件东西。 钱!!!(马老师除外)。
现实中,咱们去银行门口的自动取款机取钱,取款机的钱就是共享变量,为了保障安全,不可能两个陌生人同时进入同一个取款机内取钱,因此只能一我的进入取钱,而后锁上取款机的门,其余人只能在取款机门口等待。
取款机有多个,里面的钱互不影响,锁也有多个(多个对象锁),取钱人在多个取款机里同时取钱也没有安全问题。
假如每一个取钱的陌生人都是线程,当取钱人进入取款机锁了门后(线程得到锁),取到钱后出门(线程释放锁),下一我的竞争到锁来取钱。
假设工做人员也是一个线程,若是取钱人进入后发现取款机钱不足了,这时通知工做人员来向取款机里加钱(调用notifyAll方法),取钱人暂停取钱,进入银行大堂阻塞等待(调用wait方法)。
银行大堂里的工做人员和取钱人都被唤醒,从新竞争锁,进入后若是是取钱人,因为取款机没钱,还得进入银行大堂等待。
当工做人员得到取款机的锁进入后,加了钱后会通知大厅里的人来取钱(调用notifyAll方法)。本身暂停加钱,进入银行大堂等待唤醒加钱(调用wait方法)。
这时大堂里等待的人都来竞争锁,谁获取到谁进入继续取钱。
和现实中不一样的就是这里没有排队的概念,谁抢到锁谁进去取。
可重入锁 : 一个线程获取到对象的锁后,执行方法内部在须要获取锁的时候是能够获取到的。如如下代码
private static final ReentrantLock LOCK = new ReentrantLock(); private static void m() { LOCK.lock(); try { log.info("begin"); // 调用m1() m1(); } finally { // 注意锁的释放 LOCK.unlock(); } } public static void m1() { LOCK.lock(); try { log.info("m1"); m2(); } finally { // 注意锁的释放 LOCK.unlock(); } }
synchronized 也是可重入锁,ReentrantLock有如下优势
api
// 默认非公平锁,参数传true 表示未公平锁 ReentrantLock lock = new ReentrantLock(false); // 尝试获取锁 lock() // 释放锁 应放在finally块中 必须执行到 unlock() try { // 获取锁时可被打断,阻塞中的线程可被打断 LOCK.lockInterruptibly(); } catch (InterruptedException e) { return; } // 尝试获取锁 获取不到就返回false LOCK.tryLock() // 支持超时时间 一段时间没获取到就返回false tryLock(long timeout, TimeUnit unit) // 指定条件变量 休息室 一个锁能够建立多个休息室 Condition waitSet = ROOM.newCondition(); // 释放锁 进入waitSet等待 释放后其余线程能够抢锁 yanWaitSet.await() // 唤醒具体休息室的线程 唤醒后 重写竞争锁 yanWaitSet.signal()
实例:一个线程输出a,一个线程输出b,一个线程输出c,abc按照顺序输出,连续输出5次
这个考的就是线程的通讯,利用 wait()/notify()和控制变量能够实现,此处使用ReentrantLock便可实现该功能。
public static void main(String[] args) { AwaitSignal awaitSignal = new AwaitSignal(5); // 构建三个条件变量 Condition a = awaitSignal.newCondition(); Condition b = awaitSignal.newCondition(); Condition c = awaitSignal.newCondition(); // 开启三个线程 new Thread(() -> { awaitSignal.print("a", a, b); }).start(); new Thread(() -> { awaitSignal.print("b", b, c); }).start(); new Thread(() -> { awaitSignal.print("c", c, a); }).start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } awaitSignal.lock(); try { // 先唤醒a a.signal(); } finally { awaitSignal.unlock(); } } } class AwaitSignal extends ReentrantLock { // 循环次数 private int loopNumber; public AwaitSignal(int loopNumber) { this.loopNumber = loopNumber; } /** * @param print 输出的字符 * @param current 当前条件变量 * @param next 下一个条件变量 */ public void print(String print, Condition current, Condition next) { for (int i = 0; i < loopNumber; i++) { lock(); try { try { // 获取锁以后等待 current.await(); System.out.print(print); } catch (InterruptedException e) { } next.signal(); } finally { unlock(); } } }
说到死锁,先举个例子,
下面是代码实现
static Beer beer = new Beer(); static Story story = new Story(); public static void main(String[] args) { new Thread(() ->{ synchronized (beer){ log.info("我有酒,给我故事"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (story){ log.info("小王开始喝酒讲故事"); } } },"小王").start(); new Thread(() ->{ synchronized (story){ log.info("我有故事,给我酒"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (beer){ log.info("老王开始喝酒讲故事"); } } },"老王").start(); } class Beer { } class Story{ }
死锁致使程序没法正常运行下去
检测工具能够检查到死锁信息
jmm 体如今如下三个方面
停不下来的程序
static boolean run = true; public static void main(String[] args) throws InterruptedException { Thread t = new Thread(() -> { while (run) { // .... } }); t.start(); Thread.sleep(1000); // 线程t不会如预想的停下来 run = false; }
如上图所示,线程有本身的工做缓存,当主线程修改了变量并同步到主内存时,t线程没有读取到,因此程序停不下来
JVM在不影响程序正确性的状况下可能会调整语句的执行顺序,该状况也称为 指令重排序
static int i; static int j; // 在某个线程内执行以下赋值操做 i = ...; j = ...; 有可能将j先赋值
原子性你们应该比较熟悉,上述同步锁的synchronized代码块就是保证了原子性,就是一段代码是一个总体,原子性保证了线程安全,不会受到上下文切换的影响。
该关键字解决了可见性和有序性,volatile经过内存屏障来实现的
会在对象写操做以后加写屏障,会对写屏障的以前的数据都同步到主存,而且保证写屏障的执行顺序在写屏障以前
会在对象读操做以前加读屏障,会在读屏障以后的语句都从主存读,并保证读屏障以后的代码执行在读屏障以后
注意: volatile不能解决原子性,即不能经过该关键字实现线程安全。
volatile应用场景:一个线程读取变量,另外的线程操做变量,加了该关键字后保证写变量后,读变量的线程能够及时感知。
cas (compare and swap) 比较并交换
为变量赋值时,从内存中读取到的值v,获取到要交换的新值n,执行 compareAndSwap()方法时,比较v和当前内存中的值是否一致,若是一致则将n和v交换,若是不一致,则自旋重试。
cas底层是cpu层面的,即不使用同步锁也能够保证操做的原子性。
private AtomicInteger balance; // 模拟cas的具体操做 @Override public void withdraw(Integer amount) { while (true) { // 获取当前值 int pre = balance.get(); // 进行操做后获得新值 int next = pre - amount; // 比较并设置成功 则中断 不然自旋重试 if (balance.compareAndSet(pre, next)) { break; } } }
无锁的效率是要高于以前的锁的,因为无锁不会涉及线程的上下文切换
cas是乐观锁的思想,sychronized是悲观锁的思想
cas适合不多有线程竞争的场景,若是竞争很强,重试常常发生,反而下降效率
juc并发包下包含了实现了cas的原子类
经常使用api
new AtomicInteger(balance) get() compareAndSet(pre, next) // i.incrementAndGet() ++i // i.decrementAndGet() --i // i.getAndIncrement() i++ // i.getAndDecrement() ++i i.addAndGet() // 传入函数式接口 修改i int getAndUpdate(IntUnaryOperator updateFunction) // cas 的核心方法 compareAndSet(int expect, int update)
cas存在ABA问题,即比较并交换时,若是原值为A,有其余线程将其修改成B,在有其余线程将其修改成A。
此时实际发生过交换,可是比较和交换因为值没改变能够交换成功
解决方式
AtomicStampedReference/AtomicMarkableReference
上面两个类解决ABA问题,原理就是为对象增长版本号,每次修改时增长版本号,就能够避免ABA问题
或者增长个布尔变量标识,修改后调整布尔变量值,也能够避免ABA问题
线程池是java并发最重要的一个知识点,也是难点,是实际应用最普遍的。
线程的资源很宝贵,不可能无限的建立,必需要有管理线程的工具,线程池就是一种管理线程的工具,java开发中常常有池化的思想,如 数据库链接池、Redis链接池等。
预先建立好一些线程,任务提交时直接执行,既能够节约建立线程的时间,又能够控制线程的数量。
线程池的好处
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
构造器参数的意义
参数名 | 参数意义 |
---|---|
corePoolSize | 核心线程数 |
maximumPoolSize | 最大线程数 |
keepAliveTime | 救急线程的空闲时间 |
unit | 救急线程的空闲时间单位 |
workQueue | 阻塞队列 |
threadFactory | 建立线程的工厂,主要定义线程名 |
handler | 拒绝策略 |
下面 咱们经过一个实例来理解线程池的参数以及线程池的接收任务的过程
如上图 银行办理业务。
线程池经过一个int变量的高3位来表示线程池的状态,低29位来存储线程池的数量
状态名称 | 高三位 | 接收新任务 | 处理阻塞队列任务 | 说明 |
---|---|---|---|---|
Running | 111 | Y | Y | 正常接收任务,正常处理任务 |
Shutdown | 000 | N | Y | 不会接收任务,会执行完正在执行的任务,也会处理阻塞队列里的任务 |
stop | 001 | N | N | 不会接收任务,会中断正在执行的任务,会放弃处理阻塞队列里的任务 |
Tidying | 010 | N | N | 任务所有执行完毕,当前活动线程是0,即将进入终结 |
Termitted | 011 | N | N | 终结状态 |
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
线程池建立、接收任务、执行任务、回收线程的步骤
注意: 不是刚建立的线程是核心线程,后面建立的线程是非核心线程,线程是没有核心非核心的概念的,这是我长期以来的误解。
拒绝策略
提交任务的方法
// 执行Runnable public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } // 提交Callable public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 内部构建FutureTask RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } // 提交Runnable,指定返回值 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); // 内部构建FutureTask RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } // 提交Runnable,指定返回值 public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); // 内部构建FutureTask RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
注意: 下面几种方式都不推荐使用
1.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
2.newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
3.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
4.newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
shutdown()
会让线程池状态为shutdown,不能接收任务,可是会将工做线程和阻塞队列里的任务执行完 至关于优雅关闭
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
shutdownNow()
会让线程池状态为stop, 不能接收任务,会当即中断执行中的工做线程,而且不会执行阻塞队列里的任务, 会返回阻塞队列的任务列表
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
线程池难就难在参数的配置,有一套理论配置参数
cpu密集型 : 指的是程序主要发生cpu的运算
核心线程数: CPU核心数+1
IO密集型: 远程调用RPC,操做数据库等,不须要使用cpu进行大量的运算。 大多数应用的场景
核心线程数=核数cpu指望利用率 总时间/cpu运算时间
可是基于以上理论仍是很难去配置,由于cpu运算时间很差估算
实际配置大小可参考下表
cpu密集型 | io密集型 | |
---|---|---|
线程数数量 | 核数<=x<=核数*2 | 核心数50<=x<=核心数 100 |
队列长度 | y>=100 | 1<=y<=10 |
1.线程池参数经过分布式配置,修改配置无需重启应用
线程池参数是根据线上的请求数变化而变化的,最好的方式是 核心线程数、最大线程数 队列大小都是可配置的
主要配置 corePoolSize maxPoolSize queueSize
java提供了可方法覆盖参数,线程池内部会处理好参数 进行平滑的修改
public void setCorePoolSize(int corePoolSize) { }
2.增长线程池的监控
3.io密集型可调整为先新增任务到最大线程数后再将任务放到阻塞队列
代码 主要可重写阻塞队列 加入任务的方法
public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException("The task queue does not have executor!"); } final ReentrantLock lock = this.lock; lock.lock(); try { int currentPoolThreadSize = executor.getPoolSize(); // 若是提交任务数小于当前建立的线程数, 说明还有空闲线程, if (executor.getTaskCount() < currentPoolThreadSize) { // 将任务放入队列中,让线程去处理任务 return super.offer(runnable); } // 核心改动 // 若是当前线程数小于最大线程数,则返回 false ,让线程池去建立新的线程 if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; } // 不然,就将任务放入队列中 return super.offer(runnable); } finally { lock.unlock(); } }
3.拒绝策略 建议使用tomcat的拒绝策略(给一次机会)
// tomcat的源码 @Override public void execute(Runnable command) { if ( executor != null ) { try { executor.execute(command); } catch (RejectedExecutionException rx) { // 捕获到异常后 在从队列获取,至关于重试1取不到任务 在执行拒绝任务 if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full."); } } else throw new IllegalStateException("StandardThreadPool not started."); }
建议修改从队列取任务的方式: 增长超时时间,超时1分钟取不到在进行返回
public boolean offer(E e, long timeout, TimeUnit unit){}
文章篇幅较长,给看到这里的小伙伴点个大大的赞!因为做者水平有限,加之第一次写博客,文章中不免会有错误之处,欢迎小伙伴们反馈指正。
若是以为文章对你有帮助,麻烦 点赞、评论、转发、在看 走起