能够经过parallel或者parallelStream方法实现数据并行化操做java
经过手动使用线程模拟掷骰子事件数组
import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.*; /** * 计算2次筛子投掷的和的几率 */ public class ManualDiceRools { private static final int N = 100000000; private final double fraction; private final Map<Integer, Double> results; private final int numberOfThreads; private final ExecutorService executor; private final int workPerThread; public static void main(String[] args) { ManualDiceRools rools = new ManualDiceRools(); rools.simulateDiceRoles(); } public ManualDiceRools() { // 出现一次的几率 this.fraction = 1.0 / N; this.results = new ConcurrentHashMap<>(); // 获取可用处理器的Java虚拟机的数量 this.numberOfThreads = Runtime.getRuntime().availableProcessors(); this.executor = Executors.newFixedThreadPool(numberOfThreads); // 每一个线程处理的数据量 this.workPerThread = N / numberOfThreads; } public void simulateDiceRoles() { List<Future<?>> futures = submitJobs(); awaitCompletion(futures); printResults(); } /** * 打印结果 */ private void printResults() { results.entrySet().forEach(System.out::println); } /** * 执行线程 * * @return */ private List<Future<?>> submitJobs() { List<Future<?>> futures = new ArrayList<>(); for (int i=0;i<numberOfThreads;i++) { futures.add(executor.submit(makeJob())); } return futures; } private Runnable makeJob() { return () -> { // 使用安全随机生成器 ThreadLocalRandom random = ThreadLocalRandom.current(); for (int i=0;i<workPerThread;i++) { int entry = twoDiceThrows(random); accumulateResult(entry); } }; } /** * 计算生成的几率,即fraction + 以前的几率值 * * @param entry */ private void accumulateResult(int entry) { results.compute(entry, (key, previous) -> previous == null ? fraction : previous + fraction); } /** * 生成2次筛子随机的和 * * @param random * @return */ private int twoDiceThrows(ThreadLocalRandom random) { int firstThrow = random.nextInt(1, 7); int secondThrow = random.nextInt(1, 7); return firstThrow + secondThrow; } /** * get每一个Future的值,获取每一个线程计算的结果 * * @param futures */ private void awaitCompletion(List<Future<?>> futures) { try { futures.forEach(future -> { try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); } finally { executor.shutdown(); } } }
使用并行化安全
ThreadLocalRandom random = ThreadLocalRandom.current(); double fraction = 1.0 / N; Map<Integer, Double> collect = IntStream.range(0, N) .parallel() .mapToObj(n -> random.nextInt(1, 7) + random.nextInt(1, 7))// 计算随机产生的和 .collect(Collectors.groupingBy(side -> side, Collectors.summingDouble(n -> fraction)));// 相同的值进行分组,并计算几率值,几率步长为fraction collect.entrySet().forEach(System.out::println);
以前调用reduce方法,初始值能够为任意值,为了让其在并行化时能正常工做,初始值必须为组合函数的恒等值(好比组合函数(acc, element) -> acc + element时,初始值必须为0,由于任何数字加0值不变)数据结构
reduce操做的另外一个限制是组合操做必须符合结合律。意味着只要序列的值不变,组合操做的顺序不重要,好比:(4 + 2) +1 = 4 + (2 + 1) =7框架
要避免的是持有锁,流框架会在须要时,本身处理同步操做dom
还有个叫sequential的方法,串行的意思,在对流求值时,不能同时处于两种模式,要么是并行要么是串行,若是同时用了这个2个方法,最后调用的那个方法起效ide
影响并行流性能的主要因素有5个:函数
在底层,并行流仍是沿用了fork/join框架,fork递归式的分解问题,而后每段并行执行,最终由join合并返回结果性能
parallelSetAll:使用Lambda表达式更新数据元素this
// 使用for循环初始化数组 public static double[] imperativeInitilize(int size) { double[] values = new double[size]; for (int i=0;i<values.length;i++) { values[i] = i; } return values; } // 使用并行化数组操做初始化数组,它改变了传入的数组,但没有建立一个新的数组 public static double[] parallelInitialize(int size) { double[] values = new double[size]; Arrays.parallelSetAll(values, i -> i); return values; }
parallelPrefix:会更新一个数组,将每个元素替换为当前元素和前驱元素的和,这里的“和”不必定是加法,能够说任意一个BinaryOperator
public static double[] simpleMovingAverage(double[] values, int n) { double[] sums = Arrays.copyOf(values, values.length); // 和前一个值进行相加 Arrays.parallelPrefix(sums, Double::sum); return sums; }
parallelSort:并行化对数组元素排序