上一节讲到了CountDownLatch这个并发辅助类,它可以让一个线程等待其余并发线程执行完一组任务后再继续执行,也能够说是实现了并发线程在集合点同步。可是Java又给出了一个更强大的并发辅助类CyclicBarrier。java
CyclicBarrier也使用一个整形参数进行初始化,这个参数是须要再某点同步的线程数。当线程调用await()方法后CyclicBarrier类将把这个线程编为WAITING状态,并等待直到其余线程都到达集合点。当最后一个线程到达集合点后,调用CyclicBarrier类的await()方法时,CyclicBarrieer对象将唤醒全部经过await()方法进入等待的线程。算法
CyclicBarrier与CountDownLatch不一样的地方在于,CountDownLatch经过countDown()方法对计数器减1来标记一个线程已经到达集合点,而且这个线程不会阻塞会继续执行。而CyclicBarrier类则经过await()方法标记线程到达集合点,而且这个到达集合点的线程会被阻塞。另外CyclicBarrier还支持把一个Runnable对象做为一个初始化参数,当全部的线程都到达集合点的时候,这个线程会被启动。这很是相似以一个分治算法的实现,把一个大任务拆分红若干个子任务,并等待全部子任务结束后,输出执行结果。多线程
下面咱们用一个实例来演示如何使用CyclicBarrier来模拟使用分治算法在一个矩阵中查找一个数字出现的次数。并发
首先咱们建立一个矩阵类,构造函数接受矩阵的维度以及须要查找的数字,并采用随机数的方式构建这个矩阵而后记录下来构建矩阵过程当中这个待查找数字出现的次数。dom
public class MatrixMock { private int[][] data; public MatrixMock(int size, int length, int number) { int count = 0; data = new int[size][length]; Random random = new Random(); for (int i = 0; i < size; i++) { for (int j = 0; j < length; j++) { int temp = random.nextInt(10); data[i][j] = temp; if (temp == number) { count++; } } } System.out.printf("Mock: There are %d occurences of %d in generate\n", count, number); } public int[] getRow(int row) { if(row >= 0 && row < data.length) { return data[row]; } return null; } }
查找结果类,定义了每行中待查找数字出现的次数。ide
public class Results { int data[]; public Results(int size) { data = new int[size]; } public void setData(int position, int value) { data[position] = value; } public int[] getData() { return data; } }
接下来咱们建立查找任务线程,这个类接受几个参数做为构造方法。分别是MatrixMock(待查找矩阵),Results(查找结果),firstRow(此任务分配的开始查找位置),lastRow(此任务分配的结束查找位置),number(待查找数字),CyclicBarrier(并发辅助类)。而后咱们从firstRow开始查找,直到lastRow结束查找每行中待查找数字出现的次数并保存到Result中。函数
public class Searcher implements Runnable{ private MatrixMock matrixMock; private Results results; private int firstRow; private int lastRow; private int number; private final CyclicBarrier cyclicBarrier; public Searcher(CyclicBarrier cyclicBarrier, MatrixMock matrixMock, Results results, int firstRow, int lastRow, int number) { this.cyclicBarrier = cyclicBarrier; this.matrixMock = matrixMock; this.results = results; this.firstRow = firstRow; this.lastRow = lastRow; this.number = number; } @Override public void run() { System.out.printf("%s: Processing lines from %d to %d.\n", Thread.currentThread().getName(), firstRow, lastRow); for (int i = firstRow; i < lastRow; i++) { int row[] = matrixMock.getRow(i); int counter = 0; for (int j = 0; j < row.length; j++) { if(row[j] == number) { counter++; } } results.setData(i, counter); } System.out.printf("%s: Line processed.\n", Thread.currentThread().getName()); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
查找任务合并线程,也就是等待全部全部查找子任务完成后,把全部子任务查找的结果作一个合计,并打印出一共找到多少个待查找数字。this
public class Grouper implements Runnable{ private Results results; public Grouper(Results results) { this.results = results; } @Override public void run() { System.out.printf("Grouper: Processing results...\n"); int count = 0; for (int i = 0; i < results.getData().length; i++) { count += results.getData()[i]; } System.out.printf("Grouper: Total result: %d\n", count); } }
在主方法类中建立一个10000*10000的矩阵,并启动5个Searcher线程去查找数字5在每行中出现的次数。而后使用Grouper线程做为CyclicBarrier的初始化参数,等待全部Searcher任务执行完毕后执行Grouper线程来合计每一个Searcher线程的查找结果。线程
public class Main { public static void main(String[] args) { int size = 10000; int length = 10000; int search = 5; int participants = 5; int lines_participants = size / participants; MatrixMock matrixMock = new MatrixMock(size, length, search); Results results = new Results(size); Grouper grouper = new Grouper(results); CyclicBarrier cyclicBarrier = new CyclicBarrier(participants, grouper); Searcher[] searchers = new Searcher[participants]; Thread[] searcherThreads = new Thread[participants]; for (int i = 0; i < participants; i++) { int firstRow = i * lines_participants; int lastRow = (i + 1) * lines_participants; searchers[i] = new Searcher(cyclicBarrier, matrixMock, results, firstRow, lastRow, search); searcherThreads[i] = new Thread(searchers[i]); searcherThreads[i].start(); } } }
查看任务执行日志,咱们发现每一个Searcher任务执行2000行数据的查找工做,最终找到9997834个带查找数字。日志
Mock: There are 9997834 occurences of 5 in generate Thread-0: Processing lines from 0 to 2000. Thread-1: Processing lines from 2000 to 4000. Thread-2: Processing lines from 4000 to 6000. Thread-3: Processing lines from 6000 to 8000. Thread-0: Line processed. Thread-2: Line processed. Thread-3: Line processed. Thread-4: Processing lines from 8000 to 10000. Thread-1: Line processed. Thread-4: Line processed. Grouper: Processing results... Grouper: Total result: 9997834
CyclicBarrier类提供了两个方法用来查看在CyclicBarrier上面等待的线程数和同步的任务数。
System.out.printf("CyclicBarrier: %d, %d\n", cyclicBarrier.getNumberWaiting(), cyclicBarrier.getParties());
CyclicBarrier类还支持重置,经过reset()方法完成操做。当重置发生后,await()方法将接受一个BrokenBarrierException异常,你捕获这个异常后能够用来执行一些复杂的操做,好比回滚数据或者从新执行。
CyclicBarrier类有一个特有的状态为Broken,当多线程并发等待的时候,有一个线程被中断,这个线程抛出InterruptedException异常,其余等待的线程将抛出BrokenBarrierException异常,因而CyclicBarrier对象属于Broken状态。你可使用isBroken()方法来判断CyclicBarrier对象是否处于损坏状态。