一 概述
二 源码总览
三 acquire-请求令牌
四 release-释放令牌
五 总结
复制代码
semaphore是信号的意思,在并发包中则表示持有指定数量令牌的信号量。它一般用于多线程同时请求令牌的控制。提供了acquire方法用于获取令牌,当令牌发放完后则进行阻塞等待,持有令牌的线程完成任务后须要调用release方法归还令牌。semaphore的使用很简单,如今经过学习它的源码来了解它的实现原理是怎样的。java
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
// 全部机制都经过AbstractQueuedSynchronizer子类Sync来完成的
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
// 采用AQS中的state来统计令牌数
setState(permits);
}
……
}
// 非公平锁
static final class NonfairSync extends Sync {……}
// 公平锁
static final class FairSync extends Sync{……}
// 默认构造方法-默认非公平锁
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 提供选择公平锁仍是非公平锁的构造方法
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// 请求一个令牌-响应中断+阻塞
public void acquire() throws InterruptedException {……}
// 指定获取领牌数-响应中断且阻塞
public void acquire(int permits) throws InterruptedException {……}
// 请求一个令牌-不响应中断+阻塞
public void acquireUninterruptibly() {……}
// 尝试请求一个令牌-非阻塞,失败当即返回
public boolean tryAcquire(){……}
// 尝试请求一个令牌,阻塞指定时间,超时后返回
public boolean tryAcquire(long timeout, TimeUnit unit){……}
// 释放令牌
public void release(){……}
}
复制代码
整体来讲,Semaphore内部有一个继承于AQS的内部类Sync,利用AQS的共享锁来实现对共享变量state进行操做,并将state做为令牌的计数, 并提供了公平和非公平锁的方式来获取令牌,总体的设计跟ReentrantLock很像。下面以最经常使用的acquire和release方法为例,详细了解他们的原理;node
先看acquire的方法体:微信
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
复制代码
里面实际调用的是AQS的请求共享锁方法:多线程
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 先进行一遍尝试获取锁,当返回小于0说明令牌不足了,则须要将当前线程加入到等待队列中
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
复制代码
接着先回调在Semaphore中重写的tryAcquireShared()方法尝试获取锁:并发
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
// 循环获取令牌
for (;;) {
// 获取当前可用的令牌数
int available = getState();
// 当前获取完后剩下的令牌数
int remaining = available - acquires;
// 剩下领牌数小于0或者大于等于0并CAS更新成功则直接返回剩余令牌数
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
复制代码
简单尝试获取令牌失败后则再CAS尝试几回后加入同步队列中休眠等待:oop
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 在同步队列中增长等待节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 若是前驱节点为head节点,表示当前节点是同步等待队列中的第一个,故继续尝试一次获取锁
if (p == head) {
// 尝试获取令牌,此时会跳转到semaphore中(由于重写了该方法)
int r = tryAcquireShared(arg);
// 返回大于0则表示成功获取到令牌了
if (r >= 0) {
// 将当前节点设为head节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 自旋几回后为避免强占CPU,则对该线程进行休眠处理
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 因中断请求则取消排队请求
if (failed)
cancelAcquire(node);
}
}
复制代码
简单总结下acquire方法流程为:学习
release方法体:ui
public void release() {
sync.releaseShared(1);
}
复制代码
实际调用的是AQS的方法:this
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
先调用在Semaphore中重写的尝试释放令牌方法,而且释放成功后返回true:spa
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取当前令牌数量
int current = getState();
// 计算释放后的令牌数量
int next = current + releases;
// 若是本次释放的令牌为负数则抛出异常
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS更新令牌数成功后返回true
if (compareAndSetState(current, next))
return true;
}
}
复制代码
释放令牌成功后,唤醒在同步队列中等待的线程:
private void doReleaseShared() {
/* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
复制代码
简单总结下release流程:
Semaphore利用AQS中的共享锁来操做共享变量state,并使用state做为令牌的计数。每一个线程调用required请求获取令牌,调用release则释放领牌。当令牌取完时则剩下的线程加入AQS队列中阻塞等待,当有令牌释放时会唤醒等待的线程。