独占锁 | 共享锁 |
---|---|
tryAcquire(int arg) | tryAcquireShared(int arg) |
tryAcquireNanos(int arg, long nanosTimeout) | tryAcquireSharedNanos(int arg, long nanosTimeout) |
acquire(int arg) | acquireShared(int arg) |
acquireQueued(final Node node, int arg) | doAcquireShared |
acquireInterruptibly(int arg) | acquireSharedInterruptibly(int arg) |
doAcquireNanos(int arg, long nanosTimeout) | doAcquireSharedNanos(int arg, long nanosTimeout) |
release(int arg) | releaseShared(int arg) |
tryRelease(int arg) | tryReleaseShared(int arg) |
- | doReleaseShared() |
由表格咱们知道,除了最后一个doReleased()
是共享锁独有以外,其余的方法独占锁和共享锁基本都是一一对应的。html
在独占锁中,release
方法中尝试释放锁,若是成功就unparkSuccessor(h)
java
在共享锁中,releaseShared
方法中尝试释放锁,若是成功就doReleaseShared()
node
因此通常来讲unparkSuccessor(h)
和doReleaseShared()
通常是互相对应的,可是doReleaseShare()
要执行的逻辑比前者多。这点咱们到后面再看!app
这里提一下:ide
咱们以前说过函数
在独占模式中,当一个线程获取锁以后,只有释放锁以后才会去唤醒下一个线程。oop
在共享模式中,若是一个线程成功获取了共享锁,他就会立刻唤醒下一个等待的线程,并不须要该拥有锁的线程释放锁才唤醒下一个线程,由于共享锁的设定就是为了让多个线程同时拥有所资源的!这里有两种状况会唤醒后面等待的线程 1. 当前线程成功拥有锁 , 2. 拥有锁的线程释放锁 . 但愿读者记住这两点,这对于后面咱们分析源码的时候有指导性的做用。源码分析
前面咱们先说了独占锁和共享锁的区别,是为了让读者更好地去区分它们。下面咱们就经过分析CountDownLatch的源码,去体会AQS中共享模式是如何工做的。post
CountDownLatch类是典型的AQS的共享模式使用,而且是一个高频使用的类。学习
关于CountDownLatch的用法,咱们就不一一在这里赘述。若是有不了解的读者,能够先去百度一下再往下看源码分析,这样效果会更佳。
这里推荐一篇关于怎么用的博客,我的看的时候以为比较通俗易懂
须要传入一个不小于0的数,其实也就是设置AQS的共享资源
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
//----------Sync是CountDownLatch的内部类
/** 咱们这里说一下,咱们看到Sync就是继承了AQS这个抽象类,而后重写了tryAcquireShared和 tryReleaseShared的方法,也就是本身加了个getCount()方法,因此后面看到Sync类调用的方法,除了重写那两个,其余的基本都是父类的方法,读者要时刻记住这一点! **/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count); //AQS
}
int getCount() {
return getState();
}
//重写AQS的方法
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//重写AQS的方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
复制代码
咱们能够看到,在构造方法里面咱们就设置了AQS的state值。
全部调用了await()
方法的线程就会等待挂起,而后另外的线程就会执行state = state -1 的操做。
当state的值为0的时候,那么将state减为0的线程就会立刻唤醒以前调用了await()
的线程,而后这些线程就会去获取共享锁啦!
在咱们知道CountDownLatch的用法以后,咱们须要注意它的两个方法,一个是CountDown()
,另一个是await()
方法
countDown()
每次会把state的值减1,直到state的值变为0,唤醒调用await()
的线程
await()
顾名思义,调用这个方法的线程会阻塞,直到被唤醒。
上面说的但愿读者能够认真理解一下,理解到位了看下面的源码才不会半知半懂
咱们经过下面的源码,来分析countDown
和await
的源码
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(2);
//t1和t2负责countDown,也就是将state减1
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
}
},"t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
}
},"t2");
t1.start();
t2.start();
// t3 和 t4负责await,等待state为0而后被唤醒
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
try {
latch.await();
System.out.println(Thread.currentThread().getName() + "从await回来了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t3");
Thread t4 = new Thread(new Runnable() {
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "从await回来了");
}
},"t4");
t3.start();
t4.start();
}
复制代码
结果(t3和t4不是按照绝对的顺序输出的,后面的分析咱们按照t3先入队)
下面咱们就按照步骤来
latch先await()阻塞,等待被唤醒(等待state变为0),而后await()返回去作其余事情!
public void await() throws InterruptedException {
//调用sync的方法,关于Sync咱们前面已经说过,这里就很少赘述了
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) //AQS throws InterruptedException {
//这个方法是响应中断的有中断就抛出异常呗
if (Thread.interrupted())
throw new InterruptedException();
//t3和t4到这里的时候,确定是小于0的(此时state = 2)
//因此先看看tryAcquireShared方法
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) { //重写AQS的方法
//只有state>0的时候才会返回1
return (getState() == 0) ? 1 : -1;
}
//从这个方法名咱们知道,这个方法是获取共享锁而且是响应中断的
private void doAcquireSharedInterruptibly(int arg) //AQS throws InterruptedException {
//步骤1. 将当前节点设置为共享模式,而后加入阻塞队列!
//关于这些知识,在第一篇和第二篇文章就说过啦,不懂的回去看看吧!
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//这样也是同样,只要state > 0,就返回-1,(此时state = 2)
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//步骤2. 由于是刚开始,因此不会进到前面那个if语句,因此会直接来到这
//这里咱们都很熟悉了,就是在阻塞队列找到一个地方让这个线程能够挂起!
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
咱们把上面的步骤用图来描述一下你们就清楚啦!
t3通过步骤1以后,也就是入队以后,会变成以下图所示
而后tryAcquiredShared
会返回-1,那么就执行shouldParkAfterFailedAcquire
就会将t3的pre指向的节点的WaitStatus(下面统称为ws)置为-1,以下图所示。(t3的ws为-1)
执行完shouldParkAfterFailedAcquire
以后执行parkAndCheckInterrupt
就会把t3挂起啦!
而后后面t4进来的时候和t3是同样的步骤,到后面就是t4加入阻塞队列,而后把t3的ws置为-1,结果以下图所示。(t4的ws为-1)
接着,t4就就被挂起啦,如今t3和t4就已经全挂起了,就等待着被唤醒啦!
这里说明一下,由于能够是由多个线程调用countDown
的,那么有些线程调用的时候state还不是0,因此这些线程不会唤醒await
的线程,只有当某个线程调用了countDown
方法,而后state从1变为0了,那么这个线程就唤醒其余await
的线程!明白这点很重要。
接下来咱们继续看countDown
方法
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) { // AQS
//只有将state设置为0的时候才会返回true,不然只是每次将state-1而已
if (tryReleaseShared(arg)) {
//到这里说明已经成功把state置为0了,那么就开始唤醒如今还在等待的线程啦!
doReleaseShared();
return true;
}
//state原本就是0的话,确定释放不了,就返回false啦
return false;
}
protected boolean tryReleaseShared(int releases) {// 重写AQS
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
//每次都是CAS将state-1
if (compareAndSetState(c, nextc))
//若是state减1成功变为0的话,确定就返回true啦!
return nextc == 0;
}
}
复制代码
下面咱们来着重分析下doReleaseShared()
这个方法
//此时state为0,先把流程走一遍先,先看注释便可,其余先不看
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//t3入队的时候,已经把head节点设置为-1啦,因此会到if语句里面
if (ws == Node.SIGNAL) {
//将head设置为0,也就是恢复到初始状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//到这里就是唤醒head节点的下一个节点,这时候也就是唤醒t3
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
复制代码
在t3被唤醒以后,咱们回到t3被唤醒的地方和接下来要作的事情
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {//此时head就是t3的前缀啦,因此能够进来这里
int r = tryAcquireShared(arg);
//这里r > 0啦,由于以前说过state == 0的时候才会返回1
if (r >= 0) {
//那么此时t3就走到这一步了,下面咱们看下这个方法
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//步骤1:t3被唤醒以后就会从这里继续执行啦,假设没有中断的状况,那么是不会进到if语句里面的啦
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
此时t3会进入setHeadAndPropagate这个方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); //步骤1:t3先把本身设置为头结点先
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//基本都会执行这个函数,那么这里也就说明在共享模式中,当节点被唤醒
//的时候都会执行这个函数,也就是会唤醒下一个节点
//那么这里就是说t3把本身设置为头部以后,就会立刻去唤醒t4去抢锁啦!
doReleaseShared();
}
}
复制代码
那么在这里,咱们就好好分析一下doReleaseShared()
这个方法了,因此在这里咱们知道,t3已是head节点了
//调用这个方法的时候咱们已经知道此事state为0了
private void doReleaseShared() {
for (;;) {
Node h = head;
/** h == null 说明阻塞队列已经为空 h == tail 说明阻塞队列刚被初始化的头结点 还有一种状况就是 以前是普通的节点,可是此节点已是头结点,那么就是说该节点是已经被唤醒了, 后面阻塞队列已经没有节点了 因此上面两种状况都不须要被唤醒 */
if (h != null && h != tail) {
int ws = h.waitStatus;
//ws == -1,由于以前t4已经把t3设置为-1(SIGNAL)啦
if (ws == Node.SIGNAL) {
//问题1:这里若是将节点设置为初始化状态失败的缘由待会解释!
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//到达这里,就说明t3的ws已经设置为0,而后t3就去唤醒t4啦!
//那么t4接下来被唤醒,就像t3被唤醒后作的事情差很少是如出一辙,
//因此想分析t4接下来要干什么,看上面t3被唤醒以后作了什么就行了
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
//问题2:
//在咱们举的例子中,t3是确定不会来到这里的,这里主要判断的是
//若是有节点加入进来,可是这个节点准备到这里的时候把它的前驱节点
//就设置为-1了,关于这里,咱们下面说明解释。
continue; // loop on failed CAS
}
//此时h指向的仍是t3那个节点,可是head节点可能已经变了
//若是是同一个节点,退出循环,让被唤醒的线程去唤醒后面的线程。
if (h == head) // loop if head changed
break;
}
}
复制代码
下面咱们来看一张图,就能够解释注释的内容了(但愿你们能够好好看看这张图,方便理解后面的解释)
接着咱们来解释一些内容
问题1:
就是第一个CAS为何会失败的,咱们看到'唤醒cur的next节点'的流程,好比说t3唤醒t4以后,t3和t4此时就同时进行执行各自的代码了(这里咱们把t3和cur对应,t4和next对应),因此在next被唤醒以后,就会把next节点设置为head节点,咱们以前也说过,在setHeadAndPropagate
里面有doReleased
方法,因此会有下面的状况
cur节点唤醒next节点以后,cur因为某种缘由阻塞了,next节点成功把本身设置为head节点,执行后面的操做,可是还没到第一个CAS.此时cur又继续往下走,判断此时的h已经不是执行head节点了,那么就会指向新的head,也就是此时的next节点.而后cur也继续往下走.因此如今cur和next这两个节点都是同时执行doReleased
方法的,因此也会有可能同时到达第一个CAS,因此此时确定只有一个线程CAS操做成功啦,另一个不成功的就根据状况往下走了!这里举几个例子。
好比说有t3->t4->t5->t6>......tn
(1)把t3和t4分别对应前面说的那个cur和next,那么此时假设是t3唤醒t5,但t5尚未把本身设置为head节点,那么head节点仍然t4。又由于t4以前CAS是失败的,因此会自旋一次,而后执行最后一个if语句的时候,会直接退出啦!t3执行最后一个if的时候,因为t5尚未把本身设置为head,因此此时t3也退出啦。那么就说明t5之后的节点没人唤醒了吗?非也,由于t5成功设置本身为头结点的话,也会执行doReleased()
方法啦!就由t5和它的后继去唤醒吧!
在(1)的中,若是t3唤醒t5以后,t5也成功设置本身为头结点了,那么t5此时会可能会唤醒后面的,这里先不考虑。而后t3和t4又指向新的head节点啦。这种状况又开始循环啦。
这里说明一下,t3和t4之中只能有一个CAS成功,另一个就continue再次自旋啦,因此是t4唤醒t5,那么t3就像(1)说的t4同样啦。
咱们看到在,在setHeadAndPropagate
里面有doReleased
方法,让后面的节点能够更快地唤醒,增大唤醒的吞吐量,不得不说设计者的厉害之处啊!
问题2:
在解释问题2以前,咱们经过下面的步骤慢慢分析
假设这是某一状况阻塞队列的状况
await
的节点可能在addWaiter
以前阻塞了,这时候可能又不阻塞了,而后执行了addWaiter方法
,可是没有执行shouldParkAfterFailedAcquire
,也就是t3没有把t2的ws设置为SIGNAL,因此就可能在执行if (h != null && h != tail)
以前会有以下图所示在doReleased
函数中,咱们已经把ws赋值为t3的ws,也就是ws为0,那么在2的前提下,咱们来到这段代码else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
,咱们很容易知道,ws为0,那么此次CAS有可能失败,为何呢?在2 咱们说过shouldParkAfterFailedAcquire
可能还没执行,那么也有可能CAS以前,就已经执行了这个函数,也就是此时t3的ws为-1了,那么此时确定CAS失败啦!若是失败的话,就让t3再回去检查下状态,看看是否能够唤醒后继节点啦,因此在else if
以后就是让它continue啦!
若是CAS成功,那么就是说shouldParkAfterFailedAcquire
这个函数尚未执行,因此在执行最后一个if语句的时候t3可能退出啦,那么t4继续执行shouldParkAfterFailedAcquire
,CAS把t3设置为-1以后,就去执行到setHeadAndPropagate
这个方法,那么也会执行doRelease
,t4的使命就结束了!
通过上面4点的说明,咱们对问题2的分析就已经很清楚啦!
扩展
在解决问题2的时候,咱们若是把else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
代码去掉的话,会有什么影响么?分析一下!
假如没有这段代码,假设此时阻塞队列以下所示
假设这是t1唤醒t2,但t2还不是head节点,那么在t4入队的时候,由于咱们假设没有那个else if
语句嘛,因此会执行shouldParkAfterFailedAcquire
,那么将t2的wsCAS为-1,此时head可能已经指向t2了,前面咱们也说过,在共享模式中,只要一个节点成为头结点,就会执行doReleased
方法,因此在t2设置为head节点的时候,可能在t4休眠以前就被t2park了,可是这个操做时能够容许的,当咱们unpark
一个并无被park
的线程时,该线程在下一次调用park
方法时就不会被挂起,而这一行为是符合咱们的场景的——由于当前的共享锁处于可获取的状态,后继的线程应该直接来获取锁,不该该被挂起。
下面咱们来讲下CyclicBarrier,和CountDownLatch差很少,CyclicBarrier 基于 Condition 来实现。因此若是没了解过Condition的话,建议你们看下前一篇文章初步了解AQS是什么(二)
啥都不说,开局一张图,(引用别人大佬画的图)
下面咱们开始分析咯,依然await
是最重要的方法
咱们先看下一些细节
public class CyclicBarrier {
//CyclicBarrier 是能够重复使用的,每次从开始使用到穿过栅栏当作"一代",或者"一个周期"
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
//这里看出是基于Condition的,因此等待线程的条件就是全部线程都在栅栏上await啦
private final Condition trip = lock.newCondition();
//参与跨过线程的线程数
private final int parties;
/* 若是设置这个,那么跨过栅栏以前须要执行的操做*/
private final Runnable barrierCommand;
/** 当前所处的代 */
private Generation generation = new Generation();
复制代码
下面咱们看看如可开启新的一代吧
private void nextGeneration() {
// 须要唤醒全部等待在栅栏上的线程
trip.signalAll();
count = parties;
//开启新的一代!
generation = new Generation();
}
复制代码
基本能够看出,咱们开启新的一点,差很少和重写生成一个CyclicBarrier差很少
咱们接下来看下如何打破一个栅栏
private void breakBarrier() {
//设置标志位
generation.broken = true;
//从新设置count的值
count = parties;
//唤醒全部在等待的线程
trip.signalAll();
}
复制代码
如今已经有了铺垫了,那么咱们来看下await
方法吧
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
//这里咱们先得到锁呗,在finally后面再释放锁
//由于在Condition中,await的线程须要先得到锁的啦!
lock.lock();
try {
final Generation g = generation;
//先检查栅栏是否有被打破,被打破就抛出异常呗
if (g.broken)
throw new BrokenBarrierException();
//检查中断,有则抛出
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//index 就是这个返回值
//index 存储当前还有多少count
int index = --count;
//若是index 为 0,那么就说明全部线程在栅栏啦,那么就准备打破而后下一代啦
if (index == 0) { // tripped
//用来标记barrierCommand在执行的时候有没有发生异常
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
//执行barrierCommand的操做
command.run();
ranAction = true;
//开启下一代啦,这里建议看下源码!
nextGeneration();
return 0;
} finally {
//若是barrierCommand执行的时候发生过异常,那就打破栅栏呗!
//这里也建议看下源码!
if (!ranAction)
breakBarrier();
}
}
//若是到达这里,说明上面的index不为0,救是说不是最后一个线程到达栅栏的
for (;;) {
try {
//这里是没有超时机制的
//到这里也就是说到达栅栏了,那么就等待呗,因此就调用Condition的
//await,等待最后一个线程,而后被唤醒(Condition的signal)
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//若是到达这里,就说明线程在await的时候被中断了
if (g == generation && ! g.broken) {
//到这里就打破栅栏呗
breakBarrier();
//打破栅栏以后,抛出异常信息,让外层本身去处理
throw ie;
} else {
//这里就是说g != genneration,就是说前面已经开启一个新的
//时代啦,说明最后一个线程await完成,全部线程就被唤醒啦!
//可是这个中断已经没有意义啦,因此记录下就好!
Thread.currentThread().interrupt();
}
}
//若是栅栏被打破,这里的被打破就是以前执行barrierCommand的时候
//发生异常,那么被就抛出异常啦
if (g.broken)
throw new BrokenBarrierException();
//到这里基本都要退出啦
//由于以前barrierCommand在执行完任务以后,就会 nextGeneration
//开启一个新的时代,而后释放锁,等待的线程都是await到这里的,因此肯
//定await以前的时代和被唤醒后的时代不同啦!
//若是以前的栅栏破了或者await的时候被唤醒了,都是在前面就抛出异常
//都会直接返回啦!
if (g != generation)
return index;
//这里是超时的操做了,就不作分析了
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
复制代码
上面的await方法咱们已经说得已经很清楚啦,咱们接下来看看如何获取在栅栏等待的线程数
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//也就是运算一下就好啦!
return parties - count;
} finally {
lock.unlock();
}
}
复制代码
检查栅栏有没有被打破,其实也就是看看那个标志位有没有被设置就好啦!
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
复制代码
下面咱们来总结一下何时栅栏会被打破吧!
await
的线程被中断,那么就会打破栅栏,而后抛出InterruptedException的异常啦咱们再来看下重置栅栏的源码
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
复制代码
跟简单,就是打破栅栏,而后重建栅栏!也就是不破不立!
咱们来假设一个状况,加入parties设置为4,此时有3个线程await了,那么在第4个await以前重置
那么首先打破栅栏嘛,那么就会跑出BrokenBarrierException 异常,而后开启新的一代,再重置CyclicBarrier 的count和generation,一切从0开始!
关于AQS的核心功能的源码分析就到此结束啦!继续征战下一个知识!