ElasticSearch如何更新集群的状态

ElasticSearch如何更新集群的状态

最近发生了不少事情,甚至对本身的技术能力和学习方式产生了怀疑,因此有一段时间没更新文章了,估计之后更新的频率会愈来愈少,但愿有更多的沉淀而不是简单地分享。让我有感悟的是,最近看到一篇关于ES集群状态更新的文章Elasticsearch Distributed Consistency Principles Analysis (2) - Meta,和 “提交给线程池的Runnable任务是以怎样的顺序执行的?”这个问题,所以,结合ES6.3.2源码,分析一下ES的Master节点是如何更新集群状态的。html

分布式系统的集群状态通常是指各类元数据信息,通俗地讲,在ES中建立了一个Index,这个Index的Mapping结构信息、Index由几个分片组成,这些分片分布在哪些节点上,这样的信息就组成了集群的状态。当Client建立一个新索引、或者删除一个索、或者进行快照备份、或者集群又进行了一次Master选举,这些都会致使集群状态的变化。归纳一下就是:发生了某个事件,致使集群状态发生了变化,产生了新集群状态后,如何将新的状态应用到各个节点上去,而且保证一致性。java

在ES中,各个模块发生一些事件,会致使集群状态变化,并由org.elasticsearch.cluster.service.ClusterService#submitStateUpdateTask(java.lang.String, T)提交集群状态变化更新任务。当任务执行完成时,就产生了新的集群状态,而后经过"二阶段提交协议"将新的集群状态应用到各个节点上。这里可大概了解一下有哪些模块的操做会提交一个更新任务,好比:数组

  • MetaDataDeleteIndexService#deleteIndices 删除索引
  • org.elasticsearch.snapshots.SnapshotsService#createSnapshot 建立快照
  • org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService#putTemplate 建立索引模板

所以各个Service(好比:MetaDataIndexTemplateService)都持有org.elasticsearch.cluster.service.ClusterService实例引用,经过ClusterService#submitStateUpdateTask方法提交更新集群状态的任务。安全

既然建立新索引、删除索引、修改索引模板、建立快照等都会触发集群状态更新,那么如何保证这些更新操做是"安全"的?好比操做A是删除索引,操做B是对索引作快照备份,操做A、B的顺序不当,就会引起错误!好比,索引都已经删除了,那还怎么作快照?所以,为了防止这种并发操做对集群状态更新的影响,org.elasticsearch.cluster.service.MasterService中采用单线程执行方式提交更新集群状态的任务的。状态更新任务由org.elasticsearch.cluster.service.MasterService.Batcher.UpdateTask表示,它本质上是一个具备优先级特征的Runnable任务:数据结构

//PrioritizedRunnable 实现了Comparable接口,compareTo方法比较任务的优先级
public abstract class PrioritizedRunnable implements Runnable, Comparable<PrioritizedRunnable> {
    
    private final Priority priority;//Runnable任务优先级
    private final long creationDate;
    private final LongSupplier relativeTimeProvider;
    
     @Override
    public int compareTo(PrioritizedRunnable pr) {
        return priority.compareTo(pr.priority);
    }
}

而单线程的执行方式,则是经过org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor线程池实现的。看org.elasticsearch.common.util.concurrent.EsExecutors#newSinglePrioritizing线程池的建立:并发

public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {
    //core pool size == max pool size ==1,说明该线程池里面只有一个工做线程
        return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer);
    }

而线程池的任务队列则是采用:PriorityBlockingQueue(底层是个数组,数据结构是:堆 Heap),经过compareTo方法比较Priority,从而决定任务的排队顺序。app

//PrioritizedEsThreadPoolExecutor#PrioritizedEsThreadPoolExecutor
PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {
        super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder);
        this.timer = timer;
    }

这里想提一下这种只采用一个线程执行任务状态更新的思路,它与Redis采用单线程执行Client的操做命令是一致的。各个Redis Client向Redis Server发起操做请求,Redis Server最终是以一个线程来"顺序地"执行各个命令。单线程执行方式,避免了数据并发操做致使的不一致性,而且不须要线程同步。毕竟同步须要加锁,而加锁会影响程序性能。elasticsearch

在这里,我想插一个问题:JDK线程池执行任务的顺序是怎样的?经过java.util.concurrent.ThreadPoolExecutor#execute方法先提交到线程池中的任务,必定会优先执行吗?这个问题常常被人问到,哈哈。可是,真正地理解,却不容易。由于它涉及到线程池参数,core pool size、max pool size 、任务队列的长度以及任务到来的时机。其实JDK源码中的注释已经讲得很清楚了:分布式

/*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
  • 任务提交到线程池,若是线程池的活跃线程数量小于 core pool size,那么直接建立新线程执行任务,这种状况下任务是不会入队列的。
  • 当线程池中的活跃线程数量已经达到core pool size时,继续提交任务,这时的任务就会入队列排队。
  • 当任务队列已经满了时,同时又有新任务提交过来,若是线程池的活跃线程数小于 max pool size,那么会建立新的线程,执行这些刚提交过来的任务,此时的任务也不会入队列排队。(注意:这里新建立的线程并非从任务队列中取任务,而是直接执行刚刚提交过来的任务,而那些前面已经提交了的在任务队列中排队的任务反而不能优先执行,换句话说:任务的执行顺序并非严格按提交顺序来执行的)

代码验证一下以下,会发现:后提交的任务,反而可能先执行完成。由于,先提交的任务在队列中排队,然后提交的任务直接被新建立的线程执行了,省去了排队过程。ide

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.*;

/**
 * @author psj
 * @date 2019/11/14
 */
public class ThreadPoolTest {
    public static void main(String[] args) throws InterruptedException{

        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d").build();
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(4);
        ThreadPoolExecutor executorSevice = new ThreadPoolExecutor(1, 4, 0, TimeUnit.HOURS,
                workQueue, threadFactory, new ThreadPoolExecutor.DiscardPolicy());

        for (int i = 1; i <=8; i++) {
            MyRunnable task = new MyRunnable(i, workQueue);
            executorSevice.execute(task);
            sleepMills(200);
            System.out.println("submit: " + i  + ", queue size:" + workQueue.size() + ", active count:" + executorSevice.getActiveCount());
        }
        Thread.currentThread().join();
    }


    public static class MyRunnable implements Runnable {
        private int sequence;
        private BlockingQueue taskQueue;
        public MyRunnable(int sequence, BlockingQueue taskQueue) {
            this.sequence = sequence;
            this.taskQueue = taskQueue;
        }
        @Override
        public void run() {
            //模拟任务须要1秒钟才能执行完成
            sleepMills(1000);
            System.out.println("task :" + sequence + " finished, current queue size:" + taskQueue.size());
        }
    }

    public static void sleepMills(int mills) {
        try {
            TimeUnit.MILLISECONDS.sleep(mills);
        } catch (InterruptedException e) {

        }
    }
}

OK,分析完了线程池执行任务的顺序,再看看ES的PrioritizedEsThreadPoolExecutor线程池的参数:将 core pool size 和 max pool size 都设置成1,避免了这种"插队"的现象。各个模块触发的集群状态更新最终在org.elasticsearch.cluster.service.MasterService#submitStateUpdateTasks方法中构造UpdateTask对象实例,并经过submitTasks方法提交任务执行。额外须要注意的是:集群状态更新任务能够以批量执行方式提交,具体看org.elasticsearch.cluster.service.TaskBatcher的实现吧。

try {
            List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream()
                .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue()), executor))
                .collect(Collectors.toList());
            taskBatcher.submitTasks(safeTasks, config.timeout());
        } catch (EsRejectedExecutionException e) {
            // ignore cases where we are shutting down..., there is really nothing interesting
            // to be done here...
            if (!lifecycle.stoppedOrClosed()) {
                throw e;
            }
        }

最后来分析一下 org.elasticsearch.cluster.service.ClusterService类,在ES节点启动的时候,在Node#start()方法中会启动ClusterService,当其它各个模块执行一些操做触发集群状态改变时,就是经过ClusterService来提交集群状态更新任务。而ClusterService其实就是封装了 MasterService和ClusterApplierService,MasterService提供任务提交接口,内部维护一个线程池处理更新任务,而ClusterApplierService则负责通知各个模块应用新生成的集群状态。

相关文章
相关标签/搜索