今天来聊聊如何让项目异步化的一些事。java
同步和异步,阻塞和非阻塞, 这个几个词已是老生常谈,可是经常仍是有不少同窗分不清楚,觉得同步确定就是阻塞,异步确定就是非阻塞,其实他们不是一回事。git
同步和异步关注的是结果消息的通讯机制程序员
阻塞和非阻塞主要关注的是等待结果返回调用方的状态github
能够看见同步和异步,阻塞和非阻塞主要关注的点不一样,有人会问同步还能非阻塞,异步还能阻塞?固然是能够的,下面为了更好的说明他们的组合之间的意思,用几个简单的例子说明: 1.同步阻塞:同步阻塞基本也是编程中最多见的模型,打个比方你去商店买衣服,你去了以后发现衣服卖完了,那你就在店里面一直等,期间不作任何事(包括看手机),等着商家进货,直到有货为止,这个效率很低。web
2.同步非阻塞:同步非阻塞在编程中能够抽象为一个轮询模式,你去了商店以后,发现衣服卖完了,这个时候不须要傻傻的等着,你能够去其余地方好比奶茶店,买杯水,可是你仍是须要时不时的去商店问老板新衣服到了吗。算法
3.异步阻塞:异步阻塞这个编程里面用的较少,有点相似你写了个线程池,submit而后立刻future.get(),这样线程其实仍是挂起的。有点像你去商店买衣服,这个时候发现衣服没有了,这个时候你就给老板留给电话,说衣服到了就给我打电话,而后你就守着这个电话,一直等着他响什么事也不作。这样感受的确有点傻,因此这个模式用得比较少。spring
4.异步非阻塞:异步非阻塞这也是如今高并发编程的一个核心,也是今天主要讲的一个核心。比如你去商店买衣服,衣服没了,你只须要给老板说这是个人电话,衣服到了就打。而后你就为所欲为的去玩,也不用操心衣服何时到,衣服一到,电话一响就能够去买衣服了。数据库
上面已经看到了同步阻塞的效率是多么的低,若是使用同步阻塞的方式去买衣服,你有可能一天只能买一件衣服,其余什么事都不能干,若是用异步非阻塞的方式去买,买衣服只是你一天中进行的一个小事。apache
咱们把这个映射到咱们代码中,当咱们的线程发生一次rpc调用或者http调用,又或者其余的一些耗时的IO调用,发起以后,若是是同步阻塞,咱们的这个线程就会被阻塞挂起,直到结果返回,试想一下若是IO调用很频繁那咱们的CPU使用率实际上是很低很低。正所谓是物尽其用,既然CPU的使用率被IO调用搞得很低,那咱们就可使用异步非阻塞,当发生IO调用时我并不立刻关心结果,我只须要把回调函数写入此次IO调用,我这个时候线程能够继续处理新的请求,当IO调用结束结束时,会调用回调函数。而咱们的线程始终处于忙碌之中,这样就能作更多的有意义的事了。编程
这里首先要说明的是,异步化不是万能,异步化并不能缩短你整个链路调用时间长的问题,可是他能极大的提高你的最大qps。通常咱们的业务中有两处比较耗时:
上面说了异步化是用于解决IO阻塞的问题,而咱们通常项目中可使用异步化以下:
下面我会从上面几个方面进行异步化的介绍.
对于Java开发程序员来讲servlet并不陌生吧,在项目中不论你使用struts2,仍是使用的springmvc,本质上都是封装的servlet。可是咱们的通常的开发,其实都是使用的同步阻塞模式以下:
上面的模式优势在于编码简单,适合在项目启动初期,访问量较少,或者是CPU运算较多的项目
缺点在于,业务逻辑线程和servlet容器线程是同一个,通常的业务逻辑总得发生点IO,好比查询数据库,好比产生RPC调用,这个时候就会发生阻塞,而咱们的servlet容器线程确定是有限的,当servlet容器线程都被阻塞的时候咱们的服务这个时候就会发生拒绝访问,线程否则我固然们能够经过增长机器的一系列手段来解决这个问题,可是俗话说得好靠人不如靠本身,靠别人替我分担请求,还不如我本身搞定。因此在servlet3.0以后支持了异步化,咱们采用异步化以后就会变成以下:
在这里咱们采用新的线程处理业务逻辑,IO调用的阻塞就不会影响咱们的serlvet了,实现异步serlvet的代码也比较简单,以下:
@WebServlet(name = "WorkServlet",urlPatterns = "/work",asyncSupported =true) public class WorkServlet extends HttpServlet{ private static final long serialVersionUID = 1L; @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { this.doPost(req, resp); } @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { //设置ContentType,关闭缓存 resp.setContentType("text/plain;charset=UTF-8"); resp.setHeader("Cache-Control","private"); resp.setHeader("Pragma","no-cache"); final PrintWriter writer= resp.getWriter(); writer.println("老师检查做业了"); writer.flush(); List<String> zuoyes=new ArrayList<String>(); for (int i = 0; i < 10; i++) { zuoyes.add("zuoye"+i);; } //开启异步请求 final AsyncContext ac=req.startAsync(); doZuoye(ac, zuoyes); writer.println("老师布置做业"); writer.flush(); } private void doZuoye(final AsyncContext ac, final List<String> zuoyes) { ac.setTimeout(1*60*60*1000L); ac.start(new Runnable() { @Override public void run() { //经过response得到字符输出流 try { PrintWriter writer=ac.getResponse().getWriter(); for (String zuoye:zuoyes) { writer.println("\""+zuoye+"\"请求处理中"); Thread.sleep(1*1000L); writer.flush(); } ac.complete(); } catch (Exception e) { e.printStackTrace(); } } }); } }
实现serlvet的关键在于http采起了长链接,也就是当请求打过来的时候就算有返回也不会关闭,由于可能还会有数据,直到返回关闭指令。 AsyncContext ac=req.startAsync(); 用于获取异步上下文,后续咱们经过这个异步上下文进行回调返回数据,有点像咱们买衣服的时候,给老板一个电话,而这个上下文也是一个电话,当有衣服到的时候,也就是当有数据准备好的时候就能够打电话发送数据了。 ac.complete(); 用来进行长连接的关闭。
如今其实不多人来进行serlvet编程,都是直接采用现成的一些框架,好比struts2,springmvc。下面介绍下使用springmvc如何进行异步化:
<dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>4.2.3.RELEASE</version> </dependency>
<?xml version="1.0" encoding="UTF-8"?> <web-app version="3.0" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"> <filter> <filter-name>testFilter</filter-name> <filter-class>com.TestFilter</filter-class> <async-supported>true</async-supported> </filter> <servlet> <servlet-name>mvc-dispatcher</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> ......... <async-supported>true</async-supported> </servlet>
@RequestMapping(value="/asynctask", method = RequestMethod.GET) public DeferredResult<String> asyncTask() throws IOReactorException { IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build(); ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor); conManager.setMaxTotal(100); conManager.setDefaultMaxPerRoute(100); CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build(); // Start the client httpclient.start(); //设置超时时间200ms final DeferredResult<String> deferredResult = new DeferredResult<String>(200L); deferredResult.onTimeout(new Runnable() { @Override public void run() { System.out.println("异步调用执行超时!thread id is : " + Thread.currentThread().getId()); deferredResult.setResult("超时了"); } }); System.out.println("/asynctask 调用!thread id is : " + Thread.currentThread().getId()); final HttpGet request2 = new HttpGet("http://www.apache.org/"); httpclient.execute(request2, new FutureCallback<HttpResponse>() { public void completed(final HttpResponse response2) { System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine()); deferredResult.setResult(request2.getRequestLine() + "->" + response2.getStatusLine()); } public void failed(final Exception ex) { System.out.println(request2.getRequestLine() + "->" + ex); } public void cancelled() { System.out.println(request2.getRequestLine() + " cancelled"); } }); return deferredResult; }
注意: 在serlvet异步化中有个问题是filter的后置结果处理,无法使用,对于咱们一些打点,结果统计直接使用serlvet异步是无法用的。在springmvc中就很好的解决了这个问题,springmvc采用了一个比较取巧的方式经过请求转发,能让请求再次过滤器。可是又引入了新的一个问题那就是过滤器会处理两次,这里能够经过SpringMVC源码中自身判断的方法,咱们能够在filter中使用下面这句话来进行判断是否是属于springmvc转发过来的请求,从而不处理filter的前置事件,只处理后置事件:
Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE); return asyncManagerAttr instanceof WebAsyncManager ;
上面咱们介绍了serlvet的异步化,相信细心的同窗都看出来彷佛并无解决根本的问题,个人IO阻塞依然存在,只是换了个位置而已,当IO调用频繁一样会让业务线程池快速变满,虽然serlvet容器线程不被阻塞,可是这个业务依然会变得不可用。
那么怎么才能解决上面的问题呢?答案就是全链路异步化,全链路异步追求的是没有阻塞,打满你的CPU,把机器的性能压榨到极致模型图以下:
具体的NIO client到底作了什么事呢,具体以下面模型:
上面就是咱们全链路异步的图了(部分线程池能够优化)。全链路的核心在于只要咱们遇到IO调用的时候,咱们就可使用NIO,从而避免阻塞,也就解决了以前说的业务线程池被打满获得尴尬场景。
咱们通常远程调用使用rpc或者http。对于rpc来讲通常thrift,http,motan等支持都异步调用,其内部原理也都是采用事件驱动的NIO模型,对于http来讲通常的apachehttpclient和okhttp也都提供了异步调用。 下面简单介绍下Http异步化调用是怎么作的: 首先来看一个例子:
public class HTTPAsyncClientDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, IOReactorException { //具体参数含义下文会讲 //apache提供了ioReactor的参数配置,这里咱们配置IO 线程为1 IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build(); //根据这个配置建立一个ioReactor ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); //asyncHttpClient使用PoolingNHttpClientConnectionManager管理咱们客户端链接 PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor); //设置总共的链接的最大数量 conManager.setMaxTotal(100); //设置每一个路由的链接的最大数量 conManager.setDefaultMaxPerRoute(100); //建立一个Client CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build(); // Start the client httpclient.start(); // Execute request final HttpGet request1 = new HttpGet("http://www.apache.org/"); Future<HttpResponse> future = httpclient.execute(request1, null); // and wait until a response is received HttpResponse response1 = future.get(); System.out.println(request1.getRequestLine() + "->" + response1.getStatusLine()); // One most likely would want to use a callback for operation result final HttpGet request2 = new HttpGet("http://www.apache.org/"); httpclient.execute(request2, new FutureCallback<HttpResponse>() { //Complete成功后会回调这个方法 public void completed(final HttpResponse response2) { System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine()); } public void failed(final Exception ex) { System.out.println(request2.getRequestLine() + "->" + ex); } public void cancelled() { System.out.println(request2.getRequestLine() + " cancelled"); } }); } }
下面给出httpAsync的整个类图:
对于咱们的HTTPAysncClient 其实最后使用的是InternalHttpAsyncClient,在InternalHttpAsyncClient中有个ConnectionManager,这个就是咱们管理链接的管理器,而在httpAsync中只有一个实现那就是PoolingNHttpClientConnectionManager,这个链接管理器中有两个咱们比较关心的一个是Reactor,一个是Cpool。
Reactor :全部的Reactor这里都是实现了IOReactor接口。在PoolingNHttpClientConnectionManager中会有拥有一个Reactor,那就是DefaultConnectingIOReactor,这个DefaultConnectingIOReactor,负责处理Acceptor。在DefaultConnectingIOReactor有个excutor方法,生成IOReactor也就是咱们图中的BaseIOReactor,进行IO的操做。这个模型就是咱们上面的1.2.2的模型
CPool :在PoolingNHttpClientConnectionManager中有个CPool,主要是负责控制咱们链接,咱们上面所说的maxTotal和defaultMaxPerRoute,都是由其进行控制,若是每一个路由的满了他会断开最老的一个连接,若是总共的total满了他会放入leased队列,释放空间的时候就会将其从新链接。
对于数据库调用通常的框架并无提供异步化的方法,这里推荐本身封装或者使用网上开源的,这里咱们公司有个开源的 https://github.com/ainilife/zebra-dao/blob/master/README_ZH.md 能很好的支持异步化
异步化并非高并发的银弹,可是有了异步化的确能提升你机器的qps,吞吐量等等。上述讲的一些模型若是能合理的作一些优化,而后进行应用,相信能对你的服务有很大的帮助的。
最后这篇文章被我收录于JGrowing,一个全面,优秀,由社区一块儿共建的Java学习路线,若是您想参与开源项目的维护,能够一块儿共建,github地址为:https://github.com/javagrowing/JGrowing 麻烦给个小星星哟。