使用无界队列的线程池会致使内存飙升吗?面试官常常会问这个问题,本文将基于源码,去分析newFixedThreadPool线程池致使的内存飙升问题,但愿能加深你们的理解。java
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executor.execute(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
//do nothing
}
});
}
复制代码
IDE指定JVM参数:-Xmx8m -Xms8m : node
run以上代码,会抛出OOM: 面试
以上的实例代码,就一个newFixedThreadPool和一个execute方法。首先,咱们先来看一下newFixedThreadPool方法的源码算法
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
该段源码以及结合线程池特色,咱们能够知道newFixedThreadPool:编程
线程池特色了解不是很清楚的朋友,能够看我这篇文章,面试必备:Java线程池解析bash
接下来,咱们再来看看线程池执行方法execute的源码。并发
execute的源码以及相关解释以下:函数
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //步骤一:判断当前正在工做的线程是否比核心线程数量小
if (addWorker(command, true)) // 以核心线程的身份,添加到工做集合
return;
c = ctl.get();
}
//步骤二:不知足步骤一,线程池还在RUNNING状态,阻塞队列也没满的状况下,把执行任务添加到阻塞队列workQueue。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//来个double check ,检查线程池是否忽然被关闭
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//步骤三:若是阻塞队列也满了,执行任务以非核心线程的身份,添加到工做集合
else if (!addWorker(command, false))
reject(command);
}
复制代码
纵观以上代码,咱们能够发现就addWorker 以及workQueue.offer(command) 可能在建立对象。那咱们先分析addWorker方法。oop
addWorker源码以及相关解释以下源码分析
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//获取当前线程池的状态
int rs = runStateOf(c);
//若是线程池状态是STOP,TIDYING,TERMINATED状态的话,则会返回false。
// 若是如今状态是SHUTDOWN,可是firstTask不为空或者workQueue为空的话,那么直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//自旋
for (;;) {
//获取当前工做线程的数量
int wc = workerCountOf(c);
//判断线程数量是否符合要求,若是要建立的是核心工做线程,判断当前工做线程数量是否已经超过coreSize,
// 若是要建立的是非核心线程,判断当前工做线程数量是否超过maximumPoolSize,是的话就返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//若是线程数量符合要求,就经过CAS算法,将WorkerCount加1,成功就跳出retry自旋
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
retry inner loop
}
}
//线程启动标志
boolean workerStarted = false;
//线程添加进集合workers标志
boolean workerAdded = false;
Worker w = null;
try {
//由(Runnable 构造Worker对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//获取线程池的重入锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池状态
int rs = runStateOf(ctl.get());
//若是状态知足,将Worker对象添加到workers集合
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//启动Worker中的线程开始执行任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//线程启动失败,执行addWorkerFailed方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
复制代码
addWorker执行流程
大概就是判断线程池状态是否OK,若是OK,在判断当前工做中的线程数量是否知足(小于coreSize/maximumPoolSize),若是不知足,不添加,若是知足,就将执行任务添加到工做集合workers,,并启动执行该线程。
再看一下workers的类型:
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
复制代码
workers是一个HashSet集合,它由coreSize/maximumPoolSize控制着,那么addWorker方法会致使OOM?结合实例代码demo,coreSize=maximumPoolSize=10,若是超过10,不会再添加到workers了,因此它不是致使newFixedThreadPool内存飙升的缘由。那么,问题应该就在于workQueue.offer(command) 方法了。为了让整个流程清晰,咱们画一下execute执行的流程图。
根据以上execute以及addWork源码分析,咱们把流程图画出来:
看完execute的执行流程,我猜想,内存飙升问题就是workQueue塞满了。接下来,进行阻塞队列源码分析,揭开内存飙升问题的神秘面纱。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
复制代码
LinkedBlockingQueue无参构造函数,默认构造Integer.MAX_VALUE(那么大) 的链表,看到这里,你回想一下execute流程,是否是阻塞队列一直不会满了,这队列来者不拒,把全部阻塞任务收于麾下。。。是否是内存飙升问题水落石出啦。
线程池中,插入队列用了offer方法,咱们来看一下阻塞队列LinkedBlockingQueue的offer骚操做吧
public boolean offer(E e) {
//为空元素则抛出空指针异常
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//如采当前队列满则丢弃将要放入的元素, 而后返回false
if (count.get() == capacity)
return false;
int c = -1;
//构造新节点,获取putLock独占锁
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//如采队列不满则进队列,并递增元素计数
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
//新元素入队后队列还有空闲空间,则
唤醒 notFull 的条件队列中一条阻塞线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//释放锁
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
复制代码
offer操做向队列尾部插入一个元素,若是队列中有空闲则插入成功后返回 true,若是队列己满 则丢弃当前元素而后返回 false。 若是 e 元素为 null 则抛出 Nul!PointerException 异常。另外, 该方法是非阻塞的。
newFixedThreadPool线程池的核心线程数是固定的,它使用了近乎于无界的LinkedBlockingQueue阻塞队列。当核心线程用完后,任务会入队到阻塞队列,若是任务执行的时间比较长,没有释放,会致使愈来愈多的任务堆积到阻塞队列,最后致使机器的内存使用不停的飙升,形成JVM OOM。