须要提早了解的知识点: AbstractQueuedSynchronizer 实现原理并发
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它经过协调各个线程,以保证合理的使用公共资源。好比控制用户的访问量,同一时刻只容许1000个用户同时使用系统,若是超过1000个并发,则须要等待。dom
好比模拟一个停车场停车信号,假设停车场只有两个车位,一开始两个车位都是空的。这时若是同时来了两辆车,看门人容许它们进入停车场,而后放下车拦。之后来的车必须在入口等待,直到停车场中有车辆离开。这时,若是有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,若是又离开一辆,则又能够放入一辆,如此往复。ide
public class SemaphoreDemo { private static Semaphore s = new Semaphore(2); public static void main(String[] args) { ExecutorService pool = Executors.newCachedThreadPool(); pool.submit(new ParkTask("1")); pool.submit(new ParkTask("2")); pool.submit(new ParkTask("3")); pool.submit(new ParkTask("4")); pool.submit(new ParkTask("5")); pool.submit(new ParkTask("6")); pool.shutdown(); } static class ParkTask implements Runnable { private String name; public ParkTask(String name) { this.name = name; } @Override public void run() { try { s.acquire(); System.out.println("Thread "+this.name+" start..."); TimeUnit.SECONDS.sleep(new Random().nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } finally { s.release(); } } } }
Semaphore 经过使用内部类Sync继承AQS来实现。
支持公平锁和非公平锁。内部使用的AQS的共享锁。
具体实现可参考 AbstractQueuedSynchronizer 源码分析源码分析
Semaphore 的结构以下:ui
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
构造方法指定信号量的许可数量,默认采用的是非公平锁,也只能够指定为公平锁。
permits赋值给AQS中的state变量。this
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
得到信号量方法,这两个方法支持 Interrupt中断机制,可以使用acquire() 方法每次获取一个信号量,也可使用acquire(int permits) 方法获取指定数量的信号量 。spa
public void acquireUninterruptibly() { sync.acquireShared(1); } public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); }
这两个方法不响应Interrupt中断机制,其它功能同acquire方法机制。.net
public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }
尝试得到信号量有三个方法。线程
public void release() { sync.releaseShared(1); }
调用AQS中的releaseShared方法,使得state每次减一来控制信号量。code
public int availablePermits() { return sync.getPermits(); } //=========Sync类======== final int getPermits() { return getState(); }
该方法返回AQS中state变量的值,当前剩余的信号量个数
public int drainPermits() { return sync.drainPermits(); } //=========Sync类======== final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } }
获取并返回当即可用的全部许可。Sync类的drainPermits方法,获取1个信号量后将可用的信号量个数置为0。例如总共有10个信号量,已经使用了5个,再调用drainPermits方法后,能够得到一个信号量,剩余4个信号量就消失了,总共可用的信号量就变成6个了。
protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); } //=========Sync类======== final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } }
该方法是protected 方法,减小信号量个数
public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } //=========AbstractQueuedSynchronizer类======== public final boolean hasQueuedThreads() { //头结点不等于尾节点就说明链表中还有元素 return head != tail; }
protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } //=========AbstractQueuedSynchronizer类======== public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { Thread t = p.thread; if (t != null) list.add(t); } return list; }
该方法获取AQS中等待队列中全部未获取信号量的线程相关的信息(等待获取信号量的线程相关信息)。