以前咱们已经讲解过关于AQS的独占锁,这一章节主要讲解AQS的共享锁,以Semaphore
信号量来进行讲解,相信经过看了本章节内容的同窗能够对AQS的共享模式有一个了解,Semaphore
信号量提供了用于控制资源同时被访问的个数,也就是它会维护一个许可证,访问资源以前须要申请许可证,申请许可证成功后才能够进行访问,若是申请访问资源获取的了许可证,则能够进行资源访问,同时颁发许可证中心的许可证会进行增长,等到访问资源的线程释放资源后,许可证使用状况会进行减小。java
public class SemaphoreDemo {
private static final Semaphore semaphore = new Semaphore(3);
private static final AtomicInteger atomicInteger = new AtomicInteger();
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "开始执行");
Thread.sleep(100);
System.out.println(Thread.currentThread().getName() + "执行完毕");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
复制代码
运行结果以下:node
pool-1-thread-2开始执行
pool-1-thread-3开始执行
pool-1-thread-1开始执行
pool-1-thread-4开始执行
pool-1-thread-6开始执行
pool-1-thread-5开始执行
pool-1-thread-7开始执行
pool-1-thread-9开始执行
pool-1-thread-8开始执行
pool-1-thread-10开始执行
复制代码
经过一个例子来拨开Semaphore
的面纱,上面咱们定义了信号量为3个,也就是同时能够得到锁的线程只有3个,经过调用semaphore.acquire();
申请信号量,若是申请成功则执行下面的逻辑,在后面经过semaphore.release();
释放掉信号量的占用,也就是说经过semaphore.acquice
从信号量中获取一个许可,若是许可经过则执行下面的语句,若是没有许可可发放则等待,而后经过semaphore.release();
归还一个许可,上面例子中的输出内容也可清晰的看到同事只有两个线程可以访问资源。并发
借助图示来讲明一下:ide
首先初始化的时候下面图示内容是放置了3个许可证,以下图所示: 函数
release
方法后会将许可归还给信号量中,以下图所示:
Semaphore`主要方法有如下内容:oop
方法名 | 描述 |
---|---|
acquire() | 尝试得到一个准入许可,如没法得到,则线程等待,直到有线程释放一个许可或当线程被中断。 |
acquire(int permits) | 尝试得到permits个准入许可,如没法得到,则线程等待,直到有线程释放permits个许可或当线程被中断。 |
acquireUninterruptibly() | 尝试得到一个准入许可,如没法得到,则线程等待,直到有线程释放一个许可,可是不响应中断请求 |
acquireUninterruptibly(int permits) | 尝试得到permits个准入许可,如没法得到,则线程等待,直到有线程释放permits个许可,可是不响应中断请求 |
release() | 用于在线程访问资源结束后,释放一个许可,以使其余等待许可的线程能够进行资源访问。 |
release(int permits) | 用于在线程访问资源结束后,释放permits个许可,以使其余等待许可的线程能够进行资源访问。 |
tryAcquire() | 尝试得到一个许可,若是得到许可成功返回true,若是失败则返回fasle,它不会等待,当即返回 |
tryAcquire(int permits) | 尝试得到permits个许可,若是得到许可成功返回true,若是失败则返回fasle,它不会等待,当即返回 |
tryAcquire(int permits, long timeout, TimeUnit unit) | 尝试在指定时间内得到permits个许可,若是在指定时间内没有得到许可则则返回false,反之返回true |
tryAcquire(long timeout, TimeUnit unit) | 尝试在指定时间内得到一个许可,若是在指定时间内没有得到许可则则返回false,反之返回true |
availablePermits(): | 当前可用的许可数 |
经过上面方法的大体介绍,Semaphore
提供了对信号量获取的操做,获取的过程当中有等待操做,也有当即返回的方法,有的响应中断有的又不响应中断,下面会以一些简单的例子,进行分析一下源码内容,针对下面的例子来进行分析:源码分析
public class SemaphoreDemo {
private static final Semaphore semaphore = new Semaphore(1);
private static final AtomicInteger atomicInteger = new AtomicInteger();
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 2; i++) {
Thread.sleep(100);
executorService.execute(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "开始执行");
Thread.sleep(10);
System.out.println(Thread.currentThread().getName() + "执行完毕");
// semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
复制代码
针对上面的例子,咱们先初始化了1个信号量,线程池提交了2个任务,这时候你们也想到了运行结果,运行结果就是只有1一个线程可以执行完毕,其他的线程都须要等到操做,由于信号量被消耗了,下面是输出结果:ui
pool-1-thread-1开始执行
pool-1-thread-1执行完毕
复制代码
咱们这里特别在提交到线程池任务的时候睡眠了一会,其实想要达到的目的是可以让每一个线程执行按照顺序排下去,否则可能顺序就不定了,固然也没有太大影响,这里只是为了方便分析,当第一个线程提交任务到线程池时,它会先通过semaphore.acquire()
方法来进行得到一个许可操做,下面咱们来看一下源码:this
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
复制代码
咱们能够看到它调用了sync.acquireSharedInterruptibly(1)
方法,这个snyc
实际上是Semaphore
内部类Sync
的实例对象,那么问题来了,这个sync
变量是何时初始化的呢?其实当咱们初始化Semaphore
,就已经将sync
变量初始化了,接下来咱们看一下Semaphore
构造函数:atom
// 非公平模式
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// fair=true为公平模式,false=非公平模式
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
复制代码
/** * 非公平模式 */
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
//实现AQS的tryAcquireShared方法,尝试得到锁。
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/** * 公平模式 */
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
//实现AQS的tryAcquireShared方法,尝试得到锁。
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
复制代码
咱们能够发现Semaphore
提供了两种模式的锁机制,一种是公平模式,一种是非公平模式,公平模式其实就是若是发现了有线程在排队等待,则自觉到后面去排队,而非公平模式则不同,它无论你有没有在排队的线程,谁先抢到是谁的,说到这里咱们发现上例子中当声明Semaphore
时,其实默认使用了非公平模式NonfairSync
,指定了信号量数量为1个,其实它内部Sync
中调用了AQS
的setState
方法,设置同步器状态state
为1,详细以下图所示:
接下来咱们在回到acquire
方法中,它调用了sync.acquireSharedInterruptibly(1);
,细心地朋友会发现NonfairSync
和父类Sync
中并无该方法,其实该方法是AQS
提供的方法,接下来咱们看一下这个方法到底作了什么?源码内容以下所示:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) //若是线程被中断则抛出异常。
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //尝试获取信号量,若是得到信号量小于0,则表明获取失败则运行下面的语句。
doAcquireSharedInterruptibly(arg); //将当前线程节点加入到队列中,等到其余线程释放信号量。
}
复制代码
doAcquireSharedInterruptibly
方法,将当前线程挂起等待其余线程释放信号量。接下来咱们看一下tryAcquireShared
方法实现,tryAcquireShared
这个方法是AQS
提供给子类实现的方法,它自身并无实现,只是抛出了异常,实现它的类必然是Semaphore
的Sync
类,咱们发现实现该方法有两个,包括非公平模式的NonfairSync
,另一个是公平模式下的FairSync
,因为咱们上例子中采用的是非公平模式,咱们看一下非公平模式下的tryAcquireShared
实现逻辑:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
复制代码
它(NonfairSync
)内部调用了父类Sync
的nonfairTryAcquireShared
方法,继续刨根问底看一下这个方法:
final int nonfairTryAcquireShared(int acquires) {
//这里上来就是个死循环
for (;;) {
//获取可用的信号量数量。
int available = getState();
//剩余线程数,其实就是当前可用信号量数量-申请的信号量数量
int remaining = available - acquires;
//1. 若是剩余信号量数量小于0,表明没有信号量可用
//2. 修改state成功,则表明申请信号量成功。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
复制代码
若是读过前面ReentrantLock
源码的朋友会发现,它对state
是增长的,也就是若是state
值被设置上了值,则表明已经有线程得到了锁,其余线程不容许获取当前锁,若是是当前线程从新得到锁,则state
值会增长,这也是重入锁的关键点,而Semaphore
与之不一样点在于,它是对state
同步器状态进行减小操做,换句话说先初始化若干信号量,若是得到信号量时,剩余信号量小于0,则表明没有可用的信号量,则直接返回,若是得到信号量成功则对state
值进行修改,回到上面的例子中,咱们刚分析道其中第一个线程,第一个线程得到到了信号量,此时剩余信号量为0,它会将state
值设置为0,设置以后回到了acquireSharedInterruptibly
的if (tryAcquireShared(arg) < 0)
语句中,if语句为true,不进入到if语句内部,此时AQS的状况以下图所示:
state
已经变成0了,当它执行到
tryAcquireShared
去获取信号量时,可用信号量为0个,当可用信号量减去申请的信号量个数1时,此时剩余信号量变成了-1,因此这时候if语句的条件
remaining < 0
是知足的,进入到if语句中,返回的是剩余信号量-1,此时会跳转到调用地方,也就是AQS的
acquireSharedInterruptibly
方法中,这时候发现if语句中(
tryAcquireShared(arg) < 0
)返回结果是-1,会进入到if语句内部执行
doAcquireSharedInterruptibly
方法,这个方法主要操做是将当前线程放入到等待队列中,等到其余线程释放信号量,接下来慢慢剖析一下内部源码
AQS->doAcquireSharedInterruptibly
:
/** * Acquires in shared interruptible mode. * @param arg the acquire argument */
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //将节点添加到队列中
boolean failed = true; //失败标志位,若是出现中断,或者异常抛出时会进行取消操做
try {
for (;;) {
final Node p = node.predecessor(); //获取当前节点的前节点
if (p == head) { //若是当前节点的前节点为头节点
int r = tryAcquireShared(arg); //尝试得到锁
if (r >= 0) { //若是能够得到信号量
setHeadAndPropagate(node, r); //设置当前节点为头节点
p.next = null; // help GC //帮助GC回收
failed = false; //设置失败为false,也就是正常获取
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
这时候当咱们的程序运行到addWaiter
,来看一下这个时候队列的状况:
Ref-405
是头节点,而后再将当前线程的节点指向头节点也就是上图中所示内容,由于这里
addWaiter
内部代码和以前分析的AQS的独占锁(也就是
ReentrantLock
源码)的时候已经分析过了,这里就不在赘述了,将线程加入到等待队列中以后,接下来进入到for死循环中去,首先上来获取当前节点的头节点,也就是上图的
Ref-405
,而后判断是否是头节点,这里面的内容其实就是再去尝试争抢一下信号量,看有没有信号量释放,若是返回的信号量剩余个数大于等于0,则表明争抢信号量成功,须要对节点进行处理,可是咱们这个例子中,当进行
tryAcquireShared
时,返回的值是-1,因此获取信号量失败,不会进入到下面内容,可是咱们在这里先进行分析分析一下这个方法
setHeadAndPropagate
,为后面埋下伏笔:
//从方法中也能够看出来大体意思是设置头节点,而且根据条件是否唤醒后继节点。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录一下原来的head头节点。
setHead(node); // 设置新的节点设置为头节点。
if (propagate > 0 || //若是有信号量
h == null || //头节点为空状况下
h.waitStatus < 0 || //若是原来的头结点的状态是负数,这里指的是SIGNAL和PROPAGATE两个状态
(h = head) == null || // 从新阅读头节点防止额外的竞争。
h.waitStatus < 0) { //若是原来的头结点的状态是负数,这里指的是SIGNAL和PROPAGATE两个状态
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
复制代码
当他没有进入到setHeadAndPropagate方法,它会走下面的步骤:
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
复制代码
shouldParkAfterFailedAcquire
将头节点修改成SIGNAL
,parkAndCheckInterrupt
将线程进行阻塞,运行到这里是线程被挂起,等待其余线程唤醒,此时队列状态以下所示:
当咱们调用semaphore.release()
进行释放信号量时,它其实调用的是AbstractQueuedSynchronizer
中的releaseShared(int arg)
方法,咱们来看一下源码内容:
public void release() {
sync.releaseShared(1);
}
复制代码
接下来分析一下AQS中的releaseShared
方法:
public final boolean releaseShared(int arg) {
// 调用Semaphore实现的tryReleaseShared方法。
if (tryReleaseShared(arg)) {
// 唤醒后记节点
doReleaseShared();
return true;
}
return false;
}
复制代码
先进尝试释放信号量,若是信号量释放成功,则进行调用doReleaseShared来进行唤醒等待的节点,告知队列中等待的节点已经有信号量了能够进行获取了。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取当前的state值
int current = getState();
// 将当前的state值添加releases个信号量
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// cas修改state值
if (compareAndSetState(current, next))
return true;
}
}
复制代码
其实最主要的方法是方法是doReleaseShared
方法,咱们来看一下源码:
/** * 唤醒队列中的节点,以及修改头结点的waitStatus状态为PROPAGATE * 1. 若是头节点等待状态为SIGNAL,则将头节点状态设为0,并唤醒后继节点 * 2. 若是头节点等待状态为0,则将头节点状态设为PROPAGATE,保证唤醒可以正常传播下去。 */
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 若是头结点的状态为SIGNAL则进行唤醒操做。
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 若是头节点状态为0,则将头节点状态修改成PROPAGATE,至于为何会变成0,为何要有PROPAGATE?,请看下文。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
复制代码
这里可能你们会有一个疑问,为何不是直接propagate > 0,而后就直接唤醒下一个节点呢?这里我要引用一下以前的版本中的一个bug来讲明一下: BUG-6801020
根据BUG中的描述影响的版本号是 JDK 6u11,6u17 两个版本,BUG中说起到了复现bug的代码以下所示:
import java.util.concurrent.Semaphore;
public class TestSemaphore {
private static Semaphore sem = new Semaphore(0);
private static class Thread1 extends Thread {
@Override
public void run() {
sem.acquireUninterruptibly();
}
}
private static class Thread2 extends Thread {
@Override
public void run() {
sem.release();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000000; i++) {
Thread t1 = new Thread1();
Thread t2 = new Thread1();
Thread t3 = new Thread2();
Thread t4 = new Thread2();
t1.start();
t2.start();
t3.start();
t4.start();
t1.join();
t2.join();
t3.join();
t4.join();
System.out.println(i);
}
}
}
复制代码
接下来看一下受影响版本号中的setHeadAndPropagate
和releaseShared
两个方法源码,以下:
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
// 这里是区别点,他这里直接是比较的信号量若是存在,而且当前节点的等待状态不等于0,才会去唤醒下一个线程。
if (propagate > 0 && node.waitStatus != 0) {
/* * Don't bother fully figuring out successor. If it * looks null, call unparkSuccessor anyway to be safe. */
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
在JDK JDK 6u11,6u17 版本中发现中是没有PROPAGATE
这种状态的,在后面的版本中引入是为了解决共享模式下并发释放致使的线程hang住问题,上面例子运行一段时间后,偶尔会出现线程hang住的状况。上面例子中初始化了四个线程,信号量初始化时是0个,t1线程和t2线程是获取信号量,t3和t4线程是释放信号量,假设某种状况极端的状况下t1和t2添加到了队列中,以下图所示:
releaseShared
方法,会调用unparkSuccessor
,这个方法是用来通知等待线程,此时head中的waitStatus由-1变成0,而后唤醒线程t1,此时信号量为1。doAcquireSharedInterruptibly
方法里面,当线程唤醒的时候也是从这个方法中进行执行,当t1线程尝试得到信号量时,发现能够得到信号量,tryAcquireShared
返回的是0,由于消耗了一个信号量,而此时当前线程没有进行继续往下操做,而是进行了线程切换,此时线程状态以下:releaseShared
,此时头节点的waitStatus=0,直接返回false,并未调用unparkSuccessor,可是此时信号量变成了1。setHeadAndPropagate
方法的时候,他没有进入到if语句的内部,因此t2线程一直没有被唤醒,致使主线程挂起。jdk1.8中的setHeadAndPropagate
并无直接调用unparkSuccessor
方法,而是修改调用doReleaseShared
方法,咱们来看一下这个方法跟上面bug中有什么区别:
/** * 1.唤醒队列中的节点 * 2.若是队列头节点waitStatus=0,则将当前head头节点修改成PROPAGATE(-3)状态 */
private void doReleaseShared() {
/* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
复制代码
这里咱们回到上面第三步,此时切换到t4线程,t4调用releaseShared
,此时头节点的waitStatus=0,直接返回false,并未调用unparkSuccessor,可是此时信号量变成了1,而且将head头节点的waitStatus状态修改成-3。
13.png
回到上面第四步骤:此时t1线程被唤醒,继续执行将head节点指向了Ref-505
,而且当时的信号量只有1个,他本身消耗了信号量,虽然如今state=1,可是咱们能够看线程切换时,信号量的state=0
,因此线程切换回去以后,它的propagate=0
,调用setHeadAndPropagate
方法的时候,此时head头节点的状态是PROPAGATE(-3)
,会进入到if语句中执行doReleaseShared
方法,此时唤醒线程t2。
因为文章写的时间比较长,中间伴随着找工做,因此耽搁的时间有点长了,若是有错误的地方请指正,我这边及时作更正。写文章不易,读到这里的朋友都是好样的,因为文章篇幅有点长,在释放信号量地方没有图示来进行表示,可是后面大体讲解了释放的流程因此这里就不赘述了。谢谢