Dubbo 消息派发的线程模型

欢迎访问个人博客,同步更新: 枫山别院html

今天咱们要介绍的是Dubbo消息派发的时候,使用的线程模型,Dubbo版本2.8.4。那么什么是Dubbo的消息派发呢?好比,Dubbo提供者收到消费者的请求以后,要将这个请求派发给哪一个线程来处理。这个派发给某个线程的规则,就是咱们今天要讨论的消息派发的线程模型。apache

请求流程

你们请看下图,下图来自Dubbo官方文档,线程模型api

image.png网络

咱们解释一下:框架

  1. 消费者的代理类发起请求ide

  2. 请求通过编码和序列化,而后经由Netty(或者Mina等)发送给提供者源码分析

  3. 提供者收到了请求,而后解码和反序列化等编码

  4. 根据Dubbo配置,选择相应的线程,处理业务逻辑url

这里的第四步,就是咱们今天要讨论的内容,Dubbo有哪些处理业务的线程模型以及如何选择。spa

消息分类

首先,咱们讨论下Dubbo有哪些消息,大致上能够分为两类:非业务消息、业务消息。

  • 非业务消息包括:创建链接,断开链接,心跳

  • 业务消息包括:请求,响应,异常

这些消息咱们能够在com.alibaba.dubbo.remoting.ChannelHandler中找到,不包括心跳。

public interface ChannelHandler {
        void connected(Channel channel) throws RemotingException;
        void disconnected(Channel channel) throws RemotingException;
        void sent(Channel channel, Object message) throws RemotingException;
        void received(Channel channel, Object message) throws RemotingException;
        void caught(Channel channel, Throwable exception) throws RemotingException;
}

线程池

针对两种分类的消息,咱们的线程也能够分为两类:Netty的线程池、业务线程池

  • Netty(Mina等)的线程池:就是底层网络框架用于处理网络传输的线程池,网络消息编码解码,发送网络数据等

  • 业务线程池:这个线程池是Dubbo框架建立的,用于处理除了网络传输以外,Dubbo相关的业务。

消息咱们都理清楚了,线程池也都知道了,那么,哪些消息应该派发到哪一个线程池,这个规则,就是今天的主角。

消息处理线程模型

Dubbo有五种处理的线程模型,

  1. all :全部的消息都派发到业务线程池,包括请求,响应,链接事件,断开事件,心跳等。

  2. direct:全部消息都不派发到业务线程池,所有在Netty(或者Mina等)线程上直接执行。

  3. message:只有请求响应等业务消息派发到业务线程池,其它链接断开事件,心跳等消息,直接在Netty(或者Mina等)线程上执行。

  4. execution:只有请求消息派发到Netty线程池,不含响应,响应和其它链接断开事件,心跳等消息,直接在业务线程上执行。

  5. connection: 将链接断开事件放入队列,由一个只有1个线程的其余线程池处理,有序逐个执行,其它消息派发到业务线程池。

源码分析

派发接口是一个SPI拓展接口,com.alibaba.dubbo.remoting.Dispatcher,这个拓展接口中默认的线程模型是all

@SPI(AllDispatcher.NAME)
public interface Dispatcher {

    /**
     * dispatch the message to threadpool.
     * 
     * @param handler
     * @param url
     * @return channel handler
     */
    @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"}) // 后两个参数为兼容旧配置
    ChannelHandler dispatch(ChannelHandler handler, URL url);

}

若是某个Dubbo服务接口单独配置了线程模型,那么会从URL中,按照dispatcherdispatherchannel.handler的key值,找对应的线程模型来处理。后面的两个参数是为了兼容老版本Dubbo,dispather就彻底是在老版Dubbo中,这个单词拼错了,后来在新版本中改过来了,就必需要兼容这个错误的配置。

OK,这个接口有5种实现,就是对应上面的5中线程模型:

image.png

这5种Dispatcher分别有对应的5种ChannelHandler,逻辑都在handler中,以下图

image.png

以AllChannelHandler为例,里面就是对应的connected,disconnected,received,caught的实现,很是的简单,就是把任务提交到线程池里。

image.png

每一个任务都是ChannelEventRunnable类,实现了Runnable接口,线程会执行它的run方法。在run方法中,是一个switch,根据消息类型,调用handler对应的方法。咱们以received为例:

@Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

received调用了reply方法,咱们继续看看reply方法:

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //若是是callback 须要处理高版本调用低版本的问题
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1){
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods){
                            if (inv.getMethodName().equals(method)){
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod){
                        logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

reply中就是查找invoker,调用业务逻辑了,后面就不是本篇的重点了。

相关文章
相关标签/搜索