最近遇到了一个OOM的问题,提示的是没法建立更多的线程,定位问题,发现是相似下面的这段代码出现了问题,用JConsole监测,发现某一时段线程数量突然飙升,由此引起了下面的思考bash
public class DemoController {
private ExecutorService executorService = Executors.newWorkStealingPool(20);
@RequestMapping("/test")
public String test() {
ExecutorService forkJoinPool = Executors.newWorkStealingPool(10);
CompletableFuture[] completableFutures = new CompletableFuture[600];
for (int i = 0; i < 600; i++) {
int j = i;
completableFutures[i] = CompletableFuture.runAsync(() -> {
getAssociatedInfo(forkJoinPool);
}, forkJoinPool);
}
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures);
voidCompletableFuture.join();
return "OK";
}
public String getAssociatedInfo(ExecutorService service) {
CompletableFuture<String> trialAssociatedInfoCompletableFuture
= CompletableFuture.supplyAsync(() -> {
try {
System.out.println("按理说你已在运行,不是吗");
TimeUnit.SECONDS.sleep(100);
System.out.println("你已经完成了");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
}, executorService);
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(trialAssociatedInfoCompletableFuture);
voidCompletableFuture.join();
return "ok";
}
}
复制代码
completableFutures[i] = CompletableFuture.runAsync(() -> {
getAssociatedInfo(forkJoinPool);
}, forkJoinPool);
复制代码
这一句的做用是启异步任务,交由forkJoinPool线程池管理,当线程池数量不足10个时,启动一个线程,当即执行,当超过10个时,加入任务队列。多线程
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures);
复制代码
allOf的做用是递归地构造完成树,汇总并返回成一个总任务,以下图所示: 并发
// 从多线程的角度,若任务未完成,会阻塞
voidCompletableFuture.join();
return "OK";
CompletableFuture->join():
return reportJoin((r = result) == null ? waitingGet(false) : r);
CompletableFuture->waitingGet():
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
// 当返回任务不为空,循环结束
while ((r = result) == null) {
if (spins < 0)
spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (q == null)
// 实例化一个信号量 --1
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
else if (q.thread != null && result == null) {
try {
// 若迟迟没有返回结果,最终会走到这个方法中,下面是ForkJoinPool对信号量的管理
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
ForkJoinPool->managedBlock():
Thread t = Thread.currentThread();
if ((t instanceof ForkJoinWorkerThread) &&
(p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
WorkQueue w = wt.workQueue;
while (!blocker.isReleasable()) {
//
if (p.tryCompensate(w)) { // --2
try {
do {} while (!blocker.isReleasable() &&
!blocker.block());
} finally {
U.getAndAddLong(p, CTL, AC_UNIT);
}
break;
}
}
}
else {
do {} while (!blocker.isReleasable() &&
!blocker.block());
}
ForkJoinPool->tryCompensate(): // --2
canBlock = add && createWorker(); // throws on exception
复制代码
CompletableFuture->Signaller->Signaller(): // --1
Signaller(boolean interruptible, long nanos, long deadline) {
// thread变量是当前线程
this.thread = Thread.currentThread();
this.interruptControl = interruptible ? 1 : 0;
this.nanos = nanos;
this.deadline = deadline;
}
复制代码
到第一个voidCompletableFuture.join(),该线程是http线程,由forkJoinPool线程池管理,最多10个线程并行,而后到waitingGet(),因为其不是forkJoin线程,所以走的是else方法app
到第二个voidCompletableFuture.join(),该线程是forkJoinPool执行的任务,每个任务都会执行一次getAssociatedInfo方法,由executorService线程池管理,最多20个线程并行,而后到waitingGet(),因为它是forkJoin线程,因此会新建一个线程,帮助执行forkJoinPool线程池里的任务,然而受到executorService线程池数量的制约,即便线程数多了,也不能加快执行,随着愈来愈多getAssociatedInfo方法的Join,致使了线程数量的飙升,又不能即时释放,最终致使了OOM的发生dom
猜测:将http线程的任务与forkJoinPool线程池的任务放在同一线程池,这样每当forkJoinPool线程池新产生一个线程时,都能窃取到任务从而执行,而且随着线程数量的上升,愈来愈多的任务被执行,这样就减小了线程建立的数量。最终的结果果真如此异步