Semaphore是一种同步辅助工具,翻译过来就是信号量,用来实现流量控制,它能够控制同一时间内对资源的访问次数.bash
不管是Synchroniezd仍是ReentrantLock,一次都只容许一个线程访问一个资源,可是Semaphore能够指定多个线程同时访问某一个资源.并发
Semaphore有一个构造函数,能够传入一个int型整数n,表示某段代码最多只有n个线程能够访问,若是超出了n,那么请等待,等到某个线程执行完毕这段代码块,下一个线程再进入。dom
信号量上定义两种操做:ide
信号量主要用于两个目的:函数
如下的例子:5个线程抢3个车位,同时最多只有3个线程能抢到车位,等其余线程释放信号量后,才能抢到车位.工具
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();//申请资源
System.out.println(Thread.currentThread().getName()+"抢到车位");
ThreadUtil.sleep(RandomUtil.randomInt(1000,5000));
System.out.println(Thread.currentThread().getName()+"归还车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放资源
semaphore.release();
}
}
},"线程"+i).start();
}
}
复制代码
abstract static class Sync extends AbstractQueuedSynchronizer {
//省略
}
复制代码
Semaphore内部使用Sync类,Sync又是继承AbstractQueuedSynchronizer,因此Sync底层仍是使用AQS实现的.Sync有两个实现类NonfairSync和FairSync,用来指定获取信号量时是否采用公平策略.ui
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Sync(int permits) {
setState(permits);
}
复制代码
如上所示,Semaphore默认采用非公平策略,若是须要使用公平策略则可使用带两个参数的构造函数来构造Semaphore对象。spa
参数permits被传递给AQS的state值,用来表示当前持有的信号量个数.线程
当前线程调用该方法的目的是但愿获取一个信号量资源。翻译
若是当前信号量个数大于0,则当前信号量的计数会减1,而后该方法直接返回。不然若是当前信号量个数等0,则当前线程会被放入AQS的阻塞队列。当其余线程调用了当前线程的interrupt()方法中断了当前线程时,则当前线程会抛出InterruptedException异常返回。
//Semaphore方法
public void acquire() throws InterruptedException {
//传递参数为1,说明要获取1个信号量资源
sync.acquireSharedInterruptibly(1);
}
//AQS的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//(1)若是线程被中断,则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
//(2)不然调用Sync子类方法尝试获取,这里根据构造函数肯定使用公平策略
if (tryAcquireShared(arg) < 0)
//若是获取失败则放入阻塞队列.而后再次尝试,若是使用则调用park方法挂起当前线程
doAcquireSharedInterruptibly(arg);
}
复制代码
由如上代码可知,acquire()在内部调用了Sync的acquireSharedlnterruptibly方法,后者会对中断进行响应(若是当前线程被中断,则抛出中断异常)。尝试获取信号量资源的AQS的方法 tryAcquireShared是由Sync的子类实现的,因此这里分别从两 方面来讨论。
先讨论非公平策略NonfairSync类的tryAcquireShared方法,代码以下:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//获取当前信号量值
int available = getState();
//计算当前剩余值
int remaining = available - acquires;
//若是当前剩余值小于0或则CAS设置成功则返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
复制代码
如上代码先获取当前信号量值(available),而后减去须要获取的值(acquires),获得剩余的信号量个数(remaining),若是剩余值小于0则说明当前信号量个数知足不了需求,那么直接返回负数,这时当前线程会被放入AQS的阻塞队列而被挂起。若是剩余值大于0,则使用CAS操做设置当前信号量值为剩余值,而后返回剩余值。
另外,因为NonFairSync是非公平获取的,也就是说先调用aquire方法获取信号量的线程不必定比后来者先获取到信号量。
考虑下面场景,若是线程A先调用了aquire()方法获取信号量,可是当前信号量个数为0,那么线程A会被放入AQS的阻塞队列 。过一段时间后线程C调用了release()方法释放了一个信号量,若是当前没有其余线程获取信号量,那么线程A就会被激活,而后获取该信号量,可是假如线程C释放信号量后,线程C调用了aquire方法,那么线程C就会和线程A去竞争这个信号量资源 。 若是采用非公平策略,由nonfairTryAcquireShared的代码可知,线程C彻底能够在线程A被激活前,或者激活后先于线程 A获取到该信号量,也就是在这种模式下阻塞线程和当前请求的线程是竞争关系,而不遵循先来先得的策略。
下面看公平性的FairSync类是如何保证公平性的。
protected int tryAcquireShared(int acquires) {
for (;;) {
//查询是否当前线程节点的前驱节点也在等待获取该资源,有的话直接返回
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
复制代码
可见公平性仍是靠hasQueuedPredecessors这个函数来保证的。因此Semaphore的公平策略是看当前线程节点的前驱节点是否也在等待获取该资源,若是是则本身放弃获取的权限,而后当前线程会被放入AQS阻塞队列,不然就去获取。
该方法与acquire()方法不一样,后者只须要获取一个信号量值, 而前者则获取permits个。
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
复制代码
该方法与acquire()相似,不一样之处在于该方法对中断不响应,也就是当当前线程调用了 acquireUninterruptibly获取资源时(包含被阻塞后),其余线程调用了当前线程的interrupt() 方法设置了当前线程的中断标志,此时当前线程并不会抛出IllegalArgumentException异常而返回。
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
复制代码
该方法的做用是把当前Semaphore对象的信号量值增长1,若是当前有线程由于调用aquire方法被阻塞而被放入了AQS的阻塞 队列,则会根据公平策略选择一个信号量个数能被知足的线程进行激活, 激活的线程会尝试获取刚增长的信号量.
public void release() {
//(1)arg=1
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//(2)尝试释放资源
if (tryReleaseShared(arg)) {
//(3)资源释放成功则调用park方法唤醒AQS队列里面最早挂起的线程
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//获取当前信号量值
int current = getState();
//将当前信号量值增长releases,这里为增长1
int next = current + releases;
//移除处理
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//使用CAS保证更新信号量值的原子性
if (compareAndSetState(current, next))
return true;
}
}
复制代码
由代码release()->sync.releaseShared(1),可知,release方法每次只会对信号量值增长1,tryReleaseShared方法是无限循环,使用CAS保证了release方法对信号量递增1的原子性操做.tryReleaseShared方法增长信号量值成功后会执行代码(3),即调用AQS的方法来激活由于调用acquire方法而被阻塞的线程。
该方法与不带参数的release方法的不一样之处在于,前者每次调用会在信号量值原来的基础上增长 permits,然后者每次增长l。
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
复制代码
另外能够看到,这里的sync.releaseShared是共享方法,这说明该信号量是线程共享的,信号量没有和固定线程绑定,多个线程能够同时使用CAS去更新信号量的值而不会被阻塞。