欢迎访问个人博客,同步更新: 枫山别院html
今天咱们要介绍的是Dubbo消息派发的时候,使用的线程模型,Dubbo版本2.8.4。那么什么是Dubbo的消息派发呢?好比,Dubbo提供者收到消费者的请求以后,要将这个请求派发给哪一个线程来处理。这个派发给某个线程的规则,就是咱们今天要讨论的消息派发的线程模型。apache
请求流程
你们请看下图,下图来自Dubbo官方文档,线程模型:api
image.png网络
咱们解释一下:框架
-
消费者的代理类发起请求ide
-
请求通过编码和序列化,而后经由Netty(或者Mina等)发送给提供者源码分析
-
提供者收到了请求,而后解码和反序列化等编码
-
根据Dubbo配置,选择相应的线程,处理业务逻辑url
这里的第四步,就是咱们今天要讨论的内容,Dubbo有哪些处理业务的线程模型以及如何选择。spa
消息分类
首先,咱们讨论下Dubbo有哪些消息,大致上能够分为两类:非业务消息、业务消息。
-
非业务消息包括:创建链接,断开链接,心跳
-
业务消息包括:请求,响应,异常
这些消息咱们能够在com.alibaba.dubbo.remoting.ChannelHandler中找到,不包括心跳。
public interface ChannelHandler { void connected(Channel channel) throws RemotingException; void disconnected(Channel channel) throws RemotingException; void sent(Channel channel, Object message) throws RemotingException; void received(Channel channel, Object message) throws RemotingException; void caught(Channel channel, Throwable exception) throws RemotingException; }
线程池
针对两种分类的消息,咱们的线程也能够分为两类:Netty的线程池、业务线程池
-
Netty(Mina等)的线程池:就是底层网络框架用于处理网络传输的线程池,网络消息编码解码,发送网络数据等
-
业务线程池:这个线程池是Dubbo框架建立的,用于处理除了网络传输以外,Dubbo相关的业务。
消息咱们都理清楚了,线程池也都知道了,那么,哪些消息应该派发到哪一个线程池,这个规则,就是今天的主角。
消息处理线程模型
Dubbo有五种处理的线程模型,
-
all
:全部的消息都派发到业务线程池,包括请求,响应,链接事件,断开事件,心跳等。 -
direct
:全部消息都不派发到业务线程池,所有在Netty(或者Mina等)线程上直接执行。 -
message
:只有请求响应等业务消息派发到业务线程池,其它链接断开事件,心跳等消息,直接在Netty(或者Mina等)线程上执行。 -
execution
:只有请求消息派发到Netty线程池,不含响应,响应和其它链接断开事件,心跳等消息,直接在业务线程上执行。 -
connection
: 将链接断开事件放入队列,由一个只有1个线程的其余线程池处理,有序逐个执行,其它消息派发到业务线程池。
源码分析
派发接口是一个SPI拓展接口,com.alibaba.dubbo.remoting.Dispatcher,这个拓展接口中默认的线程模型是all
。
@SPI(AllDispatcher.NAME) public interface Dispatcher { /** * dispatch the message to threadpool. * * @param handler * @param url * @return channel handler */ @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"}) // 后两个参数为兼容旧配置 ChannelHandler dispatch(ChannelHandler handler, URL url); }
若是某个Dubbo服务接口单独配置了线程模型,那么会从URL中,按照dispatcher
,dispather
,channel.handler
的key值,找对应的线程模型来处理。后面的两个参数是为了兼容老版本Dubbo,dispather
就彻底是在老版Dubbo中,这个单词拼错了,后来在新版本中改过来了,就必需要兼容这个错误的配置。
OK,这个接口有5种实现,就是对应上面的5中线程模型:
image.png
这5种Dispatcher分别有对应的5种ChannelHandler,逻辑都在handler中,以下图
image.png
以AllChannelHandler为例,里面就是对应的connected,disconnected,received,caught的实现,很是的简单,就是把任务提交到线程池里。
image.png
每一个任务都是ChannelEventRunnable类,实现了Runnable接口,线程会执行它的run方法。在run方法中,是一个switch,根据消息类型,调用handler对应的方法。咱们以received为例:
@Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } }
received调用了reply方法,咱们继续看看reply方法:
public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); //若是是callback 须要处理高版本调用低版本的问题 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){ String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1){ hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods){ if (inv.getMethodName().equals(method)){ hasMethod = true; break; } } } if (!hasMethod){ logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv ); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); }
reply中就是查找invoker,调用业务逻辑了,后面就不是本篇的重点了。