Semaphore是计数信号量。Semaphore管理一系列许可证。每一个acquire方法阻塞,直到有一个许可证能够得到而后拿走一个许可证;每一个release方法增长一个许可证,这可能会释放一个阻塞的acquire方法。然而,其实并无实际的许可证这个对象,Semaphore只是维持了一个可得到许可证的数量。java
Semaphore常常用于限制获取某种资源的线程数量。下面举个例子,好比说操场上有5个跑道,一个跑道一次只能有一个学生在上面跑步,一旦全部跑道在使用,那么后面的学生就须要等待,直到有一个学生不跑了,下面是这个例子:ui
public class Playground { private String[] tracks = {"跑道1","跑道2","跑道3","跑道4","跑道5"};//一共有5个跑道 private volatile boolean[] used = new boolean[5];//标记跑道是否被占用 private Semaphore semaphore = new Semaphore(5, true); //获取一个跑道 public String getTrack() throws InterruptedException { semaphore.acquire(1); return getNextAvailableTrack(); } //返回一个跑道 public void releaseTrack(String track) { if (makeAsUnused(track)) semaphore.release(1); } //遍历,找到一个没人用的跑道 private String getNextAvailableTrack() { for (int i = 0; i < used.length; i++) { if (!used[i]) { used[i] = true; return tracks[i]; } } return null; } //释放跑道,将使用标志设置为false private boolean makeAsUnused(String track) { for (int i = 0; i < used.length; i++) { if (tracks[i].equals(track)) { if (used[i]) { used[i] = false; return true; } else { return false; } } } return false; } public static void main(String[] args) { Executor executor = Executors.newCachedThreadPool(); Playground playground = new Playground(); Runnable runnable = ()->{ try { String track = playground.getTrack();//获取跑道 if (track != null) { System.out.println("学生" + Thread.currentThread().getId() + "在" + track.toString() + "上跑步"); TimeUnit.SECONDS.sleep(2); System.out.println("学生" + Thread.currentThread().getId() + "释放" + track.toString()); playground.releaseTrack(track);//释放跑道 } } catch (InterruptedException e) { e.printStackTrace(); } }; for (int i = 0; i < 100; i++) { executor.execute(runnable); } } }
public Semaphore(int permits) { sync = new NonfairSync(permits);//提供许可数量,默认为非公平模式 } public Semaphore(int permits, boolean fair) { //提供许可数量,指定是否为公平模式 sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
Semaphore内部基于AQS的共享模式,因此实现都委托给了Sync类。.net
NonfairSync(int permits) { super(permits); }
Sync(int permits) { setState(permits);//AQS的state表示许可证的数量 }
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //若是线程被中断了,抛出异常 if (Thread.interrupted()) throw new InterruptedException(); //获取许可失败,将线程加入到等待队列中 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
AQS子类若是要使用共享模式的话,须要实现tryAcquireShared方法,下面看NonfairSync的该方法实现:线程
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
该方法调用了父类中的nonfairTyAcquireShared方法,以下:code
final int nonfairTryAcquireShared(int acquires) { for (;;) { //获取剩余许可数量 int available = getState(); //计算给完此次许可数量后的个数 int remaining = available - acquires; //若是许可不够(获取许可失败)或者能够将许可数量重置(获取许可成功)的话,返回。 //只有在许可不够时返回值才会小于0,其他返回的都是剩余许可数量,这也就解释了,一旦许可不够,后面的线程将会阻塞。 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
protected int tryAcquireShared(int acquires) { for (;;) { //若是前面有线程再等待,直接返回-1 if (hasQueuedPredecessors()) return -1; //后面与非公平同样 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
FairSync与NonFairSync的区别就在于会首先判断当前队列中有没有线程在等待,若是有,就老老实实进入到等待队列;而不像NonfairSync同样首先试一把,说不定就刚好得到了一个许可,这样就能够插队了。对象
public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
releaseShared方法在AQS中,以下:blog
public final boolean releaseShared(int arg) { //若是改变许可数量成功 if (tryReleaseShared(arg)) { doReleaseShared();//一旦CAS改变许可数量成功,就调用该方法释放阻塞的线程。 return true; } return false; }
AQS子类实现共享模式的类须要实现tryReleaseShared类来判断是否释放成功,实现以下:队列
protected final boolean tryReleaseShared(int releases) { for (;;) { //获取当前许可数量 int current = getState(); //计算回收后的数量 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //CAS改变许可数量成功,返回true if (compareAndSetState(current, next)) return true; } }
protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); }
能够看到,委托给了Sync,Sync的reducePermits方法以下:资源
final void reducePermits(int reductions) { for (;;) { //获得当前剩余许可数量 int current = getState(); //获得减完以后的许可数量 int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); //若是CAS改变成功 //CAS改变AQS中的state变量,由于该变量表明许可证的数量。 if (compareAndSetState(current, next)) return; } }
Semaphore还能够一次将剩余的许可数量所有取走,该方法是drain方法,以下:rem
public int drainPermits() { return sync.drainPermits(); }
Sync的实现以下:
final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0))//CAS将许可数量置为0。 return current; } }
Semaphore是信号量,用于管理一组资源。其内部是基于AQS的共享模式,AQS的状态表示许可证的数量,在许可证数量不够时,线程将会被挂起;而一旦有一个线程释放一个资源,那么就有可能从新唤醒等待队列中的线程继续执行。
参考地址: