对最近遇到的业务应用dubbo线程池爆满(异常:RejectedExecutionException:Thread pool is EXHAUSTED)问题进行了分析。java
1、问题回顾:spring
业务应用dubbo配置以下:express
<dubbo:protocol name="dubbo" port="${dubbo.port}" /}
在dubbo的spring配置中,业务应用并无配置threadpool, threads等参数,查看dubbo.io用户文档,可知:默认配置为:apache
<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />
查看dubbo代码(版本为2.5.3)实现:FixedThreadPool.java缓存
/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.common.threadpool.support.fixed; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.threadpool.ThreadPool; import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport; import com.alibaba.dubbo.common.utils.NamedThreadFactory; /** * 此线程池启动时即建立固定大小的线程数,不作任何伸缩,来源于:<code>Executors.newFixedThreadPool()</code> * * @see java.util.concurrent.Executors#newFixedThreadPool(int) * @author william.liangf */ public class FixedThreadPool implements ThreadPool { public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
其中:安全
public class Constants { // the other constant // ...... public static final String DEFAULT_THREAD_NAME = "Dubbo"; public static final int DEFAULT_THREADS = 200; public static final int DEFAULT_QUEUES = 0; }
由代码可知:dubbo的线程池采用jdk的ThreadPoolExecutor,默认threads数为200,默认队列长度为0,此时默认采用了SynchronousQueue队列,而若是用户配置的队列长度大于0时,则会采用LinkedBlockingQueue队列。【注意:因而可知,dubbo.io用户文档中,默认threads=100是错误的,实际为200】app
SynchronousQueue:是一个缓存为1的阻塞队列,某次添加元素后必须等待其余线程取走后才能继续添加。less
LinkedBlockingQueue:线程安全的基于链表的阻塞队列,实现了入队出队的分离(分别采用putLock和takeLock锁,所以能够同时入队和出队操做)位于java.util.concurrent包下,是一个无界队列。ide
ThreadPoolExecutor的完整构造方法的签名是:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) . corePoolSize - 池中所保存的线程数,包括空闲线程。 maximumPoolSize-池中容许的最大线程数。 keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 unit - keepAliveTime 参数的时间单位。 workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 Runnable任务。 threadFactory - 执行程序建立新线程时使用的工厂。 handler - 因为超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。 ThreadPoolExecutor是Executors类的底层实现。
dubbo默认建立固定大小的线程池(200), 每次提交一个任务就建立一个线程,直到线程数达到线程池大小200,线程池的大小一旦达到最大值就保持不变。若是某个线程由于执行异常而结束,那么线程池会补充一个新的线程。ui
因为dubbo默认采用了直接提交的SynchronousQueue工做队列,因此,全部的task会直接提交给线程池中的某一worker线程,若是没有可用线程,那么会拒绝任务的处理,而抛出异常RejectedExecutionException:Thread pool is EXHAUSTED.
/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.common.threadpool.support; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; /** * Abort Policy. * Log warn info when abort. * * @author ding.lid */ public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class); private final String threadName; private final URL url; public AbortPolicyWithReport(String threadName, URL url) { this.threadName = threadName; this.url = url; } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg = String.format("Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!" , threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); throw new RejectedExecutionException(msg); } }
所以,若是系统抛出异常RejectedExecutionException:Thread pool is EXHAUSTED.能够经过调大dubbo线程池解决该问题,或者下降对TPS的预期。
<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="500" />
此处修改了dubbo线程池为500,以处理更多的任务。