CyclicBarrier是java推出的一个并发编程工具,它用在多个线程之间协同工做。线程约定到达某个点,到达这个点以后的线程都停下来,直到最后一个线程也到达了这个点以后,全部的线程才会获得释放。经常使用的场景是:多个worker线程,每一个线程都在循环地作一部分工做,并在最后用cyclicBarrier.await()设下约定点,当最后一个线程作完了工做也到达约定点后,全部线程获得释放,开始下一轮工做。也就是下面这样:html
1 while(!done()){ 2 //working 3 cyclicBarrier.await(); 4 }
CyclicBarrier还支持一个回调函数,每当一轮工做结束后,下一轮工做开始前,这个回调函数都会被调用一次。java
可是,使用CyclicBarrier必须准守最佳实践的使用方法,不然,就可能达不到想要的效果。好比,下面这样,就是一种典型的错误使用方法:编程
private void process(CyclicBarrier cyclicBarrier) { final int n = 100; Runnable worker= new Runnable() { @Override public void run() { try { //模拟工做 Thread.sleep(3000); } catch (InterruptedException ex) { ex.printStackTrace(); } try { cyclicBarrier.await(); } catch (BrokenBarrierException | InterruptedException ex) { ex.printStackTrace(); } } System.out.println("Worker is done"); System.out.println("Thread of Worker is "+ Thread.currentThread().getId()); }; for (int i = 0; i < n; i++) { Thread t1 = new Thread(worker); Thread t2 = new Thread(worker); t1.start(); t2.start(); } }
在上面的代码中,工做不在worker线程中循环,而是在开启工做的线程中循环,也就是说,它会不断地开启新的worker线程。这会致使的一个问题是,上一轮的回调还没执行完成,下一轮的工做就已经开始了。api
那么为何呢?下面来分析一下缘由。并发
首先,要知道CyclicBarrier是如何作到在上一轮工做结束后下一轮工做开始前执行回调函数的。查看jdoc文档,里面有这么一句话“A CyclicBarrier supports an optional Runnable
command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. ”这是描述回调函数的,从描述中能够看到,回调函数是在最后一个线程到达约定点后,线程释放前被执行的。也就是说,回调函数的执行时间发生在下一轮工做前,这是经过在执行完回调函数再释放工做线程来实现的。oracle
而后,咱们再来看看上面错误的使用方法。在错误的使用方法中,主线程的每一轮循环中都开启了新的worker线程,这样在回调函数结束以前,前面开启的worker线程确实没有获得释放,可是,新开启的工做线程却彻底能够执行下一轮工做,这就是为何在回调函数执行完毕以前,新一轮的工做就已经开始了的缘由。而且,错误方法中的每个工做线程只执行一轮工做就结束了,每一轮工做之间的线程互不影响,这也就失去了协做性,所以,千万要避免写出这种代码。ide
关于CyclicBarrier使用的最佳时间,基本上就是官方示例中的用法了,以下:函数
1 class Solver { 2 final int N; 3 final float[][] data; 4 final CyclicBarrier barrier; 5 6 class Worker implements Runnable { 7 int myRow; 8 Worker(int row) { myRow = row; } 9 public void run() { 10 while (!done()) { 11 processRow(myRow); 12 13 try { 14 barrier.await(); 15 } catch (InterruptedException ex) { 16 return; 17 } catch (BrokenBarrierException ex) { 18 return; 19 } 20 } 21 } 22 } 23 24 public Solver(float[][] matrix) { 25 data = matrix; 26 N = matrix.length; 27 barrier = new CyclicBarrier(N, 28 new Runnable() { 29 public void run() { 30 mergeRows(...); 31 } 32 }); 33 for (int i = 0; i < N; ++i) 34 new Thread(new Worker(i)).start(); 35 36 waitUntilDone(); 37 } 38 }
最后在有一个问题是,回调函数是在哪个线程里执行的?工具
根据个人demo测试发现,是在第一个到达的线程中执行的。固然,官方并无明确规定这一点,也许之后会有变化吧,因此,咱们也不能以来这一特征。个人demo以下:测试
public class Demo1 { public static main(String[] args){ Demo1 demo = new Demo1(); demo1.showInfThreadWhenDirectly(); }
private void process(CyclicBarrier cyclicBarrier) { final int n = 100; Runnable worker= new Runnable() { @Override public void run() { for (int i = 0; i < n; i++) { try { Thread.sleep(3000); } catch (InterruptedException ex) { ex.printStackTrace(); } try { int arrival_index=cyclicBarrier.await(); if(0==arrival_index){ System.out.println("first arrival Thread in this iteration is: " +Thread.currentThread().getId()); } } catch (BrokenBarrierException | InterruptedException ex) { ex.printStackTrace(); } } System.out.println("Worker is done"); System.out.println("Thread of Worker is "+ Thread.currentThread().getId()); } }; Thread t1 = new Thread(worker); Thread t2 = new Thread(worker); t1.start(); t2.start(); } public void showInfThreadWhenDirectly(){ CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("[Directly] Thread in invert call function is" + Thread.currentThread().getId())); process(cyclicBarrier); System.out.println("[Directly] main Thread is "+ Thread.currentThread().getId()); } }
输出结果以下:
[Directly] main Thread is 1
[Directly] Thread in invert call function is10
first arrival Thread in this iteration is: 10
[Directly] Thread in invert call function is10
first arrival Thread in this iteration is: 10
[Directly] Thread in invert call function is10
first arrival Thread in this iteration is: 10
[Directly] Thread in invert call function is10
first arrival Thread in this iteration is: 10
[Directly] Thread in invert call function is11
first arrival Thread in this iteration is: 11
[Directly] Thread in invert call function is10
first arrival Thread in this iteration is: 10
[Directly] Thread in invert call function is10
first arrival Thread in this iteration is: 10
[Directly] Thread in invert call function is10
first arrival Thread in this iteration is: 10
[Directly] Thread in invert call function is11
first arrival Thread in this iteration is: 11
另外,官方还有一段:“
If the barrier action does not rely on the parties being suspended when it is executed, then any of the threads in the party could execute that action when it is released. To facilitate this, each invocation of
await()
returns the arrival index of that thread at the barrier. You can then choose which thread should execute the barrier action, for example:if (barrier.await() == 0) { // log the completion of this iteration }”
意思是说,若是回调动做“arrier action”不须要在全部工做线程都中止的状态下执行的话,那么能够随便找一个工做线程去作这个动做。为了支持这个,CyclicBarrier 的await( )方法有一个返回值,返回的就是当前线程是第几个到达约定点(barrier)的。
参考https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CyclicBarrier.html