欢迎你们关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思惟、职场分享、产品思考等等,同时欢迎你们加我我的微信「java_front」一块儿交流学习java
1 文章概述
咱们在服务端开发时若是须要实现异步调用,首先声明一个线程池,并将调用业务方法封装成一个任务提交至线程池,若是不须要获取返回值则封装为Runnable,须要获取返回值则封装为Callable并经过Future对象接受结果。spring
class CalcTask1 implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("task1耗时计算"); Thread.sleep(1000L); return 100; } } class CalcTask2 implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("task2耗时计算"); Thread.sleep(3000L); return 200; } } public class CallableTest { public static void test1() throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); CalcTask1 task1 = new CalcTask1(); Future<Integer> f1 = executor.submit(task1); CalcTask2 task2 = new CalcTask2(); Future<Integer> f2 = executor.submit(task2); Integer result1 = f1.get(); Integer result2 = f2.get(); System.out.println("final result=" + (result1 + result2)); executor.shutdown(); } public static void test2() throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>(); CalcTask1 task1 = new CalcTask1(); CalcTask2 task2 = new CalcTask2(); tasks.add(task1); tasks.add(task2); for (int i = 0; i < tasks.size(); i++) { Future<Integer> future = executor.submit(tasks.get(i)); System.out.println("result=" + future.get()); } executor.shutdown(); } }
1.1 什么是消费异步化
在使用DUBBO进行异步化调用时不须要这么麻烦,DUBBO基于NIO非阻塞能力使得服务消费者无需启用多线程就能够实现并行调用多个服务,在此咱们给出基于2.7.0版本调用实例。编程
1.1.1 生产者
(1) 服务声明
public interface CalcSumService { public Integer sum(int a, int b); } public class CalcSumServiceImpl implements CalcSumService { @Override public Integer sum(int a, int b) { return a + b; } } public interface CalcSubtractionService { public Integer subtraction(int a, int b); } public class CalcSubtractionServiceImpl implements CalcSubtractionService { @Override public Integer subtraction(int a, int b) { return a - b; } }
(2) 配置文件
<beans> <dubbo:application name="java-front-provider" /> <dubbo:registry address="zookeeper://127.0.0.1:2181" /> <dubbo:protocol name="dubbo" port="9999" /> <bean id="calcSumService" class="com.java.front.dubbo.demo.provider.service.CalcSumServiceImpl" /> <bean id="calcSubtractionService" class="com.java.front.dubbo.demo.provider.service.CalcSubtractionServiceImpl" /> <dubbo:service interface="com.java.front.dubbo.demo.provider.service.CalcSumService" ref="calcSumService" /> <dubbo:service interface="com.java.front.dubbo.demo.provider.service.CalcSubtractionService" ref="calcSubtractionService" /> </beans>
(3) 服务发布
public class Provider { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath*:META-INF/spring/dubbo-provider.xml"); context.start(); System.out.println(context); System.in.read(); } }
1.1.2 消费者
(1) 配置文件
<beans> <dubbo:application name="java-front-consumer" /> <dubbo:registry address="zookeeper://127.0.0.1:2181" /> <dubbo:reference id="calcSumService" interface="com.java.front.dubbo.demo.provider.service.CalcSumService" timeout="10000"> <dubbo:method name="sum" async="true" /> </dubbo:reference> <dubbo:reference id="calcSubtractionService" interface="com.java.front.dubbo.demo.provider.service.CalcSubtractionService" timeout="10000"> <dubbo:method name="subtraction" async="true" /> </dubbo:reference> </beans>
(2) 服务消费
public class Consumer { public static void main(String[] args) throws Exception { testAsync(); System.in.read(); } public static void testAsync() { try { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" }); System.out.println(context); context.start(); /** 加法运算 **/ CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService"); calcSumService.sum(3, 2); CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture(); /** 减法运算 **/ CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService"); calcSubtractionService.subtraction(3, 2); CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture(); /** 输出结果 **/ int sumResult = futureSum.get(); int subtractionResult = futureSubtraction.get(); System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult); } catch (Exception e) { e.printStackTrace(); } } }
1.2 为何消费异步化
异步化能够将本来串行的调用并行化,减小执行时间从而提高性能。假设上述实例加法服务须要100ms,减法服务须要200ms,那么串行化执行时间为两者之和300ms:设计模式

若是消费异步化那么执行时间减小为两者最大值200ms,异步化所带来的性能提高不言而喻:微信

2 保护性暂停模式
分析DUBBO源码以前咱们首先介绍一种多线程设计模式:保护性暂停模式。咱们设想这样一种场景:线程A生产数据,线程B读取这个数据。咱们必须面对一种状况:线程B准备读取数据时,此时线程A尚未生产出数据。在这种状况下线程B不能一直空转,也不能当即退出,线程B要等到生产数据完成并拿到数据以后才退出。多线程
那么在数据没有生产出这段时间,线程B须要执行一种等待机制,这样能够达到对系统保护目的,这就是保护性暂停。架构
public class MyData implements Serializable { private static final long serialVersionUID = 1L; private String message; public MyData(String message) { this.message = message; } } class Resource { private MyData data; private Object lock = new Object(); public MyData getData() { synchronized (lock) { while (data == null) { try { // 没有数据则释放锁并暂停等待被唤醒 lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return data; } } public void sendData(MyData data) { synchronized (lock) { // 生产数据后唤醒消费线程 this.data = data; lock.notifyAll(); } } } public class ProtectDesignTest { public static void main(String[] args) { Resource resource = new Resource(); new Thread(() -> { try { MyData data = new MyData("hello"); System.out.println(Thread.currentThread().getName() + "生产数据=" + data); // 模拟发送耗时 TimeUnit.SECONDS.sleep(3); resource.sendData(data); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1").start(); new Thread(() -> { MyData data = resource.getData(); System.out.println(Thread.currentThread().getName() + "接收到数据=" + data); }, "t2").start(); } }
在上述代码实例中线程1生产数据,线程2消费数据,Resource类经过wait/notify实现了保护性暂停模式,关于保护性暂停模式请参看我以前《保护性暂停模式详解以及其在DUBBO应用源码分析》这篇文章。app
3 源码分析
本章节咱们分析对比2.6.9和2.7.0两个版本源码,之因此选取这两个版本是由于2.7.0是一个里程碑版本,异步化能力获得了明显加强。异步
3.1 version_2.6.9
3.1.1 异步调用
咱们首先看看这个版本异步调用使用方式,生产者内容和消费者配置文件同第一章节再也不赘述,咱们重点分析服务消费代码。async
public class AsyncConsumer { public static void main(String[] args) throws Exception { test1(); System.in.read(); } public static void test1() throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" }); System.out.println(context); context.start(); /** 加法运算 **/ CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService"); calcSumService.sum(3, 2); Future<Integer> futureSum = RpcContext.getContext().getFuture(); /** 减法运算 **/ CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService"); calcSubtractionService.subtraction(3, 2); Future<Integer> futureSubtraction = RpcContext.getContext().getFuture(); /** 输出结果 **/ int sumResult = futureSum.get(); int subtractionResult = futureSubtraction.get(); System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult); } }
消费者最终执行DubboInvoker.doInvoke,这个方法包含异步调用核心:
public class DubboInvoker<T> extends AbstractInvoker<T> { @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 单向调用 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } // 异步调用 else if (isAsync) { // 发起请求给生产者 ResponseFuture future = currentClient.request(inv, timeout); // 设置future对象至上下文 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); // 返回空结果 return new RpcResult(); } // 同步调用 else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } }
若是包含async属性则表示异步调用,第一步发送调用请求给生产者,第二步设置Future对象至上下文,第三步当即返回空结果。那么在服务消费时关键一步就是获取Future对象,因此咱们在调用时要从上下文获取Future对象:
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService"); calcSumService.sum(3, 2); Future<Integer> futureSum = RpcContext.getContext().getFuture();
使用Future对象获取结果:
int sumResult = futureSum.get();
进入FutureAdapter.get()方法:
public class FutureAdapter<V> implements Future<V> { private final ResponseFuture future; public V get() throws InterruptedException, ExecutionException { try { return (V) (((Result) future.get()).recreate()); } catch (RemotingException e) { throw new ExecutionException(e.getMessage(), e); } catch (Throwable e) { throw new RpcException(e); } } }
进入ResponseFuture.get()方法,咱们能够看到保护性暂停模式应用,当生产者线程没有返回数据则阻塞并等待被唤醒:
public class DefaultFuture implements ResponseFuture { private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); @Override public Object get() throws RemotingException { return get(timeout); } @Override public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { // 远程调用未完成则等待被唤醒 done.await(timeout, TimeUnit.MILLISECONDS); // 超时时间未完成则退出 if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } // 抛出超时异常 if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); } }
当消费者接收到生产者响应时会调用received方法唤醒相关阻塞线程,这时阻塞在get方法中的线程便可获取到数据:
public class DefaultFuture implements ResponseFuture { private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); public static void received(Channel channel, Response response) { try { // 根据惟一请求号获取Future DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } } private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { // 唤醒相关阻塞线程 done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } } }
3.1.2 设置回调函数
咱们如今调用get方法会阻塞在那里等到结果,那么有没有一种方式当结果返回时就当即调用咱们设置的回调函数?答案是有。
public class AsyncConsumer { public static void main(String[] args) throws Exception { test2(); System.in.read(); } public static void test2() throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" }); System.out.println(context); context.start(); /** 加法运算 **/ CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService"); calcSumService.sum(3, 2); /** 执行回调函数 **/ ((FutureAdapter<Object>) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback() { @Override public void done(Object response) { System.out.println("sumResult=" + response); } @Override public void caught(Throwable exception) { exception.printStackTrace(); } }); /** 减法运算 **/ CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService"); calcSubtractionService.subtraction(3, 2); /** 执行回调函数 **/ ((FutureAdapter<Object>) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback() { @Override public void done(Object response) { System.out.println("subtractionResult=" + response); } @Override public void caught(Throwable exception) { exception.printStackTrace(); } }); } }
DefaultFuture能够设置callback回调函数,当结果返回时若是回调函数不为空则执行:
public class DefaultFuture implements ResponseFuture { private volatile ResponseCallback callback; private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); } if (callback != null) { // 执行回调函数 invokeCallback(callback); } } private void invokeCallback(ResponseCallback c) { ResponseCallback callbackCopy = c; if (callbackCopy == null) { throw new NullPointerException("callback cannot be null."); } c = null; Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null. url:" + channel.getUrl()); } if (res.getStatus() == Response.OK) { try { // 执行成功回调 callbackCopy.done(res.getResult()); } catch (Exception e) { logger.error("callback invoke error .reasult:" + res.getResult() + ",url:" + channel.getUrl(), e); } } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { try { TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); // 发生超时回调 callbackCopy.caught(te); } catch (Exception e) { logger.error("callback invoke error ,url:" + channel.getUrl(), e); } } else { try { RuntimeException re = new RuntimeException(res.getErrorMessage()); callbackCopy.caught(re); } catch (Exception e) { logger.error("callback invoke error ,url:" + channel.getUrl(), e); } } } }
3.2 version_2.7.0
CompletableFuture在这个版本中被引入实现异步调用,能够使用此类强大的异步编程API加强异步能力,咱们首先回顾1.1.2章节实例:
public class Consumer { public static void testAsync() { try { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" }); System.out.println(context); context.start(); /** 加法运算 **/ CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService"); calcSumService.sum(3, 2); CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture(); /** 减法运算 **/ CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService"); calcSubtractionService.subtraction(3, 2); CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture(); /** 输出结果 **/ int sumResult = futureSum.get(); int subtractionResult = futureSubtraction.get(); System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult); } catch (Exception e) { e.printStackTrace(); } } }
在上述消费者代码的实例中咱们只是应用了CompletableFuture.get()方法,并无发挥其强大功能。咱们对上述实例稍加改造,两个CompletionStage任务都执行完成后,两个任务结果会一块儿交给thenCombine进行处理:
public class Consumer { public static void testAsync() { try { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer2.xml" }); System.out.println(context); context.start(); /** 加法运算 **/ CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService"); calcSumService.sum(3, 2); CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture(); /** 减法运算 **/ CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService"); calcSubtractionService.subtraction(3, 2); CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture(); /** 乘法运算 **/ CompletableFuture<Integer> multiplyResult = futureSum.thenCombine(futureSubtraction, new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer t, Integer u) { return (t * u); } }); System.out.println("multiplyResult=" + multiplyResult.get()); } catch (Exception e) { e.printStackTrace(); } } }
DubboInvoker代码有所变化:
public class DubboInvoker<T> extends AbstractInvoker<T> { @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 是否为异步调用 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // 是否为future异步方式 boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv); // 是否须要响应结果 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // 超时时间 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 单向调用 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } // 异步请求 else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future); RpcContext.getContext().setFuture(futureAdapter); Result result; if (isAsyncFuture) { result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } else { result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } return result; } // 同步请求 else { RpcContext.getContext().setFuture(null); Result result = (Result) currentClient.request(inv, timeout).get(); return result; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } }
咱们看到与2.6.9版本相同的是FutureAdapter一样会被设置到上下文,可是FutureAdapter自己已经发生了变化:
public class FutureAdapter<V> extends CompletableFuture<V> { private final ResponseFuture future; private CompletableFuture<Result> resultFuture; public FutureAdapter(ResponseFuture future) { this.future = future; this.resultFuture = new CompletableFuture<>(); // 设置回调函数至DefaultFuture future.setCallback(new ResponseCallback() { // 设置响应结果至CompletableFuture @Override public void done(Object response) { Result result = (Result) response; FutureAdapter.this.resultFuture.complete(result); V value = null; try { value = (V) result.recreate(); } catch (Throwable t) { FutureAdapter.this.completeExceptionally(t); } FutureAdapter.this.complete(value); } // 设置异常结果至FutureAdapter @Override public void caught(Throwable exception) { FutureAdapter.this.completeExceptionally(exception); } }); } public ResponseFuture getFuture() { return future; } public CompletableFuture<Result> getResultFuture() { return resultFuture; } }
咱们在服务消费时经过getResultFuture方法获取CompletableFuture,这个对象值在回调时被设置,回调时机一样在DefaultFuture.doReceived方法里面:
public class DefaultFuture implements ResponseFuture { private volatile ResponseCallback callback; private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); } if (callback != null) { // 执行回调函数代码同version_2.6.9 invokeCallback(callback); } } }
4 文章总结
本文第一介绍了DUBBO消费异步化是什么,以及异步化为何会带来性能提高。第二介绍了保护性暂停模式,这是实现异步化的基础。最后咱们阅读了两个不一样版本异步化源码,了解了DUBBO异步化演进过程,但愿本文对你们有所帮助。
欢迎你们关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思惟、职场分享、产品思考等等,同时欢迎你们加我我的微信「java_front」一块儿交流学习