📦 本文以及示例源码已归档在 javacorehtml
确保线程安全最多见的作法是利用锁机制(Lock
、sychronized
)来对共享数据作互斥同步,这样在同一个时刻,只有一个线程能够执行某个方法或者某个代码块,那么操做必然是原子性的,线程安全的。java
在工做、面试中,常常会听到各类五花八门的锁,听的人云里雾里。锁的概念术语不少,它们是针对不一样的问题所提出的,经过简单的梳理,也不难理解。git
可重入锁又名递归锁,是指 同一个线程在外层方法获取了锁,在进入内层方法会自动获取锁。github
可重入锁能够在必定程度上避免死锁。面试
ReentrantLock
、ReentrantReadWriteLock
是可重入锁。这点,从其命名也不难看出。 synchronized
也是一个可重入锁。 synchronized void setA() throws Exception{
Thread.sleep(1000);
setB();
}
synchronized void setB() throws Exception{
Thread.sleep(1000);
}复制代码
上面的代码就是一个典型场景:若是使用的锁不是可重入锁的话,setB
可能不会被当前线程执行,从而形成死锁。编程
公平锁为了保证线程申请顺序,势必要付出必定的性能代价,所以其吞吐量通常低于非公平锁。数组
公平锁与非公平锁 在 Java 中的典型实现:缓存
synchronized
只支持非公平锁。 ReentrantLock
、ReentrantReadWriteLock
,默认是非公平锁,但支持公平锁。 独享锁与共享锁是一种广义上的说法,从实际用途上来看,也常被称为互斥锁与读写锁。安全
独享锁与共享锁在 Java 中的典型实现:markdown
synchronized
、ReentrantLock
只支持独享锁。 ReentrantReadWriteLock
其写锁是独享锁,其读锁是共享锁。读锁是共享锁使得并发读是很是高效的,读写,写读 ,写写的过程是互斥的。 乐观锁与悲观锁不是指具体的什么类型的锁,而是处理并发同步的策略。
悲观锁与乐观锁在 Java 中的典型实现:
synchronized
和 Lock
显示加锁来进行互斥同步,这是一种阻塞同步。 Unsafe
类提供,但这个类不直接暴露为 API,因此都是间接使用,如各类原子类)。 所谓轻量级锁与重量级锁,指的是锁控制粒度的粗细。显然,控制粒度越细,阻塞开销越小,并发性也就越高。
Java 1.6 之前,重量级锁通常指的是 synchronized
,而轻量级锁指的是 volatile
。
Java 1.6 之后,针对 synchronized
作了大量优化,引入 4 种锁状态: 无锁状态、偏向锁、轻量级锁和重量级锁。锁能够单向的从偏向锁升级到轻量级锁,再从轻量级锁升级到重量级锁 。
分段锁实际上是一种锁的设计,并非具体的一种锁。所谓分段锁,就是把锁的对象分红多段,每段独立控制,使得锁粒度更细,减小阻塞开销,从而提升并发性。这其实很好理解,就像高速公路上的收费站,若是只有一个收费口,那全部的车只能排成一条队缴费;若是有多个收费口,就能够分流了。
Hashtable
使用 synchronized
修饰方法来保证线程安全性,那么面对线程的访问,Hashtable 就会锁住整个对象,全部的其它线程只能等待,这种阻塞方式的吞吐量显然很低。
Java 1.7 之前的 ConcurrentHashMap
就是分段锁的典型案例。ConcurrentHashMap
维护了一个 Segment
数组,通常称为分段桶。
final Segment<K,V>[] segments;复制代码
当有线程访问 ConcurrentHashMap
的数据时,ConcurrentHashMap
会先根据 hashCode 计算出数据在哪一个桶(即哪一个 Segment),而后锁住这个 Segment
。
Java 1.5 以前,协调对共享对象的访问时可使用的机制只有 synchronized
和 volatile
。这两个都属于内置锁,即锁的申请和释放都是由 JVM 所控制。
Java 1.5 以后,增长了新的机制:ReentrantLock
、ReentrantReadWriteLock
,这类锁的申请和释放均可以由程序所控制,因此常被称为显示锁。
💡
synchronized
的用法和原理能够参考:Java 并发基础机制 - synchronized 。:bell: 注意:若是不须要
ReentrantLock
、ReentrantReadWriteLock
所提供的高级同步特性,应该优先考虑使用synchronized
。理由以下:- Java 1.6 之后,
synchronized
作了大量的优化,其性能已经与ReentrantLock
、ReentrantReadWriteLock
基本上持平。- 从趋势来看,Java 将来更可能会优化
synchronized
,而不是ReentrantLock
、ReentrantReadWriteLock
,由于synchronized
是 JVM 内置属性,它能执行一些优化。-
ReentrantLock
、ReentrantReadWriteLock
申请和释放锁都是由程序控制,若是使用不当,可能形成死锁,这是很危险的。
如下对比一下显示锁和内置锁的差别:
synchronized
不能主动获取锁和释放锁。获取锁和释放锁都是 JVM 控制的。 ReentrantLock
能够主动获取锁和释放锁。(若是忘记释放锁,就可能产生死锁)。 synchronized
不能响应中断。 ReentrantLock
能够响应中断。 synchronized
没有超时机制。 ReentrantLock
有超时机制。ReentrantLock
能够设置超时时间,超时后自动释放锁,避免一直等待。 synchronized
只支持非公平锁。 ReentrantLock
支持非公平锁和公平锁。 synchronized
修饰的方法或代码块,只能被一个线程访问(独享)。若是这个线程被阻塞,其余线程也只能等待 ReentrantLock
能够基于 Condition
灵活的控制同步条件。 synchronized
不支持读写锁分离; ReentrantReadWriteLock
支持读写锁,从而使阻塞读写的操做分开,有效提升并发性。
AbstractQueuedSynchronizer
(简称 AQS)是队列同步器,顾名思义,其主要做用是处理同步。它是并发锁和不少同步工具类的实现基石(如ReentrantLock
、ReentrantReadWriteLock
、Semaphore
等)。所以,要想深刻理解
ReentrantLock
、ReentrantReadWriteLock
等并发锁和同步工具,必须先理解 AQS 的要点和原理。
在 java.util.concurrent.locks
包中的相关锁(经常使用的有 ReentrantLock
、 ReadWriteLock
)都是基于 AQS 来实现。这些锁都没有直接继承 AQS,而是定义了一个 Sync
类去继承 AQS。为何要这样呢?由于锁面向的是使用用户,而同步器面向的则是线程控制,那么在锁的实现中聚合同步器而不是直接继承 AQS 就能够很好的隔离两者所关注的事情。
AQS 提供了对独享锁与共享锁的支持。
获取、释放独享锁的主要 API 以下:
public final void acquire(int arg)
public final void acquireInterruptibly(int arg)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
public final boolean release(int arg)复制代码
acquire
- 获取独占锁。 acquireInterruptibly
- 获取可中断的独占锁。 tryAcquireNanos
- 尝试在指定时间内获取可中断的独占锁。在如下三种状况下回返回:
release
- 释放独占锁。 获取、释放共享锁的主要 API 以下:
public final void acquireShared(int arg)
public final void acquireSharedInterruptibly(int arg)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
public final boolean releaseShared(int arg)复制代码
acquireShared
- 获取共享锁。 acquireSharedInterruptibly
- 获取可中断的共享锁。 tryAcquireSharedNanos
- 尝试在指定时间内获取可中断的共享锁。 release
- 释放共享锁。 阅读 AQS 的源码,能够发现:AQS 继承自 AbstractOwnableSynchronize
。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/** 等待队列的队头,懒加载。只能经过 setHead 方法修改。 */
private transient volatile Node head;
/** 等待队列的队尾,懒加载。只能经过 enq 方法添加新的等待节点。*/
private transient volatile Node tail;
/** 同步状态 */
private volatile int state;
}复制代码
state
- AQS 使用一个整型的 volatile
变量来 维护同步状态。
ReentrantLock
中该状态值表示全部者线程已经重复获取该锁的次数,Semaphore
中该状态值表示剩余的许可数量。 head
和 tail
- AQS 维护了一个 Node
类型(AQS 的内部类)的双链表来完成同步状态的管理。这个双链表是一个双向的 FIFO 队列,经过 head
和 tail
指针进行访问。当 有线程获取锁失败后,就被添加到队列末尾。 再来看一下 Node
的源码
static final class Node {
/** 该等待同步的节点处于共享模式 */
static final Node SHARED = new Node();
/** 该等待同步的节点处于独占模式 */
static final Node EXCLUSIVE = null;
/** 线程等待状态,状态值有: 0、一、-一、-二、-3 */
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/** 前驱节点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 等待锁的线程 */
volatile Thread thread;
/** 和节点是否共享有关 */
Node nextWaiter;
}复制代码
很显然,Node 是一个双链表结构。
waitStatus
- Node
使用一个整型的 volatile
变量来 维护 AQS 同步队列中线程节点的状态。waitStatus
有五个状态值:
CANCELLED(1)
- 此状态表示:该节点的线程可能因为超时或被中断而 处于被取消(做废)状态,一旦处于这个状态,表示这个节点应该从等待队列中移除。 SIGNAL(-1)
- 此状态表示:后继节点会被挂起,所以在当前节点释放锁或被取消以后,必须唤醒(unparking
)其后继结点。 CONDITION(-2)
- 此状态表示:该节点的线程 处于等待条件状态,不会被看成是同步队列上的节点,直到被唤醒(signal
),设置其值为 0,再从新进入阻塞状态。 PROPAGATE(-3)
- 此状态表示:下一个 acquireShared
应无条件传播。 AQS 中使用 acquire(int arg)
方法获取独占锁,其大体流程以下:
详细流程能够用下图来表示,请结合源码来理解(一图胜千言):
AQS 中使用 release(int arg)
方法释放独占锁,其大体流程以下:
AQS 中使用 acquireInterruptibly(int arg)
方法获取可中断的独占锁。
acquireInterruptibly(int arg)
实现方式相较于获取独占锁方法( acquire
)很是类似,区别仅在于它会经过 Thread.interrupted
检测当前线程是否被中断,若是是,则当即抛出中断异常(InterruptedException
)。
AQS 中使用 tryAcquireNanos(int arg)
方法获取超时等待的独占锁。
doAcquireNanos 的实现方式 相较于获取独占锁方法( acquire
)很是类似,区别在于它会根据超时时间和当前时间计算出截止时间。在获取锁的流程中,会不断判断是否超时,若是超时,直接返回 false;若是没超时,则用 LockSupport.parkNanos
来阻塞当前线程。
AQS 中使用 acquireShared(int arg)
方法获取共享锁。
acquireShared
方法和 acquire
方法的逻辑很类似,区别仅在于自旋的条件以及节点出队的操做有所不一样。
成功得到共享锁的条件以下:
tryAcquireShared(arg)
返回值大于等于 0 (这意味着共享锁的 permit 尚未用完)。 AQS 中使用 releaseShared(int arg)
方法释放共享锁。
releaseShared
首先会尝试释放同步状态,若是成功,则解锁一个或多个后继线程节点。释放共享锁和释放独享锁流程大致类似,区别在于:
对于独享模式,若是须要 SIGNAL,释放仅至关于调用头节点的 unparkSuccessor
。
AQS 中使用 acquireSharedInterruptibly(int arg)
方法获取可中断的共享锁。
acquireSharedInterruptibly
方法与 acquireInterruptibly
几乎一致,再也不赘述。
AQS 中使用 tryAcquireSharedNanos(int arg)
方法获取超时等待式的共享锁。
tryAcquireSharedNanos
方法与 tryAcquireNanos
几乎一致,再也不赘述。
ReentrantLock
类是Lock
接口的具体实现,它是一个可重入锁。与内置锁synchronized
不一样,ReentrantLock
提供了一组无条件的、可轮询的、定时的以及可中断的锁操做,全部获取锁、释放锁的操做都是显式的操做。
ReentrantLock
的特性以下:
ReentrantLock
提供了与 synchronized
相同的互斥性、内存可见性和可重入性。 ReentrantLock
支持公平锁和非公平锁(默认)两种模式。 ReentrantLock
实现了 Lock
接口,支持了 synchronized
所不具有的灵活性。
synchronized
没法中断一个正在等待获取锁的线程 synchronized
没法在请求获取一个锁时无休止地等待 Lock
的接口定义以下:
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}复制代码
lock()
- 获取锁。 unlock()
- 释放锁。 tryLock()
- 尝试获取锁,仅在调用时锁未被另外一个线程持有的状况下,才获取该锁。 tryLock(long time, TimeUnit unit)
- 和 tryLock()
相似,区别仅在于限定时间,若是限定时间内未获取到锁,视为失败。 lockInterruptibly()
- 锁未被另外一个线程持有,且线程没有被中断的状况下,才能获取锁。 newCondition()
- 返回一个绑定到 Lock
对象上的 Condition
实例。 前文了解了 ReentrantLock
的特性,接下来,咱们要讲述其具体用法。
ReentrantLock
有两个构造方法:
public ReentrantLock() {}
public ReentrantLock(boolean fair) {}复制代码
ReentrantLock()
- 默认构造方法会初始化一个非公平锁(NonfairSync); ReentrantLock(boolean)
- new ReentrantLock(true)
会初始化一个公平锁(FairSync)。 lock()
- 无条件获取锁。若是当前线程没法获取锁,则当前线程进入休眠状态不可用,直至当前线程获取到锁。若是该锁没有被另外一个线程持有,则获取该锁并当即返回,将锁的持有计数设置为 1。 unlock()
- 用于释放锁。 :bell: 注意:请务必牢记,获取锁操做
lock()
必须在try catch
块中进行,而且将释放锁操做unlock()
放在finally
块中进行,以保证锁必定被被释放,防止死锁的发生。
示例:ReentrantLock
的基本操做
public class ReentrantLockDemo {
public static void main(String[] args) {
Task task = new Task();
MyThread tA = new MyThread("Thread-A", task);
MyThread tB = new MyThread("Thread-B", task);
MyThread tC = new MyThread("Thread-C", task);
tA.start();
tB.start();
tC.start();
}
static class MyThread extends Thread {
private Task task;
public MyThread(String name, Task task) {
super(name);
this.task = task;
}
@Override
public void run() {
task.execute();
}
}
static class Task {
private ReentrantLock lock = new ReentrantLock();
public void execute() {
lock.lock();
try {
for (int i = 0; i < 3; i++) {
System.out.println(lock.toString());
// 查询当前线程 hold 住此锁的次数
System.out.println("\t holdCount: " + lock.getHoldCount());
// 查询正等待获取此锁的线程数
System.out.println("\t queuedLength: " + lock.getQueueLength());
// 是否为公平锁
System.out.println("\t isFair: " + lock.isFair());
// 是否被锁住
System.out.println("\t isLocked: " + lock.isLocked());
// 是否被当前线程持有锁
System.out.println("\t isHeldByCurrentThread: " + lock.isHeldByCurrentThread());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}
}
}复制代码
输出结果:
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-A]
holdCount: 1
queuedLength: 2
isFair: false
isLocked: true
isHeldByCurrentThread: true
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-C]
holdCount: 1
queuedLength: 1
isFair: false
isLocked: true
isHeldByCurrentThread: true
// ...复制代码
与无条件获取锁相比,tryLock 有更完善的容错机制。
tryLock()
- 可轮询获取锁。若是成功,则返回 true;若是失败,则返回 false。也就是说,这个方法不管成败都会当即返回,获取不到锁(锁已被其余线程获取)时不会一直等待。 tryLock(long, TimeUnit)
- 可定时获取锁。和 tryLock()
相似,区别仅在于这个方法在获取不到锁时会等待必定的时间,在时间期限以内若是还获取不到锁,就返回 false。若是若是一开始拿到锁或者在等待期间内拿到了锁,则返回 true。 示例:ReentrantLock
的 tryLock()
操做
修改上个示例中的 execute()
方法
public void execute() {
if (lock.tryLock()) {
try {
for (int i = 0; i < 3; i++) {
// 略...
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 获取锁失败");
}
}复制代码
示例:ReentrantLock
的 tryLock(long, TimeUnit)
操做
修改上个示例中的 execute()
方法
public void execute() {
try {
if (lock.tryLock(2, TimeUnit.SECONDS)) {
try {
for (int i = 0; i < 3; i++) {
// 略...
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 获取锁失败");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 获取锁超时");
e.printStackTrace();
}
}复制代码
lockInterruptibly()
- 可中断获取锁。可中断获取锁能够在得到锁的同时保持对中断的响应。可中断获取锁比其它获取锁的方式稍微复杂一些,须要两个 try-catch
块(若是在获取锁的操做中抛出了 InterruptedException
,那么可使用标准的 try-finally
加锁模式)。
lock.lockInterruptibly()
获取某个锁时,若线程 A 获取到了锁,则线程 B 只能等待。若此时对线程 B 调用 threadB.interrupt()
方法可以中断线程 B 的等待过程。因为 lockInterruptibly()
的声明中抛出了异常,因此 lock.lockInterruptibly()
必须放在 try
块中或者在调用 lockInterruptibly()
的方法外声明抛出 InterruptedException
。
:bell: 注意:当一个线程获取了锁以后,是不会被
interrupt()
方法中断的。单独调用interrupt()
方法不能中断正在运行状态中的线程,只能中断阻塞状态中的线程。所以当经过lockInterruptibly()
方法获取某个锁时,若是未获取到锁,只有在等待的状态下,才能够响应中断。
示例:ReentrantLock
的 lockInterruptibly()
操做
修改上个示例中的 execute()
方法
public void execute() {
try {
lock.lockInterruptibly();
for (int i = 0; i < 3; i++) {
// 略...
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断");
e.printStackTrace();
} finally {
lock.unlock();
}
}复制代码
newCondition()
- 返回一个绑定到 Lock
对象上的 Condition
实例。Condition
的特性和具体方法请阅读下文 Condition
。
阅读 ReentrantLock
的源码,能够发现它有一个核心字段:
private final Sync sync;复制代码
sync
- 内部抽象类 ReentrantLock.Sync
对象,Sync
继承自 AQS。它有两个子类: ReentrantLock.FairSync
- 公平锁。 ReentrantLock.NonfairSync
- 非公平锁。 查看源码能够发现,ReentrantLock
实现 Lock
接口实际上是调用 ReentrantLock.FairSync
或 ReentrantLock.NonfairSync
中各自的实现,这里不一一列举。
ReentrantLock 获取锁和释放锁的接口,从表象看,是调用 ReentrantLock.FairSync
或 ReentrantLock.NonfairSync
中各自的实现;从本质上看,是基于 AQS 的实现。
仔细阅读源码很容易发现:
void lock()
调用 Sync 的 lock() 方法。 void lockInterruptibly()
直接调用 AQS 的 获取可中断的独占锁 方法 lockInterruptibly()
。 boolean tryLock()
调用 Sync 的 nonfairTryAcquire()
。 boolean tryLock(long time, TimeUnit unit)
直接调用 AQS 的 获取超时等待式的独占锁 方法 tryAcquireNanos(int arg, long nanosTimeout)
。 void unlock()
直接调用 AQS 的 释放独占锁 方法 release(int arg)
。 直接调用 AQS 接口的方法就再也不赘述了,其原理在 AQS 的原理 中已经用很大篇幅进行过讲解。
nonfairTryAcquire
方法源码以下:
// 公平锁和非公平锁都会用这个方法区尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// 若是同步状态为0,将其设为 acquires,并设置当前线程为排它线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}复制代码
处理流程很简单:
lock 方法在公平锁和非公平锁中的实现:
两者的区别仅在于申请非公平锁时,若是同步状态为 0,尝试将其设为 1,若是成功,直接将当前线程置为排它线程;不然和公平锁同样,调用 AQS 获取独占锁方法 acquire
。
// 非公平锁实现
final void lock() {
if (compareAndSetState(0, 1))
// 若是同步状态为0,将其设为1,并设置当前线程为排它线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 调用 AQS 获取独占锁方法 acquire
acquire(1);
}
// 公平锁实现
final void lock() {
// 调用 AQS 获取独占锁方法 acquire
acquire(1);
}复制代码
ReentrantReadWriteLock
类是ReadWriteLock
接口的具体实现,它是一个可重入的读写锁。ReentrantReadWriteLock
维护了一对读写锁,将读写锁分开,有利于提升并发效率。
ReentrantLock
实现了一种标准的互斥锁:每次最多只有一个线程能持有ReentrantLock
。但对于维护数据的完整性来讲,互斥一般是一种过于强硬的加锁策略,所以也就没必要要地限制了并发性。大多数场景下,读操做比写操做频繁,只要保证每一个线程都能读取到最新数据,而且在读数据时不会有其它线程在修改数据,那么就不会出现线程安全问题。这种策略减小了互斥同步,天然也提高了并发性能,ReentrantReadWriteLock
就是这种策略的具体实现。
ReentrantReadWriteLock 的特性以下:
ReentrantReadWriteLock
适用于读多写少的场景。若是是写多读少的场景,因为 ReentrantReadWriteLock
其内部实现比 ReentrantLock
复杂,性能可能反而要差一些。若是存在这样的问题,须要具体问题具体分析。因为 ReentrantReadWriteLock
的读写锁(ReadLock
、WriteLock
)都实现了 Lock
接口,因此要替换为 ReentrantLock
也较为容易。 ReentrantReadWriteLock
实现了 ReadWriteLock
接口,支持了 ReentrantLock
所不具有的读写锁分离。ReentrantReadWriteLock
维护了一对读写锁(ReadLock
、WriteLock
)。将读写锁分开,有利于提升并发效率。ReentrantReadWriteLock
的加锁策略是:容许多个读操做并发执行,但每次只容许一个写操做。 ReentrantReadWriteLock
为读写锁都提供了可重入的加锁语义。 ReentrantReadWriteLock
支持公平锁和非公平锁(默认)两种模式。 ReadWriteLock
接口定义以下:
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}复制代码
readLock
- 返回用于读操做的锁(ReadLock
)。 writeLock
- 返回用于写操做的锁(WriteLock
)。 在读写锁和写入锁之间的交互能够采用多种实现方式,ReadWriteLock
的一些可选实现包括:
前文了解了 ReentrantReadWriteLock
的特性,接下来,咱们要讲述其具体用法。
ReentrantReadWriteLock
和 ReentrantLock
同样,也有两个构造方法,且用法类似。
public ReentrantReadWriteLock() {}
public ReentrantReadWriteLock(boolean fair) {}复制代码
ReentrantReadWriteLock()
- 默认构造方法会初始化一个非公平锁(NonfairSync)。在非公平的锁中,线程得到锁的顺序是不肯定的。写线程降级为读线程是能够的,但读线程升级为写线程是不能够的(这样会致使死锁)。 ReentrantReadWriteLock(boolean)
- new ReentrantLock(true)
会初始化一个公平锁(FairSync)。对于公平锁,等待时间最长的线程将优先得到锁。若是这个锁是读线程持有,则另外一个线程请求写锁,那么其余读线程都不能得到读锁,直到写线程释放写锁。 在 ReentrantReadWriteLock` 的特
中已经介绍过,ReentrantReadWriteLock
的读写锁(ReadLock
、WriteLock
)都实现了 Lock
接口,因此其各自独立的使用方式与 ReentrantLock
同样,这里再也不赘述。
ReentrantReadWriteLock
与 ReentrantLock
用法上的差别,主要在于读写锁的配合使用。本文以一个典型使用场景来进行讲解。
示例:基于 ReentrantReadWriteLock
实现一个简单的本地缓存
/**
* 简单的无界缓存实现
* <p>
* 使用 WeakHashMap 存储键值对。WeakHashMap 中存储的对象是弱引用,JVM GC 时会自动清除没有被引用的弱引用对象。
*/
static class UnboundedCache<K, V> {
private final Map<K, V> cacheMap = new WeakHashMap<>();
private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
public V get(K key) {
cacheLock.readLock().lock();
V value;
try {
value = cacheMap.get(key);
String log = String.format("%s 读数据 %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.readLock().unlock();
}
return value;
}
public V put(K key, V value) {
cacheLock.writeLock().lock();
try {
cacheMap.put(key, value);
String log = String.format("%s 写入数据 %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.writeLock().unlock();
}
return value;
}
public V remove(K key) {
cacheLock.writeLock().lock();
try {
return cacheMap.remove(key);
} finally {
cacheLock.writeLock().unlock();
}
}
public void clear() {
cacheLock.writeLock().lock();
try {
this.cacheMap.clear();
} finally {
cacheLock.writeLock().unlock();
}
}
}复制代码
说明:
WeakHashMap
而不是 HashMap
来存储键值对。WeakHashMap
中存储的对象是弱引用,JVM GC 时会自动清除没有被引用的弱引用对象。 Map
写数据前加写锁,写完后,释放写锁。 Map
读数据前加读锁,读完后,释放读锁。 测试其线程安全性:
/**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @since 2020-01-01
*/
public class ReentrantReadWriteLockDemo {
static UnboundedCache<Integer, Integer> cache = new UnboundedCache<>();
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
executorService.execute(new MyThread());
cache.get(0);
}
executorService.shutdown();
}
/** 线程任务每次向缓存中写入 3 个随机值,key 固定 */
static class MyThread implements Runnable {
@Override
public void run() {
Random random = new Random();
for (int i = 0; i < 3; i++) {
cache.put(i, random.nextInt(100));
}
}
}
}复制代码
说明:示例中,经过线程池启动 20 个并发任务。任务每次向缓存中写入 3 个随机值,key 固定;而后主线程每次固定读取缓存中第一个 key 的值。
输出结果:
main 读数据 0:null
pool-1-thread-1 写入数据 0:16
pool-1-thread-1 写入数据 1:58
pool-1-thread-1 写入数据 2:50
main 读数据 0:16
pool-1-thread-1 写入数据 0:85
pool-1-thread-1 写入数据 1:76
pool-1-thread-1 写入数据 2:46
pool-1-thread-2 写入数据 0:21
pool-1-thread-2 写入数据 1:41
pool-1-thread-2 写入数据 2:63
main 读数据 0:21
main 读数据 0:21
// ...复制代码
前面了解了 ReentrantLock
的原理,理解 ReentrantReadWriteLock
就容易多了。
阅读 ReentrantReadWriteLock 的源码,能够发现它有三个核心字段:
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }复制代码
sync
- 内部类 ReentrantReadWriteLock.Sync
对象。与 ReentrantLock
相似,它有两个子类:ReentrantReadWriteLock.FairSync
和 ReentrantReadWriteLock.NonfairSync
,分别表示公平锁和非公平锁的实现。 readerLock
- 内部类 ReentrantReadWriteLock.ReadLock
对象,这是一把读锁。 writerLock
- 内部类 ReentrantReadWriteLock.WriteLock
对象,这是一把写锁。 public static class ReadLock implements Lock, java.io.Serializable {
// 调用 AQS 获取共享锁方法
public void lock() {
sync.acquireShared(1);
}
// 调用 AQS 释放共享锁方法
public void unlock() {
sync.releaseShared(1);
}
}
public static class WriteLock implements Lock, java.io.Serializable {
// 调用 AQS 获取独占锁方法
public void lock() {
sync.acquire(1);
}
// 调用 AQS 释放独占锁方法
public void unlock() {
sync.release(1);
}
}复制代码
前文中提过 Lock
接口中 有一个 newCondition()
方法用于返回一个绑定到 Lock
对象上的 Condition
实例。Condition
是什么?有什么做用?本节将一一讲解。
在单线程中,一段代码的执行可能依赖于某个状态,若是不知足状态条件,代码就不会被执行(典型的场景,如:if ... else ...)。在并发环境中,当一个线程判断某个状态条件时,其状态多是因为其余线程的操做而改变,这时就须要有必定的协调机制来确保在同一时刻,数据只能被一个线程锁修改,且修改的数据状态被全部线程所感知。
Java 1.5 以前,主要是利用 Object
类中的 wait
、notify
、notifyAll
配合 synchronized
来进行线程间通讯(若是不了解其特性,能够参考:Java 线程基础 - wait/notify/notifyAll)。
wait
、notify
、notifyAll
须要配合 synchronized
使用,不适用于 Lock
。而使用 Lock
的线程,彼此间通讯应该使用 Condition
。这能够理解为,什么样的锁配什么样的钥匙。内置锁(synchronized
)配合内置条件队列(wait
、notify
、notifyAll
),显式锁(Lock
)配合显式条件队列(Condition
)。
Condition
接口定义以下:
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}复制代码
其中,await
、signal
、signalAll
与 wait
、notify
、notifyAll
相对应,功能也类似。除此之外,Condition
相比内置条件队列( wait
、notify
、notifyAll
),提供了更为丰富的功能:
Lock
)上能够存在多个 Condition
,这意味着锁的状态条件能够有多个。 awaitUninterruptibly()
。 awaitNanos(long)
、await(long, TimeUnit)
、awaitUntil(Date)
。 这里以 Condition
来实现一个消费者、生产者模式。
:bell: 注意:事实上,解决此类问题使用
CountDownLatch
、Semaphore
等工具更为便捷、安全。想了解详情,能够参考 Java 并发工具类 。
产品类
class Message {
private final Lock lock = new ReentrantLock();
private final Condition producedMsg = lock.newCondition();
private final Condition consumedMsg = lock.newCondition();
private String message;
private boolean state;
private boolean end;
public void consume() {
//lock
lock.lock();
try {
// no new message wait for new message
while (!state) { producedMsg.await(); }
System.out.println("consume message : " + message);
state = false;
// message consumed, notify waiting thread
consumedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - viewMessage");
} finally {
lock.unlock();
}
}
public void produce(String message) {
lock.lock();
try {
// last message not consumed, wait for it be consumed
while (state) { consumedMsg.await(); }
System.out.println("produce msg: " + message);
this.message = message;
state = true;
// new message added, notify waiting thread
producedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - publishMessage");
} finally {
lock.unlock();
}
}
public boolean isEnd() {
return end;
}
public void setEnd(boolean end) {
this.end = end;
}
}复制代码
消费者
class MessageConsumer implements Runnable {
private Message message;
public MessageConsumer(Message msg) {
message = msg;
}
@Override
public void run() {
while (!message.isEnd()) { message.consume(); }
}
}复制代码
生产者
class MessageProducer implements Runnable {
private Message message;
public MessageProducer(Message msg) {
message = msg;
}
@Override
public void run() {
produce();
}
public void produce() {
List<String> msgs = new ArrayList<>();
msgs.add("Begin");
msgs.add("Msg1");
msgs.add("Msg2");
for (String msg : msgs) {
message.produce(msg);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
message.produce("End");
message.setEnd(true);
}
}复制代码
测试
public class LockConditionDemo {
public static void main(String[] args) {
Message msg = new Message();
Thread producer = new Thread(new MessageProducer(msg));
Thread consumer = new Thread(new MessageConsumer(msg));
producer.start();
consumer.start();
}
}复制代码