dubbo超时重试和异常处理

dubbo超时重试和异常处理

dubbo超时重试和异常处理html

 

参考:前端

https://www.cnblogs.com/ASPNET2008/p/7292472.htmljava

https://www.tuicool.com/articles/YfA3Ubweb

https://www.cnblogs.com/binyue/p/5380322.html数据库

https://blog.csdn.net/mj158518/article/details/51228649express

 

 

dubbo源码分析:超时原理以及应用场景

本篇主要记录dubbo中关于超时的常见问题,实现原理,解决的问题以及如何在服务降级中体现做用等。apache

超时问题

为了检查对dubbo超时的理解,尝试回答以下几个问题,若是回答不上来或者不肯定那么说明此处须要再多研究研究。bootstrap

我只是针对我的的理解提问题,并不表明我理解的就是全面深刻的,但个人问题若是也回答不了,那至少说明理解的确是不够细的。后端

  • 超时是针对消费端仍是服务端?
  • 超时在哪设置?
  • 超时设置的优先级是什么?
  • 超时的实现原理是什么?
  • 超时解决的是什么问题?

问题解答

RPC场景

本文全部问题均如下图作为业务场景,一个web api作为前端请求,product service是产品服务,其中调用comment service(评论服务)获取产品相关评论,comment service从持久层中加载数据。api

超时是针对消费端仍是服务端?

  • 若是是争对消费端,那么当消费端发起一次请求后,若是在规定时间内未获得服务端的响应则直接返回超时异常,但服务端的代码依然在执行。

  • 若是是争取服务端,那么当消费端发起一次请求后,一直等待服务端的响应,服务端在方法执行到指定时间后若是未执行完,此时返回一个超时异常给到消费端。

dubbo的超时是争对客户端的,因为是一种NIO模式,消费端发起请求后获得一个ResponseFuture,而后消费端一直轮询这个ResponseFuture直至超时或者收到服务端的返回结果。虽然超时了,但仅仅是消费端再也不等待服务端的反馈并不表明此时服务端也中止了执行。

按上图的业务场景,看看生成的日志:

product service:报超时错误,由于comment service 加载数据须要5S,但接口只等1S 。

Caused by: com.alibaba.dubbo.remoting.TimeoutException: Waiting server-side response timeout. start time: 2017-08-05 18:14:52.751, end time: 2017-08-05 18:14:53.764, client elapsed: 6 ms, server elapsed: 1006 ms, timeout: 1000 ms, request: Request [id=0, version=2.0.0, twoway=true, event=false, broken=false, data=RpcInvocation [methodName=getCommentsByProductId, parameterTypes=[class java.lang.Long], arguments=[1], attachments={traceId=6299543007105572864, spanId=6299543007105572864, input=259, path=com.jim.framework.dubbo.core.service.CommentService, interface=com.jim.framework.dubbo.core.service.CommentService, version=0.0.0}]], channel: /192.168.10.222:53204 -> /192.168.10.222:7777
    at com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.get(DefaultFuture.java:107) ~[dubbo-2.5.3.jar:2.5.3]
    at com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.get(DefaultFuture.java:84) ~[dubbo-2.5.3.jar:2.5.3]
    at com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker.doInvoke(DubboInvoker.java:96) ~[dubbo-2.5.3.jar:2.5.3]
    ... 42 common frames omitted

comment service : 并无异常,而是慢慢悠悠的执行本身的逻辑:

2017-08-05 18:14:52.760  INFO 846 --- [2:7777-thread-5] c.j.f.d.p.service.CommentServiceImpl     : getComments start:Sat Aug 05 18:14:52 CST 2017
2017-08-05 18:14:57.760  INFO 846 --- [2:7777-thread-5] c.j.f.d.p.service.CommentServiceImpl     : getComments end:Sat Aug 05 18:14:57 CST 2017

从日志来看,超时影响的是消费端,与服务端没有直接关系。

超时在哪设置?

消费端

  • 全局控制
<dubbo:consumer timeout="1000"></dubbo:consumer>
  • 接口控制
  • 方法控制

服务端

  • 全局控制
<dubbo:provider timeout="1000"></dubbo:provider>
  • 接口控制
  • 方法控制

能够看到dubbo针对超时作了比较精细化的支持,不管是消费端仍是服务端,不管是接口级别仍是方法级别都有支持。

超时设置的优先级是什么?

上面有提到dubbo支持多种场景下设置超时时间,也说过超时是针对消费端的。那么既然超时是针对消费端,为何服务端也能够设置超时呢?

这实际上是一种策略,其实服务端的超时配置是消费端的缺省配置,即若是服务端设置了超时,任务消费端能够不设置超时时间,简化了配置。

另外针对控制的粒度,dubbo支持了接口级别也支持方法级别,能够根据不一样的实际状况精确控制每一个方法的超时时间。因此最终的优先顺序为:客户端方法级>服务端方法级>客户端接口级>服务端接口级>客户端全局>服务端全局

超时的实现原理是什么?

以前有简单提到过, dubbo默认采用了netty作为网络组件,它属于一种NIO的模式。消费端发起远程请求后,线程不会阻塞等待服务端的返回,而是立刻获得一个ResponseFuture,消费端经过不断的轮询机制判断结果是否有返回。由于是经过轮询,轮询有个须要特别注要的就是避免死循环,因此为了解决这个问题就引入了超时机制,只在必定时间范围内作轮询,若是超时时间就返回超时异常。

源码

ResponseFuture接口定义

public interface ResponseFuture {

    /**
     * get result.
     * 
     * @return result.
     */
    Object get() throws RemotingException;

    /**
     * get result with the specified timeout.
     * 
     * @param timeoutInMillis timeout.
     * @return result.
     */
    Object get(int timeoutInMillis) throws RemotingException;

    /**
     * set callback.
     * 
     * @param callback
     */
    void setCallback(ResponseCallback callback);

    /**
     * check is done.
     * 
     * @return done or not.
     */
    boolean isDone();

}

ReponseFuture的实现类:DefaultFuture

只看它的get方法,能够清楚看到轮询的机制。

public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (! isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (! isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (! isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

超时解决的是什么问题?

设置超时主要是解决什么问题?若是没有超时机制会怎么样?

回答上面的问题,首先要了解dubbo这类rpc产品的线程模型。下图是我以前我的RPC学习产品的示例图,与dubbo的线程模型大体是相同的,有兴趣的可参考个人笔记:简单RPC框架-业务线程池

咱们从dubbo的源码看下这下线程模型是怎么用的:

netty boss

主要是负责socket链接之类的工做。

netty wokers

将一个请求分给后端的某个handle去处理,好比心跳handle ,执行业务请求的 handle等。

Netty Server中能够看到上述两个线程池是如何初始化的:

首选是open方法,能够看到一个boss一个worker线程池。

protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);
        // ......
}

再看ChannelFactory的构造函数:

public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor, int workerCount) {
        this(bossExecutor, 1, workerExecutor, workerCount);
    }

能够看出,boss线程池的大小为1,worker线程池的大小也是能够配置的,默认大小是当前系统的核心数+1,也称为IO线程。

busines(业务线程池)

为何会有业务线程池,这里很少解释,能够参考我上面的文章。

缺省是采用固定大小的线程池,dubbo提供了三种不一样类型的线程池供用户选择。咱们看看这个类:AllChannelHandler,它是其中一种handle,处理全部请求,它的一个做用就是调用业务线程池去执行业务代码,其中有获取线程池的方法:

private ExecutorService getExecutorService() {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) { 
            cexecutor = SHARED_EXECUTOR;
        }
        return cexecutor;
    }

上面代码中的变量executor来自于AllChannelHandler的父类WrappedChannelHandler,看下它的构造函数:

public WrappedChannelHandler(ChannelHandler handler, URL url) {
       //......
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        //......
}

获取线程池来自于SPI技术,从代码中能够看出线程池的缺省配置就是上面提到的固定大小线程池。

@SPI("fixed")
public interface ThreadPool {
    
    /**
     * 线程池
     * 
     * @param url 线程参数
     * @return 线程池
     */
    @Adaptive({Constants.THREADPOOL_KEY})
    Executor getExecutor(URL url);

}

最后看下是如何将请求丢给线程池去执行的,在AllChannelHandler中有这样的方法:

public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

典型问题:拒绝服务

若是上面提到的dubbo线程池模型理解了,那么也就容易理解一个问题,当前端大量请求并发出现时,颇有能够将业务线程池中的线程消费完,由于默认缺省的线程池是固定大小(我如今版本缺省线程池大小为200),此时会出现服务没法按预期响应的结果,固然因为是固定大小的线程池,当核心线程滿了后也有队列可排,但默认是不排队的,须要排队须要单独配置,咱们能够从线程池的具体实现中看:

public class FixedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 
                queues == 0 ? new SynchronousQueue<Runnable>() : 
                    (queues < 0 ? new LinkedBlockingQueue<Runnable>() 
                            : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

上面代码的结论是:

  • 默认线程池大小为200(不一样的dubbo版本可能此值不一样)
  • 默认线程池不排队,若是须要排队,须要指定队列的大小

当业务线程用完后,服务端会报以下的错误:

Caused by: java.util.concurrent.RejectedExecutionException: Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-192.168.10.222:9999, Pool Size: 1 (active: 1, core: 1, max: 1, largest: 1), Task: 8 (completed: 7), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://192.168.10.222:9999!
    at com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport.rejectedExecution(AbortPolicyWithReport.java:53) ~[dubbo-2.5.3.jar:2.5.3]
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_121]
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_121]
    at com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.caught(AllChannelHandler.java:65) ~[dubbo-2.5.3.jar:2.5.3]
    ... 17 common frames omitted

经过上面的分析,对调用的服务设置超时时间,是为了不由于某种缘由致使线程被长时间占用,最终出现线程池用完返回拒绝服务的异常。

超时与服务降级

按咱们文章以前的场景,web api 请求产品明细时调用product service,为了查询产品评论product service调用comment service。若是此时因为comment service异常,响应时间增大到10S(远大于上游服务设置的超时时间),会发生超时异常,进而致使整个获取产品明细的接口异常,这也就是日常说的强依赖。这类强依赖是超时不能解决的,解决方案通常是两种:

  • 调用comment service时作异常捕获,返回空值或者返回具体的错误码,消费端根据不一样的错误码作不一样的处理。
  • 调用coment service作服务降级,好比发生异常时返回一个mock的数据,dubbo默认支持mock。

只有经过作异常捕获或者服务降级才能确保某些不重要的依赖出问题时不影响主服务的稳定性。而超时就能够与服务降级结合起来,当消费端发生超时时自动触发服务降级, 这样即便咱们的评论服务一直慢,但不影响获取产品明细的主体功能,只不过会牺牲部分体验,用户看到的评论不是真实的,但评论相对是个边缘功能,相比看不到产品信息要轻的多,某种程度上是能够舍弃的。

 

 

 

Dubbo超时机制致使的雪崩链接

BUG做者: 许晓

Bug 标题: Dubbo超时机制致使的雪崩链接

Bug 影响: Dubbo 服务提供者出现没法获取 Dubbo 服务处理线程异常,后端 DB 爆出拿不到数据库链接池,致使前端响应时间异常飙高,系统处理能力降低,核心基础服务没法提供正常服务。

Bug 发现过程:

线 上,对于高并发的服务化接口应用,时常会出现Dubbo链接池爆满状况,一般,咱们理所应当的认为,这是客户端并发链接太高所致,一方面调整链接池大小, 一方面考虑去增长服务接口的机器,固然也会考虑去优化服务接口的应用。很天然的,当咱们在线上压测一个营销页面(为大促服务,具有高并发)时,咱们遇到了 这种状况。而经过不断的深刻研究,我发现了一个特别的状况。

场景描述:

alt

压力从Jmeter压至前端web应用marketingfront,场景是批量获取30个产品的信息。wsproductreadserver有一个批量接口,会循环从tair中获取产品信息,若缓存不存在,则命中db。

压测后有两个现象:

1) Dubbo的服务端爆出大量链接拿不到的异常,还伴随着没法获取数据库链接池的状况

alt

2) Dubbo Consumer端有大量的Dubbo超时和重试的异常,且重试3次后,均失败。

3) Dubbo Consumer端的最大并发时91个

alt

Dubbo Provider端的最大并发倒是600个,而服务端配置的dubbo最大线程数即为600。alt

这个时候,出于性能测试的警觉性,发现这两个并发数极为不妥。

按照正常的请求模式,DubboConsumer和DubboProvider展现出来的并发应该是一致的。此处为什么会出现服务端的并发数被放大6倍,甚至有可能不止6倍,由于服务端的dubbo链接数限制就是600。

此处开始发挥性能测试各类大胆猜测:

1)是不是由于服务端再dubboServerHandle处理请求时,开启了多线程,而这块儿的多线程会累计到Dubbo的链接上,dragoon采集的这个数据能够真实的反应目前应用活动的线程对系统的压力状况;

2)压测环境不纯洁?个人小伙伴们在偷偷和我一块儿压测?(这个被我生生排除了,性能测试基本环境仍是要保持独立性)

3)是不是由于超时所致?这里超时会重试3次,那么顺其天然的想,并发有可能最多会被放大到3倍,3*91=273<<600....仍是不止3倍?

有了猜测,就得当心求证!

首先经过和dubbo开发人员 【草谷】分析,Dubbo链接数爆满的缘由,猜测1被否决,Dubbo服务端链接池是计数DubboServerHandle个数的业务是否采用多线程无关。

经过在压测时,Dump provider端的线程数,也证实了这个。

alt

那么,可能仍是和超时有很大关系。

再观察wsproductreadserver接口的处理时间分布状况:

alt

从 RT 的分布来看 。基本上 78.5% 的响应时间是超过 1s 的。那么这个接口方法的 dubbo 超时时间是 500ms ,此时 dubbo 的重试机制会带来怎样的 雪崩效应 呢?

alt

若是按照上图,虽然客户端只有1个并发在作操做,可是因为服务端执行十分耗时,每一个请求的执行RT远远超过了超时时间500ms,此时服务端的最大并发会有多少呢?

和服务端处理的响应时间有特比特别大的关系。服务端处理时间变长,可是若是超时,客户端的阻塞时间却只有可怜的500ms,超过500ms,新一轮压力又将发起。

上图可直接看到的并发是8个,若是服务端RT再长些,那么并发可能还会再大些!

这也是为何从marketingfront consumer的dragoon监控来看,只有90个并发。可是到服务端,却致使dubbo链接池爆掉的直接缘由。

查看了wsproductreadserver的堆栈,600个dubboServerHandle大部分都在作数据库的读取和数据库链接获取以及tair的操做。

alt

因此,为何Dubbo服务端的链接池会爆掉?颇有可能就是由于你的服务接口,在高并发下的大部分RT分布已经超过了你的Dubbo设置的超时时间!这将直接致使Dubbo的重试机制会不断放大你的服务端请求并发。

所 以若是,你在线上曾经遇到过相似场景,您能够采起去除Dubbo的重试机器,而且合理的设置Dubbo的超时时间。目前国际站的服务中心,已经开始去除 Dubbo的重试机制。固然Dubbo的重试机制实际上是很是好的QOS保证,它的路由机制,是会帮你把超时的请求路由到其余机器上,而不是本机尝试,因此 dubbo的重试机器也能必定程度的保证服务的质量。可是请必定要综合线上的访问状况,给出综合的评估。

------------ 等等等,别着急,咱们彷佛又忽略了一些细节,元芳,你怎么看? ------------------------

咱们从新回顾刚才的业务流程架构,wsproductReadserver层有DB和tair两级存储。那么对于一样接口为何服务化的接口RT如此之差,按照前面提到的架构,包含tair缓存,怎么还会有数据库链接获取不到的状况?

接续深刻追踪,将问题暴露和开发讨论,他们拿出tair

能够看到,客户端提交批量查询30个产品的产品信息。在服务端,有一个缓存模块,缓存的key是产品的ID。当产品命中tair时,则直接返回,若不命中,那么回去db中取数,再放入缓存中。

这里能够发现一个潜在的性能问题:

客 户端提交30个产品的查询请求,而服务端,则经过for循环和tair交互,因此这个接口在一般状况下的性能估计也得超过60-100ms。若是不是30 个产品,而是50或者100,那么这个接口的性能将会衰减的很是厉害!(这纯属性能测试的yy,固然这个暂时还不是咱们本次关注的主要缘由)

那么如此的架构,请求打在db上的可能性是比较小的, 由缓存命中率来保证。从线上真实的监控数据来看,tair的命中率在70%,应该说还不错,为何在咱们的压测场景,DB的压力确是如此凶残,甚至致使db的链接池没法获取呢?

因此性能验证场景就呼之欲出了:

场景: 准备30个产品ID,保持不变,这样最多只会第一次会去访问DB,并将数据存入缓存,后面将会直接命中缓存,db就在后面喝喝茶好了!

alt

可是从测试结果来看,有两点能够观察到:

1)

2)

3)

因而开始检查这30个产品到底有哪几个没有存入缓存。

通 过开发Debug预发布环境代码,最终发现,这两个产品居然已经被用户移到垃圾箱了。而经过和李浩和跃波沟通SellerCoponList的业务来 看,DA推送过来的产品是存在被用户移除的可能性。于是,每次这两个数据的查询,因为数据库查询不到记录,tair也没有存储相关记录,致使这些查询都将 通过数据库。数据库压力缘由也找到了。

可是问题尚未结束,这彷佛只像是冰山表面,咱们但愿可以鸟瞰整个冰山!

细细品味这个问题的最终性能表象  这是一种变向击穿缓存的作法啊!也就是具有必定的通用性。若是接口始终传入数据库和缓存都不可能存在的数据,那么每次的访问都就落到db上,致使缓存变相击穿,这个现象颇有意思!

目前有一种解决方案,就是Null Object Pattern,将数据库不存在的记录也记录到缓存中,可是value为NULL,使得缓存能够有效的拦截。因为数据的超时时间是10min,因此若是数据有所改动,也能够接受。

我相信这只是一种方案,可能还会有其余方案,可是这种变向的缓存击穿却让我很兴奋。回过头来,若是让我本身去实现这样的缓存机制,数据库和缓存都不存在的 数据场景很容易被忽略,而且这个对于业务确实也不会有影响。在线上存在大量热点数据状况下,这样的机制,每每并不会暴露性能问题。巧合的是,特定的场景, 性能却会出现很大的误差,这考验的既是性能测试工程师的功力,也考验的是架构的功力!

Bug 解决办法:

其实这过程当中不只仅有一些方法论,也有一些是性能测试经验的功底,更重要的是产出了一些通用性的性能问题解决方案,以及部分参数和技术方案的设计对系统架构的影响。

1)对于核心的服务中心,去除dubbo超时重试机制,并从新评估设置超时时间。

2)对于存在tair或者其余中间件缓存产品,对NULL数据进行缓存,防止出现缓存的变相击穿问题

 

 

 

Dubbo超时和重连机制

dubbo启动时默认有重试机制和超时机制。
超时机制的规则是若是在必定的时间内,provider没有返回,则认为本次调用失败,
重试机制在出现调用失败时,会再次调用。若是在配置的调用次数内都失败,则认为这次请求异常,抛出异常。

若是出现超时,一般是业务处理太慢,可在服务提供方执行:jstack PID > jstack.log 分析线程都卡在哪一个方法调用上,这里就是慢的缘由。
若是不能调优性能,请将timeout设大。

某些业务场景下,若是不注意配置超时和重试,可能会引发一些异常。

1.超时设置

DUBBO消费端设置超时时间须要根据业务实际状况来设定,
若是设置的时间过短,一些复杂业务须要很长时间完成,致使在设定的超时时间内没法完成正常的业务处理。
这样消费端达到超时时间,那么dubbo会进行重试机制,不合理的重试在一些特殊的业务场景下可能会引起不少问题,须要合理设置接口超时时间。
好比发送邮件,可能就会发出多份重复邮件,执行注册请求时,就会插入多条重复的注册数据。

(1)合理配置超时和重连的思路

1.对于核心的服务中心,去除dubbo超时重试机制,并从新评估设置超时时间。
2.业务处理代码必须放在服务端,客户端只作参数验证和服务调用,不涉及业务流程处理

(2)Dubbo超时和重连配置示例

1
2
<!-- 服务调用超时设置为5秒,超时不重试--> 
< dubbo:service  interface="com.provider.service.DemoService" ref="demoService"  retries="0" timeout="5000"/>

2.重连机制

dubbo在调用服务不成功时,默认会重试2次。
Dubbo的路由机制,会把超时的请求路由到其余机器上,而不是本机尝试,因此 dubbo的重试机制也能必定程度的保证服务的质量。
可是若是不合理的配置重试次数,当失败时会进行重试屡次,这样在某个时间点出现性能问题,调用方再连续重复调用,
系统请求变为正常值的retries倍,系统压力会大增,容易引发服务雪崩,须要根据业务状况规划好如何进行异常处理,什么时候进行重试。

 

 

 

 

 

浅谈dubbo的ExceptionFilter异常处理

背景

咱们的项目使用了dubbo进行不一样系统之间的调用。
每一个项目都有一个全局的异常处理,对于业务异常,咱们会抛出自定义的业务异常(继承RuntimeException)。
全局的异常处理会根据不一样的异常类型进行不一样的处理。
最近咱们发现,某个系统调用dubbo请求,provider端(服务提供方)抛出了自定义的业务异常,但consumer端(服务消费方)拿到的并非自定义的业务异常。
这是为何呢?还须要从dubbo的ExceptionFilter提及。

ExceptionFilter

若是Dubbo的 provider端 抛出异常(Throwable),则会被 provider端 的ExceptionFilter拦截到,执行如下invoke方法:
[java]  view plain  copy
 
  1. /* 
  2.  * Copyright 1999-2011 Alibaba Group. 
  3.  *   
  4.  * Licensed under the Apache License, Version 2.0 (the "License"); 
  5.  * you may not use this file except in compliance with the License. 
  6.  * You may obtain a copy of the License at 
  7.  *   
  8.  *      http://www.apache.org/licenses/LICENSE-2.0 
  9.  *   
  10.  * Unless required by applicable law or agreed to in writing, software 
  11.  * distributed under the License is distributed on an "AS IS" BASIS, 
  12.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  13.  * See the License for the specific language governing permissions and 
  14.  * limitations under the License. 
  15.  */  
  16. package com.alibaba.dubbo.rpc.filter;  
  17.   
  18. import java.lang.reflect.Method;  
  19.   
  20. import com.alibaba.dubbo.common.Constants;  
  21. import com.alibaba.dubbo.common.extension.Activate;  
  22. import com.alibaba.dubbo.common.logger.Logger;  
  23. import com.alibaba.dubbo.common.logger.LoggerFactory;  
  24. import com.alibaba.dubbo.common.utils.ReflectUtils;  
  25. import com.alibaba.dubbo.common.utils.StringUtils;  
  26. import com.alibaba.dubbo.rpc.Filter;  
  27. import com.alibaba.dubbo.rpc.Invocation;  
  28. import com.alibaba.dubbo.rpc.Invoker;  
  29. import com.alibaba.dubbo.rpc.Result;  
  30. import com.alibaba.dubbo.rpc.RpcContext;  
  31. import com.alibaba.dubbo.rpc.RpcException;  
  32. import com.alibaba.dubbo.rpc.RpcResult;  
  33. import com.alibaba.dubbo.rpc.service.GenericService;  
  34.   
  35. /** 
  36.  * ExceptionInvokerFilter 
  37.  * <p> 
  38.  * 功能: 
  39.  * <ol> 
  40.  * <li>不指望的异常打ERROR日志(Provider端)<br> 
  41.  *     不指望的日志便是,没有的接口上声明的Unchecked异常。 
  42.  * <li>异常不在API包中,则Wrap一层RuntimeException。<br> 
  43.  *     RPC对于第一层异常会直接序列化传输(Cause异常会String化),避免异常在Client出不能反序列化问题。 
  44.  * </ol> 
  45.  *  
  46.  * @author william.liangf 
  47.  * @author ding.lid 
  48.  */  
  49. @Activate(group = Constants.PROVIDER)  
  50. public class ExceptionFilter implements Filter {  
  51.   
  52.     private final Logger logger;  
  53.       
  54.     public ExceptionFilter() {  
  55.         this(LoggerFactory.getLogger(ExceptionFilter.class));  
  56.     }  
  57.       
  58.     public ExceptionFilter(Logger logger) {  
  59.         this.logger = logger;  
  60.     }  
  61.       
  62.     public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {  
  63.         try {  
  64.             Result result = invoker.invoke(invocation);  
  65.             if (result.hasException() && GenericService.class != invoker.getInterface()) {  
  66.                 try {  
  67.                     Throwable exception = result.getException();  
  68.   
  69.                     // 若是是checked异常,直接抛出  
  70.                     if (! (exception instanceof RuntimeException) && (exception instanceof Exception)) {  
  71.                         return result;  
  72.                     }  
  73.                     // 在方法签名上有声明,直接抛出  
  74.                     try {  
  75.                         Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());  
  76.                         Class<?>[] exceptionClassses = method.getExceptionTypes();  
  77.                         for (Class<?> exceptionClass : exceptionClassses) {  
  78.                             if (exception.getClass().equals(exceptionClass)) {  
  79.                                 return result;  
  80.                             }  
  81.                         }  
  82.                     } catch (NoSuchMethodException e) {  
  83.                         return result;  
  84.                     }  
  85.   
  86.                     // 未在方法签名上定义的异常,在服务器端打印ERROR日志  
  87.                     logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()  
  88.                             + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()  
  89.                             + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);  
  90.   
  91.                     // 异常类和接口类在同一jar包里,直接抛出  
  92.                     String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());  
  93.                     String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());  
  94.                     if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)){  
  95.                         return result;  
  96.                     }  
  97.                     // 是JDK自带的异常,直接抛出  
  98.                     String className = exception.getClass().getName();  
  99.                     if (className.startsWith("java.") || className.startsWith("javax.")) {  
  100.                         return result;  
  101.                     }  
  102.                     // 是Dubbo自己的异常,直接抛出  
  103.                     if (exception instanceof RpcException) {  
  104.                         return result;  
  105.                     }  
  106.   
  107.                     // 不然,包装成RuntimeException抛给客户端  
  108.                     return new RpcResult(new RuntimeException(StringUtils.toString(exception)));  
  109.                 } catch (Throwable e) {  
  110.                     logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()  
  111.                             + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()  
  112.                             + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);  
  113.                     return result;  
  114.                 }  
  115.             }  
  116.             return result;  
  117.         } catch (RuntimeException e) {  
  118.             logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()  
  119.                     + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()  
  120.                     + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);  
  121.             throw e;  
  122.         }  
  123.     }  
  124.   
  125. }  

代码分析

按逻辑顺序进行分析,知足其中一个即返回,再也不继续执行判断。

逻辑0

[java]  view plain  copy
 
  1. if (result.hasException() && GenericService.class != invoker.getInterface()) {  
  2.     //...  
  3. }  
  4. return result;  
调用结果有异常且未实现GenericService接口,进入后续判断逻辑,不然直接返回结果。
[java]  view plain  copy
 
  1. /** 
  2.  * 通用服务接口 
  3.  *  
  4.  * @author william.liangf 
  5.  * @export 
  6.  */  
  7. public interface GenericService {  
  8.   
  9.     /** 
  10.      * 泛化调用 
  11.      *  
  12.      * @param method 方法名,如:findPerson,若是有重载方法,需带上参数列表,如:findPerson(java.lang.String) 
  13.      * @param parameterTypes 参数类型 
  14.      * @param args 参数列表 
  15.      * @return 返回值 
  16.      * @throws Throwable 方法抛出的异常 
  17.      */  
  18.     Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;  
  19.   
  20. }  
泛接口实现方式主要用于服务器端没有API接口及模型类元的状况,参数及返回值中的全部POJO均用Map表示,一般用于框架集成,好比:实现一个通用的远程服务Mock框架,可经过实现GenericService接口处理全部服务请求。
不适用于此场景,不在此处探讨。
 

逻辑1

[java]  view plain  copy
 
  1. // 若是是checked异常,直接抛出  
  2. if (! (exception instanceof RuntimeException) && (exception instanceof Exception)) {  
  3.     return result;  
  4. }  
不是RuntimeException类型的异常,而且是受检异常(继承Exception),直接抛出。
provider端想抛出受检异常,必须在api上明确写明抛出受检异常;consumer端若是要处理受检异常,也必须使用明确写明抛出受检异常的api。
provider端api新增 自定义的 受检异常, 全部的 consumer端api都必须升级,同时修改代码,不然没法处理这个特定异常。

consumer端DecodeableRpcResult的decode方法会对异常进行处理


此处会抛出IOException,上层catch后会作toString处理,放到mErrorMsg属性中:
[java]  view plain  copy
 
  1. try {  
  2.     decode(channel, inputStream);  
  3. catch (Throwable e) {  
  4.     if (log.isWarnEnabled()) {  
  5.         log.warn("Decode rpc result failed: " + e.getMessage(), e);  
  6.     }  
  7.     response.setStatus(Response.CLIENT_ERROR);  
  8.     response.setErrorMessage(StringUtils.toString(e));  
  9. finally {  
  10.     hasDecoded = true;  
  11. }  

DefaultFuture判断请求返回的结果,最后抛出RemotingException:
[java]  view plain  copy
 
  1. private Object returnFromResponse() throws RemotingException {  
  2.     Response res = response;  
  3.     if (res == null) {  
  4.         throw new IllegalStateException("response cannot be null");  
  5.     }  
  6.     if (res.getStatus() == Response.OK) {  
  7.         return res.getResult();  
  8.     }  
  9.     if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {  
  10.         throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());  
  11.     }  
  12.     throw new RemotingException(channel, res.getErrorMessage());  
  13. }  

DubboInvoker捕获RemotingException,抛出RpcException:
[java]  view plain  copy
 
  1. try {  
  2.     boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);  
  3.     boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);  
  4.     int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);  
  5.     if (isOneway) {  
  6.         boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);  
  7.         currentClient.send(inv, isSent);  
  8.         RpcContext.getContext().setFuture(null);  
  9.         return new RpcResult();  
  10.     } else if (isAsync) {  
  11.         ResponseFuture future = currentClient.request(inv, timeout) ;  
  12.         RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));  
  13.         return new RpcResult();  
  14.     } else {  
  15.         RpcContext.getContext().setFuture(null);  
  16.         return (Result) currentClient.request(inv, timeout).get();  
  17.     }  
  18. catch (TimeoutException e) {  
  19.     throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);  
  20. catch (RemotingException e) {  
  21.     throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);  
  22. }  

调用栈:
FailOverClusterInvoker.doInvoke -...-> DubboInvoker.doInvoke -> ReferenceCountExchangeClient.request -> HeaderExchangeClient.request -> HeaderExchangeChannel.request -> AbstractPeer.send -> NettyChannel.send -> AbstractChannel.write -> Channels.write --back_to--> DubboInvoker.doInvoke -> DefaultFuture.get -> DefaultFuture.returnFromResponse -> throw new RemotingException
 
异常示例:
[java]  view plain  copy
 
  1. com.alibaba.dubbo.rpc.RpcException: Failed to invoke the method triggerCheckedException in the service com.xxx.api.DemoService. Tried 1 times of the providers [192.168.1.101:20880] (1/1) from the registry 127.0.0.1:2181 on the consumer 192.168.1.101 using the dubbo version 3.1.9. Last error is: Failed to invoke remote method: triggerCheckedException, provider: dubbo://192.168.1.101:20880/com.xxx.api.DemoService?xxx, cause: java.io.IOException: Response data error, expect Throwable, but get {cause=(this Map), detailMessage=null, suppressedExceptions=[], stackTrace=[Ljava.lang.StackTraceElement;@23b84919}  
  2. java.io.IOException: Response data error, expect Throwable, but get {cause=(this Map), detailMessage=null, suppressedExceptions=[], stackTrace=[Ljava.lang.StackTraceElement;@23b84919}  
  3.     at com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcResult.decode(DecodeableRpcResult.java:94)  

 

逻辑2

[java]  view plain  copy
 
  1. // 在方法签名上有声明,直接抛出  
  2. try {  
  3.     Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());  
  4.     Class<?>[] exceptionClassses = method.getExceptionTypes();  
  5.     for (Class<?> exceptionClass : exceptionClassses) {  
  6.         if (exception.getClass().equals(exceptionClass)) {  
  7.             return result;  
  8.         }  
  9.     }  
  10. catch (NoSuchMethodException e) {  
  11.     return result;  
  12. }  
若是在provider端的api明确写明抛出运行时异常,则会直接被抛出。
 
若是抛出了这种异常,可是consumer端又没有这种异常,会发生什么呢?
答案是和上面同样,抛出RpcException。

所以若是consumer端不care这种异常,则不须要任何处理;
consumer端有这种异常(路径要彻底一致,包名+类名),则不须要任何处理;
没有这种异常,又想进行处理,则须要引入这个异常进行处理(方法有多种,好比升级api,或引入/升级异常所在的包)。

逻辑3

[java]  view plain  copy
 
  1. // 异常类和接口类在同一jar包里,直接抛出  
  2. String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());  
  3. String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());  
  4. if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)){  
  5.     return result;  
  6. }  
若是异常类和接口类在同一个jar包中,直接抛出。
 

逻辑4

[java]  view plain  copy
 
  1. // 是JDK自带的异常,直接抛出  
  2. String className = exception.getClass().getName();  
  3. if (className.startsWith("java.") || className.startsWith("javax.")) {  
  4.     return result;  
  5. }  
以java.或javax.开头的异常直接抛出。
 

逻辑5

[java]  view plain  copy
 
  1. // 是Dubbo自己的异常,直接抛出  
  2. if (exception instanceof RpcException) {  
  3.     return result;  
  4. }  
dubbo自身的异常,直接抛出。
 

逻辑6

[java]  view plain  copy
 
  1. // 不然,包装成RuntimeException抛给客户端  
  2. return new RpcResult(new RuntimeException(StringUtils.toString(exception)));  
不知足上述条件,会作toString处理并被封装成RuntimeException抛出。
 

核心思想

尽力避免反序列化时失败(只有在jdk版本或api版本不一致时才可能发生)。

如何正确捕获业务异常

了解了ExceptionFilter,解决上面提到的问题就很简单了。
有多种方法能够解决这个问题,每种都有优缺点,这里不作详细分析,仅列出供参考:
1. 将该异常的包名以"java.或者"javax. " 开头
2. 使用受检异常(继承Exception)
3. 不用异常,使用错误码
4. 把异常放到provider-api的jar包中
5. 判断异常message是否以XxxException.class.getName()开头(其中XxxException是自定义的业务异常)
6. provider实现GenericService接口
7. provider的api明确写明throws XxxException,发布provider(其中XxxException是自定义的业务异常)
8. 实现dubbo的filter,自定义provider的异常处理逻辑(方法可参考以前的文章 给dubbo接口添加白名单——dubbo Filter的使用
 

 给dubbo接口添加白名单——dubbo Filter的使用具体内容以下:

在开发中,有时候须要限制访问的权限,白名单就是一种方法。对于Java Web应用,Spring的拦截器能够拦截Web接口的调用;而对于dubbo接口,Spring的拦截器就无论用了。

dubbo提供了Filter扩展,能够经过自定义Filter来实现这个功能。本文经过一个事例来演示如何实现dubbo接口的IP白名单。

 

  1. 扩展Filter
    实现com.alibaba.dubbo.rpc.Filter接口:
    [java]  view plain  copy
     
    1. public class AuthorityFilter implements Filter {  
    2.     private static final Logger LOGGER = LoggerFactory.getLogger(AuthorityFilter.class);  
    3.   
    4.     private IpWhiteList ipWhiteList;  
    5.   
    6.     //dubbo经过setter方式自动注入  
    7.     public void setIpWhiteList(IpWhiteList ipWhiteList) {  
    8.         this.ipWhiteList = ipWhiteList;  
    9.     }  
    10.   
    11.     @Override  
    12.     public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {  
    13.         if (!ipWhiteList.isEnabled()) {  
    14.             LOGGER.debug("白名单禁用");  
    15.             return invoker.invoke(invocation);  
    16.         }  
    17.   
    18.         String clientIp = RpcContext.getContext().getRemoteHost();  
    19.         LOGGER.debug("访问ip为{}", clientIp);  
    20.         List<String> allowedIps = ipWhiteList.getAllowedIps();  
    21.         if (allowedIps.contains(clientIp)) {  
    22.             return invoker.invoke(invocation);  
    23.         } else {  
    24.             return new RpcResult();  
    25.         }  
    26.     }  
    27. }  
    注意:只能经过setter方式来注入其余的bean,且不要标注注解!
    dubbo本身会对这些bean进行注入,不须要再标注@Resource让Spring注入,参见扩展点加载
  2. 配置文件
    参考:调用拦截扩展
    在resources目录下添加纯文本文件META-INF/dubbo/com.alibaba.dubbo.rpc.Filter,内容以下:
    [plain]  view plain  copy
     
    1. xxxFilter=com.xxx.AuthorityFilter  
    修改dubbo的provider配置文件,在dubbo:provider中添加配置的filter,以下:
    [html]  view plain  copy
     
    1. <dubbo:provider filter="xxxFilter" />  

这样就能够实现dubbo接口的IP白名单功能了。