本文首发于我的微信公众号《andyqian》,期待你的关注!面试
以前文章《Java线程池ThreadPoolExecutor》《ThreadPoolExecutor 原理解析》中,分别讲述了ThreadPoolExecutor 的概念以及原理,今天就一块儿来看看其在 Dubbo 框架中的应用。微信
Dubbo 为咱们提供了几种不一样类型的线程池实现,其底层均使用的是 JDK 中的 ThreadPoolExecutor 线程池。ThreadPoolExecutor 咱们都已经很是熟悉,其构造函数中有几个很是重要的参数。其中就包括:拒绝策略( ThreadPoolExecutor.AbortPolicy ) 以及 ThreadFactory,在 Dubbo 中自定义了 ThreadPoolExecutor.AbortPolicy 以及 ThreadFactory。在学习线程池以前,咱们先来看看这二者的实现,更有益于后面的理解。多线程
在Dubbo中NamedInternalThreadFactory 为自定义的线程 ThreadFactory 的子类。其类图以下:框架
其中 NamedInternalThreadFactory 类,其实现以下所示:ide
public class NamedInternalThreadFactory extends NamedThreadFactory { public NamedInternalThreadFactory() { super(); } public NamedInternalThreadFactory(String prefix) { super(prefix, false); } public NamedInternalThreadFactory(String prefix, boolean daemon) { super(prefix, daemon); } @Override public Thread newThread(Runnable runnable) { String name = mPrefix + mThreadNum.getAndIncrement(); InternalThread ret = new InternalThread(mGroup, runnable, name, 0); ret.setDaemon(mDaemon); return ret; } }
其中 NamedThreadFactory 类的实现以下:函数
public class NamedThreadFactory implements ThreadFactory { protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1); protected final AtomicInteger mThreadNum = new AtomicInteger(1); protected final String mPrefix; protected final boolean mDaemon; protected final ThreadGroup mGroup; public NamedThreadFactory() { this("pool-" + POOL_SEQ.getAndIncrement(), false); } public NamedThreadFactory(String prefix) { this(prefix, false); } public NamedThreadFactory(String prefix, boolean daemon) { mPrefix = prefix + "-thread-"; mDaemon = daemon; SecurityManager s = System.getSecurityManager(); mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); } @Override public Thread newThread(Runnable runnable) { String name = mPrefix + mThreadNum.getAndIncrement(); Thread ret = new Thread(mGroup, runnable, name, 0); ret.setDaemon(mDaemon); return ret; } public ThreadGroup getThreadGroup() { return mGroup; }
到这里,上述代码描述的是Dubbo对线程池中线程的命名规则,其做用是为了方便追踪信息。工具
接下来,咱们来看下拒绝策略 AbortPolicyWithReport 类的实现,其类图以下所示:学习
源码以下:ui
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class); private final String threadName; private final URL url; private static volatile long lastPrintTime = 0; private static final long TEN_MINUTES_MILLS = 10 * 60 * 1000; private static final String OS_WIN_PREFIX = "win"; private static final String OS_NAME_KEY = "os.name"; private static final String WIN_DATETIME_FORMAT = "yyyy-MM-dd_HH-mm-ss"; private static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd_HH:mm:ss"; private static Semaphore guard = new Semaphore(1); public AbortPolicyWithReport(String threadName, URL url) { this.threadName = threadName; this.url = url; } // 覆盖 父类 ThreadPoolExecutor.AbortPolicy 的 rejectedExecution 方法。 @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 构造 warn 参数,其中包括:线程状态,线程池数量,活跃数量,核心线程池数量,最大线程池数量 等信息。 String msg = String.format("Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: " + "%d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); // dump 堆栈信息 dumpJStack(); throw new RejectedExecutionException(msg); } // 当执行 rejectedExecution 方法时,会执行该方法。将会 dump 堆栈信息 至 DUMP_DIRECTORY 目录,默认为:user.name 目录下。 private void dumpJStack() { long now = System.currentTimeMillis(); //dump every 10 minutes if (now - lastPrintTime < TEN_MINUTES_MILLS) { return; } if (!guard.tryAcquire()) { return; } ExecutorService pool = Executors.newSingleThreadExecutor(); pool.execute(() -> { String dumpPath = url.getParameter(DUMP_DIRECTORY, System.getProperty("user.home")); SimpleDateFormat sdf; String os = System.getProperty(OS_NAME_KEY).toLowerCase(); // window system don't support ":" in file name if (os.contains(OS_WIN_PREFIX)) { sdf = new SimpleDateFormat(WIN_DATETIME_FORMAT); } else { sdf = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT); } String dateStr = sdf.format(new Date()); //try-with-resources try (FileOutputStream jStackStream = new FileOutputStream( new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) { // 工具类,此处实现省略,有兴趣的能够查看。 JVMUtil.jstack(jStackStream); } catch (Throwable t) { logger.error("dump jStack error", t); } finally { guard.release(); } lastPrintTime = System.currentTimeMillis(); }); //must shutdown thread pool ,if not will lead to OOM pool.shutdown(); } }
上面的代码不难,都是打日志,dump 堆栈信息,其目的就是:用于在线程池被打满时,也就是记录执行AbortPolicy时现场信息,主要是便于后期的分析与问题排查。this
上面讲述了Dubbo线程池中自定义的 ThreadFactory 类 以及 AbortPolicyWithReport 类。接下来,咱们继续讲解 Dubbo 提供的不一样线程池实现,其类图以下所示:
1. LimitedThreadPool 线程池
源码以下:
public class LimitedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS); int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS); int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }
其中:
THREAD_NAME_KEY 值为:threadname ,表示为:线程名,其默认值为:Dubbo。
CORE_THREADS_KEY 值为:corethreads,表示:核心线程池数量,其默认值为:0。
THREADS_KEY 值为:threads 表示:最大线程数,默认值为:200。
QUEUES_KEY 值为:queues 表示:阻塞队列大小,默认值为:0。
备注:
该线程池中的 cores,threads 参数由外部制定,其中 keepAliveTime 值为:Long.MAX_VALUE,TimeUnit 为 TimeUnit.MILLISECONDS (毫秒)。(意味着线程池中的全部线程永不过时,理论上大于Long.MAX_VALUE 即会过时,由于其足够大,这里能够看为是永不过时 )。
此处使用了三目运算符:
当 queues = 0 时,BlockingQueue为SynchronousQueue。
当 queues < 0 时,则构造一个新的LinkedBlockingQueue。
当 queues > 0 时,构造一个指定元素的LinkedBlockingQueue。
queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)
该线程池的特色是:能够建立若干个线程,其默认值为 200,线程池中的线程生命周期很是长,甚至能够看作是永不过时。
2. CachedThreadPool 线程池
源码:
public class CachedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS); int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE); int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE); return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
其中:
THREAD_NAME_KEY 值为:threadname , 表示为:线程名,其默认值为:Dubbo。
CORE_THREADS_KEY 值为:corethreads,表示为:核心线程池数量,其默认值为:0。
THREADS_KEY 值为:threads,表示:最大线程数,默认值为:Integer.MAX_VALUE。
QUEUES_KEY 值为:queues,表示:阻塞队列大小,默认值为:0。
ALIVE_KEY 值为:alive, 表示: keepAliveTime 表示线程池中线程的存活时间,其默认值为:60 * 1000 (毫秒) 也就是一分钟。
该线程池的特色是:可建立无限多线程(在操做系统的限制下,会远远低于Integer.MAX_VALUE值,这里视为无限大),其线程的最大存活时间默认为 1 分钟。意味着能够建立无限多线程,可是线程的生命周期默认较短!
3. FixedThreadPool 线程池
public class FixedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS); int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }
其中:
THREADS_KEY 值为:threads,表示:最大线程数,默认值为:200。
QUEUES_KEY 值为:queues,表示:阻塞队列大小,默认值为:0。
corePoolSize,maximumPoolSize 的线程数量均为:threads,(也就意味着核心线程数等于最大线程数)。
keepAliveTime 的默认值为0,当线程数大于corePoolSize 时,多余的空闲线程会当即终止。
该线程池的特色是:该线程池中corePoolSize 数量 与 maxinumPoolSize 数量一致,当提交的任务大于核心线程池时,则会将其放入到LinkedBlockingQueue队列中等待执行,也是Dubbo中默认使用的线程池。
4. EagerThreadPool 线程池
源码:
public class EagerThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS); int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE); int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE); // init queue and executor TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues); EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, taskQueue, new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); taskQueue.setExecutor(executor); return executor; } }
备注:
该线程池与上面的线程池实现方式有些不同,上面是直接使用了ThreadPoolExecutor 类的构造函数。在该线程池实现中,首先构造了一个自定义的 EagerThreadPoolExecutor 线程池,其底层实现也是基于 ThreadPoolExecutor 类的,其代码以下所示:
public class EagerThreadPoolExecutor extends ThreadPoolExecutor { /** * task count */ private final AtomicInteger submittedTaskCount = new AtomicInteger(0); public EagerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, TaskQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } /** * @return current tasks which are executed */ public int getSubmittedTaskCount() { return submittedTaskCount.get(); } @Override protected void afterExecute(Runnable r, Throwable t) { // 执行任务数依次递减 submittedTaskCount.decrementAndGet(); } @Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } // do not increment in method beforeExecute! // 提交任务书 依次递加。 submittedTaskCount.incrementAndGet(); try { // 调用父类方法执行线程任务 super.execute(command); } catch (RejectedExecutionException rx) { // 将任务从新添加到队列中 final TaskQueue queue = (TaskQueue) super.getQueue(); try { //若是添加失败,则减小任务数,并抛出异常。 if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full.", rx); } } catch (InterruptedException x) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(x); } } catch (Throwable t) { // decrease any way submittedTaskCount.decrementAndGet(); throw t; } }
在这里咱们发现,在 EagerThreadPoolExecutor 类中,重载了父类ThreadPoolExecutor 类的几个方法,分别以下:afterExecute,execute方法。分别加入 submittedTaskCount 属性进行任务的统计,当父类的execute方法抛出 RejectedExecutionExcetion 异常时,则会将任务从新放入队列中执行,其TaskQueue代码以下:
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> { private static final long serialVersionUID = -2635853580887179627L; private EagerThreadPoolExecutor executor; public TaskQueue(int capacity) { super(capacity); } public void setExecutor(EagerThreadPoolExecutor exec) { executor = exec; } @Override public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException("The task queue does not have executor!"); } int currentPoolThreadSize = executor.getPoolSize(); // have free worker. put task into queue to let the worker deal with task. // 当提交的任务数,小于 当前线程时,则之间调用父类的offer 方法。 if (executor.getSubmittedTaskCount() < currentPoolThreadSize) { return super.offer(runnable); } // return false to let executor create new worker. // 当当前线程数大小小于,最大线程数时,则直接返回false,建立worker。 if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; } // currentPoolThreadSize >= max return super.offer(runnable); } /** * retry offer task * * @param o task * @return offer success or not * @throws RejectedExecutionException if executor is terminated. */ public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if (executor.isShutdown()) { throw new RejectedExecutionException("Executor is shutdown!"); } return super.offer(o, timeout, unit); } }
该线程池的特色是:能够从新将拒绝掉的task,从新添加的work queue中执行。至关于有一个重试机制!
经过上面的分析,我相信你们对Dubbo中线程池应该有所了解。若是还有不清楚的地方,能够经过debug的方式进行跟踪分析。其实在不少的开源框架中,都有自定义的线程池,但其底层最终使用的仍是 ThreadPoolExecutor 线程池,这个知识点建议你们必定要掌握,不管是实际工做仍是面试,都是一个经常使用的知识点。
相关阅读: