在这篇文章中,咱们主要讨论一下死锁及其解决办法。java
在上一篇文章中,咱们讨论了如何使用一个互斥锁去保护多个资源,以银行帐户转帐为例,当时给出的解决方法是基于Class对象建立互斥锁。后端
这样虽然解决了同步的问题,可是能在现实中使用吗?答案是不能够,尤为是在高并发的状况下,缘由是咱们使用的互斥锁的范围太大,以转帐为例,咱们的作法会锁定整个帐户Class对象,这样会致使转帐操做只能串行进行,可是在实际场景中,大量的转帐操做业务中的双方是不相同的,直接在Class对象级别上加锁是不能接受的。并发
那若是在对象实例级别上加锁,使用细粒度锁,会有什么问题?可能会发生死锁。app
咱们接下来看一下形成死锁的缘由和可能的解决方案。高并发
什么是死锁?性能
死锁是指一组互相竞争资源的线程因互相等待,致使“永久”阻塞的现象。测试
通常来讲,当咱们使用细粒度锁时,它在提高性能的同时,也可能会致使死锁。this
咱们仍是以银行转帐为例,来看一下死锁是如何发生的。线程
首先,咱们先定义个BankAccount对象,来存储基本信息,代码以下。code
public class BankAccount { private int id; private double balance; private String password; public int getId() { return id; } public void setId(int id) { this.id = id; } public double getBalance() { return balance; } public void setBalance(double balance) { this.balance = balance; } }
接下来,咱们使用细粒度锁来尝试完成转帐操做,代码以下。
public class BankTransferDemo { public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) { synchronized(sourceAccount) { synchronized(targetAccount) { if (sourceAccount.getBalance() > amount) { System.out.println("Start transfer."); System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); sourceAccount.setBalance(sourceAccount.getBalance() - amount); targetAccount.setBalance(targetAccount.getBalance() + amount); System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); } } } } }
咱们用下面的代码来作简单测试。
public static void main(String[] args) throws InterruptedException { BankAccount sourceAccount = new BankAccount(); sourceAccount.setId(1); sourceAccount.setBalance(50000); BankAccount targetAccount = new BankAccount(); targetAccount.setId(2); targetAccount.setBalance(20000); BankTransferDemo obj = new BankTransferDemo(); Thread t1 = new Thread(() ->{ for (int i = 0; i < 10000; i++) { obj.transfer(sourceAccount, targetAccount, 1); } }); Thread t2 = new Thread(() ->{ for (int i = 0; i < 10000; i++) { obj.transfer(targetAccount, sourceAccount, 1); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Finished."); }
测试代码中包含了2个线程,其中t1线程循环从sourceAccount向targetAccount转帐,而t2线程会循环从targetAccount向sourceAccount转帐。
从运行结果来看,t1线程中的循环在运行600次左右时,t2线程也建立好,开始循环转帐了,这时就会发生死锁,致使t1线程和t2线程都没法继续执行。
咱们能够用下面的资源分配图来更直观的描述死锁。
并发程序一旦死锁,通常没有特别好的办法,不少时候咱们只能重启应用,所以,解决死锁问题的最好办法是规避死锁。
咱们先来看一下死锁发生的条件,一个叫Coffman的牛人,于1971年在ACM Computing Surveys发表了一篇名为System Deadlocks的文章,他总结了只有如下四个条件所有知足的状况下,才会发生死锁:
经过上述描述,咱们可以推导出,只要破坏上面其中一个条件,就能够避免死锁的发生。
可是第一个条件互斥,是不能够被破坏的,不然咱们就没有用锁的必要了,那么咱们来看如何破坏其余三个条件。
若是要破坏占用且等待条件,咱们能够尝试一次性申请所有资源,这样就不须要等待了。
在实现过程当中,咱们须要建立一个新的角色,负责同时申请和同时释放所有资源,咱们能够将其称为Allocator。
咱们来看一下具体的代码实现。
public class Allocator { private volatile static Allocator instance; private Allocator() {} public static Allocator getInstance() { if (instance == null) { synchronized(Allocator.class) { if (instance == null) { instance = new Allocator(); } } } return instance; } private Set<Object> lockObjs = new HashSet<Object>(); public synchronized boolean apply(Object... objs) { for (Object obj : objs) { if (lockObjs.contains(obj)) { return false; } } for (Object obj : objs) { lockObjs.add(obj); } return true; } public synchronized void free(Object... objs) { for (Object obj : objs) { if (lockObjs.contains(obj)) { lockObjs.remove(obj); } } } }
Allocator是一个单例模式,它会使用一个Set对象来保存全部须要处理的资源,而后使用apply()和free()来同时锁定或者释放全部资源,它们会接收不固定参数。
咱们来看一下新的transfer()方法应该怎么写。
public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) { Allocator allocator = Allocator.getInstance(); while(!allocator.apply(sourceAccount, targetAccount)); try { synchronized(sourceAccount) { synchronized(targetAccount) { if (sourceAccount.getBalance() > amount) { System.out.println("Start transfer."); System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); sourceAccount.setBalance(sourceAccount.getBalance() - amount); targetAccount.setBalance(targetAccount.getBalance() + amount); System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); } } } } finally { allocator.free(sourceAccount, targetAccount); } }
咱们能够看到,transfer()方法中,首先获取Allocator实例,而后调用apply(),传入sourceAccount和targetAccount实例,请注意这里使用了while循环,即直到apply()返回true,才会退出循环,此时,Allocator已经锁定了sourceAccount和targetAccount,接下来,咱们使用synchronized关键字来锁定sourceAccount和targetAccount,而后执行转帐的业务逻辑。这里并非必需要用synchronized,可是这样作能够避免其余操做来影响转帐操做,例如若是转帐的过程当中对sourceAccount实例进行取钱操做,若是不用synchronized,就有可能引起并发问题。
下面是测试代码。
public static void main(String[] args) throws InterruptedException { BankAccount sourceAccount = new BankAccount(); sourceAccount.setId(1); sourceAccount.setBalance(50000); BankAccount targetAccount = new BankAccount(); targetAccount.setId(2); targetAccount.setBalance(20000); BankTransferDemo obj = new BankTransferDemo(); Thread t1 = new Thread(() ->{ for (int i = 0; i < 10000; i++) { obj.transfer(sourceAccount, targetAccount, 1); } }); Thread t2 = new Thread(() ->{ for (int i = 0; i < 10000; i++) { obj.transfer(targetAccount, sourceAccount, 1); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Finished."); }
程序是能够正常执行的,结果和咱们预期一致。
在这里,咱们须要保证锁对象的不可变性,对于BankAccount对象来讲,id属性能够看作是其主键,id相同的BankAccount实例,从业务角度来讲,指向的都是同一个帐户,可是对于锁对象来讲,id相同的不一样实例,会产生不一样的锁,从而引起并发问题。
咱们来看下面修改后的测试代码。
public static void main(String[] args) throws InterruptedException { BankTransferDemo obj = new BankTransferDemo(); Thread t1 = new Thread(() ->{ for (int i = 0; i < 10000; i++) { // 这里应该从后端获取帐户实例,此处只作演示。 BankAccount sourceAccount = new BankAccount(); sourceAccount.setId(1); sourceAccount.setBalance(50000); BankAccount targetAccount = new BankAccount(); targetAccount.setId(2); targetAccount.setBalance(20000); obj.transfer(sourceAccount, targetAccount, 1); } }); Thread t2 = new Thread(() ->{ for (int i = 0; i < 10000; i++) { // 这里应该从后端获取帐户实例,此处只作演示。 BankAccount sourceAccount = new BankAccount(); sourceAccount.setId(1); sourceAccount.setBalance(50000); BankAccount targetAccount = new BankAccount(); targetAccount.setId(2); targetAccount.setBalance(20000); obj.transfer(targetAccount, sourceAccount, 1); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Finished."); }
上述代码中,每次转帐都建立新的BankAccount实例,而后将其传入Allocator,这样作,是不可以正常处理的,由于每次使用的互斥锁都做用在不一样的实例上,这一点,须要特别注意。
破坏不可抢占条件很简单,解决的关键在于可以主动释放它占有的资源,可是synchronized是不能作到这一点的。
synchronized申请资源的时候,若是申请失败,线程会直接进入阻塞状态,什么都不能作,已经锁定的资源也没法释放。
咱们可使用java.util.concurrent包中的Lock对象来实现这一点,相关代码以下。
private Lock lock = new ReentrantLock(); public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) { try { lock.lock(); if (sourceAccount.getBalance() > amount) { System.out.println("Start transfer."); System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); sourceAccount.setBalance(sourceAccount.getBalance() - amount); targetAccount.setBalance(targetAccount.getBalance() + amount); System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); } } finally { lock.unlock(); } }
破坏循环条件,须要对资源进行排序,而后按序申请资源。
咱们来看下面的代码。
public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) { BankAccount left = sourceAccount; BankAccount right = targetAccount; if (sourceAccount.getId() > targetAccount.getId()) { left = targetAccount; right = sourceAccount; } synchronized(left) { synchronized(right) { if (sourceAccount.getBalance() > amount) { System.out.println("Start transfer."); System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); sourceAccount.setBalance(sourceAccount.getBalance() - amount); targetAccount.setBalance(targetAccount.getBalance() + amount); System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); } } } }
在这里,咱们假设BankAccount中的id
是主键,咱们按照id
对sourceAccount和targetAccount进行排序,以后按照id
从小到大申请资源,这样就不会有死锁发生了。
咱们在解决并发问题的时候,可能会有多种方式,咱们须要评估一下各个解决方案,从中选择一个成本最低的方案。
对于咱们一直谈论的转帐示例,破坏循环条件多是一个比较好的解决方法。
咱们上面在破坏占用且等待条件时,使用了以下的死循环:
while(!allocator.apply(sourceAccount, targetAccount));
在并发量不高的状况下,这样写没有问题,可是在高并发的状况下,这样写可能须要循环太屡次才能拿到锁,太消耗CPU了,属于蛮干型。
在这种状况下,一种合理的方案是:若是线程要求的条件不知足,那么线程阻塞本身,进入等待状态,当线程要求的条件知足后,通知等待的线程从新执行,这里线程阻塞就避免了循环消耗CPU的问题。
这就是咱们要讨论的等待-通知机制。
Java中的等待-通知机制流程是怎样的?
线程首先获取互斥锁,当线程要求的条件不知足时,释放互斥锁,进入等待状态;当要求的条件知足时,通知等待的线程,从新获取互斥锁。
Java使用synchronized关键字配合wait()、notify()、notifyAll()三个方法实现等待-通知机制。
在并发程序中,当一个线程进入临界区后,因为某些条件没有知足,须要进入等待状态,Java对象的wait()方法可以实现这一点。当线程要求的条件知足时,Java对象的notify()和notifyAll()方法就能够通知等待的线程,它会告诉线程,你须要的条件曾经知足过,之因此说曾经,是由于notify()只能保证在通知的那一时刻,条件是知足的,而被通知线程的执行时刻和通知时刻通常不会重合,因此在线程开始执行的时候,可能条件又不知足了。
另外须要注意,被通知的线程从新执行时,还须要获取互斥锁,由于以前在调用wait()方法时,互斥锁已经被释放了。
wait()、notify()和notifyAll()三个方法可以被调用的前提是已经获取了响应的互斥锁,因此这三个方法都是在synchronized{}内部被调用的。
下面咱们来看一下修改后的Allocator,其中apply()和free()方法的代码以下。
public synchronized void apply(Object... objs) { for (Object obj : objs) { while (lockObjs.contains(obj)) { try { this.wait(); } catch (InterruptedException e) { System.out.println(e.getMessage()); } } } for (Object obj : objs) { lockObjs.add(obj); } } public synchronized void free(Object... objs) { for (Object obj : objs) { if (lockObjs.contains(obj)) { lockObjs.remove(obj); } } this.notifyAll(); }
对应的transfer()方法的代码以下。
public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) { Allocator allocator = Allocator.getInstance(); allocator.apply(sourceAccount, targetAccount); try { synchronized(sourceAccount) { synchronized(targetAccount) { if (sourceAccount.getBalance() > amount) { System.out.println("Start transfer."); System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); sourceAccount.setBalance(sourceAccount.getBalance() - amount); targetAccount.setBalance(targetAccount.getBalance() + amount); System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); } } } } finally { allocator.free(sourceAccount, targetAccount); } }
运行结果和咱们指望是一致的。
在上述代码中,咱们能够发现,apply()方法中的判断条件以前是if,如今改为了while, while (lockObjs.contains(obj))
,这样作能够解决条件曾经知足的问题。
由于当wait()返回时,有可能条件已经发生了变化,曾经条件知足,可是如今已经不知足了,因此要从新检验条件是否知足。
这是一种范式,是一种经典的作法。
notify()和notifyAll()有什么区别?
notify()会随机的通知等待队列中的一个线程, 而notifyAll()会通知等待队列中的全部线程。
咱们尽可能使用notifyAll()方法,由于notify()可能会致使某些线程永远不会被通知到。
假设咱们有一个实例,它有资源 A、B、C、D,咱们使用实例对象来建立互斥锁。
wait()方法与sleep()方法的不一样之处在于,wait()方法会释放对象的“锁标志”。当调用某一对象的wait()方法后,会使当前线程暂停执行,并将当前线程放入对象等待池中,直到调用了notify()方法后,将从对象等待池中移出任意一个线程并放入锁标志等待池中,只有锁标志等待池中的线程能够获取锁标志,它们随时准备争夺锁的拥有权。当调用了某个对象的notifyAll()方法,会将对象等待池中的全部线程都移动到该对象的锁标志等待池。
sleep()方法须要指定等待的时间,它可让当前正在执行的线程在指定的时间内暂停执行,进入阻塞状态,该方法既可让其余同优先级或者高优先级的线程获得执行的机会,也可让低优先级的线程获得执行机会。可是sleep()方法不会释放“锁标志”,也就是说若是有synchronized同步块,其余线程仍然不能访问共享数据。
总结一下,wait()和sleep()区别以下。
wait()和sleep()都会让渡CPU执行时间,等待再次调度!