上篇解释了Dubbo源码中降级及容错处理
Dubbo服务调用——Cluster组件(服务降级,容错)java
这篇文章主要是关于Dubbo源码中的限流组件,Dubbo限流除了限流(并发限制)的入口ThreadPool 以外,还有更细粒度的限流功能。首先先记录客户端限流组价ActiveLimitFilter 的限流原理。并发
经过它的名字,咱们知道它是Dubbo中的Filter过滤器,接下来它的Activate注解信息以下:ide
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
所以它是用于消费方的Filter组件的扩展 , 源码以下:url
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY) public class ActiveLimitFilter implements Filter { public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); //获取设置的acvites的值,默认为0 int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0); // 获取当前方法目前并发请求数量 RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); if (max > 0) { //说明设置了actives变量 long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0); long start = System.currentTimeMillis(); long remain = timeout; int active = count.getActive(); if (active >= max) { synchronized (count) { while ((active = count.getActive()) >= max) { try { count.wait(remain); } catch (InterruptedException e) { } // 等待超时则抛异常 long elapsed = System.currentTimeMillis() - start; remain = timeout - elapsed; if (remain <= 0) { throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit: " + max); } } } } } // 调用业务 try { long begin = System.currentTimeMillis(); RpcStatus.beginCount(url, methodName); try { Result result = invoker.invoke(invocation); RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true); return result; } catch (RuntimeException t) { RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false); throw t; } } finally { if (max > 0) { // 唤醒等待线程 synchronized (count) { count.notify(); } } } } }
经过源码能够看出它的调用信息实际存储在 RpcStatus 中,为何不在类ActiveLimitFilter声明 存储的变量信息呢?spa
客户端并发控制配置方式:.net
<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" check="false" group="dev" version="1.0.0" timeout="3000" actives="10"/>
设置com.alibaba.dubbo.demo.DemoService接口中全部方法,每一个方法最多同时并发请求10个请求。线程
也能够使用下面方法设置接口中的单个方法的并发请求个数,以下:code
<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" group="dev" version="1.0.0" timeout="3000"> <dubbo:method name="sayHello" actives="10" /> </dubbo:reference>
如上设置了sayHello的并发请求数量为10 , 若是客户端请求该方法并发超过10 则会阻塞 , 当请求并发小于10时 , 该请求才会发送到请求提供方。xml