Dubbo学习笔记8:Dubbo的线程模型与线程池策略

Dubbo默认的底层网络通信使用的是Netty,服务提供方NettyServer使用两级线程池,其中 EventLoopGroup(boss) 主要用来接受客户端的连接请求,并把接受的请求分发给 EventLoopGroup(worker) 来处理,boss和worker线程组咱们称之为IO线程。数据库

若是服务提供方的逻辑能迅速完成,而且不会发起新的IO请求,那么直接在IO线程上处理会更快,由于这减小了线程池调度。缓存

但若是处理逻辑很慢,或者须要发起新的IO请求,好比须要查询数据库,则IO线程必须派发请求到新的线程池进行处理,不然IO线程会阻塞,将致使不能接收其它请求。网络

Dubbo提供的线程模型app

根据请求的消息类被IO线程处理仍是被业务线程池处理,Dubbo提供了下面几种线程模型:oop

  •  all : (AllDispatcher类)全部消息都派发到业务线程池,这些消息包括请求/响应/链接事件/断开事件/心跳等,这些线程模型以下图:

  • direct : (DirectDispacher类)全部消息都不派发到业务线程池,所有在IO线程上直接执行,模型以下图:

  • message : (MessageOnlyDispatcher类)只有请求响应消息派发到业务线程池,其余链接断开事件/心跳等消息,直接在IO线程上执行,模型图以下:

  • execution:(ExecutionDispatcher类)只把请求类消息派发到业务线程池处理,可是响应和其它链接断开事件,心跳等消息直接在IO线程上执行,模型以下图:

  • connection:(ConnectionOrderedDispatcher类)在IO线程上,将链接断开事件放入队列,有序逐个执行,其它消息派发到业务线程池处理,模型以下图:

其中AllDispatcher对应的handler代码以下:性能

public class AllChannelHandler extends WrappedChannelHandler{
    public AllChannelHandler(ChannelHandler handler , URL url){
        super(handler,url);
    }

    // 连接事件,交给业务线程池处理
    public void connected(Channel channel) throws RemotingExcecption{
        ExecutorService cexecutor = getExecutorService();
        try{
            cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.CONNECTED));
        }catch(Throwable t){
            throw new ExecutionException("connect event" , channel , getClass() + "error when process connected event.",t);
        }
    }

    // 连接断开事件,交给业务线程池处理
    public void disconnected(Channel channel) throws RemotingException{
        ExecutorService cexecutor = getExecutorService();
        try{
            cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.DISCONNECTED));
        }catch(Throwable t){
            throw new ExecutionException("disconnect event",channel,getClass()+" error when process disconnected event.",t);
        }
    }

    // 请求响应事件,交给业务线程池处理
    public void received(Channel channel , Object message) throws RemotingException{        
        ExecutorService cexecutor = getExecutorService();
        try{
            cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.RECEIVED,message));
        }catch(Throwable t){
            // TODO 临时解决线程池满后异常信息没法发送到对端的问题。待重构
            // fix 线程池满了拒绝调用不返回,致使消费者一直等待超时
            if(message instanceof Request && t instanceof RejectedExecutionException ){
                ...
            }
            throw new ExecutionException(message , channel ,getClass() + " error when process received event.",t);
        }
    }

    // 异常处理事件,交给业务线程池处理
    public void caught(Channel channel , Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try{
            cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.CAUGHT,exception));
        }catch(Throwable t){
            throw new ExecutionException("caught event",channel,getClass() + " error when process caught event .");
        }
    }
    ...
}

可知全部事件都直接交给业务线程池进行处理了。url

Dubbo提供了经常使用的线程池模型,这些模型能够知足咱们绝大多数的需求,可是您能够根据本身的须要进行扩展定制。在服务提供者启动线程时,咱们会看到何时加载的线程模型的实现。spa

Dubbo提供的线程池策略线程

扩展接口 ThreadPool 的SPI实现有以下几种:3d

  • fixed:固定大小线程池,启动时创建线程,不关闭,一直持有(缺省)。
  • cached:缓存线程池,空闲一分钟自动删除,须要时重建。
  • limited:可伸缩线程池,但池中的线程数只会增加不会收缩。只增加不收缩的目的是为了不收缩时忽然带来大流量引发性能问题。

其中fixed策略对应扩展实现类是FixedThreadPool,代码以下:

public class FixedThreadPool implements ThreadPool{
    public Executor getExecutor(URL url){
        String name = url.getParameter(Constants.THREAD_NAME_KEY,Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY,Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY,Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads , threads , 0 , TimeUnit.MILLISECONDS , queues==0 ? new SynchronousQueue<Runnable>() : (queue < 0 ? new LinkedBlockingQueue<Runnable>(queues)) , new NamedThreadFactory(name,true) , new AbortPolicyWithReport(name,url));
    }
}

可知使用ThreadPoolExecutor建立了核心线程数=最大线程池数=threads的线程池。

Dubbo线程池扩展,这些扩展能够知足咱们绝大多数的需求,可是您能够根据本身的须要进行扩展定制。在服务提供者启动流程时,咱们会看到何时加载的线程池扩展实现。  

相关文章
相关标签/搜索