在ElasticSearch 线程池类型分析之SizeBlockingQueue这篇文章中分析了ES的fixed类型的线程池。本文分析scaling类型的线程池,以及该线程池所使用的任务队列:ExecutorScalingQueue
从ThreadPool类中可看出,scaling线程池主要用来执行ES的系统操做:FLUSH、FORCE_MERGE、REFRESH、SNAPSHOT...而fixed类型的线程池则执行用户发起的操做:SEARCH、INDEX、GET、WRITE。系统操做有什么特色呢?系统操做请求量小、可容忍必定的延时。从线程池的角度看,执行系统操做的任务不会被线程池的拒绝策略拒绝,而这正是由ExecutorScalingQueue任务队列和ForceQueuePolicy拒绝策略实现的。html
org.elasticsearch.common.util.concurrent.EsExecutors.newScalingjava
public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) { ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>(); EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder); queue.executor = executor; return executor; }
线程池对象是 EsThreadPoolExecutor、任务队列是 ExecutorScalingQueue、拒绝策略是 ForceQueuePolicyelasticsearch
ForceQueuePolicy和ExecutorScalingQueue都是org.elasticsearch.common.util.concurrent.EsExecutors.EsExecutors 的内部类。EsExecutors是一个工具类,用来建立ThreadPoolExecutor对象。ide
org.elasticsearch.common.util.concurrent.EsExecutors.newScaling
org.elasticsearch.common.util.concurrent.EsExecutors.newFixed
org.elasticsearch.common.util.concurrent.EsExecutors.newAutoQueueFixed
再加上 private static final ExecutorService DIRECT_EXECUTOR_SERVICE = new AbstractExecutorService()...
ES中全部的线程池对象都由EsExecutors建立了。工具
当向 EsThreadPoolExecutor 提交任务时,若是触发了拒绝策略,则会执行以下的rejectedExecution方法:将任务再添加到任务队列中。ui
/** * A handler for rejected tasks that adds the specified element to this queue, * waiting if necessary for space to become available. */ static class ForceQueuePolicy implements XRejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { // force queue policy should only be used with a scaling queue assert executor.getQueue() instanceof ExecutorScalingQueue; //将被"拒绝"的任务再put到任务队列中 executor.getQueue().put(r); } catch (final InterruptedException e) { // a scaling queue never blocks so a put to it can never be interrupted throw new AssertionError(e); } } //由于任务不会被拒绝,因此这里的被拒绝的任务计数老是返回0 @Override public long rejected() { return 0; } }
ExecutorScalingQueue 继承了LinkedTransferQueue,因此是一个无界队列。它和 SizeBlockingQueue 所不一样的是:SizeBlockingQueue的容量是有限制的,而ExecutorScalingQueue没有长度限制,这意味着能够将任意多个任务提交到 ExecutorScalingQueue中排队等待,这与它一块儿搭配使用的拒绝策略ForceQueuePolicy是吻合的。同时,这也代表FLUSH、REFRESH、SNAPSHOT等这些操做都不会被拒绝,不过这些操做的执行频率都很低。
试想,对于SEARCH(搜索请求)、INDEX(索引文档请求)、WRITE(添加文档请求)这些由用户触发的操做,可能QPS会很是大,而REFRESH(刷新段segment)、FLUSH这样的操做是系统层面的操做,执行频率很低。所以分开交由不一样的线程池处理是很是有必要的,这样就能够为线程池配置不一样的特色(有界队列、无界队列)的任务队列以及拒绝处理策略了。this
在任务入队列时,ExecutorScalingQueue的offer方法先判断线程池中是否有空闲线程,如有空闲线程,tryTransfer方法会当即成功返回true,任务直接交由线程处理而不须要入队列再排队等待了。
这里也能够看出: LinkedBlockingQueue 与 LinkedTransferQueue 的区别,我想这也是为何ES选择LinkedTransferQueue做为任务队列的缘由之一吧。若线程池中没有空闲的线程,再判断线程池中当前已有线程数量是否达到了最大线程数量(max pool size),若未达到,则新建线程来处理任务,不然任务就进入队列排队等待处理,而因为ExecutorScalingQueue是个无界队列,没有长度限制,而REFRESH这样的操做又没有低响应时间要求,所以长时间排队也可以接受。编码
/** * ExecutorScalingQueue 必须与 ForceQueuePolicy 拒绝策略搭配使用. * * 采用 ExecutorScalingQueue 做为任务队列的线程池它的 core pool size 和 max pool size 能够不相等 * 当不断地向线程池提交任务,线程的个数达到了core pool size但还没有达到 max pool size时, left大于0成立,返回false * 触发 ThreadPoolExecutor#execute方法中if语句 workQueue.offer(command) 为false,从而致使if语句不成立 * 因而执行 addWorker 方法建立新线程来执行任务,若是 addWorker 不当心失败了,会执行 rejected(command),可是这个任务是不能 * 被拒绝的,由于咱们只是想让 线程池 优先建立 max pool size个线程来处理任务. * 因而采用 ForceQueuePolicy 保证任务必定是提交到队列里,从而保证任务"不被拒绝" * @param e * @return */ static class ExecutorScalingQueue<E> extends LinkedTransferQueue<E> { ThreadPoolExecutor executor; ExecutorScalingQueue() { } @Override public boolean offer(E e) { // first try to transfer to a waiting worker thread //若是线程池中有空闲的线程,tryTransfer会当即成功,直接将任务交由线程处理(省去了任务的排队过程) if (!tryTransfer(e)) { // check if there might be spare capacity in the thread // pool executor int left = executor.getMaximumPoolSize() - executor.getCorePoolSize(); if (left > 0) { //线程池当前已有的线程数量还没有达到 max pool size, 返回false, 触发ThreadPoolExecutor的addWorker方法被调用,从而建立新线程 // reject queuing the task to force the thread pool // executor to add a worker if it can; combined // with ForceQueuePolicy, this causes the thread // pool to always scale up to max pool size and we // only queue when there is no spare capacity return false; } else { //线程池当前已有的线程数量 已是 max pool size了, 任务入队列排队等待 return super.offer(e); } } else { return true; } } }
本文分析了 ES中FLUSH、FORCE_MERGE、REFRESH、SNAPSHOT...操做所使用的线程池及其任务队列、拒绝策略。理解线程池的实现原理有助于各类操做的调优,有时候写数据到ES或执行大量的查询请求时,可能会发现ES的日志里面有一些操做被拒绝的提示,这时,就能针对性地去调整线程池的配置了。
不论是refresh刷新segment,仍是 snapshot 快照备份,这些操做可理解为"系统操做",这与用户操做(search、get)是有区别的:write/get 须要良好的响应时间,这意味着任务不能长时间排队过久。write/get 请求量可能很是大、QPS很是高,须要一些限制,因此这也是为何它们的任务队列容量是固定的,当wirte/get的请求量大处处理不过来时,就会触发拒绝策略,任务被拒绝执行了。而对于refresh这类操做,执行不是太频繁,有些系统操做还很重要,这种任务提交时就不能被拒绝,所以ForcePolicy是一个很好的选择。从这里也能够看出:在一个大系统里面,有各类类型的操做,所以有必要使用多个线程池来分别处理这些操做。而如何协调统一管理多个线程池(EsExecutors类、ExecutorBuilder类),及时回收空闲线程,设置合适的任务队列长度(各类类型的任务队列:ExecutorScalingQueue、SizeBlockingQueue、ResizableBlockingQueue),将全部的任务处理操做都统一到一套代码流程逻辑(AbstractRunnable类、EsThreadPoolExecutor类的doExecute()方法)下执行,这些都须要很强的编码能力。
最后,提一下search操做,很特殊。ES主要是用来作搜索的,那么负责执行search操做的线程池是如何实现的呢?它又采用了什么任务队列呢?它的拒绝策略又是什么呢?提早透露一下:search操做的线程池的任务队列可动态调整任务队列的长度,而且以一种十分巧妙的方式统计每一个任务的执行时间。读完源码以后,感叹这些代码的设计思路是那么优美。spa
参考文章:
ElasticSearch 线程池类型分析之SizeBlockingQueue线程
原文:https://www.cnblogs.com/hapjin/p/11005676.html