以前分析AQS的时候,内部有两种模式,独占模式和共享模式,前面的ReentrantLock都是使用独占模式,而Semaphore一样做为一个基于AQS实现的并发组件,它是基于共享模式实现的,咱们先看看它的使用场景前端
假设有20我的去银行柜面办理业务,银行只有3个柜面,同时只能办理三我的,若是基于这种有限的、咱们须要控制资源的状况,使用Semaphore比较方便:node
public class SemaphoreTest {
//排队总人数
private static final int COUNT =20;
//只有三个柜台
private static final Semaphore AVALIABLECOUNT = new Semaphore(3);
public static void main(String[] args) {
//建立一个线程池
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(COUNT);
BasicThreadFactory.Builder builder = new BasicThreadFactory.Builder().namingPattern("线程池");
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(COUNT, COUNT, 30L, TimeUnit.SECONDS, workQueue,
builder.build());
for (int i = 0; i < COUNT; i++) {
final int count = i;
//排队的人都须要被服务,因此全部的人直接提交线程池处理
threadPoolExecutor.execute(() -> {
try {
//使用acquire获取共享锁
AVALIABLECOUNT.acquire();
System.out.println(Thread.currentThread().getName());
System.out.println("服务号"+count+"正在服务");
Thread.sleep(1000);
}catch (Exception e){
System.out.println(e.getMessage());
}
finally {
//获取完了以后释放资源
AVALIABLECOUNT.release();
}
});
}
threadPoolExecutor.shutdown();
}
}
复制代码
输出以下:咱们执行代码,能够发现每隔1秒几乎同一时间出现3条线程访,以下图 设计模式
在深刻分析Semaphore的内部原理前先看看一张类图结构 安全
一样的,咱们先看看构造方法:bash
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* Creates a {@code Semaphore} with the given number of
* permits and the given fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
* @param fair {@code true} if this semaphore will guarantee
* first-in first-out granting of permits under contention,
* else {@code false}
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
复制代码
咱们经过默认构造函数建立时,诞生的就是非公平锁,接下来咱们看一下构造方法的入参permits的传递:并发
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
//调用父类Sync的nonfairTryAcquireShared
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
复制代码
在Sync中:函数
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
//直接将该值设置为AQS中的state的值
Sync(int permits) {
setState(permits);
}
复制代码
因此Semaphore的入参permit直接传入设置到AQS中的state中。 接下来咱们看看acquire()方法,咱们先通俗的解释一下它的执行过程: 当一个线程请求到来时,state值表明的许可数,那么请求线程将会得到同步状态即对共享资源的访问权,并更新state的值(通常是对state值减1),但若是请求线程过多,state值表明的许可数已减为0,则请求线程将没法获取同步状态,线程将被加入到同步队列并阻塞,直到其余线程释放同步状态(通常是对state值加1)才可能获取对共享资源的访问权。 调用Semaphore的acquire()方法后将会调用到AQS的acquireSharedInterruptibly():oop
//Semaphore的acquire()
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//判断是否被中断
if (Thread.interrupted())
throw new InterruptedException();
//若是tryAcquireShared(arg)不小于0,则说明当前还有permit可被使用
if (tryAcquireShared(arg) < 0)
//若是许可被用完了,没有剩余许可 则加入同步队列等待
doAcquireSharedInterruptibly(arg);
}
复制代码
在acquireSharedInterruptibly()方法内部先进行了线程中断的判断,那么先尝试调用tryAcquireShared(arg)方法获取同步状态,若是此时许可获取成功,那么方法执行结束,若是获取失败,则说明没有剩余许可了,那么调用doAcquireSharedInterruptibly(arg);方法加入同步队列等待。 这里的tryAcquireShared(arg)是个模板方法设计模式,AQS内部没有提供具体实现,由子类实现,也就是有Semaphore内部本身实现,该方法在Semaphore内部非公平锁的实现以下post
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
//remaining < 0说明许可已经供不该求了,这个时候进来的线程须要被阻塞
//不然CAS操做更新avaliable的值,它表示剩余的许可数
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
复制代码
nonfairTryAcquireShared(int acquires)方法内部,先获取state的值,并执行减法操做,获得remaining值,它能够理解为剩余的许可数,若是remaining<0,说明请求的许可数过大,此时直接返回一个负数的remaining;若是remaining大于0,说明还有剩余的许可数,则能够访问共享资源,后续将被加入同步队列(经过doAcquireSharedInterruptibly(arg))。 注意Semaphore的acquire()可能存在并发操做,所以nonfairTryAcquireShared()方法体内部采用死循环+无锁(CAS)并发的操做保证对state值修改的安全性。 例如:假设permit值为5,有多个线程并发accquire获取许可,线程1运行时获得的remainin是5-1=4,线程2运行时,获得的remaining一样是5-1=4,可是执行compareAndSetState时,线程2 更快一点,执行CAS操做:判断state如今是否为5,若是为5,则CAS更新为4. 这个时候线程1也执行CAS操做,判断state如今是否为5,发现不为5,因此CAS失败,这时候须要这个死循环去重试。ui
若是remaining大于0,说明还有剩余的许可数,则能够访问共享资源,后续将被加入同步队列,接下来看入队的操做,这一部分与ReentrantLock差很少:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//使用SHARED类型建立共享模式的Node
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取前序节点
final Node p = node.predecessor();
//若是前序节点是头节点,说明本身的Node在队列最前端,此时可能共享资源随时被释放
//因此须要再次尝试获取共享资源
if (p == head) {
int r = tryAcquireShared(arg);
//若是获取共享资源成功
if (r >= 0) {
//已经获取资源后,node已经没有意义,因此清理head节点并传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//若是不是头节点
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
在方法中,因为当前线程没有获取同步状态,所以建立一个共享模式类型(Node.SHARED)的结点并经过addWaiter(Node.SHARED)加入同步队列,加入完成后,当前线程进入自旋状态,首先判断前驱结点是否为head,若是是,那么尝试获取同步状态并返回r值,若是r大于0,则说明获取同步状态成功,将当前线程设置为head并传播,传播指的是,通知后续结点继续获取同步状态,到此return结束,获取到同步状态的线程将会执行原定的任务。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);//设置为头结点
/*
* 尝试去唤醒队列中的下一个节点,若是知足以下条件:
* 还有剩余许可(propagate > 0),
* 或者h.waitStatus为PROPAGATE(被上一个操做设置)
* 而且
* 下一个节点处于共享模式或者为null。
*
* 这两项检查中的保守主义可能会致使没必要要的唤醒,但只有在有
* 有在多个线程争取得到/释放同步状态时才会发生,因此大多
* 数状况下会立马得到须要的信号
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//唤醒后继节点,由于是共享模式,因此容许多个线程同时获取同步状态
doReleaseShared();
}
}
复制代码
但若是前驱结点不为head或前驱结点为head并尝试获取同步状态失败(与),那么调用shouldParkAfterFailedAcquire(p, node)方法判断前驱结点的waitStatus值是否为SIGNAL并调整同步队列中的node结点状态,若是返回true,那么执行parkAndCheckInterrupt()方法,将当前线程挂起。 shouldParkAfterFailedAcquire方法与ReentrantLock中的一模一样:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取当前结点的等待状态
int ws = pred.waitStatus;
//若是为等待唤醒(SIGNAL)状态则返回true
if (ws == Node.SIGNAL)
return true;
//若是ws>0 则说明是结束状态,
//遍历前驱结点直到找到没有结束状态的结点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//若是ws小于0又不是SIGNAL状态,说明是node是首次加入的线程
//则将其前驱节点设置为SIGNAL状态。下次执行shouldParkAfterFailedAcquire方法时就
//知足ws == Node.SIGNAL条件了
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
复制代码
这个方法是AQS中的,若是不懂的话,能够参考以前在ReentrantLock中也分析过:juejin.im/post/5c021b… 中自旋的部分。 到此,加入同步队列的整个过程完成。
在AQS中存在一个volatile变量state,当咱们建立Semaphore对象传入许可数值时,最终会赋值给state,state的数值表明可同时操做共享数据的线程数量,每当一个线程请求(如调用Semaphored的acquire()方法)获取同步状态成功,state的值将会减小1,直到state为0时,表示已没有可用的许可数,也就是对共享数据进行操做的线程数已达到最大值,其余后来线程将被阻塞,此时AQS内部会将线程封装成共享模式的Node结点,加入同步队列中等待并开启自旋操做。只有当持有对共享数据访问权限的线程执行完成任务并释放同步状态后,同步队列中的对于的结点线程才有可能获取同步状态并被唤醒执行同步操做,注意在同步队列中获取到同步状态的结点将被设置成head并清空相关线程数据(毕竟线程已在执行也就没有必要保存信息了),AQS经过这种方式便实现共享锁,用图表示以下:
##非公平锁的释放锁 接下来看一下释放锁:
public void release() {
sync.releaseShared(1);
}
//调用到AQS中的releaseShared(int arg)
public final boolean releaseShared(int arg) {
//调用子类Semaphore实现的tryReleaseShared方法尝试释放同步状态
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
显然Semaphore间接调用了AQS中的releaseShared(int arg)方法,经过tryReleaseShared(arg)方法尝试释放同步状态,若是释放成功,那么将调用doReleaseShared()唤醒同步队列中后继结点的线程,tryReleaseShared(int releases)方法以下:
//在Semaphore的内部类Sync中实现的
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//获取当前state
int current = getState();
//释放状态state增长releases
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//经过CAS更新state的值
if (compareAndSetState(current, next))
return true;
}
}
复制代码
逻辑很简单,释放同步状态,更新state的值,一样的,经过for死循环和CAS操做来保证线程安全问题,由于可能存在多个线程同时释放同步状态的场景。释放成功后经过doReleaseShared()方法唤醒后继结点。
private void doReleaseShared() {
/*
* 若是头节点的后继节点须要唤醒,那么执行唤醒
* 动做;若是不须要,将头结点的等待状态设置为PROPAGATE保证
* 唤醒传递。另外,为了防止过程当中有新节点进入(队列),这里必
* 需作循环,因此,和其余unparkSuccessor方法使用方式不同
* 的是,若是(头结点)等待状态设置失败,从新检测。
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
// 获取头节点对应的线程的状态
int ws = h.waitStatus;
// 若是头节点对应的线程是SIGNAL状态,则意味着头
//结点的后继结点所对应的线程须要被unpark唤醒。
if (ws == Node.SIGNAL) {
// 修改头结点对应的线程状态设置为0。失败的话,则继续循环。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒头结点h的后继结点所对应的线程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 若是头结点发生变化,则继续循环。不然,退出循环。
if (h == head) // loop if head changed
break;
}
}
//唤醒传入结点的后继结点对应的线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//拿到后继结点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒该线程
LockSupport.unpark(s.thread);
}
复制代码
显然doReleaseShared()方法中经过调用unparkSuccessor(h)方法唤醒head的后继结点对应的线程。这个方法在以前获取资源时也会被调用:
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
复制代码
两种状况下都是为唤醒后继节点,由于是共享模式,因此容许多个线程同时获取同步状态。释放操做的过程仍是相对简单些的,即尝试更新state值,更新成功调用doReleaseShared()方法唤醒后继结点对应的线程。
公平锁的中的共享模式实现除了在获取同步状态时与非公平锁不一样外,其余基本同样:
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
//这里是重点,先判断队列中是否有结点再执行
//同步状态获取。
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
相比之下,对于非公平锁:
final int nonfairTryAcquireShared(int acquires) {
//使用死循环
for (;;) {
//每当有线程获取共享资源时,就直接尝试CAS操做
int available = getState();
int remaining = available - acquires;
//判断信号量是否已小于0或者CAS执行是否成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
复制代码
从代码中能够看出,与非公平锁tryAcquireShared(int acquires)方法实现的惟一不一样是,在尝试获取同步状态前,先调用了hasQueuedPredecessors()方法判断同步队列中是否存在结点,若是存在则返回-1,即将线程加入同步队列等待,后续经过Node结构保证唤醒的顺序。从而保证先到来的线程请求必定会先执行,也就是所谓的公平锁。其余操做,与前面分析的非公平锁同样。
AQS做为核心并发组件,它经过state值来控制对共享资源访问的线程数,内部的Node有独占模式(EXCLUSIVE)和共享模式(SHARED):
AQS是采用模板方法的设计模式构建的,它做为基础组件,封装的是核心并发操做,可是实现上分为两种模式,即共享模式(如Semaphore)与独占模式(如ReetrantLock,这两个模式的本质区别在于多个线程能不能共享一把锁),而这两种模式的加锁与解锁实现方式是不同的,但AQS只关注内部公共方法实现并不关心外部不一样模式的实现,因此提供了模板方法给子类使用:也就是说实现独占锁,如ReentrantLock须要本身实现tryAcquire()方法和tryRelease()方法,而实现共享模式的Semaphore,则须要实现tryAcquireShared()方法和tryReleaseShared()方法,这样作的好处是显而易见的,不管是共享模式仍是独占模式,其基础的实现都是同一套组件(AQS),只不过是加锁解锁的逻辑不一样罢了,更重要的是若是咱们须要自定义锁的话,也变得很是简单,只须要选择不一样的模式实现不一样的加锁和解锁的模板方法便可。 不论是ReentrantLock仍是Semaphore,公平锁与非公平锁的不一样之处在于公平锁会在线程请求同步状态前,判断同步队列是否存在Node,若是存在就将请求线程封装成Node结点加入同步队列,从而保证每一个线程获取同步状态都是先到先得的顺序执行的。非公平锁则是经过竞争的方式获取,无论同步队列是否存在Node结点,只有经过竞争获取就能够获取线程执行权。