RPC-非阻塞通讯下的同步API实现原理,以Dubbo为例

    Netty在Java NIO领域基本算是独占鳌头,涉及到高性能网络通讯,基本都会以Netty为底层通讯框架,Dubbo 也不例外。如下将以Dubbo实现为例介绍其是如何在NIO非阻塞通讯基础上实现同步通讯的。java

    Dubbo为一种RPC通讯框架,提供进程间的通讯,在使用dubbo协议+Netty做为传输层时,提供三种API调用方式:react

  1. 同步接口
  2. 异步带回调接口
  3. 异步不带回调接口

    同步接口适用在大部分环境,通讯方式简单、可靠,客户端发起调用,等待服务端处理,调用结果同步返回。这种方式下,在高吞吐、高性能(响应时间很快)的服务接口场景中最为适用,能够减小异步带来的额外的消耗,也方便客户端作一致性保证。网络

 

    异步带回调接口,用在任务处理时间较长,客户端应用线程不肯阻塞等待,而是为了提升自身处理能力但愿服务端处理完成后能够异步通知应用线程。这种方式能够大大提高客户端的吞吐量,避免由于服务端的耗时问题拖死客户端。框架

    异步不带回调接口,一些场景为了进一步提高客户端的吞吐能力,只需发起一次服务端调用,不需关系调用结果,可使用此种通讯方式。通常在不须要严格保证数据一致性或者有其余补偿措施的状况下,选用这种,能够最小化远程调用带来的性能损耗。异步

    

    来看一下Dubbo是如何实现这三种API的。核心代码在com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker,以下图对应的位置,属于协议层的实现部分。为方便你们能够准肯定位代码所在位置,使用截图的方式,而不是直接贴代码了。性能

    上文描述的是三种API方式,Dubbo里面经过参数isOneway、isAsync来控制,isOneway=true表示异步不带回调,isAsync=true表示异步带回调,不然是同步API。具体是如何控制,看如下代码:线程

    isOneway==true时,客户端send完请求后,直接return一个空结果的RpcResult;isAsync==true时,客户端发起请求,设置一个ResponseFuture,直接return一个空结果的RpcResult,接下来当服务端处理完成,客户端Netty层在收到响应后会经过Future通知应用线程;最后是同步状况下,客户端发起请求,并经过get()方法阻塞等待服务端的响应结果。日志

    异步API状况下,结合NIO模型比较好理解是如何实现的(固然须要先了解NIO的reactor模型),接下来重点理解下,这个get()阻塞方法是如何作到基于非阻塞NIO实现同步阻塞效果。code

    直接进入get()方法内部。对象

    能够看到是利用Java的锁机制实现,循环判断是否收到响应,若是收到或者等待超时则返回。done的实例对象以下:

private final Lock                            lock = new ReentrantLock();
private final Condition                       done = lock.newCondition();

    使用可重入锁ReentrantLock,获取一个Condition对象在其上作await操做。这里有await操做,什么时候被唤醒呢,有两个条件,第一个是等待timeout超时,默认dubbo是1s,第二个就是被其余线程唤醒,即收到了服务端的响应。

    signal信号一发出,上文循环检测内的await操做会当即返回,下一次isDone判断会变成true,直接跳出循环。

    仔细看代码会发现,被唤醒的地方还有一个是在DefaultFuture内部有一个超时轮询检测的线程,这个线程主要是处理响应超时后触发资源回收、记录异常日志等操做。    

private static class RemotingInvocationTimeoutScan implements Runnable {

        public void run() {
            while (true) {
                try {
                    for (DefaultFuture future : FUTURES.values()) {
                        if (future == null || future.isDone()) {
                            continue;
                        }
                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                            // create exception response.
                            Response timeoutResponse = new Response(future.getId());
                            // set timeout status.
                            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
                            // handle response.
                            DefaultFuture.received(future.getChannel(), timeoutResponse);
                        }
                    }
                    Thread.sleep(30);
                } catch (Throwable e) {
                    logger.error("Exception when scan the timeout invocation of remoting.", e);
                }
            }
        }
    }

    static {
        Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
        th.setDaemon(true);
        th.start();
    }

    可能会有疑问,这个触发操做为什么不直接在get()方法内部检测到超时直接调用DefaultFuture.received(Channel channel, Response response)来清理,而是要额外开启一个后台线程。

    单独启动一个超时线程有两个好处:

  1.  提升超时精度

    get()方法内部的轮询有一个timeout,每次超时唤醒的时间间隔至少是timeout时长,最差的状况可能会等待2*timeout做出超时反应。在超时轮询线程中,每隔30ms遍历检测一次,能够很大程度的提高超时精度。

    2.  提高性能,下降响应时间

    剥离超时处理逻辑到一个单独线程,能够减小对业务线程的时间占用,这个超时后的处理对应用来讲并没有直接做用,彻底能够放到后台异步去处理。另外单独在一个线程中,实际上有批量处理的表现。

    以上是就NIO通讯基础上实现三种API调用的实现原理,或许有更多优于Dubbo的处理方式,能够拿出来讨论。

相关文章
相关标签/搜索