强烈建议查看官方文档html
@Autowired @Qualifier("serviceImpl") private TestService testService;
这里咱们能够发现,testService是proxy0对象,也就是服务引用那篇里返回的,java
@Autowired TestService testService:spring会去加载该Bean,调用到ReferenceBean.getObject获取对象spring
-->InvokerInvocationHandler.invoke -->RpcInvocation //全部请求都会转为RpcInvocation -->MockClusterInvoker.invoke //1.进入集群 -->result = this.invoker.invoke(invocation); -->AbstractClusterInvoker.invoke -->list(invocation) -->directory.list //2.进入目录查找 从this.methodInvokerMap里面查找一个Invoker -->AbstractDirectory.list -->doList(invocation) -->RegistryDirectory.doList //从this.methodInvokerMap里面查找一个Invoker -->router.route //3.进入路由 -->MockInvokersSelector.route -->getNormalInvokers -->ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random") -->doInvoke -->FailoverClusterInvoker.doInvoke -->select //4.进入负载均衡 -->AbstractClusterInvoker.select -->doselect //这里,若是集群中只有一个服务,直接返回 -->loadbalance.select -->AbstractLoadBalance.select -->doSelect -->RoundRobinLoadBalance.doSelect -->invokers.get(currentSequence % length)//取模轮循 -->Result result = invoker.invoke(invocation) -------------扩展点---------------- -->InvokerWrapper.invoke -->ListenerInvokerWrapper.invoke -->ConsumerContextFilter.invoke -->ProtocolFilterWrapper.invoke -->MonitorFilter.invoke -->ProtocolFilterWrapper.invoke -->FutureFilter.invoke -->ListenerInvokerWrapper.invoke -->AbstractInvoker.invoke //将附加消息(attachment)添加到invocation, 将带到服务端去 ---------------扩展点--------------- -->doInvoke(invocation) -------------***------------ -->DubboInvoker.doInvoke //这里主线程会等待,直到被唤醒,并且有返回值(通常是这种) //为何DubboInvoker是个protocol? 由于 //registryDirectory.refreshInvoker.toInvokers:protocol.refer -------------------逻辑隔开线-------------------- -->ReferenceCountExchangeClient.request -->HeaderExchangeClient.request -----------------***------------------- -->HeaderExchangeChannel.request //建立request(自带ID) -->AbstractPeer.send -->AbstractClient.send -->NettyChannel.send //里面就是netty客户端向服务端发送消息的逻辑 //channel.writeAndFlush(message) // 这里会使用netty的worke线程池去调用 // NettyClientHandler#write
中间涉及到对数据的编码操做apache
ExchangeCodec#encode-->encodeRequest-->DubboCodec #encodeRequestData(序列化请求参数)c#
解码操做数组
ExchangeCodec#decode-->decodeBody-->DubboCodec -->decodeBody-->DecodeableRpcInvocation.decode并发
------------------------来到服务端---------- NettyClientHandler#write -->NettyServerHandler.channelRead //(这里dubbo官网说的是NettyHandler#messageReceived,可是个人2.6.6版本并无进入那个方法,而是这里写着的write,channelRead) //服务端接收消息 -->AbstractPeer.received -->MultiMessageHandler.received -->HeartbeatHandler.received -->AllChannelHandler.received -->ChannelEventRunnable.run -->DecodeHandler.received //中间插入解码操做(主要是针对运行时解码) -->HeaderExchangeHandler.received ---****--- -->handleRequest(这个方法执行目标对象的目标方法) //Object result = handler.reply(channel, msg); //handle是DubboProtocol.requestHandler属性 -->DubboProtocol.ExchangeHandler.reply -->Invoker.invoke // 执行过滤连 省略过滤链步骤 -->InvokerWrapper.invoke -->DelegateProviderMetaDataInvoker.invoke ----****---- -->AbstractProxyInvoker.invoke //new RpcResult(doInvoke(proxy, invocation.getMethodName(), //invocation.getParameterTypes(), invocation.getArguments())); -->wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); //JavassistProxyFactory.getInvoker.AbstractProxyInvoker.doInvoke -->getData (目标方法) -----------------逻辑分割线------------------- -->AbstractPeer.send -->NettyChannel.send //channel.writeAndFlush(message) 返回消息到客户端 -->NettyServerHandler.write ------------------服务端执行完毕---------------------- -->NettyClientHandler.channelRead(客户端接收消息) -->AbstractPeer.received -->MultiMessageHandler.received -->HeartbeatHandler.received -->AllChannelHandler.received -->ChannelEventRunnable.run -->DecodeHandler.received -->HeaderExchangeHandler.received //中间插入解码操做(对返回的信息) -->handleResponse -->DefaultFuture.received -->DefaultFuture.doReceived //response = res 将返回的信息保存在DefaultFuture中 //done.signal() 唤醒DubboInvoker.doInvoke中暂停的主线程
DubboInvoker#doInvokeapp
该方法在请求时经过get方法处于while(true)等待中负载均衡
当被唤醒并且有返回值后(一般使用的这一种)继续执行主线程dom
return (Result) currentClient.request(inv, timeout).get(); -->currentClient.request //这里就是执行的以上全部过程(请求与相应) -->DefaultFuture.get(timeout) -->returnFromResponse // retrun response.getResult()
DubboInvoker
public class DubboInvoker<T> extends AbstractInvoker<T> { private final ExchangeClient[] clients; protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); // 设置 path 和 version 到 attachment 中 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { // 从 clients 数组中获取 ExchangeClient currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 获取异步配置 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // isOneway 为 true,表示“单向”通讯 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 异步无返回值 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); // 发送请求 currentClient.send(inv, isSent); // 设置上下文中的 future 字段为 null RpcContext.getContext().setFuture(null); // 返回一个空的 RpcResult return new RpcResult(); } // 异步有返回值 else if (isAsync) { // 发送请求,并获得一个 ResponseFuture 实例 ResponseFuture future = currentClient.request(inv, timeout); // 设置 future 到上下文中 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); // 暂时返回一个空结果 return new RpcResult(); } // 同步调用 else { RpcContext.getContext().setFuture(null); // 发送请求,获得一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待 return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(..., "Invoke remote method timeout...."); } catch (RemotingException e) { throw new RpcException(..., "Failed to invoke remote method: ..."); } } // 省略其余方法 }
问题: 通常状况下,服务消费方会并发调用多个服务,每一个用户线程发送请求后,会调用不一样 DefaultFuture 对象的 get 方法进行等待。 一段时间后,服务消费方的线程池会收到多个响应对象。这个时候要考虑一个问题,如何将每一个响应对象传递给相应的 DefaultFuture 对象,且不出错??
看到DubboInvoker.doInvoke中的同步有返回值的一段代码
return (Result) currentClient.request(inv, timeout).get();
get方法会等待被唤醒同时有返回结果
看到HeaderExchangeChannel#request方法
@Override public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException("is closed!"); } // 建立一个request对象,默认赋值了一个ID Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } //直接返回DefaultFuture return future; }
这里就直接返回了DefaultFuture ,对应是上面的currentClient.request, get方法就是调用的DefaultFuture .get
channel.send后面的调用会为每次调用开启不一样的线程
在请求时会将请求参数序列化到服务端
服务端接到请求后,会还原Request对象
ExchangeCodec.ExchangeCodec 服务端解码
// decode request. Request req = new Request(id); req.setVersion(Version.getProtocolVersion()); req.setTwoWay((flag & FLAG_TWOWAY) != 0); if ((flag & FLAG_EVENT) != 0) { req.setEvent(Request.HEARTBEAT_EVENT); } try { ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); Object data; if (req.isHeartbeat()) { data = decodeHeartbeatData(channel, in); } else if (req.isEvent()) { data = decodeEventData(channel, in); } else { data = decodeRequestData(channel, in); } req.setData(data); } catch (Throwable t) { // bad request req.setBroken(true); req.setData(t); } return req;
在服务端调用目标方法完毕后会将请求返回的结果和id设置给Reponse对象
HeaderExchangeHandler.handleRequest
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { //给response设置id(该ID是请求时Request中的ID) Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null) msg = null; else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data); else msg = data.toString(); res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); return res; } // 请求方法的参数 Object msg = req.getData(); try { // result: 调用目标方法返回的结果 Object result = handler.reply(channel, msg); res.setStatus(Response.OK); res.setResult(result); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; }
最后来到服务端调用完后回到客户端调用DefaultFuture.received
public static void received(Channel channel, Response response) { try { // FUTURES保存着每次调用后返回的DefaultFuture对象,key是生成Request生成时的ID // 这里用response.getId()去获取,由于Request对应的Response有相同的ID DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { // 唤醒对应线程的DefaultFuture对象 future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } }
这里就唤醒了Response对应的DefaultFuture对象,一个请求的响应就完成了,过程颇为复杂。
在dubbo官网对服务调用用很是详细的讲解!!