ExecutorCompletionService一不当心就会内存泄漏

我在定位苏宁智能告警平台内存泄漏过程当中,发现ExecutorCompletionService的BlockingQueue占用了20%的堆内存。java

代码以下:this

CompletionService<Integer> exec = new ExecutorCompletionService<>(
            ExecutorUtils.getMultiLiveCacheThreadPool());
        List<Future<Integer>> results = Lists.newArrayList();
        for (Map.Entry<String, GroupQueue> entry : queues.entrySet()) {
            try {
                entry.getValue().setAlertState(alertState);   
                results.add(exec.submit(new Sender(entry.getValue())));
            } catch (Exception e) {
                logger.error("send task error", e);
            }
        }
        int sendSuccessCount = 0;
     
        for (Future<Integer> fs : results) {
            try {
                //问题
                sendSuccessCount = sendSuccessCount + fs.get(20, TimeUnit.SECONDS);
            } catch (Exception e) {
                logger.error("get data error", e);
            }

        }

乍看没什么问题,CompletionService的submit方法返回了Future对象,经过get方法取得结果。code

ExecutorCompletionService源码以下
public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * FutureTask extension to enqueue upon completion
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and a
     * {@link LinkedBlockingQueue} as a completion queue.
     *
     * @param executor the executor to use
     * @throws NullPointerException if executor is {@code null}
     */
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and the supplied queue as its
     * completion queue.
     *
     * @param executor the executor to use
     * @param completionQueue the queue to use as the completion queue
     *        normally one dedicated for use by this service. This
     *        queue is treated as unbounded -- failed attempted
     *        {@code Queue.add} operations for completed tasks cause
     *        them not to be retrievable.
     * @throws NullPointerException if executor or completionQueue are {@code null}
     */
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}

submit方法,返回了future对象,又在队列中增长了一个future对象。orm

若是经过future.get方法会形成队列愈来愈长,从而引起内存泄漏。对象

正确写法:blog

//前面保持一致
        for (Future<Integer> fs : results) {
            try {
                //用exec.take方法去队列
                sendSuccessCount = sendSuccessCount + exec.take().get(20, TimeUnit.SECONDS);
            } catch (Exception e) {
                logger.error("get data error", e);
            }

        }

事实上,若是细心看源码的话,发现做者在注释中提供了正确的写法:(虽然我以为这submit方法有误导嫌疑)队列

* <pre> {@code
 * void solve(Executor e,
 *            Collection<Callable<Result>> solvers)
 *     throws InterruptedException, ExecutionException {
 *     CompletionService<Result> ecs
 *         = new ExecutorCompletionService<Result>(e);
 *     for (Callable<Result> s : solvers)
 *         ecs.submit(s);
 *     int n = solvers.size();
 *     for (int i = 0; i < n; ++i) {
 *         Result r = ecs.take().get();
 *         if (r != null)
 *             use(r);
 *     }
 * }}</pre>

最后内存

得益于苏宁决策分析平台,咱们直观修改之后的内存的变化:get

每条线都是wildfly容器占用的堆内存,右边是修改bug以后的内存占用曲线,明显比升级前少了200M以上。源码