分析源码,学会正确使用 Java 线程池

本文做者:oschina_2020javascript

在平常的开发工做当中,线程池每每承载着一个应用中最重要的业务逻辑,所以咱们有必要更多地去关注线程池的执行状况,包括异常的处理和分析等。本文主要聚焦在如何正确使用线程池上,以及提供一些实用的建议。文中会稍微涉及到一些线程池实现原理方面的知识,可是不会过多展开。java

线程池的异常处理

UncaughtExceptionHandler数据库

咱们都知道Runnable接口中的run方法是不容许抛出异常的,所以派生出这个线程的主线程可能没法直接得到该线程在执行过程当中的异常信息。以下例:数组

public static void main(String[] args) throws Exception {
        Thread thread = new Thread(() -> {
            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
            System.out.println(1 / 0); // 这行会致使报错!
        });
        thread.setUncaughtExceptionHandler((t, e) -> {
            e.printStackTrace(); //若是你把这一行注释掉,这个程序将不会抛出任何异常.
        });
        thread.start();
    }

为何会这样呢?其实咱们看一下Thread中的源码就会发现,Thread在执行过程当中若是遇到了异常,会先判断当前线程是否有设置UncaughtExceptionHandler,若是没有,则会从线程所在的ThreadGroup中获取。多线程

注意:每一个线程都有本身的ThreadGroup,即便你没有指定,而且它实现了UncaughtExceptionHandler接口。并发

咱们看下ThreadGroup中默认的对UncaughtExceptionHandler接口的实现:ide

public void uncaughtException(Thread t, Throwable e) {
        if (parent != null) {
            parent.uncaughtException(t, e);
        } else {
            Thread.UncaughtExceptionHandler ueh =
                Thread.getDefaultUncaughtExceptionHandler();
            if (ueh != null) {
                ueh.uncaughtException(t, e);
            } else if (!(e instanceof ThreadDeath)) {
                System.err.print("Exception in thread \""
                                 + t.getName() + "\" ");
                e.printStackTrace(System.err);
            }
        }
    }

这个ThreadGroup若是有父ThreadGroup,则调用父ThreadGroup的uncaughtException,不然调用全局默认的Thread.DefaultUncaughtExceptionHandler,若是全局的handler也没有设置,则只是简单地将异常信息定位到System.err中,这就是为何咱们应当在建立线程的时候,去实现它的UncaughtExceptionHandler接口的缘由,这么作可让你更方便地去排查问题。高并发

经过execute提交任务给线程池oop

回到线程池这个话题,若是咱们向线程池提交的任务中,没有对异常进行try...catch处理,而且运行的时候出现了异常,那会对线程池形成什么影响呢?答案是没有影响,线程池依旧能够正常工做,可是异常却被吞掉了。这一般来讲不是一个好事情,由于咱们须要拿到原始的异常对象去分析问题。性能

那么怎样才能拿到原始的异常对象呢?咱们从线程池的源码着手开始研究这个问题。固然网上关于线程池的源码解析文章有不少,这里限于篇幅,直接给出最相关的部分代码:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

这个方法就是真正去执行提交给线程池的任务的代码。

这里咱们略去其中不相关的逻辑,重点关注第19行到第32行的逻辑,其中第23行是真正开始执行提交给线程池的任务,那么第20行是干什么的呢?其实就是在执行提交给线程池的任务以前能够作一些前置工做,一样的,咱们看到第31行,这个是在执行完提交的任务以后,能够作一些后置工做。

beforeExecute这个咱们暂且无论,重点关注下afterExecute这个方法。咱们能够看到,在执行任务过程当中,一旦抛出任何类型的异常,都会提交给afterExecute这个方法,然而查看线程池的源代码咱们能够发现,默认的afterExecute是个空实现,所以,咱们有必要继承ThreadPoolExecutor去实现这个afterExecute方法。

看源码咱们能够发现这个afterExecute方法是protected类型的,从官方注释上也能够看到,这个方法就是推荐子类去实现的。

固然,这个方法不能随意去实现,须要遵循必定的步骤,具体的官方注释也有讲,这里摘抄以下:

*  <pre> {@code
	 * class ExtendedExecutor extends ThreadPoolExecutor {
	 *   // ...
	 *   protected void afterExecute(Runnable r, Throwable t) {
	 *     super.afterExecute(r, t);
	 *     if (t == null && r instanceof Future<?>) {
	 *       try {
	 *         Object result = ((Future<?>) r).get();
	 *       } catch (CancellationException ce) {
	 *           t = ce;
	 *       } catch (ExecutionException ee) {
	 *           t = ee.getCause();
	 *       } catch (InterruptedException ie) {
	 *           Thread.currentThread().interrupt(); // ignore/reset
	 *       }
	 *     }
	 *     if (t != null)
	 *       System.out.println(t);
	 *   }
	 * }}</pre>

那么经过这种方式,就能够将原先可能被线程池吞掉的异常成功捕获到,从而便于排查问题。

可是这里还有个小问题,咱们注意到在runWorker方法中,执行task.run();语句以后,各类类型的异常都被抛出了,那这些被抛出的异常去了哪里?事实上这里的异常对象最终会被传入到Thread的dispatchUncaughtException方法中,源码以下:

private void dispatchUncaughtException(Throwable e) {
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }

能够看到它会去获取UncaughtExceptionHandler的实现类,而后调用其中的uncaughtException方法,这也就回到了咱们上一小节所分析的UncaughtExceptionHandler实现的具体逻辑。那么为了拿到最原始的异常对象,除了实现UncaughtExceptionHandler接口以外,也能够考虑实现afterExecute方法。

经过submit提交任务到线程池

这个一样很简单,咱们仍是先回到submit方法的源码:

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

这里的execute方法调用的是ThreadPoolExecutor中的execute方法,执行逻辑跟经过execute提交任务到线程池是同样的。咱们先重点关注这里的newTaskFor方法,其源码以下:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

能够看到提交的Callable对象用FutureTask封装起来了。咱们知道最终会执行到上述runWorker这个方法中,而且最核心的执行逻辑就是task.run();这行代码。咱们知道这里的task实际上是FutureTask类型,所以咱们有必要看一下FutureTask中的run方法的实现:

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

能够看到这其中跟异常相关的最关键的代码就在第17行,也就是setException(ex);这个地方。咱们看一下这个地方的实现:

protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

这里最关键的地方就是将异常对象赋值给了outcome,outcome是FutureTask中的成员变量,咱们经过调用submit方法,拿到一个Future对象以后,再调用它的get方法,其中最核心的方法就是report方法,下面给出每一个方法的源码:

首先是get方法:

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

能够看到最终调用了report方法,其源码以下:

private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

上面是一些状态判断,若是当前任务不是正常执行完毕,或者被取消的话,那么这里的x其实就是原始的异常对象,能够看到会被ExecutionException包装。所以在你调用get方法时,可能会抛出ExecutionException异常,那么调用它的getCause方法就能够拿到最原始的异常对象了。

综上所述,针对提交给线程池的任务可能会抛出异常这一问题,主要有如下两种处理思路:

  1. 在提交的任务当中自行try...catch,但这里有个很差的地方就是若是你会提交多种类型的任务到线程池中,每种类型的任务都须要自行将异常try...catch住,比较繁琐。并且若是你只是catch(Exception e),可能依然会漏掉一些包括Error类型的异常,那为了保险起见,能够考虑catch(Throwable t)。
  2. 自行实现线程池的afterExecute方法,或者实现Thread的UncaughtExceptionHandler接口。

下面给出我我的建立线程池的一个示例,供你们参考:

BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE);
    statisticsThreadPool = new ThreadPoolExecutor(DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE,
            60, TimeUnit.SECONDS, queue, new ThreadFactoryBuilder()
            .setThreadFactory(new ThreadFactory() {
                private int count = 0;
                private String prefix = "StatisticsTask";

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, prefix + "-" + count++);
                }
            }).setUncaughtExceptionHandler((t, e) -> {
                String threadName = t.getName();
                logger.error("statisticsThreadPool error occurred! threadName: {}, error msg: {}", threadName, e.getMessage(), e);
            }).build(), (r, executor) -> {
        if (!executor.isShutdown()) {
            logger.warn("statisticsThreadPool is too busy! waiting to insert task to queue! ");
            Uninterruptibles.putUninterruptibly(executor.getQueue(), r);
        }
    }) {
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            if (t == null && r instanceof Future<?>) {
                try {
                    Future<?> future = (Future<?>) r;
                    future.get();
                } catch (CancellationException ce) {
                    t = ce;
                } catch (ExecutionException ee) {
                    t = ee.getCause();
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
            if (t != null) {
                logger.error("statisticsThreadPool error msg: {}", t.getMessage(), t);
            }
        }
    };
    statisticsThreadPool.prestartAllCoreThreads();

线程数的设置

咱们知道任务通常有两种:CPU密集型和IO密集型。那么面对CPU密集型的任务,线程数不宜过多,通常选择CPU核心数+1或者核心数的2倍是比较合理的一个值。所以咱们能够考虑将corePoolSize设置为CPU核心数+1,maxPoolSize设置为核心数的2倍。

一样的,面对IO密集型任务时,咱们能够考虑以核心数乘以4倍做为核心线程数,而后核心数乘以5倍做为最大线程数的方式去设置线程数,这样的设置会比直接拍脑壳设置一个值会更合理一些。

固然总的线程数不宜过多,控制在100个线程之内比较合理,不然线程数过多可能会致使频繁地上下文切换,致使系统性能反不如前。

如何正确关闭一个线程池

说到如何正确去关闭一个线程池,这里面也有点讲究。为了实现优雅停机的目标,咱们应当先调用shutdown方法,调用这个方法也就意味着,这个线程池不会再接收任何新的任务,可是已经提交的任务还会继续执行,包括队列中的。因此,以后你还应当调用awaitTermination方法,这个方法能够设定线程池在关闭以前的最大超时时间,若是在超时时间结束以前线程池可以正常关闭,这个方法会返回true,不然,一旦超时,就会返回false。一般来讲咱们不可能无限制地等待下去,所以须要咱们事先预估一个合理的超时时间,而后去使用这个方法。

若是awaitTermination方法返回false,你又但愿尽量在线程池关闭以后再作其余资源回收工做,能够考虑再调用一下shutdownNow方法,此时队列中全部还没有被处理的任务都会被丢弃,同时会设置线程池中每一个线程的中断标志位。shutdownNow并不保证必定可让正在运行的线程中止工做,除非提交给线程的任务可以正确响应中断。到了这一步,能够考虑继续调用awaitTermination方法,或者直接放弃,去作接下来要作的事情。

线程池中的其余有用方法

你们可能有留意到,我在建立线程池的时候,还调用了这个方法:prestartAllCoreThreads。这个方法有什么做用呢?咱们知道一个线程池建立出来以后,在没有给它提交任何任务以前,这个线程池中的线程数为0。有时候咱们事先知道会有不少任务会提交给这个线程池,可是等它一个个去建立新线程开销太大,影响系统性能,所以能够考虑在建立线程池的时候就将全部的核心线程所有一次性建立完毕,这样系统起来以后就能够直接使用了。

其实线程池中还提供了其余一些比较有意思的方法。好比咱们如今设想一个场景,当一个线程池负载很高,快要撑爆致使触发拒绝策略时,有没有什么办法能够缓解这一问题?实际上是有的,由于线程池提供了设置核心线程数和最大线程数的方法,它们分别是setCorePoolSize方法setMaximumPoolSize方法。是的,线程池建立完毕以后也是能够更改其线程数的!所以,面对线程池高负荷运行的状况,咱们能够这么处理:

  1. 起一个定时轮询线程(守护类型),定时检测线程池中的线程数,具体来讲就是调用getActiveCount方法。
  2. 当发现线程数超过了核心线程数大小时,能够考虑将CorePoolSize和MaximumPoolSize的数值同时乘以2,固然这里不建议设置很大的线程数,由于并非线程越多越好的,能够考虑设置一个上限值,好比50、100之类的。
  3. 同时,去获取队列中的任务数,具体来讲是调用getQueue方法再调用size方法。当队列中的任务数少于队列大小的二分之一时,咱们能够认为如今线程池的负载没有那么高了,所以能够考虑在线程池先前有扩容过的状况下,将CorePoolSize和MaximumPoolSize还原回去,也就是除以2。

具体来讲以下图:

以上是我我的建议的一种使用线程池的方式。

线程池必定是最佳方案吗?

线程池并不是在任何状况下都是性能最优的方案。若是是一个追求极致性能的场景,能够考虑使用Disruptor,这是一个高性能队列。排除Disruptor不谈,单纯基于JDK的话会不会有更好的方案?答案是有的。

咱们知道在一个线程池中,多个线程是共用一个队列的,所以在任务不少的状况下,须要对这个队列进行频繁读写,为了防止冲突所以须要加锁。事实上在阅读线程池源代码的时候就能够发现,里面充斥着各类加锁的代码,那有没有更好的实现方式呢?

其实咱们能够考虑建立一个由单线程线程池构成的列表,每一个线程池都使用有界队列这种方式去实现多线程。这么作的好处是,每一个线程池中的队列都只会被一个线程去操做,这样就没有竞争的问题。

其实这种用空间换时间的思路借鉴了Netty中EventLoop的实现机制。试想,若是线程池的性能真的有那么好,为何Netty不用呢?

其余须要注意的地方

  1. 任何状况下都不该该使用可伸缩线程池(线程的建立和销毁开销是很大的)。
  2. 任何状况下都不该该使用无界队列,单测除外。有界队列经常使用的有ArrayBlockingQueue和LinkedBlockingQueue,前者基于数组实现,后者基于链表。从性能表现上来看,LinkedBlockingQueue的吞吐量更高可是性能并不稳定,实际状况下应当使用哪种建议自行测试以后决定。顺便说一句,Executors的newFixedThreadPool采用的是LinkedBlockingQueue。
  3. 推荐自行实现RejectedExecutionHandler,JDK自带的都不是很好用,你能够在里面实现本身的逻辑。若是须要一些特定的上下文信息,能够在Runnable实现类中添加一些本身的东西,这样在RejectedExecutionHandler中就能够直接使用了。

怎样作到不丢任务

这里其实指的是一种特殊状况,就是好比忽然遇到了一股流量尖峰,致使线程池负载已经很是高了,即快要触发拒绝策略的时候,咱们能够怎么作来尽可能防止提交的任务丢失。通常来讲当遇到这种状况的时候,应当尽快触发报警通知研发人员来处理。以后无论是限流也好,仍是增长机器也好,甚至是上Kafka、Redis甚至是数据库用来暂存任务数据也是能够的,但毕竟远水救不了近火,若是咱们但愿在正式解决这个问题以前,先尽量地缓解,能够考虑怎么作呢?

首先能够考虑的就是我前面提到的动态增大线程池中的线程数,可是假如已经扩容过了,此时不该继续扩容,不然可能致使系统的吞吐量更低。在这种状况下,应当自行实现RejectedExecutionHandler,具体来讲就是在实现类中,单独开一个单线程的线程池,而后调用原线程池的getQueue方法的put方法,将塞不进去的任务再次尝试塞进去。固然在队列满的时候是塞不进去的,但那至少也只是阻塞了这个单独的线程而已,并不影响主流程。

固然,这种方案是治标不治本的,面对流量激增这种场景其实业界有不少成熟的作法,只是单纯从线程池的角度来看的话,这种方式不失为一种临时有效的解决方案。

做者简介

吕亚东,某风控领域互联网公司技术专家,主要关注高性能,高并发以及中间件底层原理和调优等领域。email:523144643@qq.com

原文连接地址:https://developer.baidu.com/topic/show/290668

相关文章
相关标签/搜索