简介: 《Java开发手册》中强调,线程资源必须经过线程池提供,而建立线程池必须使用ThreadPoolExecutor。手册主要强调利用线程池避免两个问题,一是线程过渡切换,二是避免请求过多时形成OOM。可是若是参数配置错误,仍是会引起上面的两个问题。因此本节咱们主要是讨论ThreadPoolExecutor的一些技术细节,而且给出几个经常使用的最佳实践建议。缓存
做者 | 风楼
来源 | 阿里技术公众号服务器
《Java开发手册》中强调,线程资源必须经过线程池提供,而建立线程池必须使用ThreadPoolExecutor。手册主要强调利用线程池避免两个问题,一是线程过渡切换,二是避免请求过多时形成OOM。可是若是参数配置错误,仍是会引起上面的两个问题。因此本节咱们主要是讨论ThreadPoolExecutor的一些技术细节,而且给出几个经常使用的最佳实践建议。多线程
我在查找资料的过程当中,发现有些问题存在争议。后面发现,一部分缘由是由于不一样JDK版本的现实是有差别的。所以,下面的分析是基于当下最经常使用的版本JDK1.8,而且对于存在争议的问题,咱们分析源码,源码才是最准确的。并发
这是一个争议点。我发现大部分博文,不管是国内的仍是国外的,都是这样回答这个问题的:框架
按照上面的描述,若是corePoolSize=0,则会判断等待队列的容量,若是还有容量,则排队,而且不会建立新的线程。异步
—— 但其实,这是老版本的实现方式,从1.6以后,实现方式就变了。咱们直接看execute的源码(submit也依赖它),我备注出了关键一行:ide
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); // 注意这一行代码,添加到等待队列成功后,判断当前池内线程数是否为0,若是是则建立一个firstTask为null的worker,这个worker会从等待队列中获取任务并执行。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);
答函数
上述问题需区分JDK版本。在1.6版本以后,若是corePoolSize=0,提交任务时若是线程池为空,则会当即建立一个线程来执行任务(先排队再获取);若是提交任务的时候,线程池不为空,则先在等待队列中排队,只有队列满了才会建立新线程。工具
因此,优化在于,在队列没有满的这段时间内,会有一个线程在消费提交的任务;1.6以前的实现是,必须等队列满了以后,才开始消费。优化
以前有人问过我这个问题,由于他发现应用中有些Bean建立了线程池,可是这个Bean通常状况下用不到,因此咨询我是否须要把这个线程池注释掉,以减小应用运行时的线程数(该应用运行时线程过多。)
答
不会。从上面的源码能够看出,在刚刚建立ThreadPoolExecutor的时候,线程并不会当即启动,而是要等到有任务提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads事先启动核心线程。
这个问题有点tricky。首先咱们要明确一下概念,虽然在JavaDoc中也使用了“core/non-core threads”这样的描述,但其实这是一个动态的概念,JDK并无给一部分线程打上“core”的标记,作什么特殊化的处理。这个问题我认为想要探讨的是闲置线程终结策略的问题。
在JDK1.6以前,线程池会尽可能保持corePoolSize个核心线程,即便这些线程闲置了很长时间。这一点曾被开发者诟病,因此从JDK1.6开始,提供了方法allowsCoreThreadTimeOut,若是传参为true,则容许闲置的核心线程被终止。
请注意这种策略和corePoolSize=0的区别。我总结的区别是:
因此corePoolSize=0的效果,基本等同于allowsCoreThreadTimeOut=true && corePoolSize=1,但实现细节其实不一样。
答
在JDK1.6以后,若是allowsCoreThreadTimeOut=true,核心线程也能够被终止。
首先咱们要明确一下线程池模型。线程池有个内部类Worker,它实现了Runnable接口,首先,它本身要run起来。而后它会在合适的时候获取咱们提交的Runnable任务,而后调用任务的run()接口。一个Worker不终止的话能够不断执行任务。
咱们前面说的“线程池中的线程”,其实就是Worker;等待队列中的元素,是咱们提交的Runnable任务。
每个Worker在建立出来的时候,会调用它自己的run()方法,实现是runWorker(this),这个实现的核心是一个while循环,这个循环不结束,Worker线程就不会终止,就是这个基本逻辑。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 看这里,核心逻辑在这里 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 注意,核心中的核心在这里 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
答
实现方式很是巧妙,核心线程(Worker)即便一直空闲也不终止,是经过workQueue.take()实现的,它会一直阻塞到从等待队列中取到新的任务。非核心线程空闲指定时间后终止是经过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)实现的,一个空闲的Worker只等待keepAliveTime,若是尚未取到任务则循环终止,线程也就运行结束了。
引伸思考
Worker自己就是个线程,它再调用咱们传入的Runnable.run(),会启动一个子线程么?若是你尚未答案,再回想一下Runnable和Thread的关系。
笼统地回答是会占用内存,咱们分析一下占用了哪些内存。首先,比较普通的一部分,一个线程的内存模型:
我想额外强调是下面这几个内存占用,须要当心:
答
线程池保持空闲的核心线程是它的默认配置,通常来说是没有问题的,由于它占用的内存通常不大。怕的就是业务代码中使用ThreadLocal缓存的数据过大又不清理。
若是你的应用线程数处于高位,那么须要观察一下YoungGC的状况,估算一下Eden大小是否足够。若是不够的话,可能要谨慎地建立新线程,而且让空闲的线程终止;必要的时候,可能须要对JVM进行调参。
这也是个争议点。有的博文说等于0表示空闲线程永远不会终止,有的说表示执行完马上终止。还有的说等于-1表示空闲线程永远不会终止。其实稍微看一下源码知道了,这里我直接抛出答案。
答
在JDK1.8中,keepAliveTime=0表示非核心线程执行完马上终止。
默认状况下,keepAliveTime小于0,初始化的时候才会报错;但若是allowsCoreThreadTimeOut,keepAliveTime必须大于0,否则初始化报错。
不少代码的写法,咱们都习惯按照常见范式去编写,而没有去思考为何。好比:
—— 可是在上面,我提到过,submit()底层实现依赖execute(),二者应该统一呀,为何有差别呢?下面再扒一扒submit()的源码,它的实现蛮有意思。
首先,ThreadPoolExecutor中没有submit的代码,而是在它的父类AbstractExecutorService中,有三个submit的重载方法,代码很是简单,关键代码就两行:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
正是由于这三个重载方法,都调用了execute,因此我才说submit底层依赖execute。经过查看这里execute的实现,咱们不难发现,它就是ThreadPoolExecutor中的实现,因此,形成submit和execute的差别化的代码,不在这。那么形成差别的必定在newTaskFor方法中。这个方法也就new了一个FutureTask而已,FutureTask实现RunnableFuture接口,RunnableFuture接口继承Runnable接口和Future接口。而Callable只是FutureTask的一个成员变量。
因此讲到这里,就有另外一个Java基础知识点:Callable和Future的关系。咱们通常用Callable编写任务代码,Future是异步返回对象,经过它的get方法,阻塞式地获取结果。FutureTask的核心代码就是实现了Future接口,也就是get方法的实现:
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 核心代码 s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // 死循环 for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 只有任务的状态是’已完成‘,才会跳出死循环 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
get的核心实现是有个awaitDone方法,这是一个死循环,只有任务的状态是“已完成”,才会跳出死循环;不然会依赖UNSAFE包下的LockSupport.park原语进行阻塞,等待LockSupport.unpark信号量。而这个信号量只有当运行结束得到结果、或者出现异常的状况下,才会发出来。分别对应方法set和setException。这就是异步执行、阻塞获取的原理,扯得有点远了。
回到最初咱们的疑问,为何submit以后,经过get方法能够获取到异常?缘由是FutureTask有一个Object类型的outcome成员变量,用来记录执行结果。这个结果能够是传入的泛型,也能够是Throwable异常:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
// get方法中依赖的,报告执行结果
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
FutureTask的另外一个巧妙的地方就是借用RunnableAdapter内部类,将submit的Runnable封装成Callable。因此就算你submit的是Runnable,同样能够用get获取到异常。
答
答
通常来说,线程池的生命周期跟随服务的生命周期。若是一个服务(Service)中止服务了,那么须要调用shutdown方法进行关闭。因此ExecutorService.shutdown在Java以及一些中间件的源码中,是封装在Service的shutdown方法内的。
若是是Server端不重启就不中止提供服务,我认为是不须要特殊处理的。
答
shutdown => 平缓关闭,等待全部已添加到线程池中的任务执行完再关闭。
shutdownNow => 马上关闭,中止正在执行的任务,并返回队列中未执行的任务。
原本想分析一下二者的源码的,可是发现本文的篇幅已通过长了,源码也贴了很多。感兴趣的朋友本身看一下便可。
答
这里我想着重强调的就是SimpleAsyncTaskExecutor,Spring中使用的@Async注解,底层就是基于SimpleAsyncTaskExecutor去执行任务,只不过它不是线程池,而是每次都新开一个线程。
另外想要强调的是Executor接口。Java初学者容易想固然的觉得Executor结尾的类就是一个线程池,而上面的都是反例。咱们能够在JDK的execute方法上看到这个注释:
/**
最佳实践总结
【强制】使用ThreadPoolExecutor的构造函数声明线程池,避免使用Executors类的 newFixedThreadPool和newCachedThreadPool。
【强制】 建立线程或线程池时请指定有意义的线程名称,方便出错时回溯。即threadFactory参数要构造好。
【建议】建议不一样类别的业务用不一样的线程池。
【建议】CPU密集型任务(N+1):这种任务消耗的主要是CPU资源,能够将线程数设置为N(CPU核心数)+1,比CPU核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它缘由致使的任务暂停而带来的影响。一旦任务暂停,CPU就会处于空闲状态,而在这种状况下多出来的一个线程就能够充分利用CPU的空闲时间。
【建议】I/O密集型任务(2N):这种任务应用起来,系统会用大部分的时间来处理I/O交互,而线程在处理I/O的时间段内不会占用CPU来处理,这时就能够将CPU交出给其它线程使用。所以在I/O密集型任务的应用中,咱们能够多配置一些线程,具体的计算方法是2N。
【建议】workQueue不要使用无界队列,尽可能使用有界队列。避免大量任务等待,形成OOM。
【建议】若是是资源紧张的应用,使用allowsCoreThreadTimeOut能够提升资源利用率。
【建议】虽然使用线程池有多种异常处理的方式,但在任务代码中,使用try-catch最通用,也能给不一样任务的异常处理作精细化。
【建议】对于资源紧张的应用,若是担忧线程池资源使用不当,能够利用ThreadPoolExecutor的API实现简单的监控,而后进行分析和优化。
线程池初始化示例:
private static final ThreadPoolExecutor pool; static { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build(); pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512), threadFactory, new ThreadPoolExecutor.AbortPolicy()); pool.allowCoreThreadTimeOut(true); }
本文为阿里云原创内容,未经容许不得转载。