我在定位苏宁智能告警平台内存泄漏过程当中,发现ExecutorCompletionService的BlockingQueue占用了20%的堆内存。java
代码以下:this
CompletionService<Integer> exec = new ExecutorCompletionService<>( ExecutorUtils.getMultiLiveCacheThreadPool()); List<Future<Integer>> results = Lists.newArrayList(); for (Map.Entry<String, GroupQueue> entry : queues.entrySet()) { try { entry.getValue().setAlertState(alertState); results.add(exec.submit(new Sender(entry.getValue()))); } catch (Exception e) { logger.error("send task error", e); } } int sendSuccessCount = 0; for (Future<Integer> fs : results) { try { //问题 sendSuccessCount = sendSuccessCount + fs.get(20, TimeUnit.SECONDS); } catch (Exception e) { logger.error("get data error", e); } }
乍看没什么问题,CompletionService的submit方法返回了Future对象,经过get方法取得结果。code
ExecutorCompletionService源码以下:
public class ExecutorCompletionService<V> implements CompletionService<V> { private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; /** * FutureTask extension to enqueue upon completion */ private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; } private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); } /** * Creates an ExecutorCompletionService using the supplied * executor for base task execution and a * {@link LinkedBlockingQueue} as a completion queue. * * @param executor the executor to use * @throws NullPointerException if executor is {@code null} */ public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } /** * Creates an ExecutorCompletionService using the supplied * executor for base task execution and the supplied queue as its * completion queue. * * @param executor the executor to use * @param completionQueue the queue to use as the completion queue * normally one dedicated for use by this service. This * queue is treated as unbounded -- failed attempted * {@code Queue.add} operations for completed tasks cause * them not to be retrievable. * @throws NullPointerException if executor or completionQueue are {@code null} */ public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; } public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; } public Future<V> take() throws InterruptedException { return completionQueue.take(); } public Future<V> poll() { return completionQueue.poll(); } public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); } }
submit方法,返回了future对象,又在队列中增长了一个future对象。orm
若是经过future.get方法会形成队列愈来愈长,从而引起内存泄漏。对象
正确写法:blog
//前面保持一致 for (Future<Integer> fs : results) { try { //用exec.take方法去队列 sendSuccessCount = sendSuccessCount + exec.take().get(20, TimeUnit.SECONDS); } catch (Exception e) { logger.error("get data error", e); } }
事实上,若是细心看源码的话,发现做者在注释中提供了正确的写法:(虽然我以为这submit方法有误导嫌疑)队列
* <pre> {@code * void solve(Executor e, * Collection<Callable<Result>> solvers) * throws InterruptedException, ExecutionException { * CompletionService<Result> ecs * = new ExecutorCompletionService<Result>(e); * for (Callable<Result> s : solvers) * ecs.submit(s); * int n = solvers.size(); * for (int i = 0; i < n; ++i) { * Result r = ecs.take().get(); * if (r != null) * use(r); * } * }}</pre>
最后,内存
得益于苏宁决策分析平台,咱们直观修改之后的内存的变化:get
每条线都是wildfly容器占用的堆内存,右边是修改bug以后的内存占用曲线,明显比升级前少了200M以上。源码