源码角度分析-newFixedThreadPool线程池致使的内存飙升问题

前言

使用无界队列的线程池会致使内存飙升吗?面试官常常会问这个问题,本文将基于源码,去分析newFixedThreadPool线程池致使的内存飙升问题,但愿能加深你们的理解。java

内存飙升问题复现

实例代码

ExecutorService executor = Executors.newFixedThreadPool(10);
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            executor.execute(() -> {
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    //do nothing
                }
            });
        }
复制代码

配置Jvm参数

IDE指定JVM参数:-Xmx8m -Xms8m : node

执行结果

run以上代码,会抛出OOM: 面试

JVM OOM问题通常是 建立太多对象,同时 GC 垃圾来不及回收致使的,那么什么缘由 致使线程池的OOM呢?带着发现新大陆的心情,咱们从源码角度分析这个问题,去找找实例代码中哪里创了太多对象。

线程池源码分析

以上的实例代码,就一个newFixedThreadPool和一个execute方法。首先,咱们先来看一下newFixedThreadPool方法的源码算法

newFixedThreadPool源码

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
复制代码

该段源码以及结合线程池特色,咱们能够知道newFixedThreadPool编程

  • 核心线程数coreSize和最大线程数maximumPoolSize大小同样,都是nThreads。
  • 空闲时间为0,即keepAliveTime为0
  • 阻塞队列为无参构造的LinkedBlockingQueue

线程池特色了解不是很清楚的朋友,能够看我这篇文章,面试必备:Java线程池解析bash

接下来,咱们再来看看线程池执行方法execute的源码。并发

线程池执行方法execute的源码

execute的源码以及相关解释以下:函数

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {   //步骤一:判断当前正在工做的线程是否比核心线程数量小
            if (addWorker(command, true))    // 以核心线程的身份,添加到工做集合
                return;
            c = ctl.get();
        }
         //步骤二:不知足步骤一,线程池还在RUNNING状态,阻塞队列也没满的状况下,把执行任务添加到阻塞队列workQueue。
        if (isRunning(c) && workQueue.offer(command)) {  
            int recheck = ctl.get();
            //来个double check ,检查线程池是否忽然被关闭
            if (! isRunning(recheck) && remove(command))  
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //步骤三:若是阻塞队列也满了,执行任务以非核心线程的身份,添加到工做集合
        else if (!addWorker(command, false))
            reject(command);
    }
复制代码

纵观以上代码,咱们能够发现就addWorker 以及workQueue.offer(command) 可能在建立对象。那咱们先分析addWorker方法。oop

addWorker源码分析

addWorker源码以及相关解释以下源码分析

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            //获取当前线程池的状态
            int rs = runStateOf(c);

            //若是线程池状态是STOP,TIDYING,TERMINATED状态的话,则会返回false。
            // 若是如今状态是SHUTDOWN,可是firstTask不为空或者workQueue为空的话,那么直接返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
           //自旋
            for (;;) {
                //获取当前工做线程的数量
                int wc = workerCountOf(c);
                //判断线程数量是否符合要求,若是要建立的是核心工做线程,判断当前工做线程数量是否已经超过coreSize,
               // 若是要建立的是非核心线程,判断当前工做线程数量是否超过maximumPoolSize,是的话就返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               //若是线程数量符合要求,就经过CAS算法,将WorkerCount加1,成功就跳出retry自旋
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                  retry inner loop
            }
        }
        //线程启动标志
        boolean workerStarted = false;
        //线程添加进集合workers标志
        boolean workerAdded = false;
        Worker w = null;
        try {
            //由(Runnable 构造Worker对象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //获取线程池的重入锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                   //获取线程池状态
                    int rs = runStateOf(ctl.get());
                    //若是状态知足,将Worker对象添加到workers集合
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
               //启动Worker中的线程开始执行任务
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //线程启动失败,执行addWorkerFailed方法
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
复制代码

addWorker执行流程

大概就是判断线程池状态是否OK,若是OK,在判断当前工做中的线程数量是否知足(小于coreSize/maximumPoolSize),若是不知足,不添加,若是知足,就将执行任务添加到工做集合workers,,并启动执行该线程。

再看一下workers的类型:

/**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();
复制代码

workers是一个HashSet集合,它由coreSize/maximumPoolSize控制着,那么addWorker方法会致使OOM?结合实例代码demo,coreSize=maximumPoolSize=10,若是超过10,不会再添加到workers了,因此它不是致使newFixedThreadPool内存飙升的缘由。那么,问题应该就在于workQueue.offer(command) 方法了。为了让整个流程清晰,咱们画一下execute执行的流程图。

线程池执行方法execute的流程

根据以上execute以及addWork源码分析,咱们把流程图画出来:

  • 提交一个任务command,线程池里存活的核心线程数小于线程数corePoolSize时,调用addWorker方法,线程池会建立一个核心线程去处理提交的任务。
  • 若是线程池核心线程数已满,即线程数已经等于corePoolSize,一个新提交的任务,会被放进任务队列workQueue排队等待执行。
  • 当线程池里面存活的线程数已经等于corePoolSize了,而且任务队列workQueue也满,判断线程数是否达到maximumPoolSize,即最大线程数是否已满,若是没到达,建立一个非核心线程执行提交的任务。
  • 若是当前的线程数达到了maximumPoolSize,还有新的任务过来的话,直接采用拒绝策略处理 。

看完execute的执行流程,我猜想,内存飙升问题就是workQueue塞满了。接下来,进行阻塞队列源码分析,揭开内存飙升问题的神秘面纱。

阻塞队列源码分析

回到newFixedThreadPool构造函数,发现阻塞队列就是LinkedBlockingQueue,并且是个 无参的LinkedBlockingQueue队列。OK,那咱们直接分析LinkedBlockingQueue源码。

LinkedBlockingQueue类图

由类图能够看到:

  • LinkedBlockingQueue 是使用单向链表实现的,其有两个 Node,分别用来存放首、尾节点, 而且还有一个初始值为 0 的原子变量 count,用来记录 队列元素个数。
  • 另外还有两个 ReentrantLock 的实例,分别用来控制元素入队和出队的原 子性,其中 takeLock 用来控制同时只有一个线程能够从队列头获取元素,其余线程必须 等待, putLock 控制同时只能有一个线程能够获取锁,在队列尾部添加元素,其余线程必 须等待。
  • 另外, notEmpty 和 notFull 是条件变量,它们内部都有一个条件队列用来存放进 队和出队时被阻塞的线程,其实这是生产者一消费者模型。

LinkedBlockingQueue无参构造函数

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
复制代码

LinkedBlockingQueue无参构造函数,默认构造Integer.MAX_VALUE(那么大) 的链表,看到这里,你回想一下execute流程,是否是阻塞队列一直不会满了,这队列来者不拒,把全部阻塞任务收于麾下。。。是否是内存飙升问题水落石出啦。

LinkedBlockingQueue的offer函数

线程池中,插入队列用了offer方法,咱们来看一下阻塞队列LinkedBlockingQueue的offer骚操做吧

public boolean offer(E e) {
        //为空元素则抛出空指针异常
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        //如采当前队列满则丢弃将要放入的元素, 而后返回false 
        if (count.get() == capacity)
            return false;
        int c = -1;
        //构造新节点,获取putLock独占锁
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            //如采队列不满则进队列,并递增元素计数 
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                //新元素入队后队列还有空闲空间,则
                唤醒 notFull 的条件队列中一条阻塞线程
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            //释放锁 
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
复制代码

offer操做向队列尾部插入一个元素,若是队列中有空闲则插入成功后返回 true,若是队列己满 则丢弃当前元素而后返回 false。 若是 e 元素为 null 则抛出 Nul!PointerException 异常。另外, 该方法是非阻塞的。

内存飙升问题结果揭晓

newFixedThreadPool线程池的核心线程数是固定的,它使用了近乎于无界的LinkedBlockingQueue阻塞队列。当核心线程用完后,任务会入队到阻塞队列,若是任务执行的时间比较长,没有释放,会致使愈来愈多的任务堆积到阻塞队列,最后致使机器的内存使用不停的飙升,形成JVM OOM。

参考与感谢

我的公众号

  • 若是你是个爱学习的好孩子,能够关注我公众号,一块儿学习讨论。
  • 若是你以为本文有哪些不正确的地方,能够评论,也能够关注我公众号,私聊我,你们一块儿学习进步哈。
相关文章
相关标签/搜索