Java并发编程实战(4)- 死锁

在这篇文章中,咱们主要讨论一下死锁及其解决办法。java

概述

在上一篇文章中,咱们讨论了如何使用一个互斥锁去保护多个资源,以银行帐户转帐为例,当时给出的解决方法是基于Class对象建立互斥锁。后端

这样虽然解决了同步的问题,可是能在现实中使用吗?答案是不能够,尤为是在高并发的状况下,缘由是咱们使用的互斥锁的范围太大,以转帐为例,咱们的作法会锁定整个帐户Class对象,这样会致使转帐操做只能串行进行,可是在实际场景中,大量的转帐操做业务中的双方是不相同的,直接在Class对象级别上加锁是不能接受的。并发

那若是在对象实例级别上加锁,使用细粒度锁,会有什么问题?可能会发生死锁。app

咱们接下来看一下形成死锁的缘由和可能的解决方案。高并发

死锁案例

什么是死锁?性能

死锁是指一组互相竞争资源的线程因互相等待,致使“永久”阻塞的现象。测试

通常来讲,当咱们使用细粒度锁时,它在提高性能的同时,也可能会致使死锁。this

咱们仍是以银行转帐为例,来看一下死锁是如何发生的。线程

首先,咱们先定义个BankAccount对象,来存储基本信息,代码以下。code

public class BankAccount {
	private int id;
	private double balance;
	private String password;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public double getBalance() {
		return balance;
	}
	public void setBalance(double balance) {
		this.balance = balance;
	}
}

接下来,咱们使用细粒度锁来尝试完成转帐操做,代码以下。

public class BankTransferDemo {
	
	public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
		synchronized(sourceAccount) {
			synchronized(targetAccount) {
				if (sourceAccount.getBalance() > amount) {
					System.out.println("Start transfer.");
					System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					sourceAccount.setBalance(sourceAccount.getBalance() - amount);
					targetAccount.setBalance(targetAccount.getBalance() + amount);
					System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
				}
			}
		}
	}
}

咱们用下面的代码来作简单测试。

public static void main(String[] args) throws InterruptedException {
		BankAccount sourceAccount = new BankAccount();
		sourceAccount.setId(1);
		sourceAccount.setBalance(50000);
		
		BankAccount targetAccount = new BankAccount();
		targetAccount.setId(2);
		targetAccount.setBalance(20000);
		
		BankTransferDemo obj = new BankTransferDemo();
		
		Thread t1 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(sourceAccount, targetAccount, 1);
			}
		});
		
		Thread t2 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(targetAccount, sourceAccount, 1);
			}
		});
		
		t1.start();
		t2.start();
		
		t1.join();
		t2.join();
		
		System.out.println("Finished.");
	}

测试代码中包含了2个线程,其中t1线程循环从sourceAccount向targetAccount转帐,而t2线程会循环从targetAccount向sourceAccount转帐。

从运行结果来看,t1线程中的循环在运行600次左右时,t2线程也建立好,开始循环转帐了,这时就会发生死锁,致使t1线程和t2线程都没法继续执行。

咱们能够用下面的资源分配图来更直观的描述死锁。

死锁的缘由和预防

并发程序一旦死锁,通常没有特别好的办法,不少时候咱们只能重启应用,所以,解决死锁问题的最好办法是规避死锁。

咱们先来看一下死锁发生的条件,一个叫Coffman的牛人,于1971年在ACM Computing Surveys发表了一篇名为System Deadlocks的文章,他总结了只有如下四个条件所有知足的状况下,才会发生死锁:

  • 互斥,共享资源X和Y只能被一个线程占用。
  • 占有且等待,线程t1已经取得共享资源X,在等待共享资源Y的时候,不释放共享资源X。
  • 不可抢占,其余线程不能强行抢占线程t1占有的资源。
  • 循环等待,线程t1等待线程t2占有的资源,线程t2等待线程t1占有的资源,就是循环等待。

经过上述描述,咱们可以推导出,只要破坏上面其中一个条件,就能够避免死锁的发生。

可是第一个条件互斥,是不能够被破坏的,不然咱们就没有用锁的必要了,那么咱们来看如何破坏其余三个条件。

破坏占用且等待条件

若是要破坏占用且等待条件,咱们能够尝试一次性申请所有资源,这样就不须要等待了。

在实现过程当中,咱们须要建立一个新的角色,负责同时申请和同时释放所有资源,咱们能够将其称为Allocator。

咱们来看一下具体的代码实现。

public class Allocator {
	
	private volatile static Allocator instance;
	
	private Allocator() {}
	
	public static Allocator getInstance() {
		if (instance == null) {
			synchronized(Allocator.class) {
				if (instance == null) {
					instance = new Allocator();
				}
			}
		}
		
		return instance;
	}
	
	private Set<Object> lockObjs = new HashSet<Object>();
	
	public synchronized boolean apply(Object... objs) {
		for (Object obj : objs) {
			if (lockObjs.contains(obj)) {
				return false;
			}
		}
		for (Object obj : objs) {
			lockObjs.add(obj);
		}
		
		return true;
	}
	
	public synchronized void free(Object... objs) {
		for (Object obj : objs) {
			if (lockObjs.contains(obj)) {
				lockObjs.remove(obj);
			}
		}
	}
}

Allocator是一个单例模式,它会使用一个Set对象来保存全部须要处理的资源,而后使用apply()和free()来同时锁定或者释放全部资源,它们会接收不固定参数。

咱们来看一下新的transfer()方法应该怎么写。

public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
		Allocator allocator = Allocator.getInstance();
		while(!allocator.apply(sourceAccount, targetAccount));
		try {
			synchronized(sourceAccount) {
				synchronized(targetAccount) {
					if (sourceAccount.getBalance() > amount) {
						System.out.println("Start transfer.");
						System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
						sourceAccount.setBalance(sourceAccount.getBalance() - amount);
						targetAccount.setBalance(targetAccount.getBalance() + amount);
						System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					}
				}
			}
		}
		finally {
			allocator.free(sourceAccount, targetAccount);
		}
	}

咱们能够看到,transfer()方法中,首先获取Allocator实例,而后调用apply(),传入sourceAccount和targetAccount实例,请注意这里使用了while循环,即直到apply()返回true,才会退出循环,此时,Allocator已经锁定了sourceAccount和targetAccount,接下来,咱们使用synchronized关键字来锁定sourceAccount和targetAccount,而后执行转帐的业务逻辑。这里并非必需要用synchronized,可是这样作能够避免其余操做来影响转帐操做,例如若是转帐的过程当中对sourceAccount实例进行取钱操做,若是不用synchronized,就有可能引起并发问题。

下面是测试代码。

public static void main(String[] args) throws InterruptedException {
		BankAccount sourceAccount = new BankAccount();
		sourceAccount.setId(1);
		sourceAccount.setBalance(50000);
		
		BankAccount targetAccount = new BankAccount();
		targetAccount.setId(2);
		targetAccount.setBalance(20000);
		
		BankTransferDemo obj = new BankTransferDemo();
		
		Thread t1 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(sourceAccount, targetAccount, 1);
			}
		});
		
		Thread t2 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(targetAccount, sourceAccount, 1);
			}
		});
		
		t1.start();
		t2.start();
		
		t1.join();
		t2.join();
		
		System.out.println("Finished.");
	}

程序是能够正常执行的,结果和咱们预期一致。

在这里,咱们须要保证锁对象的不可变性,对于BankAccount对象来讲,id属性能够看作是其主键,id相同的BankAccount实例,从业务角度来讲,指向的都是同一个帐户,可是对于锁对象来讲,id相同的不一样实例,会产生不一样的锁,从而引起并发问题。

咱们来看下面修改后的测试代码。

public static void main(String[] args) throws InterruptedException {
		
		
		BankTransferDemo obj = new BankTransferDemo();
		
		Thread t1 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				// 这里应该从后端获取帐户实例,此处只作演示。
				BankAccount sourceAccount = new BankAccount();
				sourceAccount.setId(1);
				sourceAccount.setBalance(50000);
				
				BankAccount targetAccount = new BankAccount();
				targetAccount.setId(2);
				targetAccount.setBalance(20000);
				obj.transfer(sourceAccount, targetAccount, 1);
			}
		});
		
		Thread t2 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				// 这里应该从后端获取帐户实例,此处只作演示。
				BankAccount sourceAccount = new BankAccount();
				sourceAccount.setId(1);
				sourceAccount.setBalance(50000);
				
				BankAccount targetAccount = new BankAccount();
				targetAccount.setId(2);
				targetAccount.setBalance(20000);
				obj.transfer(targetAccount, sourceAccount, 1);
			}
		});
		
		t1.start();
		t2.start();
		
		t1.join();
		t2.join();
		
		System.out.println("Finished.");
	}

上述代码中,每次转帐都建立新的BankAccount实例,而后将其传入Allocator,这样作,是不可以正常处理的,由于每次使用的互斥锁都做用在不一样的实例上,这一点,须要特别注意。

破坏不可抢占条件

破坏不可抢占条件很简单,解决的关键在于可以主动释放它占有的资源,可是synchronized是不能作到这一点的。

synchronized申请资源的时候,若是申请失败,线程会直接进入阻塞状态,什么都不能作,已经锁定的资源也没法释放。

咱们可使用java.util.concurrent包中的Lock对象来实现这一点,相关代码以下。

private Lock lock = new ReentrantLock();
	
    public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
        try {
            lock.lock();
            if (sourceAccount.getBalance() > amount) {
                System.out.println("Start transfer.");
                System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
                sourceAccount.setBalance(sourceAccount.getBalance() - amount);
                targetAccount.setBalance(targetAccount.getBalance() + amount);
                System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
            }
        }
        finally {
            lock.unlock();
        }
    }

破坏循环条件

破坏循环条件,须要对资源进行排序,而后按序申请资源。

咱们来看下面的代码。

public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
		BankAccount left = sourceAccount;
		BankAccount right = targetAccount;
		if (sourceAccount.getId() > targetAccount.getId()) {
			left = targetAccount;
			right = sourceAccount;
		}
		synchronized(left) {
			synchronized(right) {
				if (sourceAccount.getBalance() > amount) {
					System.out.println("Start transfer.");
					System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					sourceAccount.setBalance(sourceAccount.getBalance() - amount);
					targetAccount.setBalance(targetAccount.getBalance() + amount);
					System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
				}
			}
		}
	}

在这里,咱们假设BankAccount中的id是主键,咱们按照id对sourceAccount和targetAccount进行排序,以后按照id从小到大申请资源,这样就不会有死锁发生了。

咱们在解决并发问题的时候,可能会有多种方式,咱们须要评估一下各个解决方案,从中选择一个成本最低的方案。

对于咱们一直谈论的转帐示例,破坏循环条件多是一个比较好的解决方法。

使用等待-通知机制

咱们上面在破坏占用且等待条件时,使用了以下的死循环:

while(!allocator.apply(sourceAccount, targetAccount));

在并发量不高的状况下,这样写没有问题,可是在高并发的状况下,这样写可能须要循环太屡次才能拿到锁,太消耗CPU了,属于蛮干型。

在这种状况下,一种合理的方案是:若是线程要求的条件不知足,那么线程阻塞本身,进入等待状态,当线程要求的条件知足后,通知等待的线程从新执行,这里线程阻塞就避免了循环消耗CPU的问题。

这就是咱们要讨论的等待-通知机制。

Java中的等待-通知机制

Java中的等待-通知机制流程是怎样的?

线程首先获取互斥锁,当线程要求的条件不知足时,释放互斥锁,进入等待状态;当要求的条件知足时,通知等待的线程,从新获取互斥锁。

Java使用synchronized关键字配合wait()、notify()、notifyAll()三个方法实现等待-通知机制。

在并发程序中,当一个线程进入临界区后,因为某些条件没有知足,须要进入等待状态,Java对象的wait()方法可以实现这一点。当线程要求的条件知足时,Java对象的notify()和notifyAll()方法就能够通知等待的线程,它会告诉线程,你须要的条件曾经知足过,之因此说曾经,是由于notify()只能保证在通知的那一时刻,条件是知足的,而被通知线程的执行时刻和通知时刻通常不会重合,因此在线程开始执行的时候,可能条件又不知足了。

另外须要注意,被通知的线程从新执行时,还须要获取互斥锁,由于以前在调用wait()方法时,互斥锁已经被释放了。

wait()、notify()和notifyAll()三个方法可以被调用的前提是已经获取了响应的互斥锁,因此这三个方法都是在synchronized{}内部被调用的。

下面咱们来看一下修改后的Allocator,其中apply()和free()方法的代码以下。

public synchronized void apply(Object... objs) {
		for (Object obj : objs) {
			while (lockObjs.contains(obj)) {
				try {
					this.wait();
				} catch (InterruptedException e) {
					System.out.println(e.getMessage());
				}
			}
		}
		for (Object obj : objs) {
			lockObjs.add(obj);
		}
	}
	
	public synchronized void free(Object... objs) {
		for (Object obj : objs) {
			if (lockObjs.contains(obj)) {
				lockObjs.remove(obj);
			}
		}
		this.notifyAll();
	}

对应的transfer()方法的代码以下。

public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
	Allocator allocator = Allocator.getInstance();
	allocator.apply(sourceAccount, targetAccount);
	try {
		synchronized(sourceAccount) {
			synchronized(targetAccount) {
				if (sourceAccount.getBalance() > amount) {
					System.out.println("Start transfer.");
					System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					sourceAccount.setBalance(sourceAccount.getBalance() - amount);
					targetAccount.setBalance(targetAccount.getBalance() + amount);
					System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
				}
			}
		}
	}
	finally {
		allocator.free(sourceAccount, targetAccount);
	}
}

运行结果和咱们指望是一致的。

条件曾经知足

在上述代码中,咱们能够发现,apply()方法中的判断条件以前是if,如今改为了while, while (lockObjs.contains(obj)),这样作能够解决条件曾经知足的问题。

由于当wait()返回时,有可能条件已经发生了变化,曾经条件知足,可是如今已经不知足了,因此要从新检验条件是否知足。

这是一种范式,是一种经典的作法。

notify() vs notifyAll()

notify()和notifyAll()有什么区别?

notify()会随机的通知等待队列中的一个线程, 而notifyAll()会通知等待队列中的全部线程。

咱们尽可能使用notifyAll()方法,由于notify()可能会致使某些线程永远不会被通知到。

假设咱们有一个实例,它有资源 A、B、C、D,咱们使用实例对象来建立互斥锁。

  • 线程t1申请到了A、B
  • 线程t2申请到了C、D
  • 线程t3试图申请A、B,失败,进入等待队列
  • 线程t4试图申请C、D,失败,进入等待队列
  • 此时,线程t1执行结束,释放锁
  • 线程t1调用实例的notify()来通知等待队列中的线程,有可能被通知的是线程t4,但线程t4申请的是C、D还被线程t2占用,因此线程t4只能继续等待
  • 此时,线程t2执行结束,释放锁
  • 线程t2调用实例的notify()来通知等待队列中的线程,t3或者t4只能有1个被唤醒并正常执行,另外1个则再也没有机会被唤醒

wait()和sleep()的区别

wait()方法与sleep()方法的不一样之处在于,wait()方法会释放对象的“锁标志”。当调用某一对象的wait()方法后,会使当前线程暂停执行,并将当前线程放入对象等待池中,直到调用了notify()方法后,将从对象等待池中移出任意一个线程并放入锁标志等待池中,只有锁标志等待池中的线程能够获取锁标志,它们随时准备争夺锁的拥有权。当调用了某个对象的notifyAll()方法,会将对象等待池中的全部线程都移动到该对象的锁标志等待池。

sleep()方法须要指定等待的时间,它可让当前正在执行的线程在指定的时间内暂停执行,进入阻塞状态,该方法既可让其余同优先级或者高优先级的线程获得执行的机会,也可让低优先级的线程获得执行机会。可是sleep()方法不会释放“锁标志”,也就是说若是有synchronized同步块,其余线程仍然不能访问共享数据。

总结一下,wait()和sleep()区别以下。

  • wait()释放资源,sleep()不释放资源
  • wait()须要被唤醒,sleep()不须要
  • wait()是object顶级父类的方法,sleep()则是Thread的方法

wait()和sleep()都会让渡CPU执行时间,等待再次调度!

相关文章
相关标签/搜索