Java异步非阻塞编程的几种方式

简介: Java异步非阻塞编程的几种方式

1、 从一个同步的Http调用提及

一个很简单的业务逻辑,其余后端服务提供了一个接口,咱们须要经过接口调用,获取到响应的数据。java

逆地理接口:经过经纬度获取这个经纬度所在的省市区县以及响应的code:git

curl-i"http://xxx?latitude=31.08966221524924&channel=amap7a&near=false&longitude=105.13990312814713"
{"adcode":"510722"}

服务端执行,最简单的同步调用方式:apache

服务端响应以前,IO会阻塞在:
java.net.SocketInputStream#socketRead0 的native方法上:编程

经过jstack日志,能够发现,此时这个Thread会一直在runable的状态:后端

"main"#1 prio=5 os_prio=31 tid=0x00007fed0c810000 nid=0x1003 runnable [0x000070000ce14000]   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at org.apache.http.impl.conn.LoggingInputStream.read(LoggingInputStream.java:84)
        at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
        at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
        at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:282)
        at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
        at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
        at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)
        at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)
        at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:165)
        at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
        at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
        at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
        at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
        at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
        at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
        at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
        at com.amap.aos.async.AsyncIO.blockingIO(AsyncIO.java:207)
                .......

线程模型示例:app

同步最大的问题是在IO等待的过程当中,线程资源没有获得充分的利用,对于大量IO场景的业务吞吐量会有必定限制。curl

二 、JDK NIO & Future

在JDK 1.5 中,JUC提供了Future抽象:异步

固然并非全部的Future都是这样实现的,如
io.netty.util.concurrent.AbstractFuture 就是经过线程轮询去。socket

这样作的好处是,主线程能够不用等待IO响应,能够去作点其余的,好比说再发送一个IO请求,能够等到一块儿返回:async

"main"#1 prio=5 os_prio=31 tid=0x00007fd7a500b000 nid=0xe03 waiting on condition [0x000070000a95d000]   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x000000076ee2d768> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
        at org.asynchttpclient.netty.NettyResponseFuture.get(NettyResponseFuture.java:162)
        at com.amap.aos.async.AsyncIO.futureBlockingGet(AsyncIO.java:201)
        .....
"AsyncHttpClient-2-1"#11 prio=5 os_prio=31 tid=0x00007fd7a7247800 nid=0x340b runnable [0x000070000ba94000]   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
        at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
        at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x000000076eb00ef0> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x000000076eb00f10> (a java.util.Collections$UnmodifiableSet)
- locked <0x000000076eb00ea0> (a sun.nio.ch.KQueueSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:693)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:748)

主线程在等待结果返回过程当中依然须要等待,没有根本解决此问题。

三 、使用Callback回调方式

第二节中,依然须要主线程等待,获取结果,那么可不能够在主线程完成发送请求后,不再用关心这个逻辑,去执行其余的逻辑?那就可使用Callback机制。

如此一来,主线程不再须要关心发起IO后的业务逻辑,发送完请求后,就能够完全去干其余事情,或者回到线程池中再供调度。若是是HttpServer,那么须要结合Servlet 3.1的异步Servlet。

使用Callback方式,从线程模型中看,发现线程资源已经获得了比较充分的利用,整个过程当中已经没有线程阻塞。

4、 Callback hell

回调地狱,当Callback的线程还须要执行下一个IO调用的时候,这个时候进入回调地狱模式。

典型的应用场景如,经过经纬度获取行政区域adcode(逆地理接口),而后再根据得到的adcode,获取当地的天气信息(天气接口)。

在同步的编程模型中,几乎不会涉及到此类问题。

Callback方式的核心缺陷

5、 JDK 1.8 CompletableFuture

那么有没有办法解决Callback Hell的问题?固然有,JDK 1.8中提供了CompletableFuture,先看看它是怎么解决这个问题的。

将逆地理的Callback逻辑,封装成一个独立的CompletableFuture,当异步线程回调时,调用 future.complete(T) ,将结果封装。

将天气执行的Call逻辑,也封装成为一个独立的CompletableFuture ,完成以后,逻辑同上。

compose衔接,whenComplete输出:

每个IO操做,都可以封装为独立的CompletableFuture,从而避免回调地狱。

CompletableFuture,只有两个属性:

  • result:Future的执行结果 (Either the result or boxed AltResult)。
  • stack:操做栈,用于定义这个Future接下来操做的行为 (Top of Treiber stack of dependent actions)。

weatherFuture这个方法是如何被调用的呢?

经过堆栈能够发现,是在
reverseCodeFuture.complete(result) 的时候,而且也将得到的adcode做为参数执行接下来的逻辑。

这样一来,就完美解决回调地狱问题,在主的逻辑中,看起来像是在同步的进行编码。

6、 Vert.x Future

Info-Service中,大量使用的 Vert.x Future 也是相似的解决的方案,不过设计上使用Handler的概念。

核心执行的逻辑差很少:

这固然不是Vertx的所有,固然这是题外话了。

七 、Reactive Streams

异步编程对吞吐量以及资源有好处,可是有没有统一的抽象去解决此类问题内,答案是 Reactive Streams。

核心抽象:Publisher Subscriber Processor Subscription ,整个包里面,只有这四个接口,没有实现类。

在JDK 9里面,已经被做为一种规范封装到 java.util.concurrent.Flow :

一个简单的例子:

8、 Reactor & Spring 5 & Spring WebFlux

Flux & Mono

做者:开发者小助手_LS
原文连接本文为阿里云原创内容,未经容许不得转载

相关文章
相关标签/搜索