在上一篇文章 ElasticSearch 线程池类型分析之 ExecutorScalingQueue的末尾,谈到了处理ES 搜索操做(search)的线程池的一些实现细节,本文就如下几个问题分析SEARCH操做的线程池。html
在ThreadPool类的构造方法中构造SEARCH线程池:java
builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));
SEARCH 线程池的核数线程数与部署ES节点的机器的CPU个数有关,它的任务队列的容量可动态调整,任务队列的初始长度为1000。SEARCH线程池的具体实现类是QueueResizingEsThreadPoolExecutor,采用的任务队列是ResizableBlockingQueue,拒绝策略是 EsAbortPolicy。ResizableBlockingQueue 继承了 SizeBlockingQueue,提供了可动态调整任务队列容量的功能,关于SizeBlockingQueue 可参考ElasticSearch 线程池类型分析之 SizeBlockingQueue的分析。
org.elasticsearch.common.util.concurrent.EsExecutors.newAutoQueueFixednode
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity); return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory, new EsAbortPolicy(), contextHolder);
提交的Runnable任务会被封装成TimedRunnable对象,从而可以统计任务的执行时间。在 new TimedRunnable 对象时,this.creationTimeNanos = System.nanoTime();
,记录任务的建立时间。
finishTimeNanos-startTimeNanos
表明任务的执行时间,startTimeNanos-creationTimeNanos
表示任务的排队时间,这样就能记录每一个Runnable任务的排队时间和执行时间了,很是完美的设计思路。
org.elasticsearch.common.util.concurrent.TimedRunnablegit
//TimedRunnable的构造方法 TimedRunnable(final Runnable original) { this.original = original; this.creationTimeNanos = System.nanoTime(); } @Override public void doRun() { try { //任务执行开始时间 startTimeNanos = System.nanoTime(); //任务的执行逻辑 original.run(); } finally { //任务执行完成时间 finishTimeNanos = System.nanoTime(); } }
下面我来详细分析如何统计提交到线程池的Runnable任务的执行时间。先看 QueueResizingEsThreadPoolExecutor 的构造方法参数,重点看 runnableWrapper 参数,我把它理解成"处理逻辑"。
从本文的第一个代码片断 new QueueResizingEsThreadPoolExecutor 可知,TimedRunnable::new 赋值给了 runnableWrapper,因为它是java.util.function.Function接口,当java.util.function.Function.apply 方法被调用执行时,就是执行runnableWrapper处理逻辑,即:new 一个 TimedRunnable 对象。看TimedRunnable的构造方法可知,此时已经把任务的建立时间给记录下来了。
这里分析得这么详细的缘由是:ES源码中大量地用到了函数式接口、Lambda表达式,刚看源码时,一直不知道这段Lambda表达式所表明的"处理逻辑"是在哪里执行的,当慢慢熟悉了这种Lambda表达式的写法后,就明白这种写法极大地提高了代码的灵活性。github
//runnableWrapper声明为函数式接口Function,它接收一个Runnable参数,执行runnableWrapper处理逻辑,返回一个Runnable结果 private final Function<Runnable, Runnable> runnableWrapper; private final ResizableBlockingQueue<Runnable> workQueue; QueueResizingEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ResizableBlockingQueue<Runnable> workQueue, int minQueueSize, int maxQueueSize, Function<Runnable, Runnable> runnableWrapper, final int tasksPerFrame, TimeValue targetedResponseTime, ThreadFactory threadFactory, XRejectedExecutionHandler handler, ThreadContext contextHolder) { super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder); this.runnableWrapper = runnableWrapper; this.workQueue = workQueue; this.tasksPerFrame = tasksPerFrame; this.startNs = System.nanoTime(); this.minQueueSize = minQueueSize; this.maxQueueSize = maxQueueSize; this.targetedResponseTimeNanos = targetedResponseTime.getNanos(); this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); logger.debug( "thread pool [{}] will adjust queue by [{}] when determining automatic queue size", getName(), QUEUE_ADJUSTMENT_AMOUNT); }
当任务提交时,就是执行QueueResizingEsThreadPoolExecutor的doExecute()方法:编程
@Override protected void doExecute(final Runnable command) { // we are submitting a task, it has not yet started running (because super.excute() has not // been called), but it could be immediately run, or run at a later time. We need the time // this task entered the queue, which we get by creating a TimedRunnable, which starts the // clock as soon as it is created. super.doExecute(this.runnableWrapper.apply(command));//apply方法 触发 TimedRunnable::new执行,建立TimedRunnable对象 }
上面已经可以记录每个任务的执行时间了,可是任务队列的容量设置为多少合适呢?这是由排队理论里面的little's law决定的。关于利特尔法则,可自行Google。api
/** * Calculate Little's Law (L), which is the "optimal" queue size for a particular task rate (lambda) and targeted response time. * * @param lambda the arrival rate of tasks in nanoseconds * @param targetedResponseTimeNanos nanoseconds for the average targeted response rate of requests * @return the optimal queue size for the give task rate and targeted response time */ static int calculateL(final double lambda, final long targetedResponseTimeNanos) { assert targetedResponseTimeNanos > 0 : "cannot calculate for instantaneous requests"; // L = λ * W return Math.toIntExact((long)(lambda * targetedResponseTimeNanos)); }
Little's law 须要2个参数,一个是lambda,另外一个是W。网络
在ES中,这个平均响应时间能够在配置文件中指定,若未指定,则默认为1s。代码以下:AutoQueueAdjustingExecutorBuilder的构造方法中将响应时间配置为1s并发
final String targetedResponseTimeKey = settingsKey(prefix, "target_response_time"); this.targetedResponseTimeSetting = Setting.timeSetting(targetedResponseTimeKey, TimeValue.timeValueSeconds(1), TimeValue.timeValueMillis(10), Setting.Property.NodeScope);
统计线程池任务的执行个数和总耗时,是在 afterExecute 方法中完成的,ES自定义线程池重写了ThreadPoolExecutor.afterExecute 方法,每当线程池中的任务执行完成时,会自动调用afterExecute方法作一些"后处理"oracle
@Override protected void afterExecute(Runnable r, Throwable t) { //重写 afterExecute 方法时,要先调用 super.afterExecute super.afterExecute(r, t); // A task has been completed, it has left the building. We should now be able to get the // total time as a combination of the time in the queue and time spent running the task. We // only want runnables that did not throw errors though, because they could be fast-failures // that throw off our timings, so only check when t is null. //只统计 类型为TimedRunnable任务 的执行时间和任务个数 assert r instanceof TimedRunnable : "expected only TimedRunnables in queue"; //单个任务的耗时(排队时间加上执行时间) final long taskNanos = ((TimedRunnable) r).getTotalNanos(); //全部任务的总耗时(每一个任务的耗时累加求和) final long totalNanos = totalTaskNanos.addAndGet(taskNanos); //单个任务的执行时间(其实就是单个任务的耗时减去排队时间) final long taskExecutionNanos = ((TimedRunnable) r).getTotalExecutionNanos(); assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos; executionEWMA.addValue(taskExecutionNanos); //tasksPerFrame默认为2000, 线程池每执行完一批任务(tasksPerFrame个)就进行一次任务队列长度的调整。 if (taskCount.incrementAndGet() == this.tasksPerFrame) { final long endTimeNs = System.nanoTime(); //线程池从启动时刻(startNs)开始,一共运行了多长时间(注意不只仅Runnable任务有生命周期,线程池也是有生命周期的) final long totalRuntime = endTimeNs - this.startNs; // Reset the start time for all tasks. At first glance this appears to need to be // volatile, since we are reading from a different thread when it is set, but it // is protected by the taskCount memory barrier. // See: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html startNs = endTimeNs; // Calculate the new desired queue size try { //计算lambda,tasksPerFrame个任务执行成功的总时间是 totalNanos. 所以,lambda可理解为处理速率 final double lambda = calculateLambda(tasksPerFrame, Math.max(totalNanos, 1L)); //根据 little's law 计算出来的任务队列的理想容量(任务队列所容许的最大长度) final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos); //当前任务队列的长度 final int oldCapacity = workQueue.capacity(); if (logger.isDebugEnabled()) { final long avgTaskTime = totalNanos / tasksPerFrame; logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}], " + "[{} tasks/s], optimal queue is [{}], current capacity [{}]", getName(), tasksPerFrame, TimeValue.timeValueNanos(totalRuntime), TimeValue.timeValueNanos(avgTaskTime), TimeValue.timeValueNanos((long)executionEWMA.getAverage()), String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()), desiredQueueSize, oldCapacity); } // Adjust the queue size towards the desired capacity using an adjust of // QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max // values the queue size can have. // 将任务队列的容量从 oldCapacity 调整到 newCapacity,并非直接将任务队列的长度调整到desiredQueueSize final int newCapacity = workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize); if (oldCapacity != newCapacity && logger.isDebugEnabled()) { logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", getName(), newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT, oldCapacity, newCapacity); } } catch (ArithmeticException e) { // There was an integer overflow, so just log about it, rather than adjust the queue size logger.warn(() -> new ParameterizedMessage( "failed to calculate optimal queue size for [{}] thread pool, " + "total frame time [{}ns], tasks [{}], task execution time [{}ns]", getName(), totalRuntime, tasksPerFrame, totalNanos), e); } finally { // Finally, decrement the task count and time back to their starting values. We // do this at the end so there is no concurrent adjustments happening. We also // decrement them instead of resetting them back to zero, as resetting them back // to zero causes operations that came in during the adjustment to be uncounted int tasks = taskCount.addAndGet(-this.tasksPerFrame); assert tasks >= 0 : "tasks should never be negative, got: " + tasks; if (tasks >= this.tasksPerFrame) { // Start over, because we can potentially reach a "never adjusting" state, // // consider the following: // - If the frame window is 10, and there are 10 tasks, then an adjustment will begin. (taskCount == 10) // - Prior to the adjustment being done, 15 more tasks come in, the taskCount is now 25 // - Adjustment happens and we decrement the tasks by 10, taskCount is now 15 // - Since taskCount will now be incremented forever, it will never be 10 again, // so there will be no further adjustments logger.debug( "[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", getName()); //任务队列的长度调整完成后,总任务耗时重置为1,这样可开始下一轮统计 totalTaskNanos.getAndSet(1); taskCount.getAndSet(0); startNs = System.nanoTime(); } else { // Do a regular adjustment totalTaskNanos.addAndGet(-totalNanos); } } } }
上面的代码注释大概描述了线程池任务队列的长度是如何动态调整的,下面再记录一些细节方便更好地理解整个调整过程。
关于线程池状态的描述可参考java.util.concurrent.ThreadPoolExecutor类的源码注释。当线程池处于RUNNING状态时,可接收新提交的任务而且能处理已在队列中排队的任务;当处于SHUTDOWN状态时,不能接收新提交的任务,但能处理已在队列中排队等待的任务。当处于STOP状态时,不能接收新提交的任务了,也不能处理在任务队列中排队等待的任务了,正在执行中的任务也会被强制中断。因此,要想"正确"地关闭线程池,应该分步骤处理:这里给一个ES中实现的处理定时任务的线程池如何关闭的示例:
org.elasticsearch.threadpool.Scheduler.terminate
static boolean terminate(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, long timeout, TimeUnit timeUnit) { //先调用 shutdown(), 线程池再也不接收新提交的任务了 scheduledThreadPoolExecutor.shutdown(); //超时等待, 若是在timeout时间内线程池中排队的任务和正在执行的任务都执行完成了返回true,不然返回false if (awaitTermination(scheduledThreadPoolExecutor, timeout, timeUnit)) { return true; } //last resort. 在上面awaitTermination timeout后线程池中仍有任务在执行 //调用shutdownNow强制中断任务,关闭线程池 scheduledThreadPoolExecutor.shutdownNow(); return awaitTermination(scheduledThreadPoolExecutor, timeout, timeUnit); }
这种先调用shutdown,再调用 awaitTermination,最后再调用shutdownNow的“三步曲”方式关闭线程池,awaitTermination起到了"缓冲"做用,尽量减小关闭线程池致使的任务执行结果不肯定的影响。看JDK源码:java.util.concurrent.ScheduledThreadPoolExecutor.shutdownNow,可知:关闭线程池时,最好不要一开始就直接调用shutdownNow方法,而是分步骤地关闭线程池。
/** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. * * <p>This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * * @return list of tasks that never commenced execution. * Each element of this list is a {@link ScheduledFuture}, * including those tasks submitted using {@code execute}, * which are for scheduling purposes used as the basis of a * zero-delay {@code ScheduledFuture}. * @throws SecurityException {@inheritDoc} */ public List<Runnable> shutdownNow() { return super.shutdownNow(); }
shutdownNow方法会中止全部正在执行的任务(线程),stop all actively executing tasks。会停止全部处于等待状态的任务 halts the processing of waiting tasks,这里的waiting tasks,个人理解:就是在java.lang.Thread.State类中那些处于WAITING状态的线程所执行的任务。而且,shutdownNow返回全部在任务队列中排队等待处理的全部任务 returns a list of the tasks that were awaiting execution.
shutdownNow方法不会等待正在执行的任务执行完成,而是经过中断方式直接请求中断该任务,This method does not wait for actively executing tasks to terminate。因为,有些任务(线程)可能会忽略中断请求、甚至屏蔽中断请求,所以它只能作到 best-effort 结束线程。对于那些未能响应中断的线程而言,有可能它所执行的任务就永远不会结束了,so any task that fails to respond to interrupts may never terminate.
所以,从这里可知:咱们在编程中 implements Runnable 接口时,run方法代码逻辑里面最好可以保证对中断异常的响应,而不是直接把全部的异常都catch住,只作简单的打印处理,也不向上抛出。
这样显然代价太大。而是执行完一批任务后,再进行调整。每批任务默认2000个,由tasksPerFrame变量决定每批任务个数。
任务队列长度的调整并非直接调整到little's law 计算出来的理想任务队列长度(desiredQueueSize)。每次调整是有限制的,长度的变化不超过QUEUE_ADJUSTMENT_AMOUNT
if (optimalCapacity > capacity + adjustmentAmount) { // adjust up final int newCapacity = Math.min(maxCapacity, capacity + adjustmentAmount); this.capacity = newCapacity; return newCapacity; } else if (optimalCapacity < capacity - adjustmentAmount) { // adjust down final int newCapacity = Math.max(minCapacity, capacity - adjustmentAmount); this.capacity = newCapacity; return newCapacity; } else { return this.capacity; }
本文记录了ES6.3.2 SEARCH线程池的源码实现。用户发起的搜索请求会封装成SEARCH操做。SEARCH操做的任务是由QueueResizingEsThreadPoolExecutor处理的,采用的任务队列是 ResizableBlockingQueue,ResizableBlockingQueue封装了LinkedTransferQueue,可是提供了容量限制。
随着源源不断的搜索请求被处理,可动态调整任务队列的容量。SEARCH线程池采用的拒绝策略是 EsAbortPolicy,搜索请求太频繁时线程池处理不过来时会被拒绝掉。
经过将Runnable任务封装成TimedRunnable,可实现统计每一个搜索任务的执行时间、排队时间。这些统计都是在线程池的afterExecute()方法中实现的。
另外,本文还分析了如何正确地关闭线程池,以及不恰当地关闭线程池给任务的执行结果带来的不肯定性的分析。看完ES的线程池模块的源码后,对线程池的认识和理解深入了许多,后面还会分析在ES中如何实现执行定时任务、周期性任务的线程池,这种线程池可用来执行一些周期性的 ping 命令(节点之间的心跳包)等ES节点之间的通讯。以上如有错误,还请批评指正。
参考连接:
到这里,ES的线程池模块全部源码分析都结束了。整体来讲,ES对线程池的管理是"集中式"的,试想:一个大型系统,里面有各类各样复杂的操做,是将线程池散落在代码各处呢,仍是在系统启动时建立好,而后统一集中管理?
另外,因为JDK java.util.concurrent.Future#get()获取任务的执行结果时必须"阻塞",另外一个方法 java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 也是超时阻塞,这意味着线程池在提交任务执行后,在获取结果这个步骤是必须阻塞等待的。那有没有一种方法在获取结果时也不阻塞呢?这就须要Listener机制(监听器)了,Listener其实就是一种处理逻辑,一种怎样处理某个结果(Runnable/Callable执行完成的结果)的处理逻辑。其大概思想是:当Runnable(Callable)任务执行完成后,有告终果,回调Listener,执行 处理结果的逻辑。这样,就不用像 java.util.concurrent.Future#get() 那样,get()阻塞直至获取到结果,而后再执行某种处理逻辑 处理 get()获取到的结果。
而说到异步回调处理,ES中还有一种类型的线程池,它可以执行优先级任务。该线程池采用的任务队列是:java.util.concurrent.PriorityBlockingQueue
,具体实现是:org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor
,这个线程池主要用来执行ES的集群状态更新变化操做。更进一步,org.elasticsearch.cluster.service.TaskBatcher
经过封装 PrioritizedEsThreadPoolExecutor,实现了优先级任务的批量处理。当建立一个新索引,或者分片迁移时,集群的状态都会更新,这时会建立一个org.elasticsearch.cluster.service.MasterService.Batcher.UpdateTask
更新任务,UpdateTask 封装了org.elasticsearch.cluster.ClusterStateTaskListener
监听器实例,从而在执行Runnable任务后经过 Listener 执行通知回调。将多个UpdateTask提交给PrioritizedEsThreadPoolExecutor线程池执行,从而实现集群的任务状态更新。另外,将PrioritizedEsThreadPoolExecutor 的线程数量 core pool size 和 max pool size都设置成1,提交给该线程池的任务只能由一个线程顺序执行,避免了多个状态并发更新致使的数据不一致性,并且避免了使用锁的方式来进行同步,这种思路很是值得借鉴。关于org.elasticsearch.cluster.service.MasterService
实现集群状态的更新的详细实现,之后有时间再写吧。
ES启动时建立的线程池一览:
[2019-08-15T18:30:38,829][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [force_merge], size [1], queue size [unbounded] [2019-08-15T18:30:44,782][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [fetch_shard_started], core [1], max [8], keep alive [5m] [2019-08-15T18:30:48,517][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [listener], size [2], queue size [unbounded] [2019-08-15T18:30:48,535][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [index], size [4], queue size [200] [2019-08-15T18:30:48,536][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [refresh], core [1], max [2], keep alive [5m] [2019-08-15T18:30:48,537][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [generic], core [4], max [128], keep alive [30s] [2019-08-15T18:30:48,537][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [rollup_indexing], size [4], queue size [4] [2019-08-15T18:30:48,538][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [warmer], core [1], max [2], keep alive [5m] [2019-08-15T18:30:48,551][DEBUG][o.e.c.u.c.QueueResizingEsThreadPoolExecutor] thread pool [debug_node/search] will adjust queue by [50] when determining automatic queue size [2019-08-15T18:30:48,552][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [search], size [7], queue size [1k] [2019-08-15T18:30:48,553][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [flush], core [1], max [2], keep alive [5m] [2019-08-15T18:30:48,553][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [fetch_shard_store], core [1], max [8], keep alive [5m] [2019-08-15T18:30:48,554][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [management], core [1], max [5], keep alive [5m] [2019-08-15T18:30:48,554][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [get], size [4], queue size [1k] [2019-08-15T18:30:48,555][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [analyze], size [1], queue size [16] [2019-08-15T18:30:48,556][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [write], size [4], queue size [200] [2019-08-15T18:30:48,556][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [snapshot], core [1], max [2], keep alive [5m]
最后扯一扯看源码的一些体会:当开始看一个系统的源代码时,通常是先用过它了,在使用的过程当中了解了一些功能,而后不知足于现状,想要了解背后的原理。那面对一个几十万行代码的系统,从哪一个地方入手开始看呢?我以为有如下几点可供参考:
原文:https://www.cnblogs.com/hapjin/p/11011712.html