Vert.x系列(四)-- HAManager源码分析

三.HA模式。node

前言:在集群模式下,Vert.x框架拥有HA(HighAvailability)能力。通俗的解释是集群中的一个节点跪了,原来运行在失败节点上的Verticle会在其余节点上自动启动。固然,前提是Verticle被设置了HA模式。服务器

原理:
首先,HA的构造方法里传入了ClusterManager ,利用这个类实现node的add/left 监听,
这样集群的每一个节点得到其余节点的add/left。每当add事件发生,把节点的信息存到集群贡献的map中,除非集群崩溃,信息都回在。每当left事件发生,存活的节点肯定本身是否与失败节点在同一个组中:
若是不是同一个组,那么它将不会做为故障转移节点的候选者。群集中的节点仅故障转移到同一组中的其余节点;
若是是同一个组,那么用失败节点的UUID计算Hashcode,而后用这个Hashcode对集群节点的个数作取模(%)运算,以取模的结果做为索引从集群全部节点的List拿到对应的节点,判断这个计算的出来的节点是否是自身。若是是自身就re-deploy那个失败的节点。app

其中有个重要的逻辑检查一个“集群最小节点数”quorum的值。(quorum本意:会议法定最小参加人数)。官方说能够避免quorum 丢失后致使的竞争条件,若是不检查就要使用排他锁,而排他锁太棘手,很容易致使死锁。(原文: avoid race conditions resulting in modules being deployed after a quorum has been lost,and without having to resort to exclusive locking which is actually quite tricky here, and prone to deadlock。)
让我来猜想、估计、YY这段话的原意:框架实现了最小quorum台服务器存活才称为集群,才实现HA模式。即 if(aliveServerNum > quorum) {
//  doHA
}
可是这个aliveServerNum 必然分别维护在这N台服务上。如何维护这个aliveServerNum?
一台服务器跪了,须要aliveServerNum -- 。一台服务器加了,须要aliveServerNum++。若是有多台服务器同时加入,代码须要设计为
// get lock 
{
 aliveServerNum++

// release lock
那么,能够想象的场景:S1,S2服务器发生重启,要各自通知到
A1, A2,A3...An 个alive的服务器。
S1对 A1 的aliveServerNum++锁定,请求 A2的锁。
S2对 A2 的aliveServerNum++锁定,请求 A1的锁。 框架

与其费工夫维护aliveServerNum,还不如用 checkQuorum()方法和boolean attainedQuorum代替。即
checkQuorum()
if(attainedQuorum) {
//  doHA
}
可是,这里我有个疑问是:
若是lock是集群锁才有上面的死锁问题,若是lock是单机锁synchronize,好像能解决问题又不会有死锁?但不管如何,不维护aliveServerNum是更明智的选择。async


代码:
private final VertxInternal vertx;   // vertx
private final DeploymentManager deploymentManager;  // deploymentManager
private final ClusterManager clusterManager;  //  接口,重要方法是 join leave nodeListeneride

private final int quorumSize;          // 集群最小节点数
private final String group;           // 相同group才有HA逻辑
private final JsonObject haInfo;                       
private final Map<String, String> clusterMap; //
// 别的属性是单机的,当服务器跪掉就丢失了
// 为了failover时还能取得当时的信息。须要把信息变成
// 集群共享,在须要时用clusterManager.getSyncMap()取得。 haInfo和clusterMap就是用来作这件事
// clusterMap 的存在乎义是方便用nodeID查找。ui

private final String nodeID;      // 节点ID,惟一性。
private final Queue<Runnable> toDeployOnQuorum = new ConcurrentLinkedQueue<>();
// 若是节点跪了时,集群不在 quorum 状态。 那么就把信息放在这个队列容器中。而后用定时器(vertx.setPeriodic)不停的检测。 等达到attainedQuorum再deploy.this

private final boolean enabled;  // 是否HA的总开关。线程

private long quorumTimerID;  //  见上面 toDeployOnQuorum 。定时器的ID,达到条件后调用vertx.cancelTimer(quorumTimerID);取消设计

private volatile boolean attainedQuorum; // 见 “原理”
private volatile FailoverCompleteHandler failoverCompleteHandler; // 提供自定义的重启逻辑插入
private volatile boolean failDuringFailover; // For testing:
private volatile boolean stopped; // 状态量
private volatile boolean killed; // 状态量
private Consumer<Set<String>> clusterViewChangedHandler;

HAManager 的构造方法

clusterManager.nodeListener(new NodeListener() {

  @Override
  public void nodeAdded(String nodeID) {
    HAManager.this.nodeAdded(nodeID);
  }

  @Override
  public void nodeLeft(String leftNodeID) {
    HAManager.this.nodeLeft(leftNodeID);
  }
});

存在 nodeAdded 和 nodeLeft的监听。clusterManager 是接口,具体还要看由实现类。

private void doDeployVerticle(final String verticleName, DeploymentOptions deploymentOptions,
                              final Handler<AsyncResult<String>> doneHandler) {
  final Handler<AsyncResult<String>> wrappedHandler = asyncResult -> {
    if (asyncResult.succeeded()) {
      // Tell the other nodes of the cluster about the verticle for HA purposes
      addToHA(asyncResult.result(), verticleName, deploymentOptions);
    }
    if (doneHandler != null) {
      doneHandler.handle(asyncResult);
    } else if (asyncResult.failed()) {
      log.error("Failed to deploy verticle", asyncResult.cause());
    }
  };
  deploymentManager.deployVerticle(verticleName, deploymentOptions, wrappedHandler);
}


这个方法一开始看了让人蒙圈,没有搞明白前面的asyncResult怎么就succeeded了。其实,方法前部全部的代码都是为了定义一个wrappedHandler ,定义完了在方法最后一句传到deploymentManager.deployVerticle里。跟踪代码,在若干方法栈后才有对这个wrappedHandler.handle()方法调用:

completionHandler.handle(result);

因此,额外说句,这个方法的名字其实有问题。通常名为doX的方法,都是处理最底层的X业务,而这里实际上是交付给了deploymentManager去作。

private String chooseHashedNode(String group, int hashCode)

这个方法会计算出从新部署的节点,而后每一个存活的节点都调用,判断结果是否是节点本身。

String chosen = chooseHashedNode(group, failedNodeID.hashCode());
if (chosen != null && chosen.equals(this.nodeID)) {

若是是本身,才继续恢复流程,调用核心业务方法。若是不是本身,没有事情发生。

private void processFailover(JsonObject failedVerticle)

processFailover()使用了 CountDownLatch类 处理线程阻塞和超时处理逻辑。

别的方法都很简单的,看名字就知道内容。

相关文章
相关标签/搜索