想必热爱游戏的同窗小时候,都幻想过要是本身要是能像鸣人那样会多重影分身之术,就能一边打游戏一边上课了,惋惜漫画就是漫画,现实中并无这个技术,你要么只有老老实实的上课,要么就只有逃课去打游戏了。虽然在现实中咱们没法实现多重影分身这样的技术,可是咱们能够在计算机世界中实现咱们这样的愿望。java
计算机中的分身术不是天生就有了。在1971年,1971年,英特尔推出的全球第一颗通用型微处理器4004,由2300个晶体管构成。当时,公司的联合创始人之一戈登摩尔就提出大名鼎鼎的“摩尔定律”——每过18个月,芯片上能够集成的晶体管数目将增长一倍。最初的主频740kHz(每秒运行74万次),如今过了快50年了,你们去买电脑的时候会发现如今的主频都能达到4.0GHZ了(每秒40亿次)。可是主频越高带来的收益倒是愈来愈小:git
在单核主频遇到瓶颈的状况下,多核CPU应运而生,不只提高了性能,而且下降了功耗。因此多核CPU逐渐成为如今市场的主流,这样让咱们的多线程编程也更加的容易。github
说到了多核CPU就必定要说GPU,你们可能对这个比较陌生,可是一说到显卡就确定不陌生,笔者搞过一段时间的CUDA编程,我才意识到这个才是真正的并行计算,你们都知道图片像素点吧,好比19201080的图片有210万个像素点,若是想要把一张图片的每一个像素点都进行转换一下,那在咱们java里面可能就要循环遍历210万次。 就算咱们用多线程8核CPU,那也得循环几十万次。可是若是使用Cuda,最多能够365535*512=100661760(一亿)个线程并行执行,就这种级别的图片那也是立刻处理完成。可是Cuda通常适合于图片这种,有大量的像素点须要同时处理,可是其支持指令很少因此逻辑不能太复杂。GPU只是用来扩展介绍,感兴趣能够和笔者交流。算法
一提及让你的服务高性能的手段,那么异步化,并行化这些确定会第一时间在你脑海中显现出来,在以前的文章:《异步化,你的高并发大杀器》中已经介绍过了异步化的优化手段,有兴趣的朋友能够看看。并行化能够用来配合异步化,也能够用来单独作优化。编程
咱们能够想一想有这么一个需求,在你下外卖订单的时候,这笔订单可能还须要查,用户信息,折扣信息,商家信息,菜品信息等,用同步的方式调用,以下图所示:安全
设想一下这5个查询服务,平均每次消耗50ms,那么本次调用至少是250ms,咱们细想一下,在这个这五个服务其实并无任何的依赖,谁先获取谁后获取均可以,那么咱们能够想一想,是否能够用多重影分身之术,同时获取这五个服务的信息呢?优化以下:多线程
将这五个查询服务并行查询,在理想状况下能够优化至50ms。固然提及来简单,咱们真正如何落地呢?并发
CountDownLatch和Phaser是JDK提供的同步工具类Phaser是1.7版本以后提供的工具类而CountDownLatch是1.5版本以后提供的工具类。这里简单介绍一下CountDownLatch,能够将其当作是一个计数器,await()方法能够阻塞至超时或者计数器减至0,其余线程当完成本身目标的时候能够减小1,利用这个机制咱们能够将其用来作并发。 能够用以下的代码实现咱们上面的下订单的需求:框架
public class CountDownTask { private static final int CORE_POOL_SIZE = 4; private static final int MAX_POOL_SIZE = 12; private static final long KEEP_ALIVE_TIME = 5L; private final static int QUEUE_SIZE = 1600; protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE)); public static void main(String[] args) throws InterruptedException { // 新建一个为5的计数器 CountDownLatch countDownLatch = new CountDownLatch(5); OrderInfo orderInfo = new OrderInfo(); THREAD_POOL.execute(() -> { System.out.println("当前任务Customer,线程名字为:" + Thread.currentThread().getName()); orderInfo.setCustomerInfo(new CustomerInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("当前任务Discount,线程名字为:" + Thread.currentThread().getName()); orderInfo.setDiscountInfo(new DiscountInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("当前任务Food,线程名字为:" + Thread.currentThread().getName()); orderInfo.setFoodListInfo(new FoodListInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("当前任务Tenant,线程名字为:" + Thread.currentThread().getName()); orderInfo.setTenantInfo(new TenantInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("当前任务OtherInfo,线程名字为:" + Thread.currentThread().getName()); orderInfo.setOtherInfo(new OtherInfo()); countDownLatch.countDown(); }); countDownLatch.await(1, TimeUnit.SECONDS); System.out.println("主线程:"+ Thread.currentThread().getName()); } }
创建一个线程池(具体配置根据具体业务,具体机器配置),进行并发的执行咱们的任务(生成用户信息,菜品信息等),最后利用await方法阻塞等待结果成功返回。异步
相信各位同窗已经发现,CountDownLatch虽然能实现咱们须要知足的功能可是其任然有个问题是,在咱们的业务代码须要耦合CountDownLatch的代码,好比在咱们获取用户信息以后咱们会执行countDownLatch.countDown(),很明显咱们的业务代码显然不该该关心这一部分逻辑,而且在开发的过程当中万一写漏了,那咱们的await方法将只会被各类异常唤醒。
因此在JDK1.8中提供了一个类CompletableFuture,它是一个多功能的非阻塞的Future。(什么是Future:用来表明异步结果,而且提供了检查计算完成,等待完成,检索结果完成等方法。)在我以前的这篇文章中详细介绍了《异步技巧之CompletableFuture》,有兴趣的能够看这篇文章。咱们将每一个任务的计算完成的结果都用CompletableFuture来表示,利用CompletableFuture.allOf汇聚成一个大的CompletableFuture,那么利用get()方法就能够阻塞。
public class CompletableFutureParallel { private static final int CORE_POOL_SIZE = 4; private static final int MAX_POOL_SIZE = 12; private static final long KEEP_ALIVE_TIME = 5L; private final static int QUEUE_SIZE = 1600; protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE)); public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { OrderInfo orderInfo = new OrderInfo(); //CompletableFuture 的List List<CompletableFuture> futures = new ArrayList<>(); futures.add(CompletableFuture.runAsync(() -> { System.out.println("当前任务Customer,线程名字为:" + Thread.currentThread().getName()); orderInfo.setCustomerInfo(new CustomerInfo()); }, THREAD_POOL)); futures.add(CompletableFuture.runAsync(() -> { System.out.println("当前任务Discount,线程名字为:" + Thread.currentThread().getName()); orderInfo.setDiscountInfo(new DiscountInfo()); }, THREAD_POOL)); futures.add( CompletableFuture.runAsync(() -> { System.out.println("当前任务Food,线程名字为:" + Thread.currentThread().getName()); orderInfo.setFoodListInfo(new FoodListInfo()); }, THREAD_POOL)); futures.add(CompletableFuture.runAsync(() -> { System.out.println("当前任务Other,线程名字为:" + Thread.currentThread().getName()); orderInfo.setOtherInfo(new OtherInfo()); }, THREAD_POOL)); CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); allDoneFuture.get(10, TimeUnit.SECONDS); System.out.println(orderInfo); } }
能够看见咱们使用CompletableFuture能很快的完成的需求,固然这还不够。
咱们上面用CompletableFuture完成了咱们对多组任务并行执行,可是其依然是依赖咱们的线程池,在咱们的线程池中使用的是阻塞队列,也就是当咱们某个线程执行完任务的时候须要经过这个阻塞队列进行,那么确定会发生竞争,因此在JDK1.7中提供了ForkJoinTask和ForkJoinPool。
ForkJoinPool中每一个线程都有本身的工做队列,而且采用Work-Steal算法防止线程饥饿。 Worker线程用LIFO的方法取出任务,可是会用FIFO的方法去偷取别人队列的任务,这样就减小了锁的冲突。
网上这个框架的例子不少,咱们看看如何使用代码其完成咱们上面的下订单需求:
public class OrderTask extends RecursiveTask<OrderInfo> { @Override protected OrderInfo compute() { System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName()); // 定义其余五种并行TasK CustomerTask customerTask = new CustomerTask(); TenantTask tenantTask = new TenantTask(); DiscountTask discountTask = new DiscountTask(); FoodTask foodTask = new FoodTask(); OtherTask otherTask = new OtherTask(); invokeAll(customerTask, tenantTask, discountTask, foodTask, otherTask); OrderInfo orderInfo = new OrderInfo(customerTask.join(), tenantTask.join(), discountTask.join(), foodTask.join(), otherTask.join()); return orderInfo; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() -1 ); System.out.println(forkJoinPool.invoke(new OrderTask())); } } class CustomerTask extends RecursiveTask<CustomerInfo>{ @Override protected CustomerInfo compute() { System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName()); return new CustomerInfo(); } } class TenantTask extends RecursiveTask<TenantInfo>{ @Override protected TenantInfo compute() { System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName()); return new TenantInfo(); } } class DiscountTask extends RecursiveTask<DiscountInfo>{ @Override protected DiscountInfo compute() { System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName()); return new DiscountInfo(); } } class FoodTask extends RecursiveTask<FoodListInfo>{ @Override protected FoodListInfo compute() { System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName()); return new FoodListInfo(); } } class OtherTask extends RecursiveTask<OtherInfo>{ @Override protected OtherInfo compute() { System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName()); return new OtherInfo(); } }
咱们定义一个OrderTask而且定义五个获取信息的任务,在compute中分别fork执行这五个任务,最后在将这五个任务的结果经过Join得到,最后完成咱们的并行化的需求。
在jdk1.8中提供了并行流的API,当咱们使用集合的时候能很好的进行并行处理,下面举了一个简单的例子从1加到100:
public class ParallelStream { public static void main(String[] args) { ArrayList<Integer> list = new ArrayList<Integer>(); for (int i = 1; i <= 100; i++) { list.add(i); } LongAdder sum = new LongAdder(); list.parallelStream().forEach(integer -> { // System.out.println("当前线程" + Thread.currentThread().getName()); sum.add(integer); }); System.out.println(sum); } }
parallelStream中底层使用的那一套也是Fork/Join的那一套,默认的并发程度是可用CPU数-1。
能够想象有这么一个需求,天天定时对id在某个范围之间的用户发券,好比这个范围之间的用户有几百万,若是给一台机器发的话,可能所有发完须要好久的时间,因此分布式调度框架好比:elastic-job。都提供了分片的功能,好比你用50台机器,那么id%50=0的在第0台机器上,=1的在第1台机器上发券,那么咱们的执行时间其实就分摊到了不一样的机器上了。
本文介绍了什么是并行化,并行化的各类历史,在Java中如何实现并行化,以及并行化的注意事项。但愿你们对并行化有个比较全面的认识。最后给你们提个两个小问题:
最后这篇文章被我收录于JGrowing,一个全面,优秀,由社区一块儿共建的Java学习路线,若是您想参与开源项目的维护,能够一块儿共建,github地址为:https://github.com/javagrowing/JGrowing 麻烦给个小星星哟。
若是你以为这篇文章对你有文章,能够关注个人技术公众号,你的关注和转发是对我最大的支持,O(∩_∩)O