java5 中 ,提供了几个并发工具类 ,Semaphore CountDownLatch CyclicBarrier,在并发编程中很是实用。前二者经过 内部类sync 继承AQS,使用共享资源的模式,AQS的实现可参考个人另外一篇 AQS 实现分析,前二者根据各自功能需求 , 各自内部实现tryAcquireShared(获取资源)、tryReleaseShared(释放)。来定义什么条件 下来获取与释放。而CyclicBarrier内部经过Reentrantlock与Condition组合的方式实现。java
与Reentrantlock相似,也有公平和非公平的机制。这里就不在分析了,默认是非公平的。编程
经过acquire 获取并发
Semphore public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); //非独占模式 }
AQS public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //当返回值不小于0时,得到资源。 doAcquireSharedInterruptibly(arg); //资源获取失败,加入队列尾部,阻塞 }
从AQS的分析中获得,当tryAcquireShared 返回值小于0,那么认为获取失败。而对于tryAcquiredShared的实现中,让其执行时,其返回值需大于0,对于Semaphore信号量,构造时会设置许可的大小 。表示可以获取的资源,通常大于1。app
默认是非公平 的,然后会调用以下方法,会返回工具
Semaphore NonFairSync final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
减去本次尝试的获取的许可数,当结果小于0,直接返回,而若是不小于0 , cas更新许可,若是更新失败,代表其它线程也在更新,然后 进入下一次循环,直到可用的小于0或者cas成功。当返回值小于0时,阻塞,等待唤醒资源释放。返回值大于 0,获取许可成功,继续执行。oop
释放ui
当获取许可的线程执行完时,必须释放占有的许可量,this
Semaphore public void release() { sync.releaseShared(1); //释放时需CAS更新,独占模式不需cas }
AQS public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); //唤醒head的后继节点 return true; } return false; }
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) //由于是 共享锁,可能有多个线程释放 return true; } }
示例:spa
项目中的示例:在开发的项目中,一个数据中心有不少个集群,集群中有不少个虚拟机实例,同一个集群中有多个虚拟机要开机,可是开机前须要使用调度策略,须要更新集群中的信息,可是只容许一个集群只有一个虚拟机能执行策略。这里就能够经过Semaphore实现。.net
clusterLockMap.putIfAbsent(cluster.getId(), new Semaphore(1)); //为每一个集群初始一个许可 clusterLockMap.get(cluster.getId()).acquire(); //尝试获取许可 //执行逻辑 //释放资源,这里调用drainPermits的缘由是,在释放前清空许可, 由于其它 在阻塞的线程若是被中断了,会将许可值扩大,因此在释放前,将其清空。 finally { //保证了在任一时刻,只有一个虚拟机能调用策略 synchronized (clusterLockMap.get(cluster.getId())) { clusterLockMap.get(cluster.getId()).drainPermits(); //清空,返回清空数,为空时直接返回0 clusterLockMap.get(cluster.getId()).release(); } }
上面的示例中若是不加锁,异常发生时可能致使许可大于一。
相似于计数器,好比经常使用于 一个或几个线程,要等待其它的线程执行完才能继续执行。内部只有一个继承了AQS的sync。只能使用一次。
经常使用方法 await ,用来等待执行信号。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); //状态不为1时,阻塞。等待被唤醒 }
boolean await(long timeout, TimeUnit unit) 超时尚未为0,返回false
CountDownLatch sync protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
判断当前是否等于0 ,也就是说 countDown 的调用次数是否等于初始化的数量。
countDown 信号的释放
CountDownLatch public void countDown() { sync.releaseShared(1); }
CountDownLatch sync protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
p实例:
F1 比赛前须要等待发车信号,全部的车才能出发。 CountDownLatch(1) ,车比如线程,发车前await。
信号发出,countDown。
F1车进入修息区,必须 全部的检测 灯都为绿才能出发。车出发前 await,每个必检项检测完调用countDown,才能出发(线程才能运行)。
CyclicBarrier 最大的特别 之处就是,在构造时能够指定一个线程,而且能够重复使用。
final Runnable barrierCommand;
该线程的做用在于,当调用await时,且许可为0 时,执行完 barrierCommand的线程 才能继续执行。
实现细节,类的结构图以下:
构造方法时,能够指定barrierCommand,若是不指定,则功能相似于CountDownLatch,也有点计数的意思 。其从成员变量 能够看出,内部有ReentrantLock的属性,内部多是经过ReentrantLock的调用实现。接下来分析源码。
分析源码前,几个关键变量注意下,
lock 同步锁,
trip = lock.newCondition()
count 计数器
generation 内部实现的类,用来表示是否新的计数的开始
构造时指定count,线程调用await,count减1 阻塞,直到count等于0,且barrierCommand执行完若是有的话。
调用方法有两种:
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
内部实现
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); //被中断,本次计数无效,并将标志置为broken throw new InterruptedException(); } int index = --count; // 计数减一 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); //计数为0时,执行run方法 ranAction = true; nextGeneration(); //本次计数结束,trip.singnalAll(),计数重置。 return 0; } finally { if (!ranAction) breakBarrier(); //出现异常,本次计数无效。唤醒其它await线程, } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); // 阻塞 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); // 限时阻塞,调用await(long timeout, TimeUnit unit), } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) // 是否仍是同一个计数周期内 return index; // 顺利执行,返回 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); //只有调用超时await方法,且超时,抛出。 } } } finally { lock.unlock(); } }
generation 的操做
顺利执行后,当计数为0,且周期重置。
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
执行时出现异常或者等待超时,唤醒。计数无效,使唤醒的线程检测到generation的broken 标识为true,抛出BrokenBarrierException 。
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
总结:
从这上面的分析能够得出如下状况
大部分的并发工具类,是经过构造内部类继承AQS,并根据工具类设计的功能,实现对应的获取与释放资源的方法。来使得 调用线程什么时候阻塞,什么时候释放。如CountDownLatch 就不一样于Semaphore的设计思路,Semaphore是先尝试获取资源,获取到才能释放,获取不到,阻塞。虽然实现方式(继承AQS)同样。而CountDownLatch 释放资源 前没有经过相应的方法获取,而是直接将当前的许可数减一,实现方法 tryReleaseShared ,然后根据是否计数为0决定是否唤醒线程。而对应的获取资源方法 tryAcquireShared,判断计数器是否已经到达0来判断是否执行。或者阻塞。获取后不须要释放。
而CyclicBarrier 并无直接经过内部sync继承AQS的方式,而是经过现有的工具类Reentantlock,与Condition组合来实现功能,且还能重用。