上一篇关于《Dubbo客户端并发控制——ActiveLimitFilter》 做用,设计原理,及配置方式。java
这篇是关于Dubbo服务端Filter组件扩展 ExecuteLimitFilter ,它能够限制服务端的方法级别的并发处理请求数。 当请求数超过限制时,服务端采用的是非阻塞处理,若是超出并发数量,则直接进行失败处理(这里抛RPCException异常),这里与客户端限流ActiveLimitFilter 的wait不一样的是,这里采用Semaphore 信号量的方式,而且是抢占式的(NonFairSync) , 不明白的能够看下信号量相关源码。编程
同分析ActiveLimitFilter同样,首先看它的Activate注解信息 :并发
@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
这里能够得知它是用于服务端限流控制。ide
ActiveLimitFilter源码:ui
/** * ThreadLimitInvokerFilter * * @author william.liangf */ @Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY) public class ExecuteLimitFilter implements Filter { public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); Semaphore executesLimit = null; boolean acquireResult = false; //默认不设置executes时候,其值为0 int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0); if (max > 0) { //max>0说明设置了executes值 RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); // if (count.getActive() >= max) { /** * http://manzhizhen.iteye.com/blog/2386408 * 经过信号量来作并发控制(即限制能使用的线程数量) * 2017-08-21 yizhenqiang */ executesLimit = count.getSemaphore(max); //可知若是并发处理数量大于设置的值,则直接会抛出异常 if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) { throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); } } long begin = System.currentTimeMillis(); boolean isSuccess = true; RpcStatus.beginCount(url, methodName); try { // 进行接下来的功能/业务处理 Result result = invoker.invoke(invocation); return result; } catch (Throwable t) { isSuccess = false; if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter", t); } } finally { RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess); if(acquireResult) { executesLimit.release(); } } } }
当请求并发数大于最大并发数时,则直接失败处理。this
服务提供方进行并发控制配置方式以下:url
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" group="dev" version="1.0.0" timeout="3000" executes="10" />
设置com.alibaba.dubbo.demo.DemoService 接口中的全部方法,最多同时处理10个并发请求。spa
也能够经过以下方式单独对每一个方法进行并发控制:.net
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" group="dev" version="1.0.0" timeout="3000" > <dubbo:method name="sayHello" executes="10"/> </dubbo:service>
这里咱们采用Semaphore来进行服务端请求的并发控制, 而不是采用 sync 同步代码块 , wait notify 方式的目的是什么呢?线程
这里Semaphore 代替 sync 其实是 cas 代替 锁 + wait notify , 虽然Semaphore中底层采用的是单线程CAS , 等待线程LockSupport.park(this); 防止全部线程同时获取令牌的CPU资源消耗。 源码参考以前写的一篇 AQS源码介绍:
《java并发编程之:ReentrantLock实现原理与深刻研究》