多线程编程CompletableFuture与parallelStream

1、简介

日常在页面中咱们会使用异步调用$.ajax()函数,若是是多个的话他会并行执行相互不影响,实际上Completable我理解也是和它相似,是java 8里面新出的异步实现类,CompletableFuture类实现了Future接口,CompletableFuture与Stream的设计都遵循了相似的设计模式:使用Lambda表达式以及流水线的思想,从这个角度能够说CompletableFuture与Future的关系相似于Stream与Collection的关系。java

2、代码

直接上代码,运行以后能够看出CompletableFuture是调用的时候就开始执行,当后续代码调到get的取值方法时,若是内部已经返回结果则直接拿到,若是尚未返回将阻塞线程等待结果,能够设置超时时间避免长时间等待。ajax

如下是模拟并行调用多个方法的场景,好比查询页可能会有多个条件选择,这些条件须要后台数据相互之间有没有联系的场景,就不须要串行执行,异步执行能够节省大量时间编程

import org.joda.time.LocalDateTime;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class AppTest {

    public void printlnConsole(String msg) {
        String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss");
        System.out.println(String.format("[%s]%s", time, msg));
    }

    /**
     * 多任务单次异步执行
     */
    @Test
    public void testManyFunAsync() {
        long start = System.nanoTime();//程序开始时间
        try {
            int id = 1;//模拟一个参数,如学校Id
            printlnConsole("调用异步任务...");
            //使用异步方式调用方法【调用时就会开始执行方法】
            CompletableFuture futureClassCount = CompletableFuture.supplyAsync(() -> getClassCount(id));
            CompletableFuture futureStudentCount = CompletableFuture.supplyAsync(() -> getStudentCount(id));

            //do something 作了一些其余的事情超过了异步任务执行的时间
            printlnConsole("作一些其余的事情...");
            Thread.sleep(3000);
            printlnConsole("其余事情完成");

            //下面获取异步任务的结果,就会当即拿到返回值
            printlnConsole("获取异步任务结果...");
            Object classCount = futureClassCount.get();
            //Object classCount = futureClassCount.get(2, TimeUnit.SECONDS);//能够设置超时时间,超过这个时间时将再也不等待,返回异常
            Object studentCount = futureStudentCount.get();
            //Object studentCount = futureStudentCount.get(2, TimeUnit.SECONDS);
            printlnConsole("异步任务结果获取完成");

            printlnConsole("ClassCount:" + classCount);
            printlnConsole("StudentCount:" + studentCount);

        } catch (Exception e) {
            e.printStackTrace();
        }
        long end = System.nanoTime();//程序结束时间
        long time = (end - start) / 1000000;//总耗时
        System.out.println("运行时间:" + time);
    }

    public int getClassCount(int id) {
        try {
            Thread.sleep(2000);
            printlnConsole("getClassCount(" + id + ")执行完毕");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 20;
    }

    public int getStudentCount(int id) {
        try {
            Thread.sleep(1000);
            printlnConsole("getStudentCount(" + id + ")执行完毕");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 100;
    }
}

anyOf()为任意一个子任务线程执行完毕后返回
allOf()为等待全部子任务线程所有执行完毕后返回
getNow()表示我须要当即拿到结果,若是当前的线程并未执行完成,则使用我传入的值进行任务调用,参数为没法获取结果时使用我传入的值
get()获取子线程运算的结果,会抛出检查到的异常
join()获取子线程运算的结果,不会抛出异常segmentfault

package com.ysl;

import org.joda.time.LocalDateTime;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class AppTest {

    public void printlnConsole(String msg) {
        String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss");
        System.out.println(String.format("[%s]%s", time, msg));
    }

    /**
     * 并行执行等待所有结果或等待任意结果
     */
    @Test
    public void testAllOfAnyOf() {
        long start = System.nanoTime();
        try {
            printlnConsole("调用异步任务...");
            List<Integer> ids = Arrays.asList(1, 3, 5);//准备的请求参数
            //建立异步方法数组
            CompletableFuture[] futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getClassName(id))).toArray(size -> new CompletableFuture[size]);
            //指定该异步方法数组的子任务线程等待类型
            CompletableFuture.anyOf(futures).join();//anyOf()为任意一个子任务线程执行完毕后返回
            //CompletableFuture.allOf(futures).join();//allOf()为等待全部子任务线程所有执行完毕后返回

            printlnConsole("作一些其余的事情...");
            Thread.sleep(2000);
            printlnConsole("其余事情完成");

            printlnConsole("获取异步任务结果:");
            for (CompletableFuture f : futures) {
                //Object obj = f.getNow(1);//getNow()表示我须要当即拿到结果,若是当前的线程并未执行完成,则使用我传入的值进行任务调用,参数为没法获取结果时使用我传入的值
                Object obj = f.get();//get()获取子线程运算的结果,会抛出检查到的异常
                //Object obj = f.join();//join()获取子线程运算的结果,不会抛出异常
                printlnConsole(String.valueOf(obj));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        long end = System.nanoTime();
        long time = (end - start) / 1000000;
        System.out.println("运行时间:" + time);
    }

    public String getClassName(int id) {
        try {
            Thread.sleep(id * 1000);
            printlnConsole("getClassName(" + id + ")执行完毕");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "taiyonghai-" + id;
    }
}

下面是并行流的演示parallelStream也是java 8新特性设计模式

 ids.stream()转化为流.map()映射每一个元素对应的结果.collect(Collectors.toList)把结果概括为List;还有.filter()过滤元素.sorted()对元素进行排序.limit()获取指定数量元素;也能够toArray(size -> new Class[size])转化为数组数组

如下是模拟根据Id查询学生名称的场景,接收到的是一个集合又都是调用同一个方法获取,就可使用并行流同时异步请求等待返回结果安全

import org.joda.time.LocalDateTime;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class AppTest {

    public void printlnConsole(String msg) {
        String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss");
        System.out.println(String.format("[%s]%s", time, msg));
    }

    /**
     * 单任务屡次并行流执行
     */
    @Test
    public void testParallelStream() {
        long start = System.nanoTime();
        try {
            printlnConsole("调用异步任务...");
            List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);//准备的请求参数
            //串行执行会等待每个方法执行完毕后在继续执行下一个
            //List<String> names = ids.stream().map(id -> getStudentName(id)).collect(Collectors.toList());
            //并行执行会同时调用多个方法待所有执行完毕后一块儿返回(parallelStream是非线程安全的,配合collect达到线程安全,后续验证一下)
            List<String> names = ids.parallelStream().map(id -> getStudentName(id)).collect(Collectors.toList());
            //不管stream()或者parallelStream()调用时均会阻断线程执行
            printlnConsole("作一些其余的事情...");
            Thread.sleep(3000);
            printlnConsole("其余事情完成");

            printlnConsole("获取异步任务结果:");
            names.forEach(item -> printlnConsole(item));
        } catch (Exception e) {
            e.printStackTrace();
        }
        long end = System.nanoTime();
        long time = (end - start) / 1000000;
        System.out.println("运行时间:" + time);
    }

    public String getStudentName(int id) {
        try {
            Thread.sleep(2000);
            printlnConsole("getStudentName(" + id + ")执行完毕");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "taiyonghai-" + id;
    }
}

 上面能看到并行流虽然是并行执行但等待结果是阻塞线程的,因此能够利用异步CompletableFuture配合串行流来实现网络

如下是采用串行流配合异步实现的并发处理并发

import org.joda.time.LocalDateTime;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class AppTest {

    public void printlnConsole(String msg) {
        String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss");
        System.out.println(String.format("[%s]%s", time, msg));
    }

    /**
     * 单任务屡次异步执行
     */
    @Test
    public void testOneFunAsync() {
        long start = System.nanoTime();
        try {
            printlnConsole("调用异步任务...");
            List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);//准备的请求参数
            //ids.stream()转化为流.map()映射每一个元素对应的结果.collect()把结果概括为List;还有.filter()过滤元素.sorted()对元素进行排序.limit()获取指定数量元素;
            List<CompletableFuture<String>> futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getStudentName(id))).collect(Collectors.toList());

            //不用并行流parallelStream()调用时就不会阻断线程执行
            printlnConsole("作一些其余的事情...");
            Thread.sleep(3000);
            printlnConsole("其余事情完成");

            printlnConsole("获取异步任务结果:");
            futures.forEach(f -> {
                try {
                    Object obj = f.get();
                    printlnConsole(String.valueOf(obj));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
        }catch (Exception e){
            e.printStackTrace();
        }

        long end = System.nanoTime();
        long time = (end - start) / 1000000;
        System.out.println("运行时间:" + time);
    }

    public String getStudentName(int id) {
        try {
            Thread.sleep(2000);
            printlnConsole("getStudentName(" + id + ")执行完毕");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "taiyonghai-" + id;
    }
}

 当个人并行任务数量超过了我机器的核心数就会产生等待,我电脑是8核使用并行流执行数量就能够开8个子线程,当多余这个数量时剩下的就须要等待前面线程执行完再执行异步

当须要并行执行的任务数量大于核心数的时候,产生的等待是咱们不想看到的,这时CompletableFuture就更加适用,它能够手动这只线程池大小,避免并行任务过多时的等待

咱们将代码作些修正

如下是源码,这样就能够提升对多任务并行处理的支持了

import org.joda.time.LocalDateTime;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class AppTest {

    public void printlnConsole(String msg) {
        String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss");
        System.out.println(String.format("[%s]%s", time, msg));
    }

    /**
     * 手动配置线程执行器的线程池大小
     */
    private final Executor myExecutor = Executors.newFixedThreadPool(20, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            //使用守护线程保证不会阻止程序的关停
            t.setDaemon(true);
            return t;
        }
    });
    /**
     * 单任务屡次异步执行
     */
    @Test
    public void testOneFunAsync() {
        long start = System.nanoTime();
        try {
            printlnConsole("调用异步任务...");
            List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);//准备的请求参数
            //ids.stream()转化为流.map()映射每一个元素对应的结果.collect()把结果概括为List;还有.filter()过滤元素.sorted()对元素进行排序.limit()获取指定数量元素;
            List<CompletableFuture<String>> futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getStudentName(id), myExecutor)).collect(Collectors.toList());

            //不用并行流parallelStream()调用时就不会阻断线程执行
            printlnConsole("作一些其余的事情...");
            Thread.sleep(3000);
            printlnConsole("其余事情完成");

            printlnConsole("获取异步任务结果:");
            futures.forEach(f -> {
                try {
                    Object obj = f.get();
                    printlnConsole(String.valueOf(obj));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }

        long end = System.nanoTime();
        long time = (end - start) / 1000000;
        System.out.println("运行时间:" + time);
    }

    public String getStudentName(int id) {
        try {
            Thread.sleep(2000);
            printlnConsole("getStudentName(" + id + ")执行完毕");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "taiyonghai-" + id;
    }
}

  

java 8的新特性也只作到了会用,不少深刻的还不了解,还望指导谢谢,下面备份一下别人的总结我以为挺有用的:

选择正确的线程池大小
《Java并发编程实战》中给出以下公式:

Number = NCpu * Ucpu * ( 1 + W/C)
Number : 线程数量
NCpu : 处理器核数
UCpu : 指望cpu利用率
W/C : 等待时间与计算时间比
咱们这里:99%d的时间是等待商店响应 W/C = 99 ,cpu利用率指望 100% ,NCpu = 9,推断出 number = 800。可是为了不过多的线程搞死计算机,咱们选择商店数与计算值中较小的一个。

并行流与CompletableFuture
目前,咱们对集合进行计算有两种方式:1.并行流 2.CompletableFuture;

一、而CompletableFuture更加的灵活,咱们能够配置线程池的大小确保总体的计算不会由于等待IO而发生阻塞。

书上给出的建议以下:若是是计算密集型的操做而且没有IO推荐stream接口,由于实现简单效率也高,若是全部的线程都是计算密集型的也就没有必要建立比核数更多的线程。

二、反之,若是任务涉及到IO,网络等操做:CompletableFuture灵活性更好,由于大部分线程处于等待状态,须要让他们更加忙碌,而且再逻辑中加入异常处理能够更有效的监控是什么缘由触发了等待。

 参考地址:http://www.javashuo.com/article/p-siujpsba-nc.html

相关文章
相关标签/搜索