同步和异步,阻塞和非阻塞,这几个词已是老生常谈,可是仍是有不少同窗分不清楚,觉得同步确定就是阻塞,异步确定就是非阻塞,其实他们并非一回事。java
同步和异步关注的是结果消息的通讯机制:程序员
阻塞和非阻塞主要关注的是等待结果返回调用方的状态:web
能够看见同步和异步,阻塞和非阻塞主要关注的点不一样,有人会问同步还能非阻塞,异步还能阻塞?算法
固然是能够的,下面为了更好的说明它们的组合之间的意思,用几个简单的例子说明:spring
同步阻塞:同步阻塞基本也是编程中最多见的模型,打个比方你去商店买衣服,你去了以后发现衣服卖完了,那你就在店里面一直等,期间不作任何事(包括看手机),等着商家进货,直到有货为止,这个效率很低。数据库
这个时候不须要傻傻的等着,你能够去其余地方好比奶茶店,买杯水,可是你仍是须要时不时的去商店问老板新衣服到了吗。apache
有点像你去商店买衣服,这个时候发现衣服没有了,这个时候你就给老板留个电话,说衣服到了就给我打电话,而后你就守着这个电话,一直等着它响什么事也不作。这样感受的确有点傻,因此这个模式用得比较少。编程
比如你去商店买衣服,衣服没了,你只须要给老板说这是个人电话,衣服到了就打。而后你就为所欲为的去玩,也不用操心衣服何时到,衣服一到,电话一响就能够去买衣服了。api
上面已经看到了同步阻塞的效率是多么的低,若是使用同步阻塞的方式去买衣服,你有可能一天只能买一件衣服,其余什么事都不能干;若是用异步非阻塞的方式去买,买衣服只是你一天中进行的一个小事。缓存
咱们把这个映射到咱们代码中,当咱们的线程发生一次 RPC 调用或者 HTTP 调用,又或者其余的一些耗时的 IO 调用。
发起以后,若是是同步阻塞,咱们的这个线程就会被阻塞挂起,直到结果返回,试想一下,若是 IO 调用很频繁那咱们的 CPU 使用率会很低很低。
正所谓是物尽其用,既然 CPU 的使用率被 IO 调用搞得很低,那咱们就可使用异步非阻塞。
当发生 IO 调用时我并不立刻关心结果,我只须要把回调函数写入此次 IO 调用,这个时候线程能够继续处理新的请求,当 IO 调用结束时,会调用回调函数。
而咱们的线程始终处于忙碌之中,这样就能作更多的有意义的事了。这里首先要说明的是,异步化不是万能,异步化并不能缩短你整个链路调用时间长的问题,可是它能极大的提高你的最大 QPS。
通常咱们的业务中有两处比较耗时:
上面说了异步化是用于解决 IO 阻塞的问题,而咱们通常项目中可使用异步化的状况以下:
下面我会从上面几个方面进行异步化的介绍。
对于 Java 开发程序员来讲 Servlet 并不陌生,在项目中不论你使用 Struts2,仍是使用的 Spring MVC,本质上都是封装的 Servlet。
可是咱们通常的开发都是使用的同步阻塞,模式以下:
上面的模式优势在于编码简单,适合在项目启动初期,访问量较少,或者是 CPU 运算较多的项目。
缺点在于,业务逻辑线程和 Servlet 容器线程是同一个,通常的业务逻辑总得发生点 IO,好比查询数据库,好比产生 RPC 调用,这个时候就会发生阻塞。
而咱们的 Servlet 容器线程确定是有限的,当 Servlet 容器线程都被阻塞的时候咱们的服务这个时候就会发生拒绝访问,线程不够我固然能够经过增长机器的一系列手段来解决这个问题。
可是俗话说得好靠人不如靠本身,靠别人替我分担请求,还不如我本身搞定。
因此在 Servlet 3.0 以后支持了异步化,咱们采用异步化以后,模式变成以下:
在这里咱们采用新的线程处理业务逻辑,IO 调用的阻塞就不会影响咱们的 Serlvet 了,实现异步 Serlvet 的代码也比较简单,以下:
@WebServlet(name = "WorkServlet",urlPatterns = "/work",asyncSupported =true) public class WorkServlet extends HttpServlet{ private static final long serialVersionUID = 1L; @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { this.doPost(req, resp); } @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { //设置ContentType,关闭缓存 resp.setContentType("text/plain;charset=UTF-8"); resp.setHeader("Cache-Control","private"); resp.setHeader("Pragma","no-cache"); final PrintWriter writer= resp.getWriter(); writer.println("老师检查做业了"); writer.flush(); List<String> zuoyes=new ArrayList<String>(); for (int i = 0; i < 10; i++) { zuoyes.add("zuoye"+i);; } //开启异步请求 final AsyncContext ac=req.startAsync(); doZuoye(ac, zuoyes); writer.println("老师布置做业"); writer.flush(); } private void doZuoye(final AsyncContext ac, final List<String> zuoyes) { ac.setTimeout(1*60*60*1000L); ac.start(new Runnable() { @Override public void run() { //经过response得到字符输出流 try { PrintWriter writer=ac.getResponse().getWriter(); for (String zuoye:zuoyes) { writer.println("\""+zuoye+"\"请求处理中"); Thread.sleep(1*1000L); writer.flush(); } ac.complete(); } catch (Exception e) { e.printStackTrace(); } } }); } }
实现 Serlvet 的关键在于 HTTP 采起了长链接,也就是当请求打过来的时候就算有返回也不会关闭,由于可能还会有数据,直到返回关闭指令。
AsyncContext ac=req.startAsync();用于获取异步上下文,后续咱们经过这个异步上下文进行回调返回数据,有点像咱们买衣服的时候,留给老板一个电话。
而这个上下文也是一个电话,当有衣服到的时候,也就是当有数据准备好的时候就能够打电话发送数据了。ac.complete();用来进行长连接的关闭。
如今其实不多人来进行 Serlvet 编程,都是直接采用现成的一些框架,好比 Struts2,Spring MVC。下面介绍下使用 Spring MVC 如何进行异步化:
首先确认你的项目中的 Servlet 是 3.0 以上,其次 Spring MVC 4.0+:
<dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>4.2.3.RELEASE</version> </dependency>
web.xml 头部声明,必需要 3.0,Filter 和 Serverlet 设置为异步:
<?xml version="1.0" encoding="UTF-8"?> <web-app version="3.0" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"> <filter> <filter-name>testFilter</filter-name> <filter-class>com.TestFilter</filter-class> <async-supported>true</async-supported> </filter> <servlet> <servlet-name>mvc-dispatcher</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> ......... <async-supported>true</async-supported> </servlet>
使用 Spring MVC 封装了 Servlet 的 AsyncContext,使用起来比较简单。之前咱们同步的模式的 Controller 是返回 ModelAndView。
而异步模式直接生成一个 DeferredResult(支持咱们超时扩展)便可保存上下文,下面给出如何和咱们 HttpClient 搭配的简单 demo:
@RequestMapping(value="/asynctask", method = RequestMethod.GET) public DeferredResult<String> asyncTask() throws IOReactorException { IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build(); ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor); conManager.setMaxTotal(100); conManager.setDefaultMaxPerRoute(100); CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build(); // Start the client httpclient.start(); //设置超时时间200ms final DeferredResult<String> deferredResult = new DeferredResult<String>(200L); deferredResult.onTimeout(new Runnable() { @Override public void run() { System.out.println("异步调用执行超时!thread id is : " + Thread.currentThread().getId()); deferredResult.setResult("超时了"); } }); System.out.println("/asynctask 调用!thread id is : " + Thread.currentThread().getId()); final HttpGet request2 = new HttpGet("http://www.apache.org/"); httpclient.execute(request2, new FutureCallback<HttpResponse>() { public void completed(final HttpResponse response2) { System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine()); deferredResult.setResult(request2.getRequestLine() + "->" + response2.getStatusLine()); } public void failed(final Exception ex) { System.out.println(request2.getRequestLine() + "->" + ex); } public void cancelled() { System.out.println(request2.getRequestLine() + " cancelled"); } }); return deferredResult; }
注意:在 Serlvet 异步化中有个问题是 Filter 的后置结果处理,无法使用,对于咱们一些打点,结果统计直接使用 Serlvet 异步是无法用的。
在 Spring MVC 中就很好的解决了这个问题,Spring MVC 采用了一个比较取巧的方式经过请求转发,能让请求再次经过过滤器。
可是又引入了新的一个问题那就是过滤器会处理两次,这里能够经过 Spring MVC 源码中自身判断的方法。
咱们能够在 Filter 中使用下面这句话来进行判断是否是属于 Spring MVC 转发过来的请求,从而不处理 Filter 的前置事件,只处理后置事件:
Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE); return asyncManagerAttr instanceof WebAsyncManager ;
上面咱们介绍了 Serlvet 的异步化,相信细心的同窗都看出来彷佛并无解决根本的问题,个人 IO 阻塞依然存在,只是换了个位置而已。
当 IO 调用频繁一样会让业务线程池快速变满,虽然 Serlvet 容器线程不被阻塞,可是这个业务依然会变得不可用。
那么怎么才能解决上面的问题呢?答案就是全链路异步化,全链路异步追求的是没有阻塞,打满你的 CPU,把机器的性能压榨到极致。模型图以下:
具体的 NIO Client 到底作了什么事呢,具体以下面模型:
上面就是咱们全链路异步的图了(部分线程池能够优化)。全链路的核心在于只要咱们遇到 IO 调用的时候,咱们就可使用 NIO,从而避免阻塞,也就解决了以前说的业务线程池被打满的尴尬场景。
咱们通常远程调用使用 RPC 或者 HTTP:
下面简单介绍下 HTTP 异步化调用是怎么作的。首先来看一个例子:
public class HTTPAsyncClientDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, IOReactorException { //具体参数含义下文会讲 //apache提供了ioReactor的参数配置,这里咱们配置IO 线程为1 IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build(); //根据这个配置建立一个ioReactor ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); //asyncHttpClient使用PoolingNHttpClientConnectionManager管理咱们客户端链接 PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor); //设置总共的链接的最大数量 conManager.setMaxTotal(100); //设置每一个路由的链接的最大数量 conManager.setDefaultMaxPerRoute(100); //建立一个Client CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build(); // Start the client httpclient.start(); // Execute request final HttpGet request1 = new HttpGet("http://www.apache.org/"); Future<HttpResponse> future = httpclient.execute(request1, null); // and wait until a response is received HttpResponse response1 = future.get(); System.out.println(request1.getRequestLine() + "->" + response1.getStatusLine()); // One most likely would want to use a callback for operation result final HttpGet request2 = new HttpGet("http://www.apache.org/"); httpclient.execute(request2, new FutureCallback<HttpResponse>() { //Complete成功后会回调这个方法 public void completed(final HttpResponse response2) { System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine()); } public void failed(final Exception ex) { System.out.println(request2.getRequestLine() + "->" + ex); } public void cancelled() { System.out.println(request2.getRequestLine() + " cancelled"); } }); } }
下面给出 httpAsync 的整个类图:
对于咱们的 HTTPAysncClient 最后使用的是 InternalHttpAsyncClient,在 InternalHttpAsyncClient 中有个 ConnectionManager,这个就是咱们管理链接的管理器。
而在 httpAsync 中只有一个实现那就是 PoolingNHttpClientConnectionManager。
这个链接管理器中有两个咱们比较关心的,一个是 Reactor,一个是 Cpool:
在 DefaultConnectingIOReactor 有个 excutor 方法,生成 IOReactor 也就是咱们图中的 BaseIOReactor,进行 IO 的操做。这个模型就是咱们上面的 1.2.2 的模型。
若是每一个路由有满了,它会断开最老的一个连接;若是总共的 total 满了,它会放入 leased 队列,释放空间的时候就会将其从新链接。
对于数据库调用通常的框架并无提供异步化的方法,这里推荐本身封装或者使用网上开源的。
异步化并非高并发的银弹,可是有了异步化的确能提升你机器的 QPS,吞吐量等等。
上述讲的一些模型若是能合理的作一些优化,而后进行应用,相信能对你的服务有很大的帮助。
想必热爱游戏的同窗小时候都幻想过要是本身会分身之术,就能一边打游戏一边上课了。
惋惜现实中并无这个技术,你要么只有老老实实的上课,要么就只有逃课去打游戏了。
虽然在现实中咱们没法实现分身这样的技术,可是咱们能够在计算机世界中实现这样的愿望。
计算机中的分身术不是天生就有了。在 1971 年,英特尔推出的全球第一颗通用型微处理器 4004,由 2300 个晶体管构成。
当时,公司的联合创始人之一戈登摩尔就提出大名鼎鼎的“摩尔定律”——每过 18 个月,芯片上能够集成的晶体管数目将增长一倍。
最初的主频 740KHz(每秒运行 74 万次),如今过了快 50 年了,你们去买电脑的时候会发现如今的主频都能达到 4.0GHZ了(每秒 40 亿次)。
可是主频越高带来的收益倒是愈来愈小:
在单核主频遇到瓶颈的状况下,多核 CPU 应运而生,不只提高了性能,而且下降了功耗。
因此多核 CPU 逐渐成为如今市场的主流,这样让咱们的多线程编程也更加的容易。
说到了多核 CPU 就必定要说 GPU,你们可能对这个比较陌生,可是一说到显卡就确定不陌生,笔者搞过一段时间的 CUDA 编程,我才意识到这个才是真正的并行计算。
你们都知道图片像素点吧,好比 1920*1080 的图片有 210 万个像素点,若是想要把一张图片的每一个像素点都进行转换一下,那在咱们 Java 里面可能就要循环遍历 210 万次。
就算咱们用多线程 8 核 CPU,那也得循环几十万次。可是若是使用 Cuda,最多能够 365535*512 = 100661760(一亿)个线程并行执行,就这种级别的图片那也是立刻处理完成。
可是 Cuda 通常适合于图片这种,有大量的像素点须要同时处理,可是指令集不多因此逻辑不能太复杂。
一提及让你的服务高性能的手段,那么异步化,并行化这些确定会第一时间在你脑海中显现出来,并行化能够用来配合异步化,也能够用来单独作优化。
咱们能够想一想有这么一个需求,在你下外卖订单的时候,这笔订单可能还须要查用户信息,折扣信息,商家信息,菜品信息等。
用同步的方式调用,以下图所示:
设想一下这 5 个查询服务,平均每次消耗 50ms,那么本次调用至少是 250ms,咱们细想一下,这五个服务其实并无任何的依赖,谁先获取谁后获取均可以。
那么咱们能够想一想,是否能够用多重影分身之术,同时获取这五个服务的信息呢?
优化以下:
将这五个查询服务并行查询,在理想状况下能够优化至 50ms。固然提及来简单,咱们真正如何落地呢?
CountDownLatch 和 Phaser 是 JDK 提供的同步工具类。Phaser 是 1.7 版本以后提供的工具类。而 CountDownLatch 是 1.5 版本以后提供的工具类。
这里简单介绍一下 CountDownLatch,能够将其当作是一个计数器,await()方法能够阻塞至超时或者计数器减至 0,其余线程当完成本身目标的时候能够减小 1,利用这个机制咱们能够用来作并发。
能够用以下的代码实现咱们上面的下订单的需求:
public class CountDownTask { private static final int CORE_POOL_SIZE = 4; private static final int MAX_POOL_SIZE = 12; private static final long KEEP_ALIVE_TIME = 5L; private final static int QUEUE_SIZE = 1600; protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE)); public static void main(String[] args) throws InterruptedException { // 新建一个为5的计数器 CountDownLatch countDownLatch = new CountDownLatch(5); OrderInfo orderInfo = new OrderInfo(); THREAD_POOL.execute(() -> { System.out.println("当前任务Customer,线程名字为:" + Thread.currentThread().getName()); orderInfo.setCustomerInfo(new CustomerInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("当前任务Discount,线程名字为:" + Thread.currentThread().getName()); orderInfo.setDiscountInfo(new DiscountInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("当前任务Food,线程名字为:" + Thread.currentThread().getName()); orderInfo.setFoodListInfo(new FoodListInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("当前任务Tenant,线程名字为:" + Thread.currentThread().getName()); orderInfo.setTenantInfo(new TenantInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("当前任务OtherInfo,线程名字为:" + Thread.currentThread().getName()); orderInfo.setOtherInfo(new OtherInfo()); countDownLatch.countDown(); }); countDownLatch.await(1, TimeUnit.SECONDS); System.out.println("主线程:"+ Thread.currentThread().getName()); } }
创建一个线程池(具体配置根据具体业务,具体机器配置),进行并发的执行咱们的任务(生成用户信息,菜品信息等),最后利用 await 方法阻塞等待结果成功返回。
相信各位同窗已经发现,CountDownLatch 虽然能实现咱们须要知足的功能可是其仍然有个问题是,咱们的业务代码须要耦合 CountDownLatch 的代码。
好比在咱们获取用户信息以后,咱们会执行 countDownLatch.countDown(),很明显咱们的业务代码显然不该该关心这一部分逻辑,而且在开发的过程当中万一写漏了,那咱们的 await 方法将只会被各类异常唤醒。
因此在 JDK 1.8 中提供了一个类 CompletableFuture,它是一个多功能的非阻塞的 Future。(什么是 Future:用来表明异步结果,而且提供了检查计算完成,等待完成,检索结果完成等方法。)
咱们将每一个任务的计算完成的结果都用 CompletableFuture 来表示,利用 CompletableFuture.allOf 汇聚成一个大的 CompletableFuture,那么利用 get()方法就能够阻塞。
public class CompletableFutureParallel { private static final int CORE_POOL_SIZE = 4; private static final int MAX_POOL_SIZE = 12; private static final long KEEP_ALIVE_TIME = 5L; private final static int QUEUE_SIZE = 1600; protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE)); public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { OrderInfo orderInfo = new OrderInfo(); //CompletableFuture 的List List<CompletableFuture> futures = new ArrayList<>(); futures.add(CompletableFuture.runAsync(() -> { System.out.println("当前任务Customer,线程名字为:" + Thread.currentThread().getName()); orderInfo.setCustomerInfo(new CustomerInfo()); }, THREAD_POOL)); futures.add(CompletableFuture.runAsync(() -> { System.out.println("当前任务Discount,线程名字为:" + Thread.currentThread().getName()); orderInfo.setDiscountInfo(new DiscountInfo()); }, THREAD_POOL)); futures.add( CompletableFuture.runAsync(() -> { System.out.println("当前任务Food,线程名字为:" + Thread.currentThread().getName()); orderInfo.setFoodListInfo(new FoodListInfo()); }, THREAD_POOL)); futures.add(CompletableFuture.runAsync(() -> { System.out.println("当前任务Other,线程名字为:" + Thread.currentThread().getName()); orderInfo.setOtherInfo(new OtherInfo()); }, THREAD_POOL)); CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); allDoneFuture.get(10, TimeUnit.SECONDS); System.out.println(orderInfo); } }
能够看见咱们使用 CompletableFuture 能很快的完成需求,固然这还不够。
咱们上面用 CompletableFuture 完成了对多组任务并行执行,可是它依然是依赖咱们的线程池。
在咱们的线程池中使用的是阻塞队列,也就是当咱们某个线程执行完任务的时候须要经过这个阻塞队列进行,那么确定会发生竞争,因此在 JDK 1.7 中提供了 ForkJoinTask 和 ForkJoinPool。
ForkJoinPool 中每一个线程都有本身的工做队列,而且采用 Work-Steal 算法防止线程饥饿。
Worker 线程用 LIFO 的方法取出任务,可是会用 FIFO 的方法去偷取别人队列的任务,这样就减小了锁的冲突。
网上这个框架的例子不少,咱们看看如何使用代码完成咱们上面的下订单需求:
public class OrderTask extends RecursiveTask<OrderInfo> { @Override protected OrderInfo compute() { System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName()); // 定义其余五种并行TasK CustomerTask customerTask = new CustomerTask(); TenantTask tenantTask = new TenantTask(); DiscountTask discountTask = new DiscountTask(); FoodTask foodTask = new FoodTask(); OtherTask otherTask = new OtherTask(); invokeAll(customerTask, tenantTask, discountTask, foodTask, otherTask); OrderInfo orderInfo = new OrderInfo(customerTask.join(), tenantTask.join(), discountTask.join(), foodTask.join(), otherTask.join()); return orderInfo; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() -1 ); System.out.println(forkJoinPool.invoke(new OrderTask())); } } class CustomerTask extends RecursiveTask<CustomerInfo>{ @Override protected CustomerInfo compute() { System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName()); return new CustomerInfo(); } } class TenantTask extends RecursiveTask<TenantInfo>{ @Override protected TenantInfo compute() { System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName()); return new TenantInfo(); } } class DiscountTask extends RecursiveTask<DiscountInfo>{ @Override protected DiscountInfo compute() { System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName()); return new DiscountInfo(); } } class FoodTask extends RecursiveTask<FoodListInfo>{ @Override protected FoodListInfo compute() { System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName()); return new FoodListInfo(); } } class OtherTask extends RecursiveTask<OtherInfo>{ @Override protected OtherInfo compute() { System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName()); return new OtherInfo(); } }
咱们定义一个 Order Task 而且定义五个获取信息的任务,在 Compute 中分别 Fork 执行这五个任务,最后在将这五个任务的结果经过 Join 得到,最后完成咱们的并行化的需求。
在 JDK 1.8 中提供了并行流的 API,当咱们使用集合的时候能很好的进行并行处理。
下面举了一个简单的例子从 1 加到 100:
public class ParallelStream { public static void main(String[] args) { ArrayList<Integer> list = new ArrayList<Integer>(); for (int i = 1; i <= 100; i++) { list.add(i); } LongAdder sum = new LongAdder(); list.parallelStream().forEach(integer -> { // System.out.println("当前线程" + Thread.currentThread().getName()); sum.add(integer); }); System.out.println(sum); } }
parallelStream 中底层使用的那一套也是 Fork/Join 的那一套,默认的并发程度是可用 CPU 数 -1。
在此我向你们推荐一个架构学习交流群。交流学习群号:821169538 里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多。能够想象有这么一个需求,天天定时对 ID 在某个范围之间的用户发券,好比这个范围之间的用户有几百万,若是给一台机器发的话,可能所有发完须要好久的时间。
因此分布式调度框架好比:elastic-job 都提供了分片的功能,好比你用 50 台机器,那么 id%50 = 0 的在第 0 台机器上;=1 的在第 1 台机器上发券,那么咱们的执行时间其实就分摊到了不一样的机器上了。
线程安全:在 parallelStream 中咱们列举的代码中使用的是 LongAdder,并无直接使用咱们的 Integer 和 Long,这个是由于在多线程环境下 Integer 和 Long 线程不安全。因此线程安全咱们须要特别注意。
合理参数配置:能够看见咱们须要配置的参数比较多,好比咱们的线程池的大小,等待队列大小,并行度大小以及咱们的等待超时时间等等。
咱们都须要根据本身的业务不断的调优防止出现队列不够用或者超时时间不合理等等。
上面介绍了什么是并行化,并行化的各类历史,在 Java 中如何实现并行化,以及并行化的注意事项。但愿你们对并行化有个比较全面的认识。