谈谈Java任务的并行处理

前言

谈到并行,咱们可能最早想到的是线程,多个线程一块儿运行,来提升咱们系统的总体处理速度;为何使用多个线程就能提升处理速度,由于如今计算机广泛都是多核处理器,咱们须要充分利用cpu资源;若是站的更高一点来看,咱们每台机器均可以是一个处理节点,多台机器并行处理;并行的处理方式能够说无处不在,本文主要来谈谈Java在并行处理方面的努力。java

无处不在的并行

Java的垃圾回收器,咱们能够看到每一代版本的更新,伴随着GC更短的延迟,从serial到cms再到如今的G1,一直在摘掉Java慢的帽子;消息队列从早期的ActiveMQ到如今的kafka和RocketMQ,引入的分区的概念,提升了消息的并行性;数据库单表数据到必定量级以后,访问速度会很慢,咱们会对表进行分表处理,引入数据库中间件;Redis你可能以为自己处理是单线程的,可是Redis的集群方案中引入了slot(槽)的概念;更广泛的就是咱们不少的业务系统,一般会部署多台,经过负载均衡器来进行分发;好了还有其余的一些例子,此处不在一一例举。数据库

如何并行

我以为并行的核心在于"拆分",把大任务变成小任务,而后利用多核CPU也好,仍是多节点也好,同时并行的处理,Java历代版本的更新,都在为咱们开发者提供更方便的并行处理,从开始的Thread,到线程池,再到fork/join框架,最后到流处理,下面使用简单的求和例子来看看各类方式是如何并行处理的;bash

单线程处理

首先看一下最简单的单线程处理方式,直接使用主线程进行求和操做;并发

public class SingleThread {

    public static long[] numbers;

    public static void main(String[] args) {
        numbers = LongStream.rangeClosed(1, 10_000_000).toArray();
        long sum = 0;
        for (int i = 0; i < numbers.length; i++) {
            sum += numbers[i];
        }
        System.out.println("sum = " + sum);
    }

}
复制代码

求和自己是一个计算密集型任务,可是如今已是多核时代,只用单线程,至关于只使用了其中一个cpu,其余cpu被闲置,资源的浪费;负载均衡

Thread方式

咱们把任务拆分红多个小任务,而后每一个小任务分别启动一个线程,以下所示:框架

public class ThreadTest {

    public static final int THRESHOLD = 10_000;
    public static long[] numbers;
    private static long allSum;

    public static void main(String[] args) throws Exception {
        numbers = LongStream.rangeClosed(1, 10_000_000).toArray();
        int taskSize = (int) (numbers.length / THRESHOLD);
        for (int i = 1; i <= taskSize; i++) {
            final int key = i;
            new Thread(new Runnable() {
                public void run() {
                    sumAll(sum((key - 1) * THRESHOLD, key * THRESHOLD));
                }
            }).start();
        }
        Thread.sleep(100);
        System.out.println("allSum = " + getAllSum());
    }

    private static synchronized long sumAll(long threadSum) {
        return allSum += threadSum;
    }

    public static synchronized long getAllSum() {
        return allSum;
    }

    private static long sum(int start, int end) {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}
复制代码

以上指定了一个拆分阀值,计算拆分多少个认为,同时启动多少线程;这种处理就是启动的线程数过多,而CPU数有限,更重要的是求和是一个计算密集型任务,启动过多的线程只会带来更多的线程上下文切换;同时线程处理完一个任务就终止了,也是对资源的浪费;另外能够看到主线程不知道什么时候子任务已经处理完了,须要作额外的处理;全部Java后续引入了线程池。ide

线程池方式

jdk1.5引入了并发包,其中包括了ThreadPoolExecutor,相关代码以下:测试

public class ExecutorServiceTest {

    public static final int THRESHOLD = 10_000;
    public static long[] numbers;

    public static void main(String[] args) throws Exception {
        numbers = LongStream.rangeClosed(1, 10_000_000).toArray();
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
        CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executor);
        int taskSize = (int) (numbers.length / THRESHOLD);
        for (int i = 1; i <= taskSize; i++) {
            final int key = i;
            completionService.submit(new Callable<Long>() {

                @Override
                public Long call() throws Exception {
                    return sum((key - 1) * THRESHOLD, key * THRESHOLD);
                }
            });
        }
        long sumValue = 0;
        for (int i = 0; i < taskSize; i++) {
            sumValue += completionService.take().get();
        }
        // 全部任务已经完成,关闭线程池
        System.out.println("sumValue = " + sumValue);
        executor.shutdown();
    }

    private static long sum(int start, int end) {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}
复制代码

上面已经分析了计算密集型并非线程越多越好,这里建立了JDK默认的线程数:CPU数+1,这是一个通过大量测试之后给出的一个结果;线程池顾名思义,能够重复利用现有的线程;同时利用CompletionService来对子任务进行汇总;合理的使用线程池已经能够充分的并行处理任务,只是在写法上有点繁琐,此时JDK1.7中引入了fork/join框架;ui

fork/join框架

分支/合并框架的目的是以递归的方式将能够并行的认为拆分红更小的任务,而后将每一个子任务的结果合并起来生成总体结果;相关代码以下:this

public class ForkJoinTest extends java.util.concurrent.RecursiveTask<Long> {
    
    private static final long serialVersionUID = 1L;
    private final long[] numbers;
    private final int start;
    private final int end;
    public static final long THRESHOLD = 10_000;

    public ForkJoinTest(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private ForkJoinTest(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2);
        leftTask.fork();
        ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end);
        Long rightResult = rightTask.compute();
        // 注:join方法会阻塞,所以有必要在两个子任务的计算都开始以后才执行join方法
        Long leftResult = leftTask.join();
        return leftResult + rightResult;
    }

    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }

    public static void main(String[] args) {
        System.out.println(forkJoinSum(10_000_000));
    }

    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinTest(numbers);
        return new ForkJoinPool().invoke(task);
    }
}
复制代码

ForkJoinPool是ExecutorService接口的一个实现,子认为分配给线程池中的工做线程;同时须要把任务提交到此线程池中,须要建立RecursiveTask的一个子类;大致逻辑就是经过fork进行拆分,而后经过join进行结果的合并,JDK为咱们提供了一个框架,咱们只须要在里面填充便可,更加方便;有没有更简单的方式,连拆分都省了,自动拆分合并,jdk在1.8中引入了流的概念;

流方式

Java8引入了stream的概念,可让咱们更好的利用并行,使用流代码以下:

public class StreamTest {

    public static void main(String[] args) {
        System.out.println("sum = " + parallelRangedSum(10_000_000));
    }

    public static long parallelRangedSum(long n) {
        return LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum);
    }
}
复制代码

以上代码是否是很是简单,对于开发者来讲彻底不须要手动拆分,使用同步机制等方式,就可让任务并行处理,只须要对流使用parallel()方法,系统自动会对任务进行拆分,固然前提是没有共享可变状态;其实并行流内部使用的也是fork/join框架;

总结

本文使用一个求和的实例,来介绍了jdk为开发者提供并行处理的各类方式,能够看到Java一直在为提供更方便的并行处理而努力。

参考

<<java8实战>>

相关文章
相关标签/搜索