Netty,让咱们的编写TCP变的很是简单,而且它在业界运用极其普遍。Dubbo底层默认实现也是经过Netty。
org.apache.dubbo.remoting.transport.netty4.NettyServer就是其默认实现方式。它的类关系以下:
html
从上面的类图,咱们能够知道其设计的理念。apache
1)Map<String, Channel> channels 持有与该服务端通讯的TCP通讯Channel,key为host:port。它是NettyServerHandler内部channels的引用,NettyServerHandler就是Netty处理消息的句柄,由咱们本身编写。
2)io.netty.channel.Channel channel netty监听网络链接的channel,由bind()方法返回
3)内部监听线程为1个,线程名为NettyServerBoss。
4)内部IO线程为可用处理器+1,最多为32个线程,线程名为NettyServerWorker。
5)NettyServerHandler 为dubbo的消息处理器,编解码器对消息编解码后,就是扔给它处理。它内部持有一个ChannelHandler,便是NettyServer这个类,从类图上可知NettyServer实现了ChannelHandler 这个接口。
六、NettyServer 持有一个外部传进来的ChannelHandler。并会对其进行封装,截图以下(截图比较清晰):
6)NettyCodecAdapter,为dubbo内部的编解码器,内部持有编码器InternalEncoder,解码器为InternalDecoder。
7)Dubbo 的业务线程名为DubboServerHandler。以后再日志上看到这个名字不要在陌生了。api
答案,确定不是,直接在框架上new,以后确定没有扩展性可言。没事情,咱们包裹一层便可,命名为传输层Transporter,它就有bind,connect功能。
NettyTransporter就是其具体的实现之一。服务器
public class NettyTransporter implements Transporter { public static final String NAME = "netty"; @Override public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException { return new NettyServer(url, handler); } @Override public Client connect(URL url, ChannelHandler handler) throws RemotingException { return new NettyClient(url, handler); } }
既然是一个SPI扩展,必然须要经过ExtensionLoader.getExtensionLoader()进行加载,Transporters其工具类的静态方法getTransporter()就是经过ExtensionLoader进行加载的,而后其静态方法Transporters.bind()来获取RemotingServer,即NettyServer。网络
public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().bind(url, handler); }
值得注意的是,能够为其添加多个ChannelHandler,若是为多个,则对其封装为ChannelHandlerDispatcher,而后传递给NettyServer。框架
先引入一个名词Exchanger,我称为信息交换回执器,它实际上是对Transporter的封装。Exchanger 一样具备bind()和connect()方法。
Exchanger的bind()和connect()接收一个ExchangeHandler的消息处理句柄,ExchangeHandler 也是一个ChannelHandler,也是用来处理通道消息的,但它对其进行的加强,有一个回执的方法reply()。即接收一个消息后,能够进行回执,经过reply()。dom
和Transporters同样,工具类Exchangers一样的方式经过ExtensionLoader.getExtensionLoader()来获取特定的Exchanger,默认为HeaderExchanger。HeaderExchanger#bind()和connect()。
HeaderExchanger是默认的信息交换回执器。能够看到HeaderExchangeServer 和HeaderExchangeClient分别接收RemoteServer 和Client。而且经过Transporters调用传入。ExchangeHandler会被包裹成HeaderExchangeHandler,接着在被包裹为DecodeHandler。
从这里咱们就能够解答第五个问题,Transporters#bind()传入的ChannelHandler 实际上是DecodeHandler。为啥这里须要DecodeHandler,实际上是服务端或者客户端在收到对等端的消息字节数据后,首先解析的是头部信息,body信息是没有在netty的编解码中进行解析的,是到了真正处理消息的时候,经过Decodehandler#received()内部进行解码的。tcp
实际上,框架真正用到的是以下:
ide
其余最终都是会调到这个方法,有时间的同窗能够本身调用下其余的重载方法。工具
这个问题请查看DubboProtocol的成员变量ExchangeHandler。接下来,咱们来分析这个ExchangeHandler 到底作了什么。
DubboProtocol内部new 了一个ExchangeHandlerAdapter 对象,也就是ExchangeHandler。该handler主要处理已经Invocation类型的消息。
首先,看下received()方法。
@Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } }
若是消息的类型是Invocation,那么调用reply方法进行消息应答,若是不是调用超类,也就是
ExchangeHandlerAdapter的received方法,该方法是空的,因此即会丢弃该消息内容。
那咱们来看下,relpy方法主要干了什么?注释以下代码块。
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { /** * * 若是消息类型不是Invocation,那么会抛出异常,通常状况下不会,在received方法上已经进行的消息类型判断。 */ if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } Invocation inv = (Invocation) message; //这里获得的就是服务Invoker根据inv。 Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback // 看是否是dubbo 回调,以后看下回调内容 if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); //调用,这里就是服务调用,这里会牵扯到dubbo filter 相关内容,在服务调用时具体坚定 Result result = invoker.invoke(inv); return result.thenApply(Function.identity()); }
从上面的可知道,reply主要就是进行服务的调用,核心语句就是 invoker.invoke(inv)。
那么是如何找到这个服务提供者的服务呢。来看下getInvoker()。
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { boolean isCallBackServiceInvoke = false; boolean isStubServiceInvoke = false; int port = channel.getLocalAddress().getPort(); String path = (String) inv.getObjectAttachments().get(PATH_KEY); // if it's callback service on client side //若是该调用是在客服端,可能会有配置Stub类,经过isStubServiceInvoke标注 isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(STUB_EVENT_KEY)); if (isStubServiceInvoke) { port = channel.getRemoteAddress().getPort(); } //callback // 查看是不是本地回调 isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; if (isCallBackServiceInvoke) { path += "." + inv.getObjectAttachments().get(CALLBACK_SERVICE_KEY); inv.getObjectAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString()); } // 构建serviceKey,经过端口port,路径(通常是接口的全限定名)path,版本号version,分组group String serviceKey = serviceKey( port, path, (String) inv.getObjectAttachments().get(VERSION_KEY), (String) inv.getObjectAttachments().get(GROUP_KEY) ); //全部的服务导出都会放在exporterMap对象里,而后根据key获取获得DubboExporter DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null) { throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv)); } // 接着返回Invoker。 return exporter.getInvoker(); }
当服务调用方与服务提供方创建链接和断开链接时,代码以下:
@Override public void connected(Channel channel) throws RemotingException { invoke(channel, ON_CONNECT_KEY); } @Override public void disconnected(Channel channel) throws RemotingException { if (logger.isDebugEnabled()) { logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); } invoke(channel, ON_DISCONNECT_KEY); }
都是进行调用invoke方法。那么invoke方法主要干了啥?
private void invoke(Channel channel, String methodKey) { Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); if (invocation != null) { try { received(channel, invocation); } catch (Throwable t) { logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); } } } /** * FIXME channel.getUrl() always binds to a fixed service, and this service is random. * we can choose to use a common service to carry onConnect event if there's no easy way to get the specific * service this connection is binding to. * @param channel * @param url * @param methodKey * @return */ private Invocation createInvocation(Channel channel, URL url, String methodKey) { String method = url.getParameter(methodKey); if (method == null || method.length() == 0) { return null; } RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), new Class<?>[0], new Object[0]); invocation.setAttachment(PATH_KEY, url.getPath()); invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY)); invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY)); invocation.setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY)); if (url.getParameter(STUB_EVENT_KEY, false)) { invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString()); } return invocation; }
这一篇文章距离上一次已经好久了,主要是遇到了国庆,本身想放松一下,接下来还会继续努力的分析dubbo的一些内容。