Elasticsearch分布式一致性-节点选主

1.概述

        全部的分布式系统都须要考虑一致性问题,ES采用的是主从模式的一致性解决方案。本文将主要基于Elasticsearch 6.5.4版本的discovery模块从:ES节点类型、启动流程、Master选举、节点监测等方面介绍ES的主从模式的一致性解决方案。Elasticsearch6.5.4 debug环境搭建能够参考:java

Elasticsearch源码编译运行node

2.ES节点类型

        一个Elasticsearch集群是由许多Node组成的,Node能够在Elasticsearch的启动脚本elasticsearch.yml进行设置。算法

node.master: true/false
node.data: true/false复制代码

        根据配置,ES Node类型一共有四种:MasterNode+DataNode、MasterNode、DataNode和最后一种Coordinating节点。bash

MasterNode:是可以成为master的候选节点,能够参与选选举,主要是存储集群的元数据。网络

DataNode:主要存储shard的数据,而且负责这些shard数据的读写。app

Coordinating Node:节点既不做为master也不存储数据,接受请求转发,聚合等操做。elasticsearch

3.启动流程

        ZenDiscovery.java是Elasticsearch discovery的默认实现,选举相关的逻辑基本在该类中。启动的时候,ZenDiscovery类由Node.java,初始化,调用ZenDiscovery#startInitialJoin()方法启动。分布式

4.Master选举

4.1 脑裂问题        

        为了集群不出现脑裂问题,通常会在启动的时候配置quorum策略。ide

discovery.zen.minimum_master_nodes: (master_num/2)+1复制代码

        master_num为集群中master候选节点的个数。该配置表示:当投票的人超过半数,节点才能被选举为master。post

        以下图所示的集群,若是出现了网络分区。Node1和Node2忽然掉除了集群,此时Node1和Node2会发起选举,假设选举除了Node1成为master,右边也发起了一轮选举,选出了Node3做为master。这样集群被分为两部分,每一部分都会维护本身的集群状态。若是配置了quorum策略,则左边小于3,没法进行选主,那左边的集群暂时没法工做,右边集群能够正常提供服务。

4.2 什么时候发起选举

  • 集群启动的时候

        集群启动的时候,经过 joinThreadControl#startNewThreadIfNotRunning() 来选出Master节点。

  • 检测到和原来master的链接断开

        经过ZenDiscovery#handleMasterGone来处理,最终调用 joinThreadControl#startNewThreadIfNotRunning()从新发起选举

        咱们看到startNewThreadIfNotRunning会启动一个线程去执行innerJoinCluster(),innerJoinCluster()中有一个while()循环,会一直等待findMaster()方法找到master节点。

  • 监测到node节点断开
          每次断开节点后会检查当前集群节点数是否知足quorum配置,若是不知足,从新发起选举。

4.3 候选节点

        咱们看到findMaster的第一步,调用pingAndWait()获取集群中的节点。集群的节点能够在

elasticsearch.yml中配置:

discovery.zen.ping.unicast.hosts: [1.1.1.1, 1.1.1.2, 1.1.1.3]
复制代码
private DiscoveryNode findMaster() {    
    logger.trace("starting to ping");    
     //查找当前活跃的master 
     List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
        if (fullPingResponses == null) {
            logger.trace("No full ping responses");
            return null;
        }
        if (logger.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder();
            if (fullPingResponses.size() == 0) {
                sb.append(" {none}");
            } else {
                for (ZenPing.PingResponse pingResponse : fullPingResponses) {
                    sb.append("\n\t--> ").append(pingResponse);
                }
            }
            logger.trace("full ping responses:{}", sb);
        }

        final DiscoveryNode localNode = transportService.getLocalNode();

        // add our selves
        assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
            .filter(n -> n.equals(localNode)).findAny().isPresent() == false;

        //加入当前节点
        fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));

        // filter responses
        final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);

        List<DiscoveryNode> activeMasters = new ArrayList<>();
        //收集ping到的节点的master信息,这里先不考虑本身,Discovery的策略是非直到最后一刻都不会选本身为master,可能预防脑裂在一开始就发生吧。
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
            // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
            if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
                activeMasters.add(pingResponse.master());
            }
        }

        // nodes discovered during pinging master 候选者=> 可以ping到的全部master设置为true的节点
        List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            if (pingResponse.node().isMasterNode()) {
                masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
            }
        }

        //若是收集到的节点没有master信息,怎开始选举
        if (activeMasters.isEmpty()) {
            //master 为空,代表节点刚启动,进行选主
            if (electMaster.hasEnoughCandidates(masterCandidates)) {
                //选主
                final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
                logger.trace("candidate {} won election", winner);
                return winner.getNode();
            } else {
                //若是没有足够多候选节点,选主失败
                // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
                            masterCandidates, electMaster.minimumMasterNodes());
                return null;
            }
        } else {
            //master列表不为空,选择一个nodeid最小的做为master
            assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
            // lets tie break between discovered nodes
            return electMaster.tieBreakActiveMasters(activeMasters);
        }
}复制代码

        获得的fullPingResponses表示,如今集群中的全部节点信息,包括这些节点当前的master信息(有可能为空)。

       接下来以下图所示对fullPingResponse进行过滤,若是配置了ignore_non_master_pings为true,则要把那些node.master配置为false的过滤掉,而后判断过滤后的结果是否为当前节点,若是是当前节点,也过滤掉(ZenDiscovery通常最后才考虑当前节点,多是为了防止脑裂)。最后拿到一个activeMasters的名单,该名单表示目前集群中存活的master节点,通常个数为0或者1。

    

4.4 选主

        有了activeMasters,还要作一件事情,就是拿到集群中全部配置了node.master为true的节点,从刚才的fullPingResponses能够很容易的找到配置为node.master为true的节点,最后生成一个masterCandidates。有了这两个列表以后,就能够正在的选主工做了。


  •         如上图所示:先判断activeMasters是否为空,若是activeMasters不为空,则从activeMasters中选出nodeId最小的节点做为master(正常健康的集群只会有一个,若是出现了脑裂,则会存在多个)。若是activeMasters为空,则判断当前集群是否达到选举个数要求,若是达到,则从masterCandidates中选举一个版本号最新的节点,若是版本号一致选择nodeId小的那个。若是集群没有达到规定个数,选举失败。

4.5 肯定选主

        当前节点选出master后,并不能肯定这个master就能成为整个集群的master,这只是当前节点认为的master。这时还须要判断master的状况:

//若是master选出来是本身
        if (transportService.getLocalNode().equals(masterNode)) {
            //须要等待discovery.zen.minimum_master_nodes-1个节点加入才算成功
            final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
            logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
            nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
                    new NodeJoinController.ElectionCallback() {
                        //选举本身成功
                        @Override
                        public void onElectedAsMaster(ClusterState state) {
                            synchronized (stateMutex) {
                                joinThreadControl.markThreadAsDone(currentThread);
                            }
                        }

                        //选举本身失败,从新发起一轮ping
                        @Override
                        public void onFailure(Throwable t) {
                            logger.trace("failed while waiting for nodes to join, rejoining", t);
                            synchronized (stateMutex) {
                                joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                            }
                        }
                    }

            );
        } else {
            // process any incoming joins (they will fail because we are not the master)
            //阻止其余节点加入,localNode
            nodeJoinController.stopElectionContext(masterNode + " elected");

            // send join request
            final boolean success = joinElectedMaster(masterNode);

            synchronized (stateMutex) {
                if (success) {
                    DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
                    if (currentMasterNode == null) {
                        // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                        // a valid master.
                        logger.debug("no master node is set, despite of join request completing. retrying pings.");
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                    } else if (currentMasterNode.equals(masterNode) == false) {
                        // update cluster state
                        joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
                    }

                    joinThreadControl.markThreadAsDone(currentThread);
                } else {
                    // failed to join. Try again...
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                }
            }
        }复制代码

1)若是选出来的master为当前节点,则当前节点须要等待其余节点的加入,等待的数目为discovery.zen.minimum_master_nodes-1,加上当前节点maste,恰好投票大于半数。若是在必定时间内没有收到足够多的投票(即其余节点的加入),则选举失败,从新开始选举。若是加入的节点达到数目,则选举成功。

2)若是当前选举的master是其余节点,则当前节点关闭其余节点的加入请求,假设当前节为node1,目标master节点为node2。则此时有三种可能:

  • node2已是集群master,怎把node1做为一个新节点加入,同步集群信息,node1选举成功
  • node2正在参与选举,即1)所描述的状态,则node2会把此次链接当成一个投票,node1继续等待,若是node1在规定时间内没有等到node2成为master,则node1选举失败,node1从新选举
  • node2认为node3才是master,此时node3会拒接链接请求,node1选举失败,从新选举。

5.节点监测

        选举流程结束后,为了保证集群服务过程当中节点的意外退出,须要启动两个重要的task。分别是masterFaultDetection和NodeFaultDetection。相似于心跳机制,按期监测node和master的状态。若是node监测不到master心跳,调用,会notifyMasterFailure进行选举。若是master检测不到NodeFaultDetection心跳,调用notifyNodeFailure,将node移除,发布新的cluster_state,执行相应的primary和replica操做。移除node的时候会监测当前节点数据是否足够,若是不足,则从新发起选举。

if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) {
         final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes());
         rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])",
                                                         masterNodes, electMasterService.minimumMasterNodes()));
         return resultBuilder.build(currentState);
} else {
         return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));
}复制代码

6.总结

        相较于zookeeper的选举,es的选举有点像Bully算法,比较简单。zookeeper基于Paxos的算法则比较复杂。Es的discovery模块代码量不大,核心的ZenDiscovery.java一共才1000多行代码,认真看几遍就能明白Elasticsearch选举的主要思想。

相关文章
相关标签/搜索