微信公众号:MyClass社区
若有问题或建议,请公众号留言java
前面咱们研究了线程池的使用,可是有人说有比线程池更快的多线程处理机制,今天咱们来看看java并发ForkJoinPool的使用。首先,我以为forkjoin不是用来替代线程池的,只是适用场景不同,下面进行简单的测试来看看forkjoin是怎样高效利用CPU的。简答说一下Forkjoin 只有你在将一个任务拆分红小任务时才有用处。fork-join池是是一个work-stealing工做窃取线程池。就是将大的任务拆分fork成小的任务,而后合并join结果的一种处理机制。微信
普通ForLoop处理
看看简单的批处理list,咱们平时可能都会遇到一个很大的队列,forloop处理起来很耗时,下面是最简单最直接的处理。不用想,这样的处理时间复杂度是O(n),基本上是最耗时的。这个没有什么好讨论的。多线程
private List<Integer> taskList = new ArrayList<>();
@Before
public void initList(){
for (int i = 0; i < 200; i++) {
taskList.add(i);
}
}
/**
* 普通循环
* @throws InterruptedException
*/
@Test
public void normalLoopTest() throws InterruptedException {
Instant start = Instant.now();
int resultSum = 0;
for (int i = 0; i < taskList.size(); i++) {
//模拟cpu计算
resultSum += taskList.get(i);
}
Instant end = Instant.now();
System.out.println("resultSum:"+resultSum+",耗费时间:"+ Duration.between( start,end ).toMillis());
}
并发
使用ThreadPoolExecutor
这里将list拆成步长为20的处理跨度,而后线程池提交10个任务,采用CountDownLatch 原子计数器记录线程执行是否完成,最后获取Future结果求和(200个任务里都须要一个for1000000的计算操做)。ide
/**
* 线程池处理
* @throws InterruptedException
*/
@Test
public void ThreadPoolExecutorTest() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Instant start = Instant.now();
//这里能够设置一个外部其余线程池运行干扰,
//由于一个应用程序通常的可能有多个线程池。
//跨度
int spanSize = 20;
//处理批次
int branchSize = taskList.size()/20;
//任务执行计数器
CountDownLatch countDownLatch = new CountDownLatch(branchSize);
List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
int resultSum = 0;
InterferenceThread();
for (int i = 0; i < branchSize; i++) {
final int finalI = i;
futures.add(executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
int startIndex = finalI *spanSize;
int end = (finalI+1)*spanSize;
for (int j = startIndex; j < end; j++) {
//模拟cpu计算
sum += taskList.get(j);
//这里模拟IO等待,
//Thread.sleep(10);
//可是为了和FJ比较,最好设置为耗cpu的计算
int a = 0;
for (int n = 0; n < 1000000; n++) {
a++;
}
}
countDownLatch.countDown();
return sum;
}
}));
}
//阻塞等待执行完全部的任务,计数器==0,标识全部任务执行完毕,关闭线程池
countDownLatch.await();
if(countDownLatch.getCount() ==0){
executorService.shutdown();
for (int i = 0; i < futures.size(); i++) {
resultSum += futures.get(i).get();
}
}
Instant end = Instant.now();
System.out.println("resultSum:"+resultSum+",耗费时间:"+ Duration.between( start,end ).toMillis());
}oop
运行结果:
4个核心线程:resultSum:19900,耗费时间:38ms
10个线程:resultSum:19900,耗费时间:37ms单元测试
总结:我分别进行各类线程个数测试,改变线程池核心线程数,发现对处理时间彻底没有改进,时间在一个范围区间变更,没有任何明显的变化(我首先设置一4个核心线程,由于我电脑是4核CPU,下边ForkJoinPool使用的时候,默认poolsize也是和本电脑的核数一致)。测试结果能够看出,对于计算型cpu密集型任务,增长线程数是没有任何效果的。因此线程池大小设置和cpu个数和执行IO时间和cup处理时间都有关系。测试
使用ForkJoinPool
单元测试类,一样(200个任务里都须要一个for1000000的计算操做)。可是ForkJoinPool是将200任务进行拆分,而后合并结果:ui
@Test
public void ForkJoinPoolTest() throws InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Instant start = Instant.now();
InterferenceThread();
int resultSum = forkJoinPool.invoke(new HandLerAction(taskList,0,taskList.size()-1));
System.out.println("forkJoinPool.getPoolSize:"+forkJoinPool.getPoolSize());
Instant end = Instant.now();
System.out.println("resultSum:"+resultSum+",耗费时间:"+ Duration.between( start,end ).toMillis());
}
this
执行类HandlerAction
public class HandLerAction extends RecursiveTask<Integer> {
private List<Integer> list;
private int start;
private int end;
public HandLerAction(List<Integer> list, int start, int end) {
this.list = list;
this.start = start;
this.end = end;
}
/**
* The main computation performed by this task.
*/
@Override
protected Integer compute() {
if(end -start < 1){
int resultSum = 0;
for (int i = start; i <= end; i++) {
try {
resultSum += list.get(i);
//Thread.sleep(10);
//可是为了和FJ比较,最好设置为耗cpu的计算
int a = 0;
for (int n = 0; n < 1000000; n++) {
a++;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return resultSum;
}else {
//二分法拆分任务队列
int middle = (start + end) / 2;
HandLerAction preAction = new HandLerAction(list,start,middle);
HandLerAction endAction = new HandLerAction(list,middle+1,end);
//fork执行
preAction.fork();
endAction.fork();
//结果合并
return preAction.join()+endAction.join();
}
}
}
执行结果:
forkJoinPool.getPoolSize:4
resultSum:19900,耗费时间:45
结果分析:使用forkJoinPool来跑这样的任务,才200个任务,二分任务队列,
1.当任务处理数拆分到小于20时,loop执行任务,发现执行耗时在45ms左右
2.当任务处理数拆分到小于10时,loop执行任务,发现执行耗时在30ms左右
3.当任务处理数拆分到小于5时,loop执行任务,发现执行耗时在25左右
4.当任务处理数拆分到小于1时,loop执行任务,发现执行耗时在22左右
总结:能够看出拆分的任务越多,cpu利用率越高,处理时间也越短,可是实际运用的时候拆分的粒度须要把握好,由于拆分越多,越压缩CPU使用,而且越小的粒度效果会愈来愈不明显。因此选择一个折中的粒度比较好,任务数很少很多,既能节省开辟任务内存消耗,又能合理利用CPU。
写在最后
在上面的例子中,处理Sum++求和的时候使用 Thread.sleep 来模拟业务处理时间,发现,线程池仍是线程池,处理起来比forkJoinPool快多了,因此forkJoinPool不适合IO等待型业务逻辑,适合CPU计算型的任务处理,因此了解了他们使用的区别,就能够帮助咱们更好的使用这两个多线程处理机制。下一篇将继续讨论forkJoinPool一些实现原理。
本文分享自微信公众号 - MyClass社区(MyClass_ZZ)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。