血的教训之背景:使用线程池对存量数据进行迁移,可是总有一批数据迁移失败,无异常日志打印java
据说parallelStream
并行流是个好东西,因为平常开发stream
串行流的场景比较多,此次须要写迁移程序恰好能够用得上,那还不赶忙拿来装*一下,此时不装更待什么时候。机智的我还知道在 JVM 的后台,使用通用的 fork/join 池来完成上述功能,该池是全部并行流共享的,默认状况,fork/join 池会为每一个处理器分配一个线程,对应的变通方案就是建立本身的线程池如并发
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.submit(() -> {
list.parallelStream().collect(Collectors.toList());
});
复制代码
因而地雷就是从这里埋下的。异步
public static void main(String[] args) throws InterruptedException, ExecutionException {
final ExecutorService pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
List<Integer> list = Lists.newArrayList(1, 2, 3, null);
//1.使用submit
pool.submit(() -> {
list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
});
TimeUnit.SECONDS.sleep(3);
//2.使用 execute
pool.execute(() -> {
list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
});
//3.使用submit,调用get()
pool.submit(() -> {
list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
}).get();
TimeUnit.SECONDS.sleep(3);
}
复制代码
读者自行跑一下上面的用例,会发现单独使用submit
方法的并不会打印出错误日志,而使用execute
方法打印出了错误日志,可是对submit
返回的FutureJoinTask
调用get()
方法,又会抛出异常。因而真相大白,部分批次中的数据存在脏数据,为null值,遍历到该null值的时候出现了异常,可是异常日志在submit
方法中给catch住,没有打印出来(心痛的感受),而被捕获的异常,被包装在返回的结果类FutureJoinTask
中,并无再次抛出。工具
submit
方法 结论先行,我犯的错误就是,浅显的认为submit
和execute
的区别就只是一个有返回异步结果,一个没有返回一步结果,可是事实是残酷的。在submit()
中逻辑必定包含了将异步任务抛出的异常捕获,而由于使用方法不当而致使该异常没有再次抛出。post
如今提出一个问题,ForkJoinPool#submit()
中返回的ForkJoinTask
能够获取异步任务的结果,现这个异步抛出了异常,咱们尝试获取该任务的结果会是如何? 咱们直接看ForkJoinTask#get()
的源码。this
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
doJoin() : externalInterruptibleAwaitDone();
Throwable ex;
if ((s &= DONE_MASK) == CANCELLED)
throw new CancellationException();
//这里能够直接看到,异步任务出现异常会在调用get()获取结果的时候,会被包装成ExecutionException再次抛出
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
throw new ExecutionException(ex);
return getRawResult();
}
复制代码
异步任务出现异常会在调用get()获取结果的时候,会被包装成ExecutionException
再次抛出,可是异常是在哪里被捕获的呢?万变不离其宗,全部线程的线程都须要重写Thread#run()
方法, 投递到ForkJoinPool
的线程会被包装成ForkJoinWorkerThread
,所以咱们看一下ForkJoinWorkerThread#run()
的实现.spa
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
//出现异常,捕获,再次抛出会在调用ForkJoinTask#get()的时候
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
复制代码
上面的分析是基于ForkJoinPool
的,是否是全部的线程池的submit
和execute
方法的实现都是相似这样,咱们经常使用的线程池ThreadPoolThread
实现会是怎样的,一样的思路,咱们须要找到投递到ThreadPoolThread
的异步任务最终被包装为哪一个Thread
的子类或者是实现java.lang.Runnable#run
,答案就是java.util.concurrent.FutureTask
线程
public void run() {
...
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
//捕获异常
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
}
....
}
复制代码
java.util.concurrent.ExecutorService#submit(java.lang.Runnable)
为什么线程池会有这种设定,实际上咱们的思路不该该局限于线程池,而是放在获取异步任务结果,异常是否也是属于异步结果,FutureTask
做为JDK提供的并发工具类的实现中,已经给出了很好的答案,即获取异步任务结果,异常也是属于异步结果,若是异步任务出现运行时异常,那么在获取该任务的结果时,该异常会被从新包装抛出日志
做者:plz叫我红领巾code
本博客欢迎转载,但未经做者赞成必须保留此段声明,且在文章页面明显位置给出原文链接,不然保留追究法律责任的权利。码字不易,您的点赞是我写做的最大动力