前言:由于ClusteredEventBus涉及集群,有必产生网络问题,从而引入了NetServer、ServerID等涉及网络,端口的类。在以前的EventBusImpl中, 使用的数据结构是以address-List<Handler>做为k-v的map容器。做为EventBusImpl的子类,ClusteredEventBus的逻辑结构上同样的。 不过把address-List<ServerID>做为k-v。node
原理:在start方法中,利用第三方框架(默认hazelcast)实现的集群同步map(变量subs) ,获取已有的节点信息。而后根据参数,对自身服务器的端口实现监听,把自身服务器信息放入前面的map,让其余节点感知。调用consumer方法时,以address-List<ServerID>做为k-v存在一个map的容器中。调用send方法时,以address为k从map里取出ServerID.而后把消息利用TCP协议发送给对应的服务器。服务器
代码:网络
public static final String CLUSTER_PUBLIC_HOST_PROP_NAME = "vertx.cluster.public.host";// 这2个字段是为了从System.getProperty()取值,优先级//1.System.getProperty()2.EventBusOptions public static final String CLUSTER_PUBLIC_PORT_PROP_NAME = "vertx.cluster.public.port"; private static final Buffer PONG = Buffer.buffer(new byte[]{(byte) 1}); private static final String SERVER_ID_HA_KEY = "server_id"; private static final String SUBS_MAP_NAME = "__vertx.subs"; //集群数据存放在集群同步的map中,须要约定一个固定的key统一存取。 private final ClusterManager clusterManager; private final HAManager haManager; private final ConcurrentMap<ServerID, ConnectionHolder> connections = new ConcurrentHashMap<>();//根据socket长连接 private final Context sendNoContext; private EventBusOptions options; // 建立时的参数 private AsyncMultiMap<String, ClusterNodeInfo> subs; // 集群核心数据 k是address,value是HazelcastClusterNodeInfo private Set<String> ownSubs = new ConcurrentHashSet<>();// 自身订阅(Subscribe)的addrees private ServerID serverID; // 自身服务器信息(IP和port) private ClusterNodeInfo nodeInfo; // 自身集群信息(NodeID、IP和port) private NetServer server;//
在 public void start(Handler<AsyncResult<Void>> resultHandler) 方法中。数据结构
作了不少事件,不少逻辑。框架
1.subs = ar2.result(); 获取集群数据。从集群拉取数据,ar2.succeeded() 为前置判断。直接排除网络、配置等错误的可能。socket
2.建立底层的端口监听。这里端口有大坑,有2个概念:async
actualPort 和 publicPort
actualPort是值真正监听的端口,从option传值过来,没有则随机产生。ide
publicPort是放到共享给集群的端口,为了通知别的节点让它们往这里发数据。官方的解释是为了容器状况考虑。在容器里运行时,和主机的端口是经过代理访问的。对于这2个port ,由于这里有好几个变量能够赋值,全部里面有优先级:代理
actualPort: 1.VertxOptions 也是 EventBusOptions 的setClusterPublicHost,查看VertxOptions.setClusterPort() / VertxOptions.setClusterPublicHost() 方法,发现其实就是对EventBusOptions操做。 2.随机产生。
publicPort 1.系统变量CLUSTER_PUBLIC_PORT_PROP_NAME 2.VertxOptions 也是 EventBusOptions 的setClusterPublicHost 3.上面的actualPort
这由于端口直接涉及到通讯,设置不对就没法使用。若是是集群内多节点的状况,须要设置host,不须要设置port. 由于host默认值是 "localhost",port默认值是随机产生的可用端口(假设为51854),host和port会产生ServerID。若是不设置host,A节点就会把 "localhost:51854"传到集群上。其余B节想要访问A时,会根据这个信息去访问 localhost:51854,结果访问到自身去了。code
下面重点就是consumer 和 send/poblish方法。
调用consumer方法时,会依次调用到addRegistration(),往集群共享的subs中放入信息,达到传播的目的。
@Override protected <T> void addRegistration(boolean newAddress, String address,boolean replyHandler, boolean localOnly,Handler<AsyncResult<Void>> completionHandler) { if (newAddress && subs != null && !replyHandler && !localOnly) { // Propagate the information subs.add(address, nodeInfo, completionHandler); ownSubs.add(address); } else { completionHandler.handle(Future.succeededFuture()); } }
调用send/poblish方法时,会依次调用到sendOrPub(),
@Override protected <T> void sendOrPub(SendContextImpl<T> sendContext) { String address = sendContext.message.address(); // 这里只是定义resultHandler,没有执行,若是要执行,还需 //要resultHandler.handler(AsyncResult) Handler<AsyncResult<ChoosableIterable<ClusterNodeInfo>>> resultHandler = asyncResult -> { if (asyncResult.succeeded()) { // 重要的 server ChoosableIterable<ClusterNodeInfo> serverIDs = asyncResult.result(); if (serverIDs != null && !serverIDs.isEmpty()) { sendToSubs(serverIDs, sendContext); } else { if (metrics != null) { metrics.messageSent(address, !sendContext.message.isSend(), true, false); } deliverMessageLocally(sendContext); } } else { log.error("Failed to send message", asyncResult.cause()); } }; // 这里才是处理 。subs存的是k-v是 address-List<HazelcastClusterNodeInfo> // get(k)就是把List<HazelcastClusterNodeInfo>取出来,交给上面的handler if (Vertx.currentContext() == null) { // Guarantees the order when there is no current context sendNoContext.runOnContext(v -> { subs.get(address, resultHandler); }); } else { subs.get(address, resultHandler); } }
sendToSubs()方法是包含了 send/publish 的判断,这个逻辑原本是在deliverMessageLocally(MessageImpl msg)完成的。
protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName)方法里,单机版产生的是 MessageImpl, 集群版产生ClusteredMessage。 ClusteredMessage此类包含了对Buffer 的操做,帮助socket通讯。