自定义 ForkJoinPool 提高并行流 ParallelStream 执行速度

简介

在 java8 中 添加了流Stream,可让你以一种声明的方式处理数据。使用起来很是简单优雅。ParallelStream 则是一个并行执行的流,采用 ForkJoinPool 并行执行任务,提升执行速度。java

下面咱们看看2个简单的示例:git

示例1 (list)

Arrays.asList(1,2,3,4,5,6)
	.parallelStream()
	.forEach((value) -> {
		String name = Thread.currentThread().getName();
		System.out.println("示例1 Thread:" + name + " value:" + value);
	});
复制代码

示例2 (array)

Stream.of(1,2,3,4,5,6)
	.parallel()
	.forEach((value) -> {
		String name = Thread.currentThread().getName();
		System.out.println("示例2 Thread:" + name + " value:" + value);
	});
复制代码

问题引出

笔者最近在作一些爬虫相关的业务,其核心工具已开源 mica-http:gitee.com/596392912/m… ,通过2个版本的迭代已经发展成了一个强大非帐号爬虫利器,赶忙来试试吧。数据库

image.png

image.png

咱们采集了大量的代理 ip 用来供爬虫使用,其中有个定时任务每 5 分钟去检测代理是否失效,代理 ip 检测比较费时,咱们给每一个检测的请求 设定了 2s 的超时,这样单线程的话 1000 个 ip 就得消耗半个多小时,固然笔者在校验的时候采用的 parallel Stream 简化开发。bash

而后发现效果并不明显,代理 ip 数量上来以后 5 分钟彻底检测不完,致使任务堆积。明明用了并发流为何没有明显的提升执行速度呢?多线程

001.png

下面咱们来看看刚刚的“示例”打印出的信息:架构

示例1 Thread:main value:4
示例1 Thread:ForkJoinPool.commonPool-worker-2 value:1
示例1 Thread:main value:6
示例1 Thread:ForkJoinPool.commonPool-worker-2 value:5
示例1 Thread:main value:3
示例1 Thread:ForkJoinPool.commonPool-worker-1 value:2
示例2 Thread:main value:4
示例2 Thread:ForkJoinPool.commonPool-worker-3 value:3
示例2 Thread:ForkJoinPool.commonPool-worker-2 value:5
示例2 Thread:ForkJoinPool.commonPool-worker-4 value:1
示例2 Thread:ForkJoinPool.commonPool-worker-5 value:2
示例2 Thread:ForkJoinPool.commonPool-worker-1 value:6
复制代码

咱们能够看到 Parallel Stream,默认采用的是一个 ForkJoinPool.commonPool 的线程池,这样咱们就算使用了 Parallel Stream, 整个 jvm 共用一个 common pool 线程池,一不当心就职务堆积了,在校验代理 ip 的时候咱们还有采集代理等其余的任务中也大量使用了并发流, 这样也就印证了为何会任务堆积了。并发

解决问题

使用自定义 ForkJoinPool 执行速度。示例代码以下:jvm

// 示例:自定义线程池
ForkJoinPool forkJoinPool = new ForkJoinPool(8);

// 这里是从数据库里查出来的一批代理 ip
List<ProxyList> records = new ArrayList<>();

// 找出失效的代理 ip
List<String> needDeleteList = forkJoinPool.submit(() -> records.parallelStream()
	.map(ProxyList::getIpPort)
	.filter(IProxyListTask::isFailed)
	.collect(Collectors.toList())
).join();

// 删除失效的代理
复制代码

整个代码依然比较优雅,在使用自定义的 ForkJoin 线程池以后,执行速度有了明显的提高。之前 5 分钟执行不完的任务如今 2 分钟以内就能所有执行完毕。微服务

结论

java8 的并发流在大批量数据处理时可简化多线程的使用,在遇到耗时业务或者重度使用并发流不妨根据业务状况采用自定义线程池来提示处理速度。工具

开源推荐

关注咱们

如梦技术-公众号.jpg

扫描上面二维码,更多精彩内容天天推荐!

相关文章
相关标签/搜索