你们好,今天开始给你们分享 — Dubbo 专题之 Dubbo 并发控制。在前一个章节中咱们介绍了 Dubbo 负载均衡,Dubbo 为咱们提供四种负载均衡算法分别是:加权随机算法、加权轮询算法、最少活跃调用数算法、一致性 Hash 算法。同时咱们也例举了常见的使用场景而且进行了源码解析来分析其实现原理。有的小伙伴学习了负载均衡算法后可能会想:当咱们有不少的消费线程时,若是服务提供端只有少数的实例,那么会不会把咱们的服务提供端线程消费殆尽呢?或者超出了咱们的业务处理线程池最大接收请求数又会发生什么呢?带着这些疑问咱们开始本章节学习,咱们会经过介绍什么是并发?怎样控制并发?Dubbo 中是怎样来解决这些问题。下面就让咱们快速开始吧!java
首先咱们得理解什么是并发
,这里有另一个概念并行
。下面是来自百科的解释:并发和并行是即类似又有区别的两个概念,并行是指两个或者多个事件在同一时刻发生;而并发是指两个或多个事件在同一时间间隔内发生。在多道程序环境下,并发性是指在一段时间内宏观上有多个程序在同时运行,但在单处理器系统中,每一时刻却仅能有一道程序执行,故微观上这些程序只能是分时地交替执行。假若在计算机系统中有多个处理机,则这些能够并发执行的程序即可被分配到多个处理机上,实现并行执行即利用每一个处理机来处理一个可并发执行的程序,这样多个程序即可以同时执行。下面经过示例图进行说明:git
在上图中咱们能够看到单核处理在多线程并发执行任务时,同一时刻只有一个线程在执行,在 CPU 时间片切换的时候会调度到其余线程进行执行这就叫作并发。同理当在多核处理器上多个线程同时执行且在不一样 CPU 上的时,这就叫作并行执行,每个线程都在一个CPU上执行且线程间互不影响。算法
在 Dubbo 中提供了两大类配置分别是:消费端控制配置、服务提供端控制配置。spring
<dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade" executes="10" />
<dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade"> <dubbo:method name="queryAll" executes="10" /> </dubbo:service>
1. 限定服务的全部方法
<dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade" actives="10" />
或者shell
<dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade"> <dubbo:method name="queryAll" actives="10" /> </dubbo:service>
限定服务的某个方法apache
<dubbo:reference interface="com.muke.dubbocourse.common.api.BookFacade" actives="10" />
或者编程
<dubbo:reference interface="com.muke.dubbocourse.common.api.BookFacade"> <dubbo:method name="queryAll" actives="10" /> </dubbo:service>
若是 <dubbo:service>
和 <dubbo:reference>
都配了actives
,<dubbo:reference>
优先,参见:覆盖策略参考。api
咱们从上面的配置能够看出服务提供者端经过executes
配置而消费端经过actives
配置。服务器
并发控制在咱们平常的工做中通常状况咱们不多会去直接配置,咱们通常是自定义业务处理线程池大小。在 Dubbo 中当接收到服务请求后会把请求转发到业务处理线程池去处理,因此接收请求的线程 IO 瓶颈不大。我能想到的使用场景以下:微信
咱们以获取图书列表为例来进行演示。项目结构以下:
这里咱们主要更改了服务提供者的 XML 配置文件dubbo-provider-xml.xml
:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <dubbo:application name="demo-provider" metadata-type="remote"/> <dubbo:registry address="zookeeper://127.0.0.1:2181"/> <bean id="bookFacade" class="com.muke.dubbocourse.concurrent.provider.BookFacadeImpl"/> <!--暴露本地服务为Dubbo服务 ,executes="10" 表示限制每一个方法的并发数为10--> <dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade" executes="10" ref="bookFacade" /> </beans>
上面主要增长了executes="10"
配置,对服务BookFacade
全部的方法进行最大并发数限制。
下面咱们看看消费端代码:
public static void main(String[] args) throws IOException { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("concurrent/consumer/spring/dubbo-consumer-xml.xml"); context.start(); BookFacade bookFacade = context.getBean(BookFacade.class); //循环启动30个线程进行并发访问 for ( int i = 0; i < 30; i++) { final int index = i; //开启线程 new Thread(()->{ List<Book> books = bookFacade.queryAll(); System.out.println("The invoker "+index+" result is "+ books); }).start(); } System.in.read(); //context.close(); }
同时咱们须要设置下queryAll
方法的执行时间稍微长一些这样才能看到演示效果,正常状况下咱们会看到以下错误:
cause: The service using threads greater than <dubbo:service executes="10" /> limited.
提示得很是明显,就是说咱们的服务最大并发为设置为10。
在讲解原理以前假设这个让咱们本身来实现的话咱们该怎样实现呢?我想小伙伴可能都会想到这里控制并发数量无非就是对调用方法或服务进行一个全局的计数统计,若是达到了阀值就开始执行限制。那咱们就来看看 Dubbo 中是怎样实现。在 Dubbo 中执行这个逻辑的类是ActiveLimitFilter
其核心代码以下:
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); //执行的方法名称 String methodName = invocation.getMethodName(); //获取配置的并发参数配置 int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0); final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); //开始计数 if (!RpcStatus.beginCount(url, methodName, max)) { //计数失败,获取超时时间 long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, 0); //记录开始时间 long start = System.currentTimeMillis(); long remain = timeout; synchronized (rpcStatus) { //再次尝试计数 while (!RpcStatus.beginCount(url, methodName, max)) { try { //计数失败 阻塞等待 等待接收到onMessage、onError方法回调释放rpcStatus的阻塞 rpcStatus.wait(remain); } catch (InterruptedException e) { // ignore } long elapsed = System.currentTimeMillis() - start; remain = timeout - elapsed; //等待超时 if (remain <= 0) { throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION, "Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + rpcStatus.getActive() + ". max concurrent invoke limit: " + max); } } } } invocation.put(ACTIVELIMIT_FILTER_START_TIME, System.currentTimeMillis()); //调用服务 return invoker.invoke(invocation); }
在上面的代码中咱们能够看到在调用服务前对当前调用的方法进行计数,若是计数失败会阻塞等待指定的超时时间,计数成功则调用远程服务。
在本小节中咱们主要学习了 Dubbo 并发控制以及并发和并行的区别,同时也分析了并发控制实现的原理,其本质上是经过对调用的方法或服务进行应用级别的计数统计,当达到阀值就限制访问。
本节课程的重点以下:
我的从事金融行业,就任过易极付、思建科技、某网约车平台等重庆一流技术团队,目前就任于某银行负责统一支付系统建设。自身对金融行业有强烈的爱好。同时也实践大数据、数据存储、自动化集成和部署、分布式微服务、响应式编程、人工智能等领域。同时也热衷于技术分享创立公众号和博客站点对知识体系进行分享。关注公众号: 青年IT男 获取最新技术文章推送!
博客地址: http://youngitman.tech
微信公众号: