最近看完了ElasticSearch线程池模块的源码,感触颇深,而后也自不量力地借鉴ES的 EsThreadPoolExecutor 从新造了一把轮子(源码在这里),对线程池的理解又加深了一些。在继承 ThreadPoolExecutor实现自定义的线程池时,ES先重写了Runnable接口,提供了更灵活的任务运行过程当中出现异常处理逻辑。简而言之,它采用回调机制实现了线程在运行过程当中抛出未受检异常的统一处理逻辑,很是优美。实在忍不住把源码copy下来:html
/** * An extension to runnable. */ public abstract class AbstractRunnable implements Runnable { /** * Should the runnable force its execution in case it gets rejected? */ public boolean isForceExecution() { return false; } @Override public final void run() { try { doRun(); } catch (Exception t) { onFailure(t); } finally { onAfter(); } } /** * This method is called in a finally block after successful execution * or on a rejection. */ public void onAfter() { // nothing by default } /** * This method is invoked for all exception thrown by {@link #doRun()} */ public abstract void onFailure(Exception e); /** * This should be executed if the thread-pool executing this action rejected the execution. * The default implementation forwards to {@link #onFailure(Exception)} */ public void onRejection(Exception e) { onFailure(e); } /** * This method has the same semantics as {@link Runnable#run()} * @throws InterruptedException if the run method throws an InterruptedException */ protected abstract void doRun() throws Exception; }
统一的任务执行入口方法doRun(),由各个子类实现doRun()执行具体的业务逻辑java
try-catch中统一处理线程执行任务过程当中抛出的异常,由onFailure()处理git
任务执行完成(不论是正常结束仍是运行过程当中抛出了异常),统一由onAfter()处理程序员
isForceExecution
方法,用来支持任务在提交给线程池被拒绝了,强制执行。固然了,这须要线程池的任务队列提供相关的支持。我也是受这种方式的启发,实现了一个线程在执行任务过程当中抛出未受检异常时,先判断该任务是否容许强制执行isForceExecution,而后再从新提交任务运行的线程池。github
此外,ES内置了好几个默认实现的线程池,好比 EsThreadPoolExecutor 、QueueResizingEsThreadPoolExecutor 和 PrioritizedEsThreadPoolExecutor。apache
QueueResizingEsThreadPoolExecutorless
在建立线程池时会指定一个任务队列(BlockingQueue),日常都是直接用 LinkedBlockingQueue,它是一个无界队列,固然也能够在构造方法中指定队列的长度。可是,ES中几乎不用 LinkedBlockingQueue 做为任务队列,而是使用 LinkedTransferQueue ,可是 LinkedTransferQueue 又是一个无界队列,因而ES又基于LinkedTransferQueue 封装了一个任务队列,类名称为 ResizableBlockingQueue,它可以限制任务队列的长度。elasticsearch
那么问题来了,对于一个线程池,任务队列设置为多长合适呢?ide
答案就是Little's Law。在QueueResizingEsThreadPoolExecutor 线程池中重写了afterExecute()方法,里面统计了每一个任务的运行时间、等待时间(入队列到执行)。因此,你想知道如何统计一个任务的运行时间吗?你想统计线程池一共提交了多少个任务,全部任务的运行时间吗?看看QueueResizingEsThreadPoolExecutor 源码就明白了。性能
另外再提一个问题,为何ES用 LinkedTransferQueue 做为任务队列而不用 LinkedBlockingQueue 呢?
我想:很重要的一个缘由是LinkedBlockingQueue 是基于重量级的锁(ReentrantLock)实现的入队操做,而LinkedTransferQueue 是基于CAS原子指令实现的入队操做。LinkedBlockingQueue#offer()当队列长度达到最大值,此时不能提交任务给队列了,直接返回false,不然经过加锁方式将任务提交给队列。LinkedTransferQueue自己是无界的,所以添加任务到LinkedTransferQueue时,经过CAS实现避免了加锁带来的上下文开销的切换,在大部分竞争状况下,是会提高性能的。
PrioritizedEsThreadPoolExecutor
优先级任务的线程池,任务提交给线程池后是在任务队列里面排队,FIFO模式。而这个线程池则容许任务定义一个优先级,优先级高的任务先执行。
EsThreadPoolExecutor
这个线程池很是像JDK里面的ThreadPoolExecutor,不过,它实现了一些拒绝处理逻辑,提交任务若被拒绝(会抛出EsRejectedExecutionException异常),则进行相关处理
@Override public void execute(final Runnable command) { doExecute(wrapRunnable(command)); } protected void doExecute(final Runnable command) { try { super.execute(command); } catch (EsRejectedExecutionException ex) { if (command instanceof AbstractRunnable) { // If we are an abstract runnable we can handle the rejection // directly and don't need to rethrow it. try { ((AbstractRunnable) command).onRejection(ex); } finally { ((AbstractRunnable) command).onAfter(); } } else { throw ex; } } }
讲完了ES中经常使用的三个线程池实现,还想结合JDK源码,记录一下线程在执行任务过程当中抛出运行时异常,是如何处理的。我以为有二种方式(或者说有2个地方)来处理运行时异常。一种方式是:java.util.concurrent.ThreadPoolExecutor#afterExecute方法,另外一种方式是:java.lang.Thread.UncaughtExceptionHandler#uncaughtException
afterExecute
看ThreadPoolExecutor#afterExecute(Runnable r, Throwable t) 的源码注释:
Method invoked upon completion of execution of the given Runnable.This method is invoked by the thread that executed the task. If non-null, the Throwable is the uncaught RuntimeException or Error that caused execution to terminate abruptly.
提交给线程池的任务,执行完(不论是正常结束,仍是执行过程当中出现了异常)后都会自动调用afterExecute()方法。若是执行过程当中出现了异常,那么Throwable t 就不为null,而且致使执行终止(terminate abruptly.)。
This implementation does nothing, but may be customized in subclasses. Note: To properly nest multiple overridings, subclasses should generally invoke super.afterExecute at the beginning of this method.
默认的afterExecute(Runnable r, Throwable t) 方法是一个空实现,什么也没有。所以,在继承ThreadPoolExecutor实现本身的线程池时,若是重写该方法,则要记住:先调用 super.afterExecute
好比说这样干:
@Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t != null) { //出现了异常 if (r instanceof AbstractRunnable && ((AbstractRunnable)r).isForceExecution()) { //AbstractRunnable 设置为强制执行时从新拉起任务 execute(r); logger.error("AbstractRunnable task run time error:{}, restarted", t.getMessage()); } } }
看,重写afterExecute方法,当 Throwable 不为null时,代表线程执行任务过程当中出现了异常,这时就从新提交任务。
有个时候,在实现 Kafka 消费者线程的时候(while true循环),常常由于解析消息出错致使线程抛出异常,就会致使 Kafka消费者线程挂掉,这样就永久丢失了一个消费者了。而经过这种方式,当消费者线程挂了时,可从新拉起一个新任务。
uncaughtException
建立 ThreadPoolExecutor时,要传入ThreadFactory 做为参数,在而建立ThreadFactory 对象时,就能够设置线程的异常处理器java.lang.Thread.UncaughtExceptionHandler。
在用Google Guava包的时候,通常这么干:
//先 new Thread.UncaughtExceptionHandler对象 exceptionHandler private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread_name-%d").setUncaughtExceptionHandler(exceptionHandler).build();
在线程执行任务过程当中,若是抛出了异常,就会由JVM调用 Thread.UncaughtExceptionHandler 中实现的异常处理逻辑。看Thread.UncaughtExceptionHandler的JDK源码注释:
Interface for handlers invoked when a Thread abruptly. terminates due to an uncaught exception.
When a thread is about to terminate due to an uncaught exception the Java Virtual Machine will query the thread for its UncaughtExceptionHandler using getUncaughtExceptionHandler and will invoke the handler's uncaughtException method, passing the thread and the exception as arguments.
其大意就是:若是线程在执行Runnable任务过程由于 uncaught exception 而终止了,那么 JVM 就会调用getUncaughtExceptionHandler 方法查找是否设置了异常处理器,若是设置了,那就就会调用异常处理器的java.lang.Thread.UncaughtExceptionHandler#uncaughtException方法,这样咱们就能够在这个方法里面定义异常处理逻辑了。
ES的ThreadPool 模块是学习线程池的很是好的一个示例,实践出真知。它告诉你如何自定义线程池(用什么任务队列?cpu核数、任务队列长度等参数如何配置?)。在实现自定义任务队列过程当中,也进一步理解了CAS操做的原理,如何巧妙地使用CAS?是失败重试呢?仍是直接返回?。我想,这也是CAS与synchronized锁、ReentrantLock锁的一个最重要应用区别:多个线程在竞执行 synchronized锁 或者 ReentrantLock锁 锁住的代码(术语叫临界区)时,未抢到锁的进程会被挂起,会伴随上下文切换,而若能够把临界区中的代码逻辑基于CAS原子指令来实现,若是某个线程执行CAS操做失败了,它能够选择继续重试,仍是执行其它的处理逻辑,仍是sleep若干毫秒。所以,它把线程执行的主动权交回给了程序员。好比基于CAS实现自增操做,失败时继续重试(这里自增操做逻辑自己要求"失败重试直到加1成功"),直到加1成功,代码是这样的:
do{ v = value.get(); }while(v!=value.compareAndSwap(v,v+1));
有个时候,代码里面CAS失败,并不必定就须要当即重试,由于,CAS失败了,意味着此时有其余线程也在竞争,说明资源的竞争较激烈,那咱们是否是能够先 sleep 一下再重试呢?这样是否是更好?
线程在执行Runnable任务过程当中抛出了异常如何处理?这里提到了Thread.UncaughtExceptionHandler#uncaughtException 和 ThreadPoolExecutor#afterExecute。前者是由JVM自动调用的,后者则是在每一个任务执行结束后都会被调用。
Thread.UncaughtExceptionHandler#uncaughtException 和 RejectedExecutionHandler#rejectedExecution 是不一样的。RejectedExecutionHandler 用来处理任务在提交的时候,被线程池拒绝了,该怎么办的问题,默认是AbortPolicy,即:直接丢弃。
Lucene 源码 org.apache.lucene.util.CloseableThreadLocal 解决了使用JDK ThreadLocal 时 JAVA对象 长期驻留内存得不到及时清除的问题,也值得好好分析一番 :) 原文:https://www.cnblogs.com/hapjin/p/10617702.html