对顺序流调用parallel方法:java
public static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .parallel() .reduce(0L, Long::sum); }
它在内部实际上就是设了一个boolean标志,表示你想让调用parallel以后进行的全部操做都并行执行。相似地,你只须要对并行流调用sequential方法就能够把它变成顺序流。但最后一次parallel或sequential调用会影响整个流水线。算法
iterate很难分割成可以独立执行的小块,由于每次应用这个函数都要依赖前一次应用的结果,整张数字列表在概括过程开始时没有准备好,于是没法有效地把流划分为小块来并行处理。把流标记成并行,你实际上是给顺序处理增长了开销,它还要把每次求和操做分到一个不一样的线程上。segmentfault
错用并行流而产生错误的首要缘由,就是使用的算法改变了某些共享状态。数据结构
public class Accumulator { public long total = 0; public void add(long value) { total += value; } } public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total; }
上面的示例在本质上就是顺序的,每次访问total都会出现数据竞争.因为多个线程在同时访问累加器,执行total += value,而这一句虽然看似简单,却不是一个原子操做。所得的结果也是不可控的(错误的)。
详见第六章相关内容
注意:不该该在RecursiveTask内部使用ForkJoinPool的invoke方法。相反,你应该始终直接调用compute或fork方法,只有顺序代码才应该用invoke来启动并行计算。框架
Spliterator是Java 8中加入的另外一个新接口;这个名字表明“可分迭代器”(splitable iterator)。和Iterator同样,Spliterator也用于遍历数据源中的元素,但它是为了并行执行而设计的。
Spliterator接口ide
public interface Spliterator<T> { boolean tryAdvance(Consumer<? super T> action); Spliterator<T> trySplit(); long estimateSize(); int characteristics(); }
与往常同样,T是Spliterator遍历的元素的类型。tryAdvance方法的行为相似于普通的Iterator,由于它会按顺序一个一个使用Spliterator中的元素,而且若是还有其余元素要遍历就返回true。但trySplit是专为Spliterator接口设计的,由于它能够把一些元素划出去分给第二个Spliterator(由该方法返回),让它们两个并行处理。Spliterator还可经过estimateSize方法估计还剩下多少元素要遍历,由于即便不那么确切,能快速算出来是一个值也有助于让拆分均匀一点.函数
将Stream拆分红多个部分的算法是一个递归过程,如图所示。第一步是对第一个Spliterator调用trySplit,生成第二个Spliterator。第二步对这两个Spliterator调用trysplit,这样总共就有了四个Spliterator。这个框架不断对Spliterator调用trySplit直到它返回null,代表它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时全部的Spliterator在调用trySplit时都返回了null。性能
文中提到了reduce的三参数重载方法this
<U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> combiner)它的三个参数:spa
- identity: 一个初始化的值;这个初始化的值其类型是泛型U,与Reduce方法返回的类型一致;注意此时Stream中元素的类型是T,与U能够不同也能够同样,这样的话操做空间就大了;无论Stream中存储的元素是什么类型,U均可以是任何类型,如U能够是一些基本数据类型的包装类型Integer、Long等;或者是String,又或者是一些集合类型ArrayList等;后面会说到这些用法。
- accumulator: 其类型是BiFunction,输入是U与T两个类型的数据,而返回的是U类型;也就是说返回的类型与输入的第一个参数类型是同样的,而输入的第二个参数类型与Stream中元素类型是同样的。
- combiner: 其类型是BinaryOperator,支持的是对U类型的对象进行操做;
第三个参数combiner主要是使用在并行计算的场景下;若是Stream是非并行时,第三个参数其实是不生效的。
代码实现:
class WordCounter { private final int counter; private final boolean lastSpace; public WordCounter(int counter, boolean lastSpace) { this.counter = counter; this.lastSpace = lastSpace; } public WordCounter accumulate(Character c) { if (Character.isWhitespace(c)) { return lastSpace ? this : new WordCounter(counter, true); } else { return lastSpace ? new WordCounter(counter + 1, false) : this; } } public WordCounter combine(WordCounter wordCounter) { return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace); } public int getCounter() { return counter; } }
class WordCounterSpliterator implements Spliterator<Character> { private final String string; private int currentChar = 0; public WordCounterSpliterator(String string) { this.string = string; } @Override public boolean tryAdvance(Consumer<?super Character> action) { action.accept(string.charAt(currentChar++)); return currentChar < string.length(); } @Override public Spliterator<Character> trySplit() { int currentSize = string.length() - currentChar; if (currentSize < 10) { return null; } for (int splitPos = (currentSize / 2) + currentChar; splitPos < string.length(); splitPos++) { if (Character.isWhitespace(string.charAt(splitPos))) { Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring( currentChar, splitPos)); currentChar = splitPos; return spliterator; } } return null; } @Override public long estimateSize() { return string.length() - currentChar; } @Override public int characteristics() { return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; } }
final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " ché la dritta via era smarrita "; private int countWords(Stream<Character> stream) { WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine); return wordCounter.getCounter(); } Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE); Stream<Character> stream = StreamSupport.stream(spliterator, true); System.out.println("Found " + countWords(stream) + " words");
最后打印显示
Found 19 words