dubbo 线程池

在dubbo调用过程当中被调用方有两个线程池:io线程池,业务线程池。缓存

这也是dubbo调优的点。性能

配置信息:url

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />
Dispatcher
  • all 全部消息都派发到线程池,包括请求,响应,链接事件,断开事件,心跳等。
  • direct 全部消息都不派发到线程池,所有在 IO 线程上直接执行。
  • message 只有请求响应消息派发到线程池,其它链接断开事件,心跳等消息,直接在 IO 线程上执行。
  • execution 只请求消息派发到线程池,不含响应,响应和其它链接断开事件,心跳等消息,直接在 IO 线程上执行。
  • connection 在 IO 线程上,将链接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

 

ThreadPool
  • fixed 固定大小线程池,启动时创建线程,不关闭,一直持有。(缺省)
public class FixedThreadPool implements ThreadPool {
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

 

  • cached 缓存线程池,空闲一分钟自动删除,须要时重建。
public class CachedThreadPool implements ThreadPool {
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

 

  • limited 可伸缩线程池,但池中的线程数只会增加不会收缩。只增加不收缩的目的是为了不收缩时忽然来了大流量引发的性能问题。
public class LimitedThreadPool implements ThreadPool {
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}
相关文章
相关标签/搜索