以前介绍过并发问题的解决方式就是通常经过锁,concurrent包中最重要的接口就是lock接口,它能够显示的获取或者释放锁,对于lock接口来讲最多见的实现就是ReetrantLock(可重入锁),而ReetrantLock的实现又离不开AQS。node
AQS是concurrent包中最核心的并发组件,在读本文以前建议先阅读:设计模式
https://juejin.im/post/5c021da16fb9a049e65ffcbf 完全理解CAS机制,由于CAS在整个ReetrantLock中随处可见,它是lock的基础。安全
网上有许多相似文章,可是这一部分的东西比较抽象,须要不断理解,本文将基于源码分析concurrent包的最核心的组件AQS,将很差理解的部分尽可能用图片来分析完全理解ReetrantLock的原理性能优化
这部分是concurrent包的核心,理解了以后再去理解SemaPhore LinkedBlockingQueue ArrayBlockingQueue 等就信手拈来了,因此会花比较多的篇幅bash
先大概看一看lock接口数据结构
public interface Lock {
// 加锁
void lock();
// 可中断获取锁,获取锁的过程当中能够中断。
void lockInterruptibly() throws InterruptedException;
//当即返回的获取锁,返回true表示成功,false表示失败
boolean tryLock();
//根据传入时间当即返回的获取锁,返回true表示成功,false表示失败
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//解锁
void unlock();
//获取等待的条件队列(以后会详细分析)
Condition newCondition();
}复制代码
而咱们通常使用ReetrantLock:多线程
Lock lock = new ReentrantLock();
lock.lock();
try{
//业务代码......
}finally{
lock.unlock();
}复制代码
它在使用上是比较简单的,在正式分析以前咱们先看看什么是公平锁和非公平锁并发
公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入FIFO队列,队列中的第一个线程才能得到锁。框架
用一个打水的例子来理解:函数
公平锁的优势是等待锁的线程不会夯死。缺点是吞吐效率相对非公平锁要低,等待队列中除第一个线程之外的全部线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大。
非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但若是此时锁恰好可用,那么这个线程能够无需阻塞直接获取到锁,因此非公平锁有可能出现后申请锁的线程先获取锁的场景。
非公平锁的优势是能够减小唤起线程的开销(由于可能有的线程能够直接获取到锁,CPU也就不用唤醒它),因此总体的吞吐效率高。缺点是处于等待队列中的线程可能会夯死(试想刚好每次有新线程来,它恰巧都每次获取到锁,此时还在排队等待获取锁的线程就悲剧了),或者等好久才会得到锁。
公平锁和非公平锁的差别在因而否按照申请锁的顺序来获取锁,非公平锁可能会出现有多个线程等待时,有一我的品特别的好的线程直接没有等待而直接获取到了锁的状况,他们各有利弊;ReetrantLock在构造时默认是非公平的,能够经过参数控制。
这里以ReentrantLock为例,简单讲解ReentrantLock与AQS的关系
从上图咱们能够总结:
1. 首先为何要有Sync这个内部类呢?
2. AQS为何要声明为Abstract,内部却没有任何abstract方法?
这是由于AQS只是做为一个基础组件,从上图能够看出countDownLatch,Semaphore等并发组件都依赖了它,它并不但愿直接做为直接操做类对外输出,而更倾向于做为一个基础并发组件,为真正的实现类提供基础设施,例如构建同步队列,控制同步状态等。
AQS是采用模板方法的设计模式,它做为基础组并发件,封装了一层核心并发操做(好比获取资源成功后封装成Node加入队列,对队列双向链表的处理),可是实现上分为两种模式,即共享模式(如Semaphore)与独占模式(如ReetrantLock,这两个模式的本质区别在于多个线程能不能共享一把锁),而这两种模式的加锁与解锁实现方式是不同的,但AQS只关注内部公共方法实现并不关心外部不一样模式的实现,因此提供了模板方法给子类使用:例如:
ReentrantLock须要本身实现tryAcquire()方法和tryRelease()方法,而实现共享模式的Semaphore,则须要实现tryAcquireShared()方法和tryReleaseShared()方法,这样作的好处?由于不管是共享模式仍是独占模式,其基础的实现都是同一套组件(AQS),只不过是加锁解锁的逻辑不一样罢了,更重要的是若是咱们须要自定义锁的话,也变得很是简单,只须要选择不一样的模式实现不一样的加锁和解锁的模板方法便可。
ReetrantLock:实现了lock接口,内部类有Sync、NonfairSync、FairSync(他们三个是继承了AQS)这里用了模板方法的设计模式。
以前介绍AQS是提供基础设施,如构建同步队列,控制同步状态等,它的工做原理是怎样的呢?
咱们先看看AQS类中几个重要的字段:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer{
//指向同步队列队头
private transient volatile Node head;
//指向同步的队尾
private transient volatile Node tail;
//同步状态,0表明锁未被占用,1表明锁已被占用
private volatile int state;
//省略.
}
复制代码
再看看Node这个内部类:它是对每个访问同步代码块的线程的封装
关于等待状态,咱们暂时只需关注SIGNAL 和初始化状态便可
AQS本质上就是由node构成的双向链表,内部有node head和node tail。
AQS经过定义的state字段来控制同步状态,当state=0时,说明没有锁资源被站东,当state=1时,说明有线程目前正在使用锁的资源,这个时候其余线程必须加入同步队列进行等待;
既然要加入队列,那么AQS是内部经过内部类Node构成FIFO的同步队列实现线程获取锁排队,同时利用内部类ConditionObject构建条件队列,当调用condition.wait()方法后,线程将会加入条件队列中,而当调用signal()方法后,线程将从条件队列移动到同步队列中进行锁竞争。注意这里涉及到两种队列,一种的同步队列,当锁资源已经被占用,而又有线程请求锁而等待的后将加入同步队列等待,而另外一种则是条件队列(可有多个),经过Condition调用await()方法释放锁后,将加入等待队列。
条件队列能够暂时先放一边,下一节再详细分析,由于当咱们调用ReetrantLock.lock()方法时,实际操做的是基于node结构的同步队列,此时AQS中的state变量则是表明同步状态,加锁后,若是此时state的值为0,则说明当前线程能够获取到锁,同时将state设置为1,表示获取成功。若是调用ReetrantLock.lock()方法时state已为1,也就是当前锁已被其余线程持有,那么当前执行线程将被封装为Node结点加入同步队列等待。
如上图所示为AQS的同步队列模型;
接下来咱们看详细实现
AQS的实现依赖于内部的同步队列(就是一个由node构成的FIFO的双向链表对列)来完成对同步状态(state)的管理,当前线程获取锁失败时,AQS会将该线程封装成一个Node并将其加入同步队列,同时会阻塞当前线程,当同步资源释放时,又会将头结点head中的线程唤醒,让其尝试获取同步状态。这里从ReetrantLock入手分析AQS的具体实现,咱们先以非公平锁为例进行分析。
来看ReetrantLock的源码:
//默认构造,建立非公平锁NonfairSync
public ReentrantLock() {
sync = new NonfairSync();
}
//根据传入参数建立锁类型
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//加锁操做
public void lock() {
sync.lock();
}
复制代码
这里说明ReetrantLock默认构造方法就是构造一个非公平锁,调用lock方法时候:
/**
* 非公平锁实现
*/
static final class NonfairSync extends Sync {
//加锁
final void lock() {
//执行CAS操做,本质就是CAS更新state:
//判断state是否为0,若是为0则把0更新为1,并返回true不然返回false
if (compareAndSetState(0, 1))
//成功则将独占锁线程设置为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
//不然再次请求同步状态
acquire(1);
}
}
复制代码
也就是说,经过CAS机制保证并发的状况下只有一个线程能够成功将state设置为1,获取到锁;
此时,其它线程在执行compareAndSetState时,由于state此时不是0,因此会失败并返回false,执行acquire(1);
public final void acquire(int arg) {
//再次尝试获取同步状态
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
这里传入参数arg是state的值,由于要获取锁,而status为0时是释放锁,1则是获取锁,因此这里通常传递参数为1,进入方法后首先会执行tryAcquire(1)方法,在前面分析过该方法在AQS中并无具体实现,而是交由子类实现,所以该方法是由ReetrantLock类内部类实现的
//NonfairSync类
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
复制代码
假设有三个线程:线程1已经得到到了锁,线程2正在同步队列中排队,此时线程3执行lock方法尝试获取锁的时,线程1正好释放了锁,将state更新为0,那么线程3就可能在线程2尚未被唤醒以前去获取到这个锁。
若是此时尚未获取到锁(nonfairTryAcquire返回false),那么接下来会把该线程封装成node去同步队列里排队,代码层面上执行的是acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
ReetrantLock为独占锁,因此传入的参数为Node.EXCLUSIVE
private Node addWaiter(Node mode) {
//将请求同步状态失败的线程封装成结点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//若是是第一个结点加入确定为空,跳过。
//若是非第一个结点则直接执行CAS入队操做,尝试在尾部快速添加
if (pred != null) {
node.prev = pred;
//使用CAS执行尾部结点替换,尝试在尾部快速添加
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//若是第一次加入或者CAS操做没有成功执行enq入队操做
enq(node);
return node;
}
复制代码
其中tail是AQS的成员变量,指向队尾(这点前面的咱们分析过AQS维持的是一个双向的链表结构同步队列),若是第一次获取到锁,AQS尚未初始化,则为tail确定为空,那么将执行enq(node)操做,若是非第一个结点即tail指向不为null,直接尝试执行CAS操做加入队尾(再一次使用CAS操做实现线程安全),若是CAS操做失败或第一次加入同步队列仍是会执行enq(node),继续看enq(node):
private Node enq(final Node node) {
//死循环
for (;;) {
Node t = tail;
//若是队列为null,即没有头结点
if (t == null) { // Must initialize
//建立并使用CAS设置头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {//队尾添加新结点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
这个方法使用一个死循环进行CAS操做,能够解决多线程并发问题。这里作了两件事:
一是队列不存在的建立新结点并初始化tail、head:使用compareAndSetHead设置头结点,head和tail都指向head。
二是队列已存在,则将新结点node添加到队尾。
注意addWaiter和enq这两个方法都存在一样的代码将线程设置为同步队列的队尾:
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}复制代码
这是由于,在多线程环境中,假设线程一、二、三、4同时执行addWaiter()方法入队,而此时头节点不为null,那么他们会同时执行addWaiter中的compareAndSetTail方法将队尾指向它,添加到队尾。
但这个时候CAS操做保证只有一个能够成功,假设此时线程1成功添加到队尾,那么线程二、三、4执行CAS都会失败,那么线程二、三、4会在enq这个方法内部死循环执行compareAndSetTail方法将队尾指向它,直到成功添加到队尾为止。enq这个方法在内部对并发状况下进行补偿。
回到以前的acquire()
方法,添加到同步队列后,结点就会进入一个自旋过程,自旋的意思就是原地转圈圈:即结点都在观察时机准备获取同步状态,自旋过程是在acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法中执行的,先看前半部分
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋,死循环
for (;;) {
//获取前结点
final Node p = node.predecessor();
当且仅当p为头结点才尝试获取同步状态,FIFO
if (p == head && tryAcquire(arg)) {
//此时当前node前驱节点为head且已经tryAcquire获取到了锁,正在执行了它的相关信息
//已经没有任何用处了,因此如今须要考虑将它GC掉
//将node设置为头结点
setHead(node);
//清空原来头结点的引用便于GC
p.next = null; // help GC
failed = false;
return interrupted;
}
//若是前驱结点不是head,判断是否挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//最终都没能获取同步状态,结束该线程的请求
cancelAcquire(node);
}
}
复制代码
//设置为头结点
private void setHead(Node node) {
head = node;
//清空结点数据以便于GC
node.thread = null;
node.prev = null;
}
复制代码
死循环中,若是知足了if (p == head && tryAcquire(arg))
以下图,会执行sethead方法:
固然若是前驱结点不是head而它又没有获取到锁,那么执行以下:
//若是前驱结点不是head,判断是否挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
interrupted = true;
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取当前结点的等待状态
int ws = pred.waitStatus;
//若是为等待唤醒(SIGNAL)状态则返回true
if (ws == Node.SIGNAL)
return true;
//若是ws>0 则说明是结束状态,
//遍历前驱结点直到找到没有结束状态的结点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//若是ws小于0又不是SIGNAL状态,
//则将其设置为SIGNAL状态,表明该结点的线程正在等待唤醒。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//将当前线程挂起,线程会阻塞住
LockSupport.park(this);
//获取线程中断状态,interrupted()是判断当前中断状态,
//并不是中断线程,所以可能true也可能false,并返回
return Thread.interrupted();
}
复制代码
这段代码有个设计比较好的点:
一般咱们在设计队列时,咱们须要考虑如何最大化的减小后续排队节点对于CPU的消耗,而在AQS中,只要当前节点的前驱节点不是头结点,再把当前节点加到队列后就会执行LockSupport.park(this);将当前线程挂起,这样能够最大程度减小CPU消耗。
是否是仍是有点一头雾水?
不要紧,为了方便理解:咱们假设ABC三个线程如今同时去获取锁,A首先获取到锁后一直不释放,BC加入队列。那么对于AQS的同步队列结构是如何变化的呢?
一、A直接获取到锁:
代码执行路径:
(ReetranLock.lock()-> compareAndSetState(0, 1) -> setExclusiveOwnerThread(Thread.currentThread())
此时AQS结构尚未初始化:
二、B尝试获取锁:
由于A存在把state设置为1,因此B获取锁失败,进行入队操做加入同步队列,入队时发现AQS尚未初始化(AQS中的tail为null),会在入队前初始化代码在enq方法的死循环中:
初始化以后改变tail的prev指向,把本身加到队尾:
接着会执行acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}复制代码
第一次执行:发现本身前序节点是head节点,因而乎再次尝试获取锁,获取失败后再shouldParkAfterFailedAcquire方法中把前序节点设置为Singal状态
第二次执行:再次尝试获取锁,但由于前序节点是Signal状态了,因此执行parkAndCheckInterrupt把本身休眠起来进行自旋
三、C尝试获取锁:
C获取锁和B彻底同样,不一样的是它的前序节点是B,因此它并不会一直尝试获取锁,只会呆在B后面park住
AQS经过最简单的CAS和LockSupport的park,设计出了高效的队列模型和机制:
一、AQS结构实际上是在第二个线程获取锁的时候再初始化的,就是lazy-Init的思想,最大程度减小没必要要的代码执行的开销
二、为了最大程度上提高效率,尽可能避免线程间的通信,采用了双向链表的Node结构去存储线程
三、为了最大程度上避免CPU上下文切换执行的消耗,在设计排队线程时,只有头结点的下一个的线程在一直重复执行获取锁,队列后面的线程会经过LockSupport进行休眠。
上代码:
//ReentrantLock类的unlock
public void unlock() {
sync.release(1);
}
//AQS类的release()方法
public final boolean release(int arg) {
//尝试释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒后继结点的线程
unparkSuccessor(h);
return true;
}
return false;
}
//ReentrantLock类中的内部类Sync实现的tryRelease(int releases)
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//判断状态是否为0,若是是则说明已释放同步状态
if (c == 0) {
free = true;
//设置Owner为null
setExclusiveOwnerThread(null);
}
//设置更新同步状态
setState(c);
return free;
}
复制代码
一句话总结:释放锁首先就是把volatile类型的变量state减1。state从1变成0.
unparkSuccessor(h)的做用的唤醒后续的节点:
private void unparkSuccessor(Node node) {
//这里,node是head节点。
int ws = node.waitStatus;
if (ws < 0)//置零当前线程所在的结点状态,容许失败。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一个须要唤醒的结点s
if (s == null || s.waitStatus > 0) {//若是为空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)//从这里能够看出,<=0的结点,都是还有效的结点。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}
复制代码
从代码执行操做来看,这里主要做用是用unpark()唤醒同步队列中最前边未放弃线程(也就是状态为CANCELLED的线程结点s)。此时,回忆前面分析进入自旋的函数acquireQueued(),s结点的线程被唤醒后,会进入acquireQueued()函数的if (p == head && tryAcquire(arg))的判断,而后s把本身设置成head结点,表示本身已经获取到资源了,最终acquire()也返回了,这就是独占锁释放的过程。
回到以前的图:A B C三个线程获取锁,A已经获取到了锁,BC在队列里面,此时A释放锁
执行b.unpark,B被唤醒后继续执行
if (p == head && tryAcquire(arg))
复制代码
由于B的前序节点是head,因此会执行tryAcquire方法尝试获取锁,获取到锁以后执行setHead方法把本身设置为头节点,而且把以前的头结点也就是上图中的new Node()设置为null以便于GC:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//获取到锁以后将当前node设置为头结点 head指向当前节点node
setHead(node);
//p.next就是以前的头结点,它没有用了,因此把它gc掉
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}复制代码
总之,AQS内部有一个同步队列,线程获取同步状态失败以后会被封装成node经过park进行自旋,而在释放同步状态时,经过unpark进行唤醒后面一个线程,让后面线程得以继续获取锁。
了解完ReetrantLock中非公平锁的实现后,咱们再来看看公平锁。与非公平锁不一样的是,在获取锁的时,公平锁的获取顺序是彻底遵循时间上的FIFO规则,也就是说先请求的线程必定会先获取锁,后来的线程确定须要排队。
下面比较一下公平锁和非公平锁lock方法:
再比较一下公平锁和非公平锁lock方法:tryAcquire方法:
惟一的差异就是hasQueuedPredecessors()
判断同步队列是否存在结点,这就是非公平锁与公平锁最大的区别,即公平锁在线程请求到来时先会判断同步队列是否存在结点,若是存在先执行同步队列中的结点线程,当前线程将封装成node加入同步队列等待。而非公平锁呢,当线程请求到来时,无论同步队列是否存在线程结点,直接上去尝试获取同步状态,获取成功直接访问共享资源,但请注意在绝大多数状况下,非公平锁才是咱们理想的选择,毕竟从效率上来讲非公平锁老是胜于公平锁。
以上即是ReentrantLock的内部实现原理,这里咱们简单进行小结,重入锁ReentrantLock,是一个基于AQS并发框架的并发控制类,其内部实现了3个类,分别是Sync、NoFairSync以及FairSync类,其中Sync继承自AQS,实现了释放锁的模板方法tryRelease(int),而NoFairSync和FairSync都继承自Sync,实现各类获取锁的方法tryAcquire(int)。
ReentrantLock的全部方法实现几乎都间接调用了这3个类,所以当咱们在使用ReentrantLock时,大部分使用都是在间接调用AQS同步器中的方法。
AQS在设计时将性能优化到了极致,具体体如今同步队列的park和unpark,初始化AQS时的懒加载,以及线程之间经过Node这样的数据结构从而避免线程间通信形成的额外开销,这种由释放锁的线程主动唤醒后续线程的方式也是咱们再实际过程当中能够借鉴的。
AQS还不止于同步队列,接下来咱们会继续探讨AQS的条件队列