CountdownLatch和CyclicBarrier都属于线程同步的工具,不过具体的实现以及使用的状况有所不一样,咱们先来看看不一样的使用状况bash
顾名思义CountdownLatch能够当作一个计数器来使用,好比某线程须要等待其余几个线程都执行过某个时间节点后才能继续执行 咱们来模拟一个场景,某公司一共有十我的,门卫要等十我的都来上班之后,才能够休息,代码实现以下微信
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
//lambda中只能只用final的变量
final int times = i;
new Thread(() -> {
try {
System.out.println("子线程" + Thread.currentThread().getName() + "正在赶路");
Thread.sleep(1000 * times);
System.out.println("子线程" + Thread.currentThread().getName() + "到公司了");
//调用latch的countDown方法使计数器-1
latch.countDown();
System.out.println("子线程" + Thread.currentThread().getName() + "开始工做");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
try {
System.out.println("门卫等待员工上班中...");
//主线程阻塞等待计数器归零
latch.await();
System.out.println("员工都来了,门卫去休息了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
复制代码
运行后结果以下app
子线程Thread-0正在赶路
子线程Thread-2正在赶路
子线程Thread-0到公司了
子线程Thread-0开始工做
子线程Thread-1正在赶路
门卫等待员工上班中...
子线程Thread-4正在赶路
子线程Thread-9正在赶路
子线程Thread-5正在赶路
子线程Thread-6正在赶路
子线程Thread-7正在赶路
子线程Thread-8正在赶路
子线程Thread-3正在赶路
子线程Thread-1到公司了
子线程Thread-1开始工做
子线程Thread-2到公司了
子线程Thread-2开始工做
子线程Thread-3到公司了
子线程Thread-3开始工做
子线程Thread-4到公司了
子线程Thread-4开始工做
子线程Thread-5到公司了
子线程Thread-5开始工做
子线程Thread-6到公司了
子线程Thread-6开始工做
子线程Thread-7到公司了
子线程Thread-7开始工做
子线程Thread-8到公司了
子线程Thread-8开始工做
子线程Thread-9到公司了
子线程Thread-9开始工做
员工都来了,门卫去休息了
复制代码
能够看到子线程并无由于调用latch.countDown而阻塞,会继续进行该作的工做,只是通知计数器-1,即完成了咱们如上说的场景,只须要在全部进程都进行到某一节点后才会执行被阻塞的进程.若是咱们想要多个线程在同一时间进行就要用到CyclicBarrier了工具
咱们从新模拟一个新的场景,就用已经被说烂的跑步场景吧,十名运动员各自准备比赛,须要等待全部运动员都准备好之后,裁判才能说开始而后全部运动员一块儿跑,代码实现以下ui
public static void main(String[] args) {
final CyclicBarrier cyclicBarrier = new CyclicBarrier(10,()->{
System.out.println("全部人都准备好了裁判开始了");
});
for (int i = 0; i < 10; i++) {
//lambda中只能只用final的变量
final int times = i;
new Thread(() -> {
try {
System.out.println("子线程" + Thread.currentThread().getName() + "正在准备");
Thread.sleep(1000 * times);
System.out.println("子线程" + Thread.currentThread().getName() + "准备好了");
cyclicBarrier.await();
System.out.println("子线程" + Thread.currentThread().getName() + "开始跑了");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
复制代码
执行结果以下this
子线程Thread-0正在准备
子线程Thread-2正在准备
子线程Thread-1正在准备
子线程Thread-3正在准备
子线程Thread-4正在准备
子线程Thread-0准备好了
子线程Thread-5正在准备
子线程Thread-6正在准备
子线程Thread-7正在准备
子线程Thread-8正在准备
子线程Thread-9正在准备
子线程Thread-1准备好了
子线程Thread-2准备好了
子线程Thread-3准备好了
子线程Thread-4准备好了
子线程Thread-5准备好了
子线程Thread-6准备好了
子线程Thread-7准备好了
子线程Thread-8准备好了
子线程Thread-9准备好了
全部人都准备好了裁判开始了
子线程Thread-9开始跑了
子线程Thread-0开始跑了
子线程Thread-2开始跑了
子线程Thread-1开始跑了
子线程Thread-7开始跑了
子线程Thread-6开始跑了
子线程Thread-5开始跑了
子线程Thread-4开始跑了
子线程Thread-3开始跑了
子线程Thread-8开始跑了
复制代码
能够看到全部线程在其余线程没有准备好以前都在被阻塞中,等到全部线程都准备好了才继续执行 咱们在建立CyclicBarrier对象时传入了一个方法,当调用CyclicBarrier的await方法后,当前线程会被阻塞等到全部线程都调用了await方法后 调用传入CyclicBarrier的方法,而后让全部的被阻塞的线程一块儿运行spa
应用场景咱们说完了,接下来看看两个工具的具体实现线程
咱们先来看看CountdownLatch的构造方法code
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
复制代码
首先保证了count必定要大于零,而后初始化了一个Sync对象,在看看这个Sync对象是个什么cdn
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
复制代码
Sync是CountdownLatch的静态内部类,继承了AbstractQueuedSynchronizer(即AQS,提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的工具,回头单讲)抽象类, 在Sync的构造方法中,调用了setState方法,能够视做初始化了一个标记来记录当前计数器的数量
咱们来看CountdownLatch的两个核心方法,await和countdown,先来看await
public void await() throws InterruptedException {
//能够视做将线程阻塞
sync.acquireSharedInterruptibly(1);
}
复制代码
await调用的是AQS的方法,能够视做阻塞线程,具体实如今分析AQS的章节中展开 再来看看countdown方法
public void countDown() {
sync.releaseShared(1);
}
复制代码
调用了sync的一个方法,再来看看这个方法的实现
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
再来看这个tryReleaseShared方法
protected boolean tryReleaseShared(int releases) {
for (;;) {
//获取标记位
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
//用cas的方式更新标记位
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
复制代码
能够看到在调用tryReleaseShared其实是将标记位-1而且返回标记位是否为0,若是标记位为0 那么调用的doReleaseShared能够视做将阻塞的线程放行,这样整个的流程就通了
老规矩先看构造方法
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
复制代码
这边传入了两个对象简单的记录了一下存值,咱们直接查看一下关键的await方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
复制代码
再来看dowait的实现
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** 省略部分代码 **/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
//判断是否被打断
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//将计数器-1 即在构造方法中赋值的count
int index = --count;
if (index == 0) { // tripped
//若是全部的线程都执行完毕即count=0时
boolean ranAction = false;
try {
//执行传入的方法
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//唤醒全部线程
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
//若是count没有到0那么阻塞当前线程
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } 复制代码
从代码中能够看到,CyclicBarrier是利用Lock的condition方法来进行线程的阻塞和唤醒,相似Object.wait()和notifyAll()在count不为0时阻塞,在count=0时唤醒全部线程
1,CountdownLatch适用于全部线程经过某一点后通知方法,而CyclicBarrier则适合让全部线程在同一点同时执行 2,CountdownLatch利用继承AQS的共享锁来进行线程的通知,利用CAS来进行--,而CyclicBarrier则利用ReentrantLock的Condition来阻塞和通知线程
感谢阅读
有兴趣能够关注个人我的微信公众号,会按期推送关于Java的技术文章,并且目前不恰饭都是干货哈哈哈哈