Package java.util.concurrent
---> AtomicInteger
Lock
ReadWriteLock
java
保证可见性、不保证原子性、禁止指令重排程序员
保证可见性算法
当多个线程访问同一个变量时,一个线程修改了这个变量的值,其余线程可以当即看到修改的值小程序
当不添加volatile关键字时示例:api
package com.jian8.juc;
import java.util.concurrent.TimeUnit;
/** * 1验证volatile的可见性 * 1.1 若是int num = 0,number变量没有添加volatile关键字修饰 * 1.2 添加了volatile,能够解决可见性 */
public class VolatileDemo {
public static void main(String[] args) {
visibilityByVolatile();//验证volatile的可见性
}
/** * volatile能够保证可见性,及时通知其余线程,主物理内存的值已经被修改 */
public static void visibilityByVolatile() {
MyData myData = new MyData();
//第一个线程
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t come in");
try {
//线程暂停3s
TimeUnit.SECONDS.sleep(3);
myData.addToSixty();
System.out.println(Thread.currentThread().getName() + "\t update value:" + myData.num);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, "thread1").start();
//第二个线程是main线程
while (myData.num == 0) {
//若是myData的num一直为零,main线程一直在这里循环
}
System.out.println(Thread.currentThread().getName() + "\t mission is over, num value is " + myData.num);
}
}
class MyData {
// int num = 0;
volatile int num = 0;
public void addToSixty() {
this.num = 60;
}
}
复制代码
输出结果:数组
thread1 come in
thread1 update value:60
//线程进入死循环
复制代码
当咱们加上volatile
关键字后,volatile int num = 0;
输出结果为:缓存
thread1 come in
thread1 update value:60
main mission is over, num value is 60
//程序没有死循环,结束执行
复制代码
不保证原子性安全
原子性:不可分割、完整性,即某个线程正在作某个具体业务时,中间不能够被加塞或者被分割,须要总体完整,要么同时成功,要么同时失败bash
验证示例(变量添加volatile关键字,方法不添加synchronized):服务器
package com.jian8.juc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/** * 1验证volatile的可见性 * 1.1 若是int num = 0,number变量没有添加volatile关键字修饰 * 1.2 添加了volatile,能够解决可见性 * * 2.验证volatile不保证原子性 * 2.1 原子性指的是什么 * 不可分割、完整性,即某个线程正在作某个具体业务时,中间不能够被加塞或者被分割,须要总体完整,要么同时成功,要么同时失败 */
public class VolatileDemo {
public static void main(String[] args) {
// visibilityByVolatile();//验证volatile的可见性
atomicByVolatile();//验证volatile不保证原子性
}
/** * volatile能够保证可见性,及时通知其余线程,主物理内存的值已经被修改 */
//public static void visibilityByVolatile(){}
/** * volatile不保证原子性 * 以及使用Atomic保证原子性 */
public static void atomicByVolatile(){
MyData myData = new MyData();
for(int i = 1; i <= 20; i++){
new Thread(() ->{
for(int j = 1; j <= 1000; j++){
myData.addSelf();
myData.atomicAddSelf();
}
},"Thread "+i).start();
}
//等待上面的线程都计算完成后,再用main线程取得最终结果值
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (Thread.activeCount()>2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+"\t finally num value is "+myData.num);
System.out.println(Thread.currentThread().getName()+"\t finally atomicnum value is "+myData.atomicInteger);
}
}
class MyData {
// int num = 0;
volatile int num = 0;
public void addToSixty() {
this.num = 60;
}
public void addSelf(){
num++;
}
AtomicInteger atomicInteger = new AtomicInteger();
public void atomicAddSelf(){
atomicInteger.getAndIncrement();
}
}
复制代码
执行三次结果为:
//1.
main finally num value is 19580
main finally atomicnum value is 20000
//2.
main finally num value is 19999
main finally atomicnum value is 20000
//3.
main finally num value is 18375
main finally atomicnum value is 20000
//num并无达到20000
复制代码
禁止指令重排
有序性:在计算机执行程序时,为了提升性能,编译器和处理器经常会对指令作重排,通常分如下三种
graph LR
源代码 --> id1["编译器优化的重排"]
id1 --> id2[指令并行的重排]
id2 --> id3[内存系统的重排]
id3 --> 最终执行的指令
style id1 fill:#ff8000;
style id2 fill:#fab400;
style id3 fill:#ffd557;
复制代码
单线程环境里面确保程序最终执行结果和代码顺序执行的结果一致。
处理器在进行重排顺序是必需要考虑指令之间的数据依赖性
多线程环境中线程交替执行,因为编译器优化重排的存在,两个线程中使用的变量可否保证一致性时没法肯定的,结果没法预测
重排代码实例:
声明变量:int a,b,x,y=0
线程1 | 线程2 |
---|---|
x = a; | y = b; |
b = 1; | a = 2; |
结 果 | x = 0 y=0 |
若是编译器对这段程序代码执行重排优化后,可能出现以下状况:
线程1 | 线程2 |
---|---|
b = 1; | a = 2; |
x= a; | y = b; |
结 果 | x = 2 y=1 |
这个结果说明在多线程环境下,因为编译器优化重排的存在,两个线程中使用的变量可否保证一致性是没法肯定的
volatile实现禁止指令重排,从而避免了多线程环境下程序出现乱序执行的现象
内存屏障(Memory Barrier)又称内存栅栏,是一个CPU指令,他的做用有两个:
因为编译器和处理器都能执行指令重排优化。若是在指令前插入一条Memory Barrier,则会告诉编译器和CPU,无论什么指令都不能和这条Memory Barrier指令重排顺序,也就是说经过插入内存屏障禁止在内存屏障先后的指令执行重排序优化。内存屏障另一个做用是强制刷出各类CPU的缓存数据,所以任何CPU上的线程都能读取到这些数据的最新版本。
graph TB
subgraph
bbbb["对Volatile变量进行读操做时,<br>回在读操做以前加入一条load屏障指令,<br>从内存中读取共享变量"]
ids6[Volatile]-->red3[LoadLoad屏障]
red3-->id7["禁止下边全部普通读操做<br>和上面的volatile读重排序"]
red3-->red4[LoadStore屏障]
red4-->id9["禁止下边全部普通写操做<br>和上面的volatile读重排序"]
red4-->id8[普通读]
id8-->普通写
end
subgraph
aaaa["对Volatile变量进行写操做时,<br>回在写操做后加入一条store屏障指令,<br>将工做内存中的共享变量值刷新回到主内存"]
id1[普通读]-->id2[普通写]
id2-->red1[StoreStore屏障]
red1-->id3["禁止上面的普通写和<br>下面的volatile写重排序"]
red1-->id4["Volatile写"]
id4-->red2[StoreLoad屏障]
red2-->id5["防止上面的volatile写和<br>下面可能有的volatile读写重排序"]
end
style red1 fill:#ff0000;
style red2 fill:#ff0000;
style red4 fill:#ff0000;
style red3 fill:#ff0000;
style aaaa fill:#ffff00;
style bbbb fill:#ffff00;
复制代码
JMM(Java Memory Model)自己是一种抽象的概念,并不真实存在,他描述的时一组规则或规范,经过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。
JMM关于同步的规定:
因为JVM运行程序的实体是线程,而每一个线程建立时JVM都会为其建立一个工做内存(有的成为栈空间),工做内存是每一个线程的私有数据区域,而java内存模型中规定全部变量都存储在主内存,主内存是共享内存区域,全部线程均可以访问,但线程对变量的操做(读取赋值等)必须在工做内存中进行,首先概要将变量从主内存拷贝到本身的工做内存空间,而后对变量进行操做,操做完成后再将变量写回主内存,不能直接操做主内存中的变量,各个线程中的工做内存中存储着主内存的变量副本拷贝,所以不一样的线程件没法访问对方的工做内存,线程间的通讯(传值)必须经过主内存来完成,期间要访问过程以下图:
当普通单例模式在多线程状况下:
public class SingletonDemo {
private static SingletonDemo instance = null;
private SingletonDemo() {
System.out.println(Thread.currentThread().getName() + "\t 构造方法SingletonDemo()");
}
public static SingletonDemo getInstance() {
if (instance == null) {
instance = new SingletonDemo();
}
return instance;
}
public static void main(String[] args) {
//构造方法只会被执行一次
// System.out.println(getInstance() == getInstance());
// System.out.println(getInstance() == getInstance());
// System.out.println(getInstance() == getInstance());
//并发多线程后,构造方法会在一些状况下执行屡次
for (int i = 0; i < 10; i++) {
new Thread(() -> {
SingletonDemo.getInstance();
}, "Thread " + i).start();
}
}
}
复制代码
其构造方法在一些状况下会被执行屡次
解决方式:
单例模式DCL代码
DCL (Double Check Lock双端检锁机制)在加锁前和加锁后都进行一次判断
public static SingletonDemo getInstance() {
if (instance == null) {
synchronized (SingletonDemo.class) {
if (instance == null) {
instance = new SingletonDemo();
}
}
}
return instance;
}
复制代码
大部分运行结果构造方法只会被执行一次,但指令重排机制会让程序很小的概率出现构造方法被执行屡次
DCL(双端检锁)机制不必定线程安全,缘由时有指令重排的存在,加入volatile能够禁止指令重排
缘由是在某一个线程执行到第一次检测,读取到instance不为null时,instance的引用对象可能没有完成初始化。instance=new SingleDemo();能够被分为一下三步(伪代码):
memory = allocate();//1.分配对象内存空间
instance(memory); //2.初始化对象
instance = memory; //3.设置instance执行刚分配的内存地址,此时instance!=null
复制代码
步骤2和步骤3不存在数据依赖关系,并且不管重排前仍是重排后程序的执行结果在单线程中并无改变,所以这种重排优化时容许的,若是3步骤提早于步骤2,可是instance尚未初始化完成
可是指令重排只会保证串行语义的执行的一致性(单线程),但并不关心多线程间的语义一致性。
因此当一条线程访问instance不为null时,因为instance示例未必已初始化完成,也就形成了线程安全问题。
单例模式volatile代码
为解决以上问题,能够将SingletongDemo实例上加上volatile
private static volatile SingletonDemo instance = null;
复制代码
AtomicInteger.conpareAndSet(int expect, indt update)
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
复制代码
第一个参数为拿到的指望值,若是指望值和内存中变量的值一致,进行update赋值,若是指望值和内存中变量的值不一致,证实数据被修改过,返回fasle,取消赋值
例子:
package com.jian8.juc.cas;
import java.util.concurrent.atomic.AtomicInteger;
/** * 1.CAS是什么? * 1.1比较并交换 */
public class CASDemo {
public static void main(String[] args) {
checkCAS();
}
public static void checkCAS(){
AtomicInteger atomicInteger = new AtomicInteger(5);
System.out.println(atomicInteger.compareAndSet(5, 2019) + "\t current data is " + atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(5, 2014) + "\t current data is " + atomicInteger.get());
}
}
复制代码
输出结果为:
true current data is 2019
false current data is 2019
复制代码
比较当前工做内存中的值和主内存中的值,若是相同则执行规定操做,不然继续比较知道主内存和工做内存中的值一直为止
atomicInteger.getAndIncrement();
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
复制代码
Unsafe
是CAS核心类,因为Java方法没法直接访问地层系统,须要经过本地(native)方法来访问,Unsafe至关于一个后门,基于该类能够直接操做特定内存数据。Unsafe类存在于sun.misc
包中,其内部方法操做能够像C的指针同样直接操做内存,由于Java中CAS操做的执行依赖于Unsafe类的方法。
Unsafe类中的全部方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操做系统底层资源执行相应任务
变量valueOffset,表示该变量值在内存中的偏移地址,由于Unsafe就是根据内存偏移地址获取数据的
变量value用volatile修饰,保证多线程之间的可见性
CAS是什么
CAS全称呼Compare-And-Swap,它是一条CPU并发原语
他的功能是判断内存某个位置的值是否为预期值,若是是则更改成新的值,这个过程是原子的。
CAS并发原语体如今JAVA语言中就是sun.misc.Unsafe类中各个方法。调用Unsafe类中的CAS方法,JVM会帮咱们实现CAS汇编指令。这是一种彻底依赖于硬件的功能,经过他实现了原子操做。因为CAS是一种系统原语,原语属于操做系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,而且原语的执行必须是连续的,在执行过程当中不容许被中断,也就是说CAS是一条CPU的原子指令,不会形成数据不一致问题。
//unsafe.getAndAddInt
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
复制代码
var1 AtomicInteger对象自己
var2 该对象的引用地址
var4 须要变更的数据
var5 经过var1 var2找出的主内存中真实的值
用该对象前的值与var5比较;
若是相同,更新var5+var4而且返回true,
若是不一样,继续去之而后再比较,直到更新完成
循环时间长,开销大
例如getAndAddInt方法执行,有个do while循环,若是CAS失败,一直会进行尝试,若是CAS长时间不成功,可能会给CPU带来很大的开销
只能保证一个共享变量的原子操做
对多个共享变量操做时,循环CAS就没法保证操做的原子性,这个时候就能够用锁来保证原子性
ABA问题
CAS算法实现一个重要前提须要去取内存中某个时刻的数据并在当下时刻比较并替换,那么在这个时间差类会致使数据的变化。
好比线程1从内存位置V取出A,线程2同时也从内存取出A,而且线程2进行一些操做将值改成B,而后线程2又将V位置数据改为A,这时候线程1进行CAS操做发现内存中的值依然时A,而后线程1操做成功。
尽管线程1的CAS操做成功,可是不表明这个过程没有问题
示例代码:
package juc.cas;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import java.util.concurrent.atomic.AtomicReference;
public class AtomicRefrenceDemo {
public static void main(String[] args) {
User z3 = new User("张三", 22);
User l4 = new User("李四", 23);
AtomicReference<User> atomicReference = new AtomicReference<>();
atomicReference.set(z3);
System.out.println(atomicReference.compareAndSet(z3, l4) + "\t" + atomicReference.get().toString());
System.out.println(atomicReference.compareAndSet(z3, l4) + "\t" + atomicReference.get().toString());
}
}
@Getter
@ToString
@AllArgsConstructor
class User {
String userName;
int age;
}
复制代码
输出结果
true User(userName=李四, age=23) false User(userName=李四, age=23) 复制代码
新增机制,修改版本号
package com.jian8.juc.cas;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
/** * ABA问题解决 * AtomicStampedReference */
public class ABADemo {
static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);
public static void main(String[] args) {
System.out.println("=====如下时ABA问题的产生=====");
new Thread(() -> {
atomicReference.compareAndSet(100, 101);
atomicReference.compareAndSet(101, 100);
}, "Thread 1").start();
new Thread(() -> {
try {
//保证线程1完成一次ABA操做
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicReference.compareAndSet(100, 2019) + "\t" + atomicReference.get());
}, "Thread 2").start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("=====如下时ABA问题的解决=====");
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t第1次版本号" + stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t第2次版本号" + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t第3次版本号" + atomicStampedReference.getStamp());
}, "Thread 3").start();
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t第1次版本号" + stamp);
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean result = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName() + "\t修改是否成功" + result + "\t当前最新实际版本号:" + atomicStampedReference.getStamp());
System.out.println(Thread.currentThread().getName() + "\t当前最新实际值:" + atomicStampedReference.getReference());
}, "Thread 4").start();
}
}
复制代码
输出结果:
=====如下时ABA问题的产生=====
true 2019
=====如下时ABA问题的解决=====
Thread 3 第1次版本号1
Thread 4 第1次版本号1
Thread 3 第2次版本号2
Thread 3 第3次版本号3
Thread 4 修改是否成功false 当前最新实际版本号:3
Thread 4 当前最新实际值:100
复制代码
HashSet与ArrayList一致 HashMap
HashSet底层是一个HashMap,存储的值放在HashMap的key里,value存储了一个PRESENT的静态Object对象
package com.jian8.juc.collection;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/** * 集合类不安全问题 * ArrayList */
public class ContainerNotSafeDemo {
public static void main(String[] args) {
notSafe();
}
/** * 故障现象 * java.util.ConcurrentModificationException */
public static void notSafe() {
List<String> list = new ArrayList<>();
for (int i = 1; i <= 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, "Thread " + i).start();
}
}
}
复制代码
报错:
Exception in thread "Thread 10" java.util.ConcurrentModificationException
复制代码
并发正常修改致使
一我的正在写入,另外一个同窗来抢夺,致使数据不一致,并发修改异常
List<String> list = new Vector<>();//Vector线程安全
List<String> list = Collections.synchronizedList(new ArrayList<>());//使用辅助类
List<String> list = new CopyOnWriteArrayList<>();//写时复制,读写分离
Map<String, String> map = new ConcurrentHashMap<>();
Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
复制代码
CopyOnWriteArrayList.add方法:
CopyOnWrite容器即写时复制,往一个元素添加容器的时候,不直接往当前容器Object[]添加,而是先将当前容器Object[]进行copy,复制出一个新的容器Object[] newElements,而后向新的容器添加元素,添加完元素以后,再将原容器的引用指向新的容器setArray(newElements),这样作能够对CopyOnWrite容器进行并发的读,而不须要加锁,由于当前容器不会添加任何元素,因此CopyOnWrite容器也是一种读写分离的思想,读和写不一样的容器
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
复制代码
是什么
公平锁就是先来后到、非公平锁就是容许加塞,Lock lock = new ReentrantLock(Boolean fair);
默认非公平。
公平锁是指多个线程按照申请锁的顺序来获取锁,相似排队打饭。
非公平锁是指多个线程获取锁的顺序并非按照申请锁的顺序,有可能后申请的线程优先获取锁,在高并发的状况下,有可能会形成优先级反转或者节现象。
二者区别
公平锁:Threads acquire a fair lock in the order in which they requested it
公平锁,就是很公平,在并发环境中,每一个线程在获取锁时,会先查看此锁维护的等待队列,若是为空,或者当前线程就是等待队列的第一个,就占有锁,不然就会加入到等待队列中,之后会按照FIFO的规则从队列中取到本身。
非公平锁:a nonfair lock permits barging: threads requesting a lock can jump ahead of the queue of waiting threads if the lock happens to be available when it is requested.
非公平锁比较粗鲁,上来就直接尝试占有额,若是尝试失败,就再采用相似公平锁那种方式。
other
对Java ReentrantLock而言,经过构造函数指定该锁是否公平,默认是非公平锁,非公平锁的优势在于吞吐量比公平锁大
对Synchronized而言,是一种非公平锁
递归锁是什么
指的时同一线程外层函数得到锁以后,内层递归函数仍然能获取该锁的代码,在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁,也就是说,线程能够进入任何一个它已经拥有的锁所同步着的代码块
ReentrantLock/Synchronized 就是一个典型的可重入锁
可重入锁最大的做用是避免死锁
代码示例
package com.jian8.juc.lock;
####
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "Thread 1").start();
new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "Thread 2").start();
}
}
class Phone{
public synchronized void sendSMS()throws Exception{
System.out.println(Thread.currentThread().getName()+"\t -----invoked sendSMS()");
Thread.sleep(3000);
sendEmail();
}
public synchronized void sendEmail() throws Exception{
System.out.println(Thread.currentThread().getName()+"\t +++++invoked sendEmail()");
}
}
复制代码
package com.jian8.juc.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockDemo {
public static void main(String[] args) {
Mobile mobile = new Mobile();
new Thread(mobile).start();
new Thread(mobile).start();
}
}
class Mobile implements Runnable{
Lock lock = new ReentrantLock();
@Override
public void run() {
get();
}
public void get() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"\t invoked get()");
set();
}finally {
lock.unlock();
}
}
public void set(){
lock.lock();
try{
System.out.println(Thread.currentThread().getName()+"\t invoked set()");
}finally {
lock.unlock();
}
}
}
复制代码
概念
独占锁:指该锁一次只能被一个线程所持有,对ReentrantLock和Synchronized而言都是独占锁
共享锁:只该锁可被多个线程所持有
ReentrantReadWriteLock其读锁是共享锁,写锁是独占锁
互斥锁:读锁的共享锁能够保证并发读是很是高效的,读写、写读、写写的过程是互斥的
代码示例
package com.jian8.juc.lock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** * 多个线程同时读一个资源类没有任何问题,因此为了知足并发量,读取共享资源应该能够同时进行。 * 可是 * 若是有一个线程象取写共享资源来,就不该该自由其余线程能够对资源进行读或写 * 总结 * 读读能共存 * 读写不能共存 * 写写不能共存 */
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 1; i <= 5; i++) {
final int tempInt = i;
new Thread(() -> {
myCache.put(tempInt + "", tempInt + "");
}, "Thread " + i).start();
}
for (int i = 1; i <= 5; i++) {
final int tempInt = i;
new Thread(() -> {
myCache.get(tempInt + "");
}, "Thread " + i).start();
}
}
}
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
/** * 写操做:原子+独占 * 整个过程必须是一个完整的统一体,中间不准被分割,不准被打断 * * @param key * @param value */
public void put(String key, Object value) {
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t正在写入:" + key);
TimeUnit.MILLISECONDS.sleep(300);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "\t写入完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.writeLock().unlock();
}
}
public void get(String key) {
rwLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t正在读取:" + key);
TimeUnit.MILLISECONDS.sleep(300);
Object result = map.get(key);
System.out.println(Thread.currentThread().getName() + "\t读取完成: " + result);
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
}
public void clear() {
map.clear();
}
}
复制代码
spinlock
是指尝试获取锁的线程不会当即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减小线程上下文切换的消耗,缺点是循环会消耗CPU
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
复制代码
手写自旋锁:
package com.jian8.juc.lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/** * 实现自旋锁 * 自旋锁好处,循环比较获取直到成功为止,没有相似wait的阻塞 * * 经过CAS操做完成自旋锁,A线程先进来调用mylock方法本身持有锁5秒钟,B随后进来发现当前有线程持有锁,不是null,因此只能经过自旋等待,知道A释放锁后B随后抢到 */
public class SpinLockDemo {
public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();
new Thread(() -> {
spinLockDemo.mylock();
try {
TimeUnit.SECONDS.sleep(3);
}catch (Exception e){
e.printStackTrace();
}
spinLockDemo.myUnlock();
}, "Thread 1").start();
try {
TimeUnit.SECONDS.sleep(3);
}catch (Exception e){
e.printStackTrace();
}
new Thread(() -> {
spinLockDemo.mylock();
spinLockDemo.myUnlock();
}, "Thread 2").start();
}
//原子引用线程
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void mylock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t come in");
while (!atomicReference.compareAndSet(null, thread)) {
}
}
public void myUnlock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName()+"\t invoked myunlock()");
}
}
复制代码
它容许一个或多个线程一直等待,知道其余线程的操做执行完后再执行。例如,应用程序的主线程但愿在负责启动框架服务的线程已经启动全部的框架服务以后再执行
CountDownLatch主要有两个方法,当一个或多个线程调用await()方法时,调用线程会被阻塞。其余线程调用countDown()方法会将计数器减1,当计数器的值变为0时,因调用await()方法被阻塞的线程才会被唤醒,继续执行
代码示例:
package com.jian8.juc.conditionThread;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// general();
countDownLatchTest();
}
public static void general(){
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"\t上完自习,离开教室");
}, "Thread-->"+i).start();
}
while (Thread.activeCount()>2){
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
}
System.out.println(Thread.currentThread().getName()+"\t=====班长最后关门走人");
}
public static void countDownLatchTest() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"\t被灭");
countDownLatch.countDown();
}, CountryEnum.forEach_CountryEnum(i).getRetMessage()).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"\t=====秦统一");
}
}
复制代码
CycliBarrier
可循环(Cyclic)使用的屏障。让一组线程到达一个屏障(也可叫同步点)时被阻塞,知道最后一个线程到达屏障时,屏障才会开门,全部被屏障拦截的线程才会继续干活,线程进入屏障经过CycliBarrier的await()方法
代码示例:
package com.jian8.juc.conditionThread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
cyclicBarrierTest();
}
public static void cyclicBarrierTest() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("====召唤神龙=====");
});
for (int i = 1; i <= 7; i++) {
final int tempInt = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t收集到第" + tempInt + "颗龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, "" + i).start();
}
}
}
复制代码
能够代替Synchronize和Lock
信号量主要用于两个目的,一个是用于多个共享资源的互斥做用,另外一个用于并发线程数的控制
代码示例:
抢车位示例:
package com.jian8.juc.conditionThread;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);//模拟三个停车位
for (int i = 1; i <= 6; i++) {//模拟6部汽车
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "\t抢到车位");
try {
TimeUnit.SECONDS.sleep(3);//停车3s
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t停车3s后离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, "Car " + i).start();
}
}
}
复制代码
首先是一个队列,而一个阻塞队列在数据结构中所起的做用大体以下图
graph LR
Thread1-- put -->id1["阻塞队列"]
subgraph BlockingQueue
id1
end
id1-- Take -->Thread2
蛋糕师父--"放(柜满阻塞)"-->id2[蛋糕展现柜]
subgraph 柜
id2
end
id2--"取(柜空阻塞)"-->顾客
复制代码
线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素
当阻塞队列是空时,从队列中获取元素的操做会被阻塞
当阻塞队列是满时,从队列中添加元素的操做会被阻塞
试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其余的线程往空的队列插入新的元素。
试图往已满的阻塞队列中添加新元素的线程一样会被阻塞,直到其余的线程从列中移除一个或者多个元素或者彻底清空队列后使队列从新变得空闲起来并后续新增
在多线程领域:所谓阻塞,在某些状况下会挂起线程,一旦知足条件,被挂起的线程又会自动被唤醒
为何须要BlockingQueue
好处时咱们不须要关心何时须要阻塞线程,何时须要唤醒线程,由于这一切BlockingQueue都给你一手包办了
在concurrent包发布之前,在多线程环境下,咱们每一个程序员都必须本身控制这些细节,尤为还要兼顾效率和线程安全,而这回给咱们程序带来不小的复杂度
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take | poll(time,unit) |
检查 | element() | peek() | 不可用 | 不可用 |
方法类型 | status |
---|---|
抛出异常 | 当阻塞队列满时,再往队列中add会抛IllegalStateException: Queue full 当阻塞队列空时,在网队列里remove会抛 NoSuchElementException |
特殊值 | 插入方法,成功true失败false 移除方法,成功返回出队列的元素,队列里没有就返回null |
一直阻塞 | 当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞线程知道put数据或响应中断退出 当阻塞队列空时,消费者线程试图从队列take元素,队列会一直阻塞消费者线程知道队列可用。 |
超时退出 | 当阻塞队列满时,队列会阻塞生产者线程必定时间,超过限时后生产者线程会退出 |
种类分析
Integer.MAX_VALUE
)阻塞队列。SychronousQueue
理论:SynchronousQueue没有容量,与其余BlockingQueue不一样,SychronousQueue是一个不存储元素的BlockingQueue,每个put操做必需要等待一个take操做,不然不能继续添加元素,反之亦然。
代码示例
package com.jian8.juc.queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/** * ArrayBlockingQueue是一个基于数组结构的有界阻塞队列,此队列按FIFO原则对元素进行排序 * LinkedBlockingQueue是一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量一般要高于ArrayBlockingQueue * SynchronousQueue是一个不存储元素的阻塞队列,灭个插入操做必须等到另外一个线程调用移除操做,不然插入操做一直处于阻塞状态,吞吐量一般要高于 * 1.队列 * 2.阻塞队列 * 2.1 阻塞队列有没有好的一面 * 2.2 不得不阻塞,你如何管理 */
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + "\t put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + "\t put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "AAA").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\ttake " + blockingQueue.take());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\ttake " + blockingQueue.take());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\ttake " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "BBB").start();
}
}
复制代码
生产者消费者模式
传统版
package com.jian8.juc.queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** * 一个初始值为零的变量,两个线程对其交替操做,一个加1一个减1,来5轮 * 1. 线程 操做 资源类 * 2. 判断 干活 通知 * 3. 防止虚假唤起机制 */
public class ProdConsumer_TraditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}, "ProductorA " + i).start();
}
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}, "ConsumerA " + i).start();
}
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}, "ProductorB " + i).start();
}
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}, "ConsumerB " + i).start();
}
}
}
class ShareData {//资源类
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws Exception {
lock.lock();
try {
//1.判断
while (number != 0) {
//等待不能生产
condition.await();
}
//2.干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3.通知
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() throws Exception {
lock.lock();
try {
//1.判断
while (number == 0) {
//等待不能消费
condition.await();
}
//2.消费
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3.通知
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
复制代码
阻塞队列版
package com.jian8.juc.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ProdConsumer_BlockQueueDemo {
public static void main(String[] args) {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t生产线程启动");
try {
myResource.myProd();
} catch (Exception e) {
e.printStackTrace();
}
}, "Prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t消费线程启动");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "Consumer").start();
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("5s后main叫停,线程结束");
try {
myResource.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
class MyResource {
private volatile boolean flag = true;//默认开启,进行生产+消费
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void myProd() throws Exception {
String data = null;
boolean retValue;
while (flag) {
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t插入队列" + data + "成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t插入队列" + data + "失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "\t大老板叫停了,flag=false,生产结束");
}
public void myConsumer() throws Exception {
String result = null;
while (flag) {
result = blockingQueue.poll(2, TimeUnit.SECONDS);
if (null == result || result.equalsIgnoreCase("")) {
flag = false;
System.out.println(Thread.currentThread().getName() + "\t超过2s没有取到蛋糕,消费退出");
System.out.println();
return;
}
System.out.println(Thread.currentThread().getName() + "\t消费队列" + result + "成功");
}
}
public void stop() throws Exception {
flag = false;
}
}
复制代码
线程池
消息中间件
区别
原始构成
synchronized时关键字属于jvm
monitorenter,底层是经过monitor对象来完成,其实wait/notify等方法也依赖于monitor对象只有在同步或方法中才能调wait/notify等方法
monitorexit
Lock是具体类,是api层面的锁(java.util.)
使用方法
等待是否可中断
加锁是否公平
锁绑定多个条件Condition
package com.jian8.juc.lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** * synchronized和lock区别 * <p===lock可绑定多个条件=== * 对线程之间按顺序调用,实现A>B>C三个线程启动,要求以下: * AA打印5次,BB打印10次,CC打印15次 * 紧接着 * AA打印5次,BB打印10次,CC打印15次 * 。。。。 * 来十轮 */
public class SyncAndReentrantLockDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
shareData.print5();
}
}, "A").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
shareData.print10();
}
}, "B").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
shareData.print15();
}
}, "C").start();
}
}
class ShareData {
private int number = 1;//A:1 B:2 C:3
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void print5() {
lock.lock();
try {
//判断
while (number != 1) {
condition1.await();
}
//干活
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
//通知
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print10() {
lock.lock();
try {
//判断
while (number != 2) {
condition2.await();
}
//干活
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
//通知
number = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print15() {
lock.lock();
try {
//判断
while (number != 3) {
condition3.await();
}
//干活
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
//通知
number = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
复制代码
package com.jian8.juc.thread;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
/** * 多线程中,第三种得到多线程的方式 */
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//FutureTask(Callable<V> callable)
FutureTask<Integer> futureTask = new FutureTask<Integer>(new MyThread2());
new Thread(futureTask, "AAA").start();
// new Thread(futureTask, "BBB").start();//复用,直接取值,不要重启两个线程
int a = 100;
int b = 0;
//b = futureTask.get();//要求得到Callable线程的计算结果,若是没有计算完成就要去强求,会致使堵塞,直到计算完成
while (!futureTask.isDone()) {//当futureTask完成后取值
b = futureTask.get();
}
System.out.println("*******Result" + (a + b));
}
}
class MyThread implements Runnable {
@Override
public void run() {
}
}
class MyThread2 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("Callable come in");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1024;
}
}
复制代码
线程池作的工做主要是控制运行的线程的数量,处理过程当中将任务放入队列,而后在线程建立后启动给这些任务,若是线程数量超过了最大数量,超出数量的线程排队等候,等其余线程执行完毕,再从队列中取出任务来执行
主要特色
线程复用、控制最大并发数、管理线程
架构说明
Java中的线程池是经过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor
graph BT
类-Executors
类-ScheduledThreadPoolExecutor-->类-ThreadPoolExecutor
类-ThreadPoolExecutor-->类-AbstractExecutorService
类-AbstractExecutorService-.->接口-ExecutorService
类-ScheduledThreadPoolExecutor-.->接口-ScheduledExecutorService
接口-ScheduledExecutorService-->接口-ExecutorService
接口-ExecutorService-->接口-Executor
复制代码
编码实现
实现有五种,Executors.newScheduledThreadPool()是带时间调度的,java8新推出Executors.newWorkStealingPool(int),使用目前机器上可用的处理器做为他的并行级别
重点有三种
Executors.newFixedThreadPool(int)
执行长期的任务,性能好不少
建立一个定长线程池,可控制线程最大并发数,超出的线程回在队列中等待。
newFixedThreadPool建立的线程池corePoolSize和maximumPoolSize值是想到等的,他使用的是LinkedBlockingQueue
Executors.newSingleThreadExecutor()
一个任务一个任务执行的场景
建立一个单线程话的线程池,他只会用惟一的工做线程来执行任务,保证全部任务按照指定顺序执行
newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,使用LinkedBlockingQueue
Executors.newCachedThreadPool()
执行不少短时间异步的小程序或负载较轻的服务器
建立一个可缓存线程池,若是线程池长度超过处理须要,可灵活回收空闲县城,若无可回收,则新建线程。
newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就建立线程运行,当县城空闲超过60s,就销毁线程
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
复制代码
graph LR
subgraph 使用者
main(提交任务)
end
main-->core{核心线程是否已满}
subgraph 线程池
core--是-->queue{队列是否已满}
queue--是-->pool{线程池是否已满}
pool--是-->reject["按照拒绝策略处理<br>没法执行的任务"]
core--否-->id[建立线程执行任务]
queue--否-->任务入队列等待
pool--否-->建立线程执行任务
end
复制代码
流程
在建立了线程池以后,等待提交过来的任务请求。
当调用execute()方法添加一个请求任务时,线程池会作出以下判断
2.1 若是正在运行的线程数量小于corePoolSize,那么立刻建立线程运行这个任务;
2.2 若是正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
2.3若是此时队列满了且运行的线程数小于maximumPoolSize,那么仍是要建立非核心线程马上运行此任务
2.4若是队列满了且正在运行的线程数量大于或等于maxmumPoolSize,那么启动饱和拒绝策略来执行
当一个线程完成任务时,他会从队列中却下一个任务来执行
当一个线程无事可作超过必定的时间(keepAliveTime)时,线程池会判断:
若是当前运行的线程数大于corePoolSize,那么这个线程会被停掉;因此线程池的全部任务完成后他最大会收缩到corePoolSize的大小
什么是线程策略
等待队列也已经排满了,再也塞不下新任务了,同时线程池中的max线程也达到了,没法继续为新任务服务。这时咱们就须要拒绝策略机制合理的处理这个问题。
JDK内置的拒绝策略
AbortPolicy(默认)
直接抛出RejectedExecutionException异常阻止系统正常运行
CallerRunsPolicy
”调用者运行“一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而下降新任务的流量
DiscardOldestPolicy
抛弃队列中等待最久的任务,而后把当前任务加入队列中尝试再次提交当前任务
DiscardPolicy
直接丢弃任务,不予任何处理也不抛异常。若是容许任务丢失,这是最好的一种方案
均实现了RejectedExecutionHandler接口
一个都不用,咱们生产上只能使用自定义的!!!!
为何?
线程池不容许使用Executors建立,试试经过ThreadPoolExecutor的方式,指定任务队列的大小,规避资源耗尽风险
FixedThreadPool和SingleThreadPool容许请求队列长度为Integer.MAX_VALUE,可能会堆积大量请求;;CachedThreadPool和ScheduledThreadPool容许的建立线程数量为Integer.MAX_VALUE,可能会建立大量线程,致使OOM
package com.jian8.juc.thread;
import java.util.concurrent.*;
/** * 第四种得到java多线程的方式--线程池 */
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(3, 5, 1L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
//new ThreadPoolExecutor.AbortPolicy();
//new ThreadPoolExecutor.CallerRunsPolicy();
//new ThreadPoolExecutor.DiscardOldestPolicy();
//new ThreadPoolExecutor.DiscardPolicy();
try {
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
复制代码
CPU密集型
CPU密集的意思是该任务须要大量的运算,而没有阻塞,CPU一直全速运行
CPU密集任务只有在真正多核CPU上才可能获得加速(经过多线程)
而在单核CPU上,不管你开几个模拟的多线程该任务都不可能获得加速,由于CPU总的运算能力就那些
CPU密集型任务配置尽量少的线程数量:
通常公式:CPU核数+1个线程的线程池
IO密集型
因为IO密集型任务线程并非一直在执行任务,则应配置经可能多的线程,如CPU核数 * 2
IO密集型,即该任务须要大量的IO,即大量的阻塞。
在单线程上运行IO密集型的任务会致使浪费大量的 CPU运算能力浪费在等待。
因此在IO密集型任务中使用多线程能够大大的加速程序运行,即便在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。
IO密集型时,大部分线程都阻塞,故须要多配置线程数:
参考公式:CPU核数/(1-阻塞系数) 阻塞系数在0.8~0.9之间
八核CPU:8/(1-0,9)=80
死锁是什么
死锁是指两个或两个以上的进程在执行过程当中,因争夺资源而形成的一种互相等待的现象,若无外力干涉那他们都将没法推动下去,若是系统资源充足,进程的资源请求都可以获得知足,死锁出现的可能性就很低,不然就会因争夺有限的资源而陷入死锁。
graph TD
threadA(线程A)
threadB(线程B)
lockA((锁A))
lockB((锁B))
threadA--持有-->lockA
threadB--试图获取-->lockA
threadB--持有-->lockB
threadA--试图获取-->lockB
复制代码
产生死锁的主要缘由
死锁示例
package com.jian8.juc.thread;
import java.util.concurrent.TimeUnit;
/** * 死锁是指两个或两个以上的进程在执行过程当中,因争夺资源而形成的一种互相等待的现象,若无外力干涉那他们都将没法推动下去, */
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new HoldThread(lockA,lockB),"Thread-AAA").start();
new Thread(new HoldThread(lockB,lockA),"Thread-BBB").start();
}
}
class HoldThread implements Runnable {
private String lockA;
private String lockB;
public HoldThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + "\t本身持有:" + lockA + "\t尝试得到:" + lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + "\t本身持有:" + lockB + "\t尝试得到:" + lockA);
}
}
}
}
复制代码
解决
jps -l
定位进程号jstack 进程号
找到死锁查看jconsole
工具