Semaphore[ˈseməfɔ:(r)]意为信号量,比较书面的解释是用来控制同时访问特定资源的线程数量,它经过协调各个线程,以保证合理的使用公共资源。
Semaphore维护了信号量许可,线程只有得到了许可才可以访问资源,能够把Semaphore理解为风景区管理员,风景区有人数限制,达到了人数限制管理员就会让后来的游客等着直到风景区里面的游客离开,这里风景区至关于须要协调的公共资源,人数限制就至关于Semaphore维护的许可量,而游客就至关因而执行任务的线程。node
Semaphore是基于共享锁实现的,内部类Sync是同步器AQS的子类,Sync有两个子类:公平信号量FairSync和非公平信号量NonefairSync,Semaphore默认非公平策略。数据结构
public class SemaphoreTest { private static final int THREAD_COUNT = 10; private static ExecutorService pool = Executors.newFixedThreadPool(THREAD_COUNT); public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 0; i < THREAD_COUNT; i++) { pool.execute(() -> { try { semaphore.acquire(); System.out.println("Thread " + Thread.currentThread().getId() + " is saving data"); Thread.sleep(1000); System.out.println("Thread " + Thread.currentThread().getId() + " finished"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }); } pool.shutdown(); } }
运行结果:
函数
结果代表:同时只有三个线程能执行,由于Semaphore许可只有3个,至关于只有三个现场能访问同步资源,只有当线程释放许可,其余线程才能获取许可访问同步资源。oop
Semaphore提供两种构造函数,默认是非公平策略的,会根据传入的许可permits设置同步状态state。ui
public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L; /** All mechanics via AbstractQueuedSynchronizer subclass */ private final Sync sync; public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; //根据permits设置AQSstate Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } } }
Semaphore提供的获取许可permits方法有acquire方法,都是调用的Sync分类AQS的acquireSharedInterruptibly方法,首先介绍基于公平策略如何获取信号量的。线程
//获取一个许可 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //获取多个许可 public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } //AQS方法 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //有中断则抛出异常 if (Thread.interrupted()) throw new InterruptedException(); //尝试获取“共享锁”;获取成功则直接返回,获取失败,则经过doAcquireSharedInterruptibly()获取。 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
Sync子类FairSync实现的tryAcquireShared方法,首先判断AQS同步队列还有没有其余正在等待的线程,若是当前线程前面没有等待线程,尝试CAS修改,采用的是循环+CAS的方式修改同步状态code
protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; //目前还有多少量可 int available = getState(); //当前线程得到acquires个许可后剩下的许可 int remaining = available - acquires; //剩下的许可大于0,CAS修改 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
若是尝试获取失败,就会调用AQS的doAcquireSharedInterruptibly方法,首先会以当前线程构成共享型Node节点加入同步队列尾部,若是上一个节点是head节点,就尝试获取共享锁,不然就进入等待状态,等待前继节点成为head节点释放共享锁并唤醒后继节点。blog
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 建立”当前线程“的Node节点,且Node中记录的锁是”共享锁“类型;并将该节点添加到AQS同步队列末尾。 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //当前节点的上一个节点p final Node p = node.predecessor(); //节点p是头节点就尝试修改同步状态 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 当前线程一直等待,直到获取到共享锁。 // 若是线程在等待过程当中被中断过,则再次中断该线程(还原以前的中断状态)。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
信号量的释放,本质上就是释放获取到的共享锁。与acquire方法对应,释放信号量也有两种release方法,都调用了AQS的releaseShared方法。队列
//释放一个许可 public void release() { sync.releaseShared(1); } //释放多个许可 public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } //AQS方法 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared方法是有内部类Sync提供实现的,意味着公平方式与非公平方式释放共享锁的实现相同的。循环+CAS修改同步状态。
protected final boolean tryReleaseShared(int releases) { for (;;) { //当前同步状态/许可数 int current = getState(); //释放了releases个许可后剩余的许可数 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //CAS修改同步状态 if (compareAndSetState(current, next)) return true; } }
tryReleaseShared成功释放后,doReleaseShared唤醒等待线程
private void doReleaseShared() { for (;;) { //头节点 Node h = head; // 若是头节点不为null,而且头节点不等于tail节点。同步队列除了head节点还有其余等待节点 if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
非公平方式获取以及释放信号量的实现与公平方式只有tryAcquireShared的实现不一样,释放的逻辑是相同的。
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
基于共享锁实现的Semaphore能够控制必定数量的线程同时访问同步资源,超过数量的线程须要等待直到有线程完成操做释放许可,从而保证合理使用同步资源。