对于斐波那契数的计算,咱们都知道最容易理解的就是递归的方法:算法
public long recursiveFibonacci(int n) { if (n < 2) { return 1; } return recursiveFibonacci(n - 1) + recursiveFibonacci(n - 2); }
固然这个递归也能够转化为迭代:并发
public long iterativeFibonacci(int n) { long n1 = 1, n2 = 1; long fi = 2; // n1 + n2 for (int i = 2; i <= n; i++) { fi = n1 + n2; n1 = n2; n2 = fi; } return fi; }
可是,对于以上两种方法,并不能并行化,由于后一项的值依赖于前一项,使得算法流程是串行的。因此引出了能够并行的计算斐波那契数的公式:性能
=> this
f0 和 f1 都是 1 —— 很明显咱们能够对 进行并行计算。spa
首先咱们定义一个 Matrix
类,用来表示一个 2*2 的矩阵:线程
public class Matrix { /** * 左上角的值 */ public final BigInteger a; /** * 右上角的值 */ public final BigInteger b; /** * 左下角的值 */ public final BigInteger c; /** * 右下角的值 */ public final BigInteger d; public Matrix(int a, int b, int c, int d) { this(BigInteger.valueOf(a), BigInteger.valueOf(b), BigInteger.valueOf(c), BigInteger.valueOf(d)); } public Matrix(BigInteger a, BigInteger b, BigInteger c, BigInteger d) { this.a = a; this.b = b; this.c = c; this.d = d; } /** * multiply * * @param m multiplier * @return */ public Matrix mul(Matrix m) { return new Matrix( a.multiply(m.a).add(b.multiply(m.c)), // a*a + b*c a.multiply(m.b).add(b.multiply(m.d)), // a*b + b*d c.multiply(m.a).add(d.multiply(m.c)), // c*a + d*c c.multiply(m.b).add(d.multiply(m.d)));// c*b + d*d } /** * power of exponent * * @param exponent * @return */ public Matrix pow(int exponent) { Matrix matrix = this.copy(); for (int i = 1; i < exponent; i++) { matrix = matrix.mul(this); } return matrix; } public Matrix copy() { return new Matrix(a, b, c, d); } }
而后咱们来比较迭代和并行的效率:code
咱们先设置并行使用的线程数为 1,即单线程。递归
public static void main(String[] args) throws Exception { final int ITEM_NUM = 500000; // 计算斐波那契数列的第 ITEM_NUM 项 System.out.println("开始迭代计算..."); long begin = System.nanoTime(); BigInteger fi1 = iterativeFibonacci(ITEM_NUM); long end = System.nanoTime(); double time = (end - begin) / 1E9; System.out.printf("迭代计算用时: %.3f\n\n", time); /* ------------------------------ */ System.out.println("开始并行计算..."); begin = System.nanoTime(); BigInteger fi2 = parallelFibonacci(ITEM_NUM, 1); end = System.nanoTime(); time = (end - begin) / 1E9; System.out.printf("并行计算用时: %.3f\n\n", time); System.out.println("fi1 == fi2:" + (fi1.equals(fi2))); } static BigInteger iterativeFibonacci(int n) { BigInteger n1 = BigInteger.ONE; BigInteger n2 = BigInteger.ONE; BigInteger fi = BigInteger.valueOf(2); // n1 + n2 for (int i = 2; i <= n; i++) { fi = n1.add(n2); n1 = n2; n2 = fi; } return fi; } static BigInteger parallelFibonacci(int itemNum, int threadNum) throws Exception { final Matrix matrix = new Matrix(1, 1, 1, 0); final Matrix primary = new Matrix(1, 0, 1, 0); // (f0, 0; f1, 0) final int workload = itemNum / threadNum; // 每一个线程要计算的 相乘的项数 // (num / threadNum) 可能存在除不尽的状况,因此最后一个任务计算全部剩下的项数 final int lastWorkload = itemNum - workload * (threadNum - 1); List<Callable<Matrix>> tasks = new ArrayList<>(threadNum); for (int i = 0; i < threadNum; i++) { if (i < threadNum - 1) { // 为了简洁,使用 Lambda 表达式替代要实现 Callable<Matrix> 的匿名内部类 tasks.add(() -> matrix.pow(workload)); } else { tasks.add(() -> matrix.pow(lastWorkload)); } } ExecutorService threadPool = Executors.newFixedThreadPool(threadNum); List<Future<Matrix>> futures = threadPool.invokeAll(tasks); // 执行全部任务,invokeAll 会阻塞直到全部任务执行完毕 Matrix result = primary.copy(); for (Future<Matrix> future : futures) { // (matrix ^ n) * (f0, 0; f1, 0) result = result.mul(future.get()); } threadPool.shutdown(); return result.c; }
能够看到单线程状况下,使用矩阵运算的效率大概只有迭代计算的 1/3 左右 —— 既然如此,那咱们耍流氓的把并行的线程数改成 10 线程吧:ip
BigInteger fi2 = parallelFibonacci(ITEM_NUM, 10); // 10 线程并行计算
能够看到,此时并行计算的用时碾压了迭代计算 —— 迭代计算委屈的哭了,并行计算这流氓耍的至关漂亮。ci
好像有点不对劲,我这篇文章的标题彷佛是 使用并行流 —— 并行流呢?
其实前面都是铺垫 :) 在 parallelFibonacci
方法中,咱们使用了线程池来并行的执行任务,咱们来尝试将 parallelFibonacci
改成流式(即基于 Stream
)风格的代码:
static BigInteger streamFibonacci(int itemNum, int threadNum) { final Matrix matrix = new Matrix(1, 1, 1, 0); final Matrix primary = new Matrix(1, 0, 1, 0); final int workload = itemNum / threadNum; final int lastWorkload = itemNum - workload * (threadNum - 1); // 流式 API return IntStream.range(0, threadNum) // 产生 [0, threadNum) 区间,用于将任务切分 .parallel() // 使流并行化 .map(i -> i < threadNum - 1 ? workload : lastWorkload) .mapToObj(w -> matrix.pow(w)) // map -> mN = matrix ^ workload .reduce((m1, m2) -> m1.mul(m2)) // reduce -> m = m1 * m2 * ... * mN .map(m -> m.mul(primary)) // map -> m = m * primary .get().c; // get -> m.c }
依旧在 10 线程的环境下运行下看看:
public static void main(String[] args) throws Exception { ... /* ------------------------------ */ System.out.println("开始流式并行计算..."); begin = System.nanoTime(); BigInteger fi3 = streamFibonacci(ITEM_NUM, 10); end = System.nanoTime(); time = (end - begin) / 1E9; System.out.printf("流式并行计算用时: %.3f\n\n", time); System.out.println("fi1 == fi2:" + (fi1.equals(fi2))); System.out.println("fi1 == fi3:" + (fi1.equals(fi3))); }
是的,使用并行流就是这么的简单,只要你会使用 Stream API —— 给它加上 .parallel()
—— 它就并行化了。写了这么多年的 Java 代码,从 Java6 到 Java7 再到 Java8,这一刻,我真的感动了(容我擦擦眼泪)。
并且咱们能够看到,在线程数相同的状况下,使用 streamFibonacci
(并行流)时,用时要比parallelFibonacci
方法更短。为了验证,我夸张一点,将线程数提升到 32:
BigInteger fi2 = parallelFibonacci(ITEM_NUM, 32); ... BigInteger fi3 = streamFibonacci(ITEM_NUM, 32);
能够看到,此时 parallelFibonacci
的运行时间反而比 10 线程的时候更长了,而 streamFibonacci
使用的时间却更短了 —— 流式 API 厉害了!
但这是什么缘由呢?这个问题留给有兴趣的读者思考和探究吧。
值得注意的是,并行流的底层实现是基于 ForkJoinPool
的,而且使用的是一个共享的 ForkJoinPool
—— ForkJoinPool.commonPool()
。为了充分利用处理器资源和提高程序性能,咱们应该尽可能使用并行流来执行 CPU 密集的任务,而不是 IO 密集的任务 —— 由于共享池中的线程数量是有限的,若是共享池中某些线程执行 IO 密集的任务,那么这些线程将长时间处于等待 IO 操做完成的状态,一旦共享池中的线程耗尽,那么程序中其余想继续使用并行流的地方就须要等待,直到有空闲的线程可用,这会在很大程度上影响到程序的性能。因此使用并行流以前,咱们要注意到这个细节。