先推荐你们阅读dubbo官网源码解读服务调用流程
一节,传送门: dubbo.apache.org/zh-cn/docs/…html
Dubbo同步调用仍是异步调用的逻辑是在DubboInvoker中,Dubbo 实现同步和异步调用比较关键的一点就在于由谁调用 ResponseFuture 的 get
方法。前端
ResponseFuture是一个接口,ResponseFuture的默认实现是DefaultFuture,当服务消费者还未接收到调用结果时,用户线程调用 get 方法会被阻塞住。apache
通常状况下,服务调用方会并发调用多个服务,每一个用户线程发送请求后,会调用不一样 DefaultFuture 对象的 get 方法进行等待。 一段时间后,服务调用方的线程池会收到多个响应对象。这个时候要考虑一个问题,如何将每一个响应对象传递给相应的 DefaultFuture 对象,且不出错。答案是经过调用编号。DefaultFuture 被建立时,会要求传入一个 Request 对象。此时 DefaultFuture 可从 Request 对象中获取调用编号,并将 <调用编号, DefaultFuture 对象> 映射关系存入到静态 Map 中,即 FUTURES。线程池中的线程在收到 Response 对象后,会根据 Response 对象中的调用编号到 FUTURES 集合中取出相应的 DefaultFuture 对象,而后再将 Response 对象设置到 DefaultFuture 对象中。最后再唤醒用户线程,这样用户线程便可从 DefaultFuture 对象中获取调用结果了。bash
DefaultFuture
中的sent变量在客户端向服务端发送请求成功后会写入,以代表消息发送完成。并发
-->NettyChannel#send
-->io.netty.channel.ChannelOutboundInvoker#writeAndFlush
-->NettyClientHandler#write
-->.DefaultFuture#sent
复制代码
若是客户端没有成功发送消息,服务端不会返回响应(Response),DefaultFuture
中的sent
变量也没有被写入,在DefaultFuture#getTimeoutMessage
会根据sent
是否大于0,输出客户端超时异常。框架
若是客户端成功发送消息,服务端返回响应(Response),DefaultFuture
中的sent
变量被写入,在DefaultFuture#getTimeoutMessage
会根据sent
是否大于0,服务端超时异常。异步
private String getTimeoutMessage(boolean scan) {
long nowTimestamp = System.currentTimeMillis();
return (sent > 0 ? "Waiting server-side response timeout" : "Sending request timeout in client-side")
+ (scan ? " by scan timer" : "") + ". start time: "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(start))) + ", end time: "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ","
+ (sent > 0 ? " client elapsed: " + (sent - start)
+ " ms, server elapsed: " + (nowTimestamp - sent)
: " elapsed: " + (nowTimestamp - start)) + " ms, timeout: "
+ timeout + " ms, request: " + request + ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress();
}
复制代码
当发生超时异常的时候是没有Response返回的,dubbo的客户端在建立DefaultFuture的时候会建立一个TimeoutCheckTask
的延时任务,当超时时间到达后就会执行。这段代码不难理解。ide
public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
// timeout check
timeoutCheck(future);
return future;
}
private static class TimeoutCheckTask implements TimerTask {
private DefaultFuture future;
TimeoutCheckTask(DefaultFuture future) {
this.future = future;
}
@Override
public void run(Timeout timeout) {
if (future == null || future.isDone()) {
return;
}
// create exception response.
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse);
}
}
private Object returnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
// 若是响应正常则返回调用结果
if (res.getStatus() == Response.OK) {
return res.getResult();
}
// 抛出超时异常
if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
}
// 消费方调用异常抛出RemotingException
throw new RemotingException(channel, res.getErrorMessage());
}
复制代码
使用延时任务的方式会在调用超时的时候也会使RPC调用流程完整,而不至于一直停留在!isDone()
状态,相对来讲这种方式可能更好一些。ui