Vert.x系列(三)--ClusteredEventBus源码分析

 前言:由于ClusteredEventBus涉及集群,有必产生网络问题,从而引入了NetServer、ServerID等涉及网络,端口的类。在以前的EventBusImpl中, 使用的数据结构是以address-List<Handler>做为k-v的map容器。做为EventBusImpl的子类,ClusteredEventBus的逻辑结构上同样的。 不过把address-List<ServerID>做为k-v。node

原理:在start方法中,利用第三方框架(默认hazelcast)实现的集群同步map(变量subs) ,获取已有的节点信息。而后根据参数,对自身服务器的端口实现监听,把自身服务器信息放入前面的map,让其余节点感知。调用consumer方法时,以address-List<ServerID>做为k-v存在一个map的容器中。调用send方法时,以address为k从map里取出ServerID.而后把消息利用TCP协议发送给对应的服务器。服务器

代码:网络

public static final String CLUSTER_PUBLIC_HOST_PROP_NAME = "vertx.cluster.public.host";// 这2个字段是为了从System.getProperty()取值,优先级//1.System.getProperty()2.EventBusOptions

public static final String CLUSTER_PUBLIC_PORT_PROP_NAME = "vertx.cluster.public.port";

private static final Buffer PONG = Buffer.buffer(new byte[]{(byte) 1});

private static final String SERVER_ID_HA_KEY = "server_id";

private static final String SUBS_MAP_NAME = "__vertx.subs"; //集群数据存放在集群同步的map中,须要约定一个固定的key统一存取。

private final ClusterManager clusterManager;

private final HAManager haManager;

private final ConcurrentMap<ServerID, ConnectionHolder> connections = new ConcurrentHashMap<>();//根据socket长连接

private final Context sendNoContext;

private EventBusOptions options; // 建立时的参数

private AsyncMultiMap<String, ClusterNodeInfo> subs; // 集群核心数据 k是address,value是HazelcastClusterNodeInfo

private Set<String> ownSubs = new ConcurrentHashSet<>();// 自身订阅(Subscribe)的addrees

private ServerID serverID; // 自身服务器信息(IP和port)

private ClusterNodeInfo nodeInfo; // 自身集群信息(NodeID、IP和port)

private NetServer server;//

在 public void start(Handler<AsyncResult<Void>> resultHandler) 方法中。数据结构

作了不少事件,不少逻辑。框架

1.subs = ar2.result(); 获取集群数据。从集群拉取数据,ar2.succeeded() 为前置判断。直接排除网络、配置等错误的可能。socket

2.建立底层的端口监听。这里端口有大坑,有2个概念:async

actualPort 和 publicPort

actualPort是值真正监听的端口,从option传值过来,没有则随机产生。ide

publicPort是放到共享给集群的端口,为了通知别的节点让它们往这里发数据。官方的解释是为了容器状况考虑。在容器里运行时,和主机的端口是经过代理访问的。对于这2个port ,由于这里有好几个变量能够赋值,全部里面有优先级:代理

actualPort:
1.VertxOptions 也是 EventBusOptions 的setClusterPublicHost,查看VertxOptions.setClusterPort() / VertxOptions.setClusterPublicHost() 方法,发现其实就是对EventBusOptions操做。
2.随机产生。

 

publicPort
1.系统变量CLUSTER_PUBLIC_PORT_PROP_NAME
2.VertxOptions 也是 EventBusOptions 的setClusterPublicHost
3.上面的actualPort

这由于端口直接涉及到通讯,设置不对就没法使用。若是是集群内多节点的状况,须要设置host,不须要设置port. 由于host默认值是 "localhost",port默认值是随机产生的可用端口(假设为51854),host和port会产生ServerID。若是不设置host,A节点就会把 "localhost:51854"传到集群上。其余B节想要访问A时,会根据这个信息去访问 localhost:51854,结果访问到自身去了。code

 

下面重点就是consumer 和 send/poblish方法。

调用consumer方法时,会依次调用到addRegistration(),往集群共享的subs中放入信息,达到传播的目的。

@Override
protected <T> void addRegistration(boolean newAddress, String address,boolean replyHandler, boolean localOnly,Handler<AsyncResult<Void>> completionHandler) {
if (newAddress && subs != null && !replyHandler && !localOnly) {
    // Propagate the information
    subs.add(address, nodeInfo, completionHandler);
    ownSubs.add(address);
} else {
    completionHandler.handle(Future.succeededFuture());
}
}

调用send/poblish方法时,会依次调用到sendOrPub(),

@Override

protected <T> void sendOrPub(SendContextImpl<T> sendContext) {

String address = sendContext.message.address();
// 这里只是定义resultHandler,没有执行,若是要执行,还需
//要resultHandler.handler(AsyncResult)
Handler<AsyncResult<ChoosableIterable<ClusterNodeInfo>>> resultHandler = asyncResult -> {
if (asyncResult.succeeded()) {
// 重要的 server
ChoosableIterable<ClusterNodeInfo> serverIDs = asyncResult.result();
if (serverIDs != null && !serverIDs.isEmpty()) {
sendToSubs(serverIDs, sendContext);
} else {
if (metrics != null) {
metrics.messageSent(address, !sendContext.message.isSend(), true, false);
}
deliverMessageLocally(sendContext);
}
} else {
log.error("Failed to send message", asyncResult.cause());
}
};

// 这里才是处理 。subs存的是k-v是 address-List<HazelcastClusterNodeInfo>
// get(k)就是把List<HazelcastClusterNodeInfo>取出来,交给上面的handler
if (Vertx.currentContext() == null) {
// Guarantees the order when there is no current context
sendNoContext.runOnContext(v -> {
subs.get(address, resultHandler);
});
} else {
subs.get(address, resultHandler);
}
}

sendToSubs()方法是包含了 send/publish 的判断,这个逻辑原本是在deliverMessageLocally(MessageImpl msg)完成的。

protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName)方法里,单机版产生的是 MessageImpl, 集群版产生ClusteredMessage。 ClusteredMessage此类包含了对Buffer 的操做,帮助socket通讯。 

相关文章
相关标签/搜索