源码:html
package java.util.concurrent; import java.util.Collection; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L;//版本号 private final Sync sync;//内部类 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L;//版本号 //构造器,将容许运行的数量设为permits Sync(int permits) { setState(permits); } //根据state获得容许运行的数量 final int getPermits() { return getState(); } //不公平共享锁获取(状态减去acquires) final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState();//获取当前状态 int remaining = available - acquires;//状态值减acquires //若状态值小于0则不更新,直接返回-1,表示获取锁失败 //状态值大于等于0,更新当前状态值,返回大于等于0则表示成功 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining;//只有此处才能退出循环 } } //公平锁释放(状态增长acquires) protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState();//获取当前状态 int next = current + releases;//状态值加acquires if (next < current)//若状态值有误,抛出异常 throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next))//只有修改为功,才能退出循环 return true; } } //容许运行数减小reductions final void reducePermits(int reductions) { for (;;) { int current = getState();//获得当前容许运行数 int next = current - reductions;//将容许运行数减去reductions if (next > current)//若发现容许运行数发生了变化,则抛出异常 throw new Error("Permit count underflow"); if (compareAndSetState(current, next))//只有修改为功,才能退出循环 return; } } //返回容许运行的数,并将容许运行数置为0(“耗尽”全部剩余共享资源) final int drainPermits() { for (;;) { int current = getState();//获得当前状态值 if (current == 0 || compareAndSetState(current, 0))//只有容许修改为功或剩余为0,才能退出循环 return current;//返回当前状态值 } } } //不公平策略 static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L;//版本号 //和公平策略构造器相同 NonfairSync(int permits) {//调用sync的构造器初始化容许运行数 super(permits); } //直接调用nonfairTryAcquireShared,尝试获取不公平的共享锁 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } //公平策略 static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L;//版本号 //和不公平策略构造器相同 FairSync(int permits) {//调用sync的构造器初始化容许运行数 super(permits); } //尝试获取公平的共享锁。和不公平的共享锁不一样的关键方法 protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors())//存在前继等待节点则直接返回 return -1; int available = getState();//获得当前状态值 int remaining = available - acquires;//当前状态减去acquires if (remaining < 0 || compareAndSetState(available, remaining)) return remaining;//修改为功 } } } //permits:一次性容许运行的线程数 //一个参数构造默认使用不公平策略. public Semaphore(int permits) { sync = new NonfairSync(permits); } //permits:一次性容许运行的线程数 //fair:是否使用公平策略. public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } //可响应中断的获取共享锁1个(公平和不公平) public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //不支持响应中断的获取共享锁1个(公平和不公平) public void acquireUninterruptibly() { sync.acquireShared(1); } //不公平的尝试获取共享锁1个(不支持中断) public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0;//大于等于0,则获取成功 } //支持中断在指定时间内获取共享锁1个 public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } //计数值增长1 public void release() { sync.releaseShared(1);//计数值增长1 } /******************不使用时,每次调用一次性减1;**************************/ /******************传入permits时,每次调用一次性减permits;***************/ //可响应中断的获取共享锁(公平和不公平)permits个 public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException();//permits小于0,则抛出非法参数异常 sync.acquireSharedInterruptibly(permits); } //不支持响应中断的获取共享锁permits个(公平和不公平) public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException();//permits小于0,则抛出非法参数异常 sync.acquireShared(permits); } //不公平的尝试获取共享锁permits个(不支持中断) public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException();//permits小于0,则抛出非法参数异常 return sync.nonfairTryAcquireShared(permits) >= 0;//大于等于0,则获取成功 } //支持中断在指定时间内获取共享锁permits个 public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException();//permits小于0,则抛出非法参数异常 return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); } //增长指定数目permits(增长计数值permits) public void release(int permits) { if (permits < 0) throw new IllegalArgumentException();//permits小于0,则抛出非法参数异常 sync.releaseShared(permits);//增长计数值 } //获得当前运行的数 public int availablePermits() { return sync.getPermits(); } //返回容许运行的数,并将容许运行数置为0(“耗尽”全部剩余共享资源) public int drainPermits() { return sync.drainPermits(); } //将容许运行数减小指定数目reduction.(“缩减”剩余共享资源) protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException();//reduction小于0,则抛出非法参数异常 sync.reducePermits(reduction); } //根据判断sync 是否为FairSync类型,返回是否为公平锁 public boolean isFair() { return sync instanceof FairSync; } //返回队列中是否存在等待状态的节点 public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } //获得队列元素的总数 public final int getQueueLength() { return sync.getQueueLength(); } //获得队列中的线程集合 protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } //获得字符串表示 public String toString() { return super.toString() + "[Permits = " + sync.getPermits() + "]"; } }
一个计数信号量,信号量维护了一个许可集。java
在许可可用前会阻塞每个调用acquire()的线程。编程
已获取共享锁的线程,执行 release()
添加一个许可,从而可能释放一个正在阻塞的获取者。api
Semaphore 一般用于限制能够访问某些资源(物理或逻辑的)的线程数目。数据结构
Semaphore(int permits) 建立具备给定的许可数和非公平的公平设置的 Semaphore 。 |
Semaphore(int permits, boolean fair) 建立具备给定的许可数和给定的公平设置的 Semaphore 。 |
构造方法可选地接受一个公平 参数。当设置为 false 时,此类不对线程获取许可的顺序作任何保证。ide
特别地,闯入 是容许的,也就是说能够在已经等待的线程前为调用 acquire()
的线程分配一个许可,从逻辑上说,就是新线程将本身置于等待线程队列的头部。测试
当公平设置为 true 时,信号量保证对于任何调用获取
方法的线程而言,都按照处理它们调用这些方法的顺序(即先进先出;FIFO)来选择线程、得到许可。ui
非同步的 tryAcquire
方法不使用公平设置,而是使用任意可用的许可。spa
一般,应该将用于控制资源访问的信号量初始化为公平的,以确保全部线程均可访问资源。.net
为其余的种类的同步控制使用信号量时,非公平排序的吞吐量优点一般要比公平考虑更为重要。
void |
acquire() 今后信号量获取一个许可,在提供一个许可前一直将线程阻塞,不然线程被中断。 |
void |
acquire(int permits) 今后信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。 |
void |
acquireUninterruptibly() 今后信号量中获取许可,在有可用的许可前将其阻塞。 |
void |
acquireUninterruptibly(int permits) 今后信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。 |
int |
availablePermits() 返回此信号量中当前可用的许可数。 |
int |
drainPermits() 获取并返回当即可用的全部许可。 |
protected Collection<Thread> |
getQueuedThreads() 返回一个 collection,包含可能等待获取的线程。 |
int |
getQueueLength() 返回正在等待获取的线程的估计数目。 |
boolean |
hasQueuedThreads() 查询是否有线程正在等待获取。 |
boolean |
isFair() 若是此信号量的公平设置为 true,则返回 true 。 |
protected void |
reducePermits(int reduction) 根据指定的缩减量减少可用许可的数目。 |
void |
release() 释放一个许可,将其返回给信号量。 |
void |
release(int permits) 释放给定数目的许可,将其返回到信号量。 |
String |
toString() 返回标识此信号量的字符串,以及信号量的状态。 |
boolean |
tryAcquire() 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。 |
boolean |
tryAcquire(int permits) 仅在调用时此信号量中有给定数目的许可时,才今后信号量中获取这些许可。 |
boolean |
tryAcquire(int permits, long timeout, TimeUnit unit) 若是在给定的等待时间内此信号量有可用的全部许可,而且当前线程未被中断,则今后信号量获取给定数目的许可。 |
boolean |
tryAcquire(long timeout, TimeUnit unit) 若是在给定的等待时间内,此信号量有可用的许可而且当前线程未被中断,则今后信号量获取一个许可。 |
public Semaphore(int permits)
建立具备给定的许可数和非公平的公平设置的 Semaphore
。
参数:
permits
- 初始的可用许可数目。此值可能为负数,在这种状况下,必须在授予任何获取前进行释放。
public Semaphore(int permits, boolean fair)
建立具备给定的许可数和给定的公平设置的 Semaphore
。
参数:
permits
- 初始的可用许可数目。此值可能为负数,在这种状况下,必须在授予任何获取前进行释放。
fair
- 若是此信号量保证在争用时按先进先出的顺序授予许可,则为 true
;不然为 false
。
public void acquire() throws InterruptedException
今后信号量获取一个许可,在提供一个许可前一直将线程阻塞,不然线程被 中断。
获取一个许可(若是提供了一个)并当即返回,将可用的许可数减 1。
若是没有可用的许可,则在发生如下两种状况之一前,禁止将当前线程用于线程安排目的并使其处于休眠状态:
若是当前线程:
中断
。 则抛出 InterruptedException
,而且清除当前线程的已中断状态。
抛出:
InterruptedException
- 若是当前线程被中断
public void acquireUninterruptibly()
今后信号量中获取许可,在有可用的许可前将其阻塞。
获取一个许可(若是提供了一个)并当即返回,将可用的容许数减 1。
若是没有可用的许可,则在其余某些线程调用此信号量的 release()
方法,而且当前线程是下一个要被分配许可的线程前,禁止当前线程用于线程安排目的并使其处于休眠状态。
若是当前线程在等待许可时被中断,那么它将继续等待,可是与没有发生中断,其将接收容许的时间相比,为该线程分配许可的时间可能改变。当线程确实今后方法返回后,将设置其中断状态。
public boolean tryAcquire()
仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
获取一个许可(若是提供了一个)并当即返回,其值为 true
,将可用的许可数减 1。
若是没有可用的许可,则此方法当即返回而且值为 false
。
即便已将此信号量设置为使用公平排序策略,可是调用 tryAcquire()
也将 当即获取许可(若是有一个可用),而无论当前是否有正在等待的线程。在某些状况下,此“闯入”行为可能颇有用,即便它会打破公平性也如此。若是但愿遵照公平设置,则使用 tryAcquire(0, TimeUnit.SECONDS)
,它几乎是等效的(它也检测中断)。
返回:
若是获取了许可,则返回 true
;不然返回 false
。
public boolean tryAcquire(long timeout,TimeUnit unit) throws InterruptedException
若是在给定的等待时间内,此信号量有可用的许可而且当前线程未被 中断,则今后信号量获取一个许可。
获取一个许可(若是提供了一个)并当即返回,其值为 true
,将可用的许可数减 1。
若是没有可用的容许,则在发生如下三种状况之一前,禁止将当前线程用于线程安排目的并使其处于休眠状态:
release()
方法而且当前线程是下一个被分配许可的线程;或者 若是获取了许可,则返回值为 true
。
若是当前线程:
则抛出 InterruptedException
,而且清除当前线程的已中断状态。
若是超出了指定的等待时间,则返回值为 false
。若是该时间小于等于 0,则方法根本不等待。
参数:
timeout
- 等待许可的最多时间
unit
- timeout
参数的时间单位
返回:
若是获取了许可,则返回 true
;若是获取许可前超出了等待时间,则返回 false
抛出:
InterruptedException
- 若是当前线程是已中断的
public void release()
释放一个许可,将其返回给信号量。
释放一个许可,将可用的许可数增长 1。若是任意线程试图获取许可,则选中一个线程并将刚刚释放的许可给予它。而后针对线程安排目的启用(或再启用)该线程。
不要求释放许可的线程必须经过调用 acquire()
来获取许可。经过应用程序中的编程约定来创建信号量的正确用法。
public void acquire(int permits) throws InterruptedException
今后信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被 中断。
获取给定数目的许可(若是提供了)并当即返回,将可用的许可数减去给定的量。
若是没有足够的可用许可,则在发生如下两种状况之一前,禁止将当前线程用于线程安排目的并使其处于休眠状态:
若是当前线程:
则抛出 InterruptedException
,而且清除当前线程的已中断状态。任何本来应该分配给此线程的许可将被分配给其余试图获取许可的线程,就好像已经过调用 release()
而使许可可用同样。
参数:
permits
- 要获取的许可数
抛出:
InterruptedException
- 若是当前线程已被中断
IllegalArgumentException
- 若是 permits
为负
public void acquireUninterruptibly(int permits)
今后信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
获取给定数目的许可(若是提供了)并当即返回,将可用的许可数减去给定的量。
若是没有足够的可用许可,则在其余某些线程调用此信号量的某个释放
方法,当前线程是下一个要被分配许可的线程,而且可用的许可数目知足此请求前,禁止当前线程用于线程安排目的并使其处于休眠状态。
若是当前的线程在等待许可时被中断,则它会继续等待而且它在队列中的位置不受影响。当线程确实今后方法返回后,将其设置为中断状态。
参数:
permits
- 要获取的许可数
抛出:
IllegalArgumentException
- 若是 permits
为负
public boolean tryAcquire(int permits)
仅在调用时此信号量中有给定数目的许可时,才今后信号量中获取这些许可。
获取给定数目的许可(若是提供了)并当即返回,其值为 true
,将可用的许可数减去给定的量。
若是没有足够的可用许可,则此方法当即返回,其值为 false
,而且不改变可用的许可数。
即便已将此信号量设置为使用公平排序策略,可是调用 tryAcquire
也将 当即获取许可(若是有一个可用),而无论当前是否有正在等待的线程。在某些状况下,此“闯入”行为可能颇有用,即便它会打破公平性也如此。若是但愿遵照公平设置,则使用 tryAcquire(permits, 0, TimeUnit.SECONDS)
,它几乎是等效的(它也检测中断)。
参数:
permits
- 要获取的许可数
返回:
若是获取了许可,则返回 true
;不然返回 false
抛出:
IllegalArgumentException
- 若是 permits
为负
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException
若是在给定的等待时间内此信号量有可用的全部许可,而且当前线程未被 中断,则今后信号量获取给定数目的许可。
获取给定数目的许可(若是提供了)并当即返回,其值为 true
,将可用的许可数减去给定的量。
若是没有足够的可用许可,则在发生如下三种状况之一前,禁止将当前线程用于线程安排目的并使其处于休眠状态:
若是获取了许可,则返回值为 true
。
若是当前线程:
则抛出 InterruptedException
,而且清除当前线程的已中断状态。任何本来应该分配给此线程的许可将被分配给其余试图获取许可的线程,就好像已经过调用 release()
而使许可可用同样。
若是超出了指定的等待时间,则返回值为 false
。若是该时间小于等于 0,则方法根本不等待。任何本来应该分配给此线程的许可将被分配给其余试图获取许可的线程,就好像已经过调用 release()
而使许可可用同样。
参数:
permits
- 要获取的许可数
timeout
- 等待许可的最多时间
unit
- timeout
参数的时间单位
返回:
若是获取了许可,则返回 true
;若是获取全部许可前超出了等待时间,则返回 false
抛出:
InterruptedException
- 若是当前线程是已中断的
IllegalArgumentException
- 若是 permits
为负
public void release(int permits)
释放给定数目的许可,将其返回到信号量。
释放给定数目的许可,将可用的许可数增长该量。若是任意线程试图获取许可,则选中某个线程并将刚刚释放的许可给予该线程。若是可用许可的数目知足该线程的请求,则针对线程安排目的启用(或再启用)该线程;不然在有足够的可用许可前线程将一直等待。若是知足此线程的请求后仍有可用的许可,则依次将这些许可分配给试图获取许可的其余线程。
不要求释放许可的线程必须经过调用获取
来获取该许可。经过应用程序中的编程约定来创建信号量的正确用法。
参数:
permits
- 要释放的许可数
抛出:
IllegalArgumentException
- 若是 permits
为负
public int availablePermits()
返回此信号量中当前可用的许可数。
此方法一般用于调试和测试目的。
返回:
此信号量中的可用许可数
public int drainPermits()
获取并返回当即可用的全部许可。
返回:
获取的许可数
protected void reducePermits(int reduction)
根据指定的缩减量减少可用许可的数目。此方法在使用信号量来跟踪那些变为不可用资源的子类中颇有用。此方法不一样于 acquire
,在许可变为可用的过程当中,它不会阻塞等待。
参数:
reduction
- 要移除的许可数
抛出:
IllegalArgumentException
- 若是 reduction
是负数
public boolean isFair()
若是此信号量的公平设置为 true,则返回 true
。
返回:
若是此信号量的公平设置为 true,则返回 true
public final boolean hasQueuedThreads()
查询是否有线程正在等待获取。注意,由于同时可能发生取消,因此返回 true
并不保证有其余线程等待获取许可。此方法主要用于监视系统状态。
返回:
若是可能有其余线程正在等待获取锁,则返回 true
public final int getQueueLength()
返回正在等待获取的线程的估计数目。该值仅是估计的数字,由于在此方法遍历内部数据结构的同时,线程的数目可能动态地变化。此方法用于监视系统状态,不用于同步控制。
返回:
正在等待此锁的线程的估计数目
protected Collection<Thread> getQueuedThreads()
返回一个 collection,包含可能等待获取的线程。由于在构造此结果的同时实际的线程 set 可能动态地变化,因此返回的 collection 仅是尽力的估计值。所返回 collection 中的元素没有特定的顺序。此方法用于加快子类的构造速度,提供更多的监视设施。
返回:
线程 collection
public String toString()
返回标识此信号量的字符串,以及信号量的状态。括号中的状态包括 String 类型的 "Permits ="
,后跟许可数。
覆盖:
返回:
标识此信号量的字符串,以及信号量的状态
package com.thread; import java.util.concurrent.Semaphore; public class SemaphoreDemo implements Runnable{ private Semaphore smp = new Semaphore(3); @Override public void run() { try { System.out.println("Thread " + Thread.currentThread().getName() + " start"); smp.acquire(); System.out.println("Thread " + Thread.currentThread().getName() + " is working"); Thread.sleep(1000); smp.release(); System.out.println("Thread " + Thread.currentThread().getName() + " is over"); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args){ SemaphoreDemo semaphoreDemo = new SemaphoreDemo(); for (int i=1;i<=9;i++){ new Thread(semaphoreDemo).start(); } } }
运行结果:
Thread Thread-0 start
Thread Thread-3 start
Thread Thread-0 is working
Thread Thread-1 start
Thread Thread-2 start
Thread Thread-1 is working
Thread Thread-6 start
Thread Thread-5 start
Thread Thread-3 is working
Thread Thread-4 start
Thread Thread-7 start
Thread Thread-8 start
Thread Thread-0 is over
Thread Thread-2 is working
Thread Thread-3 is over
Thread Thread-5 is working
Thread Thread-1 is over
Thread Thread-6 is working
Thread Thread-2 is over
Thread Thread-5 is over
Thread Thread-8 is working
Thread Thread-4 is working
Thread Thread-7 is working
Thread Thread-6 is over
Thread Thread-8 is over
Thread Thread-7 is over
Thread Thread-4 is over
红色部分说明:只能一次性运行3个线程,必需要等待某一个线程执行 release() 以后,才能唤醒等待的某一个线程继续执行。
青色部分说明:因为打印语句的执行可能会滞后,所以此处的运行结果不能说明 Semaphore 设计有误,因此运行结果只能做为参考。
package com.thread; import java.util.concurrent.Semaphore; public class SemaphoreDemo implements Runnable{ private Semaphore smp = new Semaphore(3); @Override public void run() { try { System.out.println("Thread " + Thread.currentThread().getName() + " start"); smp.acquire(3); System.out.println("Thread " + Thread.currentThread().getName() + " is working"); Thread.sleep(1000); smp.release(3); System.out.println("Thread " + Thread.currentThread().getName() + " is over"); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args){ SemaphoreDemo semaphoreDemo = new SemaphoreDemo(); for (int i=1;i<=9;i++){ new Thread(semaphoreDemo).start(); } } }
运行结果:
Thread Thread-0 start
Thread Thread-3 start
Thread Thread-4 start
Thread Thread-6 start
Thread Thread-1 start
Thread Thread-7 start
Thread Thread-8 start
Thread Thread-0 is working
Thread Thread-5 start
Thread Thread-2 start
Thread Thread-0 is over
Thread Thread-3 is working
Thread Thread-3 is over
Thread Thread-4 is working
Thread Thread-4 is over
Thread Thread-6 is working
Thread Thread-6 is over
Thread Thread-1 is working
Thread Thread-1 is over
Thread Thread-7 is working
Thread Thread-7 is over
Thread Thread-8 is working
Thread Thread-8 is over
Thread Thread-5 is working
Thread Thread-5 is over
Thread Thread-2 is working
Thread Thread-2 is over
因为须要一次性获取3个,因此只有一个线程执行完毕,才能执行下一个线程。