合理利用线程池可以带来三个好处。java
第一:下降资源消耗。经过重复利用已建立的线程下降线程建立和销毁形成的消耗。git
第二:提升响应速度。当任务到达时,任务能够不须要等到线程建立就能当即执行。github
第三:提升线程的可管理性。线程是稀缺资源,若是无限制的建立,不只会消耗系统资源,还会下降系统的稳定性,使用线程池能够进行统一的分配,调优和监控。可是要作到合理的利用线程池,必须对其原理了如执掌。数据库
咱们能够经过ThreadPoolExecutor来建立一个线程池数组
1 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);
建立一个线程池须要输入几个参数:缓存
咱们可使用execute提交的任务,可是execute方法没有返回值,因此没法判断任务是否被线程池执行成功。经过如下代码可知execute方法输入的任务是一个Runnable类的实例。服务器
1 threadsPool.execute(new Runnable() { 2 @Override 3 public void run() { 4 // TODO Auto-generated method stub 5 } 6 });
咱们也可使用submit 方法来提交任务,它会返回一个future,那么咱们能够经过这个future来判断任务是否执行成功,经过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后当即返回,这时有可能任务没有执行完。并发
1 Future<Object> future = executor.submit(harReturnValuetask); 2 try { 3 Object s = future.get(); 4 } catch (InterruptedException e) { 5 // 处理中断异常 6 } catch (ExecutionException e) { 7 // 处理没法执行任务异常 8 } finally { 9 // 关闭线程池 10 executor.shutdown(); 11 }
咱们能够经过调用线程池的shutdown或shutdownNow方法来关闭线程池,它们的原理是遍历线程池中的工做线程,而后逐个调用线程的interrupt方法来中断线程,因此没法响应中断的任务可能永远没法终止。可是它们存在必定的区别,shutdownNow首先将线程池的状态设置成STOP,而后尝试中止全部的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,而后中断全部没有正在执行任务的线程。ide
只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当全部的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于咱们应该调用哪种方法来关闭线程池,应该由提交到线程池的任务特性决定,一般调用shutdown来关闭线程池,若是任务不必定要执行完,则能够调用shutdownNow。源码分析
流程分析: 线程池的主要工做流程以下图:
从上图咱们能够看出,当提交一个新任务到线程池时,线程池的处理流程以下:
源码分析。上面的流程分析让咱们很直观的了解了线程池的工做原理,让咱们再经过源代码来看看是如何实现的。线程池执行任务的方法以下:
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 //若是线程数小于基本线程数,则建立线程并执行当前任务 5 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { 6 //如线程数大于等于基本线程数或线程建立失败,则将当前任务放到工做队列中。 7 if (runState == RUNNING && workQueue.offer(command)) { 8 if (runState != RUNNING || poolSize == 0) 9 ensureQueuedTaskHandled(command); 10 } 11 //若是线程池不处于运行中或任务没法放入队列,而且当前线程数量小于最大容许的线程数量, 12 则建立一个线程执行任务。 13 else if (!addIfUnderMaximumPoolSize(command)) 14 //抛出RejectedExecutionException异常 15 reject(command); // is shutdown or saturated 16 } 17 }
工做线程。线程池建立线程时,会将线程封装成工做线程Worker,Worker在执行完任务后,还会无限循环获取工做队列里的任务来执行。咱们能够从Worker的run方法里看到这点:
1 public void run() { 2 try { 3 Runnable task = firstTask; 4 firstTask = null; 5 while (task != null || (task = getTask()) != null) { 6 runTask(task); 7 task = null; 8 } 9 } finally { 10 workerDone(this); 11 } 12 }
要想合理的配置线程池,就必须首先分析任务特性,能够从如下几个角度来进行分析:
任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
任务的优先级:高,中和低。
任务的执行时间:长,中和短。
任务的依赖性:是否依赖其余系统资源,如数据库链接。
任务性质不一样的任务能够用不一样规模的线程池分开处理。CPU密集型任务配置尽量小的线程,如配置Ncpu+1个线程的线程池。IO密集型任务则因为线程并非一直在执行任务,则配置尽量多的线程,如2*Ncpu。混合型的任务,若是能够拆分,则将其拆分红一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,若是这两个任务执行时间相差太大,则不必进行分解。咱们能够经过Runtime.getRuntime().availableProcessors()方法得到当前设备的CPU个数。
优先级不一样的任务可使用优先级队列PriorityBlockingQueue来处理。它可让优先级高的任务先获得执行,须要注意的是若是一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
执行时间不一样的任务能够交给不一样规模的线程池来处理,或者也可使用优先级队列,让执行时间短的任务先执行。
依赖数据库链接池的任务,由于线程提交SQL后须要等待数据库返回结果,若是等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
建议使用有界队列,有界队列能增长系统的稳定性和预警能力,能够根据须要设大一点,好比几千。有一次咱们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,经过排查发现是数据库出现了问题,致使执行SQL变得很是缓慢,由于后台任务线程池里的任务全是须要向数据库查询和插入数据的,因此致使线程池里的工做线程所有阻塞住,任务积压在线程池里。若是当时咱们设置成无界队列,线程池的队列就会愈来愈多,有可能会撑满内存,致使整个系统不可用,而不仅是后台任务出现问题。固然咱们的系统全部的任务是用的单独的服务器部署的,而咱们使用不一样规模的线程池跑不一样类型的任务,可是出现这样问题时也会影响到其余任务。
经过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可使用
经过扩展线程池进行监控。经过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,咱们能够在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。以下:
1 protected void beforeExecute(Thread t, Runnable r) { }
在dubbo-common 模块的threadpool包下体现,以下图所示:
com.alibaba.dubbo.common.threadpool.ThreadPool ,线程池接口。代码以下:
1 //@SPI("fixed")注解,Dubbo SPI扩展点,默认为"fixed"。 2 @SPI("fixed") 3 public interface ThreadPool { 4 /** 5 * @Adaptive({Constants.THREADPOOL_KEY}) 注解,基于Dubbo SPI Adaptive机制,加载对应的线程池实现,使用URL.threadpool属性。 6 * getExecutor(url)方法,得到对应的线程池的执行器 7 * 8 */ 9 @Adaptive({Constants.THREADPOOL_KEY}) 10 Executor getExecutor(URL url); 11 12 }
com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool ,实现ThreadPool接口,固定大小线程池,启动时创建线程,不关闭,一直持有。代码以下:
1 public class FixedThreadPool implements ThreadPool { 2 3 @Override 4 public Executor getExecutor(URL url) { 5 //线程名 6 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); 7 //线程数 8 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); 9 //队列数 10 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); 11 //建立执行器 12 return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 13 /** 14 * 根据不一样的队列数,使用不一样的队列实现: 15 * queues == 0,SynchronousQueue对象。 16 * queues < 0,LinkedBlockingQueue对象。 17 * queues > 0,带队列数的LinkedBlockingQueue对象。 18 */ 19 queues == 0 ? new SynchronousQueue<Runnable>() : 20 (queues < 0 ? new LinkedBlockingQueue<Runnable>() 21 : new LinkedBlockingQueue<Runnable>(queues)), 22 /** 23 * 建立NamedThreadFactory对象,用于生成线程名 24 * 建立AbortPolicyWithReport对象,用于当任务添加到线程池中被拒绝时。 25 */ 26 new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); 27 } 28 }
推荐阅读:
com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool ,实现ThreadPool接口,缓存线程池,空闲必定时长,自动删除,须要时重建。代码以下:
1 public class CachedThreadPool implements ThreadPool { 2 3 @Override 4 public Executor getExecutor(URL url) { 5 //线程池名 6 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); 7 //核心线程数 8 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); 9 //最大线程数 10 int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); 11 //队列数 12 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); 13 //线程存活时长 14 int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); 15 //建立执行器 16 return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, 17 queues == 0 ? new SynchronousQueue<Runnable>() : 18 (queues < 0 ? new LinkedBlockingQueue<Runnable>() 19 : new LinkedBlockingQueue<Runnable>(queues)), 20 new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); 21 } 22 }
com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool ,实现ThreadPool接口,可伸缩线程池,但池中的线程池只会增加不会收缩。只增加不收缩的目的是为了不收缩时忽然来了大流量引发的性能问题。代码以下:
1 public class LimitedThreadPool implements ThreadPool { 2 3 @Override 4 public Executor getExecutor(URL url) { 5 //线程池名 6 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); 7 //核心线程数 8 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); 9 //最大线程数 10 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); 11 //队列数 12 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); 13 /** 14 * 和CachedThreadPool实现是基本一致的,差别点在alive == Integer.MAX_VALUE,空闲时间无限大,即不会删除。 15 */ 16 return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, 17 queues == 0 ? new SynchronousQueue<Runnable>() : 18 (queues < 0 ? new LinkedBlockingQueue<Runnable>() 19 : new LinkedBlockingQueue<Runnable>(queues)), 20 new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); 21 } 22 23 }
com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport ,实现 java.util.concurrent.ThreadPoolExecutor.AbortPolicy,拒绝策略实现类。打印JStack,分析线程状态 代码以下:
1 /** 2 * AbortPolicyWithReport实现自ThreadPoolExecutor.AbortPolicy,拒绝策略实现类, 3 * 打印JStack,分析线程状态。 4 */ 5 public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { 6 7 8 protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class); 9 /** 10 * 线程名 11 */ 12 private final String threadName; 13 14 /** 15 * URL 对象 16 */ 17 private final URL url; 18 19 /** 20 * 最后打印时间 21 */ 22 private static volatile long lastPrintTime = 0; 23 24 /** 25 * 信号量,大小为1。 26 */ 27 private static Semaphore guard = new Semaphore(1); 28 29 public AbortPolicyWithReport(String threadName, URL url) { 30 this.threadName = threadName; 31 this.url = url; 32 } 33 34 @Override 35 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 36 /** 37 * 打印告警日志 38 */ 39 String msg = String.format("Thread pool is EXHAUSTED!" + 40 " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + 41 " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", 42 threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), 43 e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), 44 url.getProtocol(), url.getIp(), url.getPort()); 45 logger.warn(msg); 46 // 打印 JStack,分析线程状态。 47 dumpJStack(); 48 //抛出 RejectedExecutionException 异常 49 throw new RejectedExecutionException(msg); 50 } 51 52 private void dumpJStack() { 53 long now = System.currentTimeMillis(); 54 //每 10 分钟,打印一次。 55 //dump every 10 minutes 56 if (now - lastPrintTime < 10 * 60 * 1000) { 57 return; 58 } 59 //得到信号量 60 if (!guard.tryAcquire()) { 61 return; 62 } 63 //建立线程池,后台执行打印JStack 64 Executors.newSingleThreadExecutor().execute(new Runnable() { 65 @Override 66 public void run() { 67 //得到路径 68 String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home")); 69 70 SimpleDateFormat sdf; 71 //得到系统 72 String OS = System.getProperty("os.name").toLowerCase(); 73 74 // window system don't support ":" in file name 75 if(OS.contains("win")){ 76 sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); 77 }else { 78 sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss"); 79 } 80 81 String dateStr = sdf.format(new Date()); 82 //得到输出流 83 FileOutputStream jstackStream = null; 84 try { 85 jstackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr)); 86 //打印JStack 87 JVMUtil.jstack(jstackStream); 88 } catch (Throwable t) { 89 logger.error("dump jstack error", t); 90 } finally { 91 //释放信号量 92 guard.release(); 93 //释放输出流 94 if (jstackStream != null) { 95 try { 96 jstackStream.flush(); 97 jstackStream.close(); 98 } catch (IOException e) { 99 } 100 } 101 } 102 //记录最后打印时间 103 lastPrintTime = System.currentTimeMillis(); 104 } 105 }); 106 107 } 108 109 }
推荐阅读: