「Java」记一次使用线程池出现的问题

背景

以前在工做中遇到一个问题,我定义了一个线程池来执行任务,程序执行结束后任务没有所有执行完。mysql

业务场景是这样的:因为统计业务须要,订单信息须要从主库中通过统计业务代码写入统计库。因为代码质量及历史缘由,目前的从新统计接口是单线程的,粗略算了算一共有100万条订单信息,每100条的处理大约是10秒,因此理论上处理彻底部信息须要28个小时,这还不算由于 mysql 中 limit 分页致使的后期查询时间以及可能出现的内存溢出致使停止统计的状况。sql

基于上述的缘由,以及最重要的一点:统计业务是根据订单所属的中心进行的,各个中心同时统计不会致使脏数据。因此,我计划使用线程池,为每个中心分配一条线程去执行统计业务。函数

业务实现

// 线程工厂,用于为线程池中的每条线程命名
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("stats-pool-%d").build();

// 建立线程池,使用有界阻塞队列防止内存溢出
ExecutorService statsThreadPool = new ThreadPoolExecutor(5, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(100), namedThreadFactory);
// 遍历全部中心,为每个centerId提交一条任务到线程池
statsThreadPool.submit(new StatsJob(centerId));

在建立完线程池后,为每个 centerId 提交一条任务到线程池。在个人预想中,因为线程池的核心线程数为5,最多5个中心同时进行统计业务,将大大缩短100万条数据的总统计时间,因而万分兴奋的我开始执行从新统计业务了。ui

问题

在跑了好久以后,当我查看统计进度时,我发现了一个十分诡异的问题(以下图)。蓝框标出的这条线程是 WAIT 状态,代表这条线程是空闲状态,可是从日志中我看到这条线程并无完成它的任务,由于这个中心的数据有10万条,可是日志显示它只跑到了一半,以后就再无关于此中心的日志了。this

2019-12-16-1.JPG

这是什么缘由?spa

调试及缘由

能够想到的是,这条线程由于某些缘由被阻塞了,而且没有继续进行下去,可是日志又没有任何异常信息...线程

可能有经验的工程师已经知道了缘由...调试

因为我的水平的线程,暂时没有找到缘由的我只能放弃使用线程池,乖乖用单线程跑...日志

幸运的是,单线程跑的任务居然抛错了(为何要说幸运?),因而立刻想到,以前那条 WAIT 状态的线程多是由于一样的抛错因此被中断了,致使任务没有继续进行下去。code

为何说幸运?由于若是单线程的任务没有抛错的话,我可能好久都想不到是这个缘由。

深刻探究线程池的异常处理

工做上的问题到这里就找到缘由了,以后的解决过程也十分简单,这里就不提了。

可是疑问又来了,为何使用线程池的时候,线程因异常被中断却没有抛出任何信息呢?还有平时若是是在 main 函数里面的异常也会被抛出来,而不是像线程池这样被吞掉。

若是子线程抛出了异常,线程池会如何进行处理呢?

我提交任务到线程池的方式是: threadPoolExecutor.submit(Runnbale task); ,后面了解到使用 execute() 方式提交任务会把异常日志给打出来,这里研究一下为何使用 submit 提交任务,在任务中的异常会被“吞掉”。

对于 submit() 形式提交的任务,咱们直接看源码:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    // 被包装成 RunnableFuture 对象,而后准备添加到工做队列
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

它会被线程池包装成 RunnableFuture 对象,而最终它实际上是一个 FutureTask 对象,在被添加到线程池的工做队列,而后调用 start() 方法后, FutureTask 对象的 run() 方法开始运行,即本任务开始执行。

public void run() {
    if (state != NEW || !UNSAFE.compareAndSwapObject(this,runnerOffset,null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                // 捕获子任务中的异常
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

在 FutureTask 对象的 run() 方法中,该任务抛出的异常被捕获,而后在setException(ex); 方法中,抛出的异常会被放到 outcome 对象中,这个对象就是 submit() 方法会返回的 FutureTask 对象执行 get() 方法获得的结果。可是在线程池中,并无获取执行子线程的结果,因此异常也就没有被抛出来,即被“吞掉”了。

这就是线程池的 submit() 方法提交任务没有异常抛出的缘由。

线程池自定义异常处理方法

在定义 ThreadFactory 的时候调用setUncaughtExceptionHandler方法,自定义异常处理方法。
例如:

ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("judge-pool-%d")
                .setUncaughtExceptionHandler((thread, throwable)-> logger.error("ThreadPool {} got exception", thread,throwable))
                .build();

这样,对于线程池中每条线程抛出的异常都会打下 error 日志,就不会看不到了。

后续

在修复了单个线程任务的异常以后,我继续使用线程池进行从新统计业务,终于跑完了,也终于完成了这个任务。

小结:使用线程池时须要注意,子线程的异常,若是没有被捕获就会丢失,可能会致使后期根据日志调试时没法找到缘由。

相关文章
相关标签/搜索