1.一般咱们在获取到一个list列表后须要一个挨着一个的进行遍历处理数据,若是每次处理都须要长时间操做,那整个流程下来时间就是每一次处理时间的总和。java
2.Java8的stream接口极大地减小了for循环写法的复杂性,stream提供了map/reduce/collect等一系列聚合接口,还支持并发操做:parallelStream。web
定义一个位置类和服务,其中建立10个地址位置算法
package cn.chinotan.dto; /** * @program: test * @description: 位置 * @author: xingcheng * @create: 2018-11-17 19:36 **/ public class Location { String name; public Location() { } public Location(String name) { this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
package cn.chinotan.controller; import cn.chinotan.dto.Location; import org.apache.commons.lang3.StringUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.ArrayList; import java.util.List; /** * @program: test * @description: 位置服务 * @author: xingcheng * @create: 2018-11-17 19:42 **/ @RestController public class LocationController { @GetMapping("/location") public List<Location> getLocations() { List<Location> locations = new ArrayList<>(); for (int i = 0; i < 10; i++) { locations.add(new Location(StringUtils.join("London", i))); } return locations; } }
定义一个温度类和服务,其中温度值介于30到50之间。 在实现中添加500 ms的延迟以模拟耗时操做spring
package cn.chinotan.dto; /** * @program: test * @description: 温度 * @author: xingcheng * @create: 2018-11-17 19:35 **/ public class Temperature { private Double temperature; private String scale; public Double getTemperature() { return temperature; } public void setTemperature(Double temperature) { this.temperature = temperature; } public String getScale() { return scale; } public void setScale(String scale) { this.scale = scale; } }
package cn.chinotan.controller; import cn.chinotan.dto.Temperature; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; import java.util.Random; /** * @program: test * @description: 温度服务 * @author: xingcheng * @create: 2018-11-17 19:45 **/ @RestController public class TemperatureController { @GetMapping("/temperature/{city}") public Temperature getAverageTemperature(@PathVariable("city") String city) { Temperature temperature = new Temperature(); temperature.setTemperature((double) (new Random().nextInt(20) + 30)); temperature.setScale("Celsius"); try { Thread.sleep(500); } catch (InterruptedException ignored) { } return temperature; } }
定义一个天气预报类和响应apache
package cn.chinotan.dto; /** * @program: test * @description: 天气预报 * @author: xingcheng * @create: 2018-11-17 19:37 **/ public class Forecast implements Comparable<Forecast> { private Location location; private Temperature temperature; public Forecast(Location location) { this.location = location; } public Forecast setTemperature(final Temperature temperature) { this.temperature = temperature; return this; } public Location getLocation() { return location; } public void setLocation(Location location) { this.location = location; } public Temperature getTemperature() { return temperature; } @Override public int compareTo(Forecast o) { if (o.getTemperature().getTemperature() > temperature.getTemperature()) { return 1; } else { return -1; } } }
package cn.chinotan.dto.response; import cn.chinotan.dto.Forecast; import java.util.ArrayList; import java.util.List; /** * @program: test * @description: 服务响应 * @author: xingcheng * @create: 2018-11-17 19:39 **/ public class ServiceResponse { private long processingTime; private List<Forecast> forecasts = new ArrayList<>(); public void setProcessingTime(long processingTime) { this.processingTime = processingTime; } public ServiceResponse forecasts(List<Forecast> forecasts) { this.forecasts = forecasts; return this; } public long getProcessingTime() { return processingTime; } public List<Forecast> getForecasts() { return forecasts; } public void setForecasts(List<Forecast> forecasts) { this.forecasts = forecasts; } }
package cn.chinotan.controller; import cn.chinotan.dto.Forecast; import cn.chinotan.dto.Location; import cn.chinotan.dto.Temperature; import cn.chinotan.dto.response.ServiceResponse; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpEntity; import org.springframework.http.HttpMethod; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.client.RestTemplate; import rx.Observable; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; /** * @program: test * @description: 天气预报 * @author: xingcheng * @create: 2018-11-17 19:47 **/ @RestController public class ForecastController { @Autowired RestTemplate restTemplate; private final static String SERVER_ADDRESS = "http://localhost:9000"; @GetMapping("/v1/forecast") public ServiceResponse getLocationsWithTemperatureV1() { long startTime = System.currentTimeMillis(); ServiceResponse response = new ServiceResponse(); List<Location> locations = restTemplate.exchange(StringUtils.join(SERVER_ADDRESS, "/location"), HttpMethod.GET, HttpEntity.EMPTY, new ParameterizedTypeReference<List<Location>>() { }).getBody(); locations.forEach(location -> { Temperature temperature = restTemplate.exchange(StringUtils.join(SERVER_ADDRESS, "/temperature/", location.getName()), HttpMethod.GET, HttpEntity.EMPTY, Temperature.class).getBody(); response.getForecasts().add(new Forecast(location).setTemperature(temperature)); }); long endTime = System.currentTimeMillis(); response.setProcessingTime(endTime - startTime); return response; } @GetMapping("/v2/forecast") public ServiceResponse getLocationsWithTemperatureV2() { long startTime = System.currentTimeMillis(); ServiceResponse response = new ServiceResponse(); List<Location> locations = restTemplate.exchange(StringUtils.join(SERVER_ADDRESS, "/location"), HttpMethod.GET, HttpEntity.EMPTY, new ParameterizedTypeReference<List<Location>>() { }).getBody(); List<Forecast> collect = locations.parallelStream().map(location -> { Temperature temperature = restTemplate.exchange(StringUtils.join(SERVER_ADDRESS, "/temperature/", location.getName()), HttpMethod.GET, HttpEntity.EMPTY, Temperature.class).getBody(); return new Forecast(location).setTemperature(temperature); }).collect(Collectors.toList()); long sortStart = System.currentTimeMillis(); List<Forecast> collectResponse = collect.parallelStream().sorted().collect(Collectors.toList()); long sortEnd = System.currentTimeMillis(); System.out.println("排序花费时间:" + (sortEnd - sortStart)); response.setForecasts(collectResponse); long endTime = System.currentTimeMillis(); response.setProcessingTime(endTime - startTime); return response; } @GetMapping("/v3/forecast") public ServiceResponse getLocationsWithTemperatureV3() { long startTime = System.currentTimeMillis(); ServiceResponse response = new ServiceResponse(); List<Location> locations = restTemplate.exchange(StringUtils.join(SERVER_ADDRESS, "/location"), HttpMethod.GET, HttpEntity.EMPTY, new ParameterizedTypeReference<List<Location>>() { }).getBody(); locations.parallelStream().forEachOrdered(location -> { Temperature temperature = restTemplate.exchange(StringUtils.join(SERVER_ADDRESS, "/temperature/", location.getName()), HttpMethod.GET, HttpEntity.EMPTY, Temperature.class).getBody(); response.getForecasts().add(new Forecast(location).setTemperature(temperature)); }); long endTime = System.currentTimeMillis(); response.setProcessingTime(endTime - startTime); return response; } }
其中,v1接口是普通的list遍历实现,接口响应:api
能够看到接口响应时间是每次http调用的时间(500毫秒)总和多一些数组
接下来调用v2接口:安全
能够看到时间缩短了5倍服务器
先了解什么是流?数据结构
Stream是java8中新增长的一个特性, 统称为流.
Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。原始版本的 Iterator,用户只能显式地一个一个遍历元素并对其执行某些操做;高级版本的 Stream,用户只要给出须要对其包含的元素执行什么操做,好比 “过滤掉长度大于 10 的字符串”、“获取每一个字符串的首字母”等,Stream 会隐式地在内部进行遍历,作出相应的数据转换。
Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就比如流水从面前流过,一去不复返。
而和迭代器又不一样的是,Stream 能够并行化操做,迭代器只能命令式地、串行化操做。顾名思义,当使用串行方式去遍历时,每一个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分红多个段,其中每个都在不一样的线程中处理,而后将结果一块儿输出。Stream 的并行操做依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。Java 的并行 API 演变历程基本以下:
.0-1.4 中的 java.lang.Thread
5.0 中的 java.util.concurrent
6.0 中的 Phasers 等
7.0 中的 Fork/Join 框架
8.0 中的 Lambda
parallelStream其实就是一个并行执行的流.它经过默认的ForkJoinPool,可能提升你的多线程任务的速度.
Stream具备平行处理能力,处理的过程会分而治之,也就是将一个大任务切分红多个小任务,这表示每一个任务都是一个操做,所以像如下的程式片断:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); numbers.parallelStream() .forEach(out::println);
你获得的展现顺序不必定会是一、二、三、四、五、六、七、八、9,而多是任意的顺序,就forEach()这个操做來讲,若是平行处理时,但愿最后顺序是按照原来Stream的数据顺序,那能够调用forEachOrdered()。例如:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); numbers.parallelStream() .forEachOrdered(out::println);
注意:若是forEachOrdered()中间有其余如filter()的中介操做,会试着平行化处理,而后最终forEachOrdered()会以原数据顺序处理,所以,使用forEachOrdered()这类的有序处理,可能会(或彻底失去)失去平行化的一些优点,实际上中介操做亦有可能如此,例如sorted()方法。
例如:执行v3接口:
能够看到执行时间和普通的流同样。。。
要想深刻的研究parallelStream以前,那么咱们必须先了解ForkJoin框架和ForkJoinPool.由于两种关系甚密,故在此简单介绍一下ForkJoinPool,若有兴趣能够更深刻的去了解下ForkJoin***(固然,若是你想真正的搞透parallelStream,那么你依然须要先搞透ForkJoinPool).*
ForkJoin框架是从jdk7中新特性,它同ThreadPoolExecutor同样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存须要执行的任务,而线程的数量则是经过构造函数传入,若是没有向构造函数中传入但愿的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量做为默认值。
ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用好比快速排序算法。这里的要点在于,ForkJoinPool须要使用相对少的线程来处理大量的任务。好比要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会作出一样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,中止这样的分割处理。好比,当元素的数量小于10时,会中止分割,转而使用插入排序对它们进行排序。那么到最后,全部的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它全部的子任务完成以后,它才可以被执行。
因此当使用ThreadPoolExecutor时,使用分治法会存在问题,由于ThreadPoolExecutor中的线程没法像任务队列中再添加一个任务而且在等待该任务完成以后再继续执行。而使用ForkJoinPool时,就可以让其中的线程建立新的任务,并挂起当前的任务,此时线程就可以从队列中选择子任务执行。
那么使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差别呢?
首先,使用ForkJoinPool可以使用数量有限的线程来完成很是多的具备父子关系的任务,好比使用4个线程来完成超过200万个任务。可是,使用ThreadPoolExecutor时,是不可能完成的,由于ThreadPoolExecutor中的Thread没法选择优先执行子任务,须要完成200万个具备父子关系的任务时,也须要200万个线程,显然这是不可行的。
forkjoin最核心的地方就是利用了现代硬件设备多核,在一个操做时候会有空闲的cpu,那么如何利用好这个空闲的cpu就成了提升性能的关键,而这里咱们要提到的工做窃取(work-stealing)算法就是整个forkjion框架的核心理念,工做窃取(work-stealing)算法是指某个线程从其余队列里窃取任务来执行。
那么为何须要使用工做窃取算法呢?
假如咱们须要作一个比较大的任务,咱们能够把这个任务分割为若干互不依赖的子任务,为了减小线程间的竞争,因而把这些子任务分别放到不一样的队列里,并为每一个队列建立一个单独的线程来执行队列里的任务,线程和队列一一对应,好比A线程负责处理A队列里的任务。可是有的线程会先把本身队列里的任务干完,而其余线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其余线程干活,因而它就去其余线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,因此为了减小窃取任务线程和被窃取任务线程之间的竞争,一般会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工做窃取算法的优势是充分利用线程进行并行计算,并减小了线程间的竞争,其缺点是在某些状况下仍是存在竞争,好比双端队列里只有一个任务时。而且消耗了更多的系统资源,好比建立多个线程和多个双端队列。
上文中已经提到了在Java 8引入了自动并行化的概念。它可以让一部分Java代码自动地以并行的方式执行,也就是咱们使用了ForkJoinPool的ParallelStream。
Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的处理器数量。当调用Arrays类上添加的新方法时,自动并行化就会发生。好比用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。自动并行化也被运用在Java 8新添加的Stream API中。
对于列表中的元素的操做都会以并行的方式执行。forEach方法会为每一个元素的计算操做建立一个任务,该任务会被前文中提到的ForkJoinPool中的通用线程池处理。以上的并行计算逻辑固然也可使用ThreadPoolExecutor完成,可是就代码的可读性和代码量而言,使用ForkJoinPool明显更胜一筹。
对于ForkJoinPool通用线程池的线程数量,一般使用默认值就能够了,即运行时计算机的处理器数量。我这里提供了一个示例的代码让你了解jvm所使用的ForkJoinPool的线程数量, 你能够能够经过设置系统属性:-Djava.util.concurrent.ForkJoinPool.common.parallelism=N (N为线程数量),来调整ForkJoinPool的线程数量,能够尝试调整成不一样的参数来观察每次的输出结果。
可是它会将执行forEach自己的线程也做为线程池中的一个工做线程。所以,即便将ForkJoinPool的通用线程池的线程数量设置为1,实际上也会有2个工做线程。所以在使用forEach的时候,线程数为1的ForkJoinPool通用线程池和线程数为2的ThreadPoolExecutor是等价的。
因此当ForkJoinPool通用线程池实际须要4个工做线程时,能够将它设置成3,那么在运行时可用的工做线程就是4了。
1. 当须要处理递归分治算法时,考虑使用ForkJoinPool。
2. 仔细设置再也不进行任务划分的阈值,这个阈值对性能有影响。
3. Java 8中的一些特性会使用到ForkJoinPool中的通用线程池。在某些场合下,须要调整该线程池的默认的线程数量。
上文中咱们已经看到了ParallelStream他强大无比的特性,但这里咱们就讲告诉你ParallelStreams不是万金油,而是一把双刃剑,若是错误的使用反倒可能伤人伤己.
可能有不少朋友在jdk7用future配合countDownLatch本身实现的这个功能,可是jdk8的朋友基本都会用上面的实现方式,那么自信深究一下究竟本身用future实现的这个功能和利用jdk8的parallelStream来实现这个功能有什么不一样点呢?坑又在哪里呢?
让咱们细思思考一下整个功能到底是如何运转的。首先咱们的集合元素engines 由ParallelStreams并行的去进行map操做(ParallelStreams使用JVM默认的forkJoin框架的线程池由当前线程去执行并行操做).
然而,这里须要注意的一地方是咱们在调用第三方的api请求是一个响应略慢并且会阻塞操做的一个过程。因此在某时刻全部线程都会调用 get() 方法而且在那里等待结果返回.
再回过头仔细思考一下这个功能的实现过程是咱们一开始想要的吗?咱们是在同一时间等待全部的结果,而不是遍历这个列表按顺序等待每一个回答.然而,因为ForkJoinPool workders的存在,这样平行的等待相对于使用主线程的等待会产生的一种反作用.
如今ForkJoin pool (关于forkjion的更多实现你能够去搜索引擎中去看一下他的具体实现方式) 的实现是: 它并不会由于产生了新的workers而抵消掉阻塞的workers。那么在某个时间全部 ForkJoinPool.common() 的线程都会被用光.也就是说,下一次你调用这个查询方法,就可能会在一个时间与其余的parallel stream同时运行,而致使第二个任务的性能大大受损。或者说,例如你在这个功能里是用来快速返回调用的第三方api的,而在其余的功能里是用于一些简单的数据并行计算的,可是假如你先调用了这个功能,同一时间以后调用计算的函数,那么这里forkjionPool的实现会让你计算的函数大打折扣.
不过也不要急着去吐槽ForkJoinPool的实现,在不一样的状况下你能够给它一个ManagedBlocker实例而且确保它知道在一个阻塞调用中应该何时去抵消掉卡住的workers.如今有意思的一点是,在一个parallel stream处理中并不必定是阻塞调用会拖延程序的性能。任何被用于映射在一个集合上的长时间运行的函数都会产生一样的问题.
正如咱们上面那个列子的状况分析得知,lambda的执行并非瞬间完成的,全部使用parallel streams的程序都有可能成为阻塞程序的源头,而且在执行过程当中程序中的其余部分将没法访问这些workers,这意味着任何依赖parallel streams的程序在什么别的东西占用着common ForkJoinPool时将会变得不可预知而且暗藏危机.
此外,parallelStream是并行操做,不是线程安全的,那么是否是在其中的进行的非原子操做都要加锁呢?
答案是:paralleStream的forEach接口确实不能保证同步,同时也提出了解决方案:使用collect和reduce接口,Collections框架提供了同步的包装,使得其中的操做线程安全。
即代码中的:
若是你正在写一个其余地方都是单线程的程序而且准确地知道何时你应该要使用parallel streams,这样的话你可能会以为这个问题有一点肤浅。然而,咱们不少人是在处理web应用、各类不一样的框架以及重量级应用服务。一个服务器是怎样被设计成一个能够支持多种独立应用的主机的?谁知道呢,给你一个能够并行的却不能控制输入的parallel stream.
很抱歉,请原谅我用的标注[怎么正确使用parallelStream],由于目前为止我也没有发现一个好的方式来让我真正的正确使用parallelStream.下面的网上写的两种方式:
一种方式是限制ForkJoinPool提供的并行数。能够经过使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=1 来限制线程池的大小为1。再也不从并行化中获得好处能够杜绝错误的使用它(其实这个方式仍是有点搞笑的,既然这样搞那我还不如不去使用并行流)。
另外一种方式就是,一个被称为工做区的可让ForkJoinPool平行放置的 parallelStream() 实现。不幸的是如今的JDK尚未实现。
Parallel streams 是没法预测的,并且想要正确地使用它有些棘手。几乎任何parallel streams的使用都会影响程序中无关部分的性能,并且是一种没法预测的方式。。可是在调用stream.parallel() 或者parallelStream()时候在个人代码里以前我仍然会从新审视一遍他给个人程序究竟会带来什么问题,他能有多大的提高,是否有使用他的意义.
上面咱们也看到了parallelStream所带来的隐患和好处,那么,在从stream和parallelStream方法中进行选择时,咱们能够考虑如下几个问题:
1. 是否须要并行?
2. 任务之间是不是独立的?是否会引发任何竞态条件?
3. 结果是否取决于任务的调用顺序?
对于问题1,在回答这个问题以前,你须要弄清楚你要解决的问题是什么,数据量有多大,计算的特色是什么?并非全部的问题都适合使用并发程序来求解,好比当数据量不大时,顺序执行每每比并行执行更快。毕竟,准备线程池和其它相关资源也是须要时间的。可是,当任务涉及到I/O操做而且任务之间不互相依赖时,那么并行化就是一个不错的选择。一般而言,将这类程序并行化以后,执行速度会提高好几个等级。
对于问题2,若是任务之间是独立的,而且代码中不涉及到对同一个对象的某个状态或者某个变量的更新操做,那么就代表代码是能够被并行化的。
对于问题3,因为在并行环境中任务的执行顺序是不肯定的,所以对于依赖于顺序的任务而言,并行化也许不能给出正确的结果。
参考文章:https://blog.csdn.net/u011001723/article/details/52794455