SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。javascript
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让你们借以学习阿里如何设计。java
本文为第十篇,主要是从业务角度进行梳理。看看DataServer如何与MetaServer交互。node
首先咱们要复习下MetaServer的重要性。spring
MetaServer元数据服务器集群。这个集群管辖的范围是 Session 服务器集群和 Data 服务器集群的服务器信息,其角色就至关于 SOFARegistry 架构内部的服务注册中心,只不过 SOFARegistry 做为服务注册中心是服务于广大应用服务层,而 Meta 集群是服务于 SOFARegistry 内部的 Session 集群和 Data 集群,Meta 层可以感知到 Session 节点和 Data 节点的变化,并通知集群的其它节点。bootstrap
因此,若是想获取节点的变化,DataServer就必须重点研究如何与MetaServer交互。服务器
居于Bolt协议,DataServer在与Meta Server的交互中,使用了推拉模型。网络
咱们在这里重点分析其设计策略以下:架构
此模块目录结构以下,大体能够推论,并发
DefaultMetaServiceImpl 是 Meta Server 相关模块主体;异步
MetaServerConnectionFactory是链接管理;
ConnectionRefreshMetaTask 是按期循环task;
handler目录下是三个响应函数;
provideData 目录下是配置相关功能;
具体目录结构以下:
│ ├── metaserver │ │ ├── DefaultMetaServiceImpl.java │ │ ├── IMetaServerService.java │ │ ├── MetaServerConnectionFactory.java │ │ ├── handler │ │ │ ├── NotifyProvideDataChangeHandler.java │ │ │ ├── ServerChangeHandler.java │ │ │ └── StatusConfirmHandler.java │ │ ├── provideData │ │ │ ├── ProvideDataProcessor.java │ │ │ ├── ProvideDataProcessorManager.java │ │ │ └── processor │ │ │ └── DatumExpireProvideDataProcessor.java │ │ └── task │ │ └── ConnectionRefreshMetaTask.java
MetaServer相关组件以下:
这里有一个问题 :为何 DataServerBootstrap 之中还有 startRaftClient,按说DataServer只用Http和Bolt就能够了。
原来是用 raft 协议来获取MetaServer集群中leader的地址等信息: raftClient.getLeader();
好比 renewNodeTask 时候会用到。
Raft相关启动是在startRaftClient,此函数的做用是:
具体代码是:
private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }
前面提到了,当系统启动以后,会主动发送一个MetaServerChangeEvent,咱们就看看其内容。
public class MetaServerChangeEvent implements Event { private Map<String, Set<String>> ipMap; /** * constructor * @param ipMap */ public MetaServerChangeEvent(Map<String, Set<String>> ipMap) { this.ipMap = ipMap; } public Map<String, Set<String>> getIpMap() { return ipMap; } }
其运行状态以下:
event = {MetaServerChangeEvent@5991} ipMap = {HashMap@5678} size = 1 "DefaultDataCenter" -> {ConcurrentHashMap$KeySetView@6007} size = 1
MetaServerChangeEvent有三种来源:启动主动获取,按期,推送。这三种具体以下:
咱们先简述来源:
这就是上面提到的,启动时会从配置里面读取meta server配置,metaServerService.getMetaServerMap();据此构建MetaServerChangeEvent,投放到EventCenter之中。
当 DataServer 节点初始化成功后,会启动任务自动去链接 MetaServer。即,该任务会往事件中心 EventCenter 注册一个 DataServerChangeEvent 事件,该事件注册后会被触发,以后将对新增节点计算 Hash 值,同时进行纳管分片。
具体启动时,会从配置里面读取meta server配置,metaServerService.getMetaServerMap();据此构建MetaServerChangeEvent,投放到EventCenter之中。
private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }
堆栈以下
register:44, MetaServerConnectionFactory (com.alipay.sofa.registry.server.data.remoting.metaserver) registerMetaServer:129, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler) doHandle:92, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler) doHandle:55, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler) handle:51, AbstractEventHandler (com.alipay.sofa.registry.server.data.event.handler) post:56, EventCenter (com.alipay.sofa.registry.server.data.event) startRaftClient:197, DataServerBootstrap (com.alipay.sofa.registry.server.data.bootstrap) start:131, DataServerBootstrap (com.alipay.sofa.registry.server.data.bootstrap) start:47, DataServerInitializer (com.alipay.sofa.registry.server.data.bootstrap) doStart:173, DefaultLifecycleProcessor (org.springframework.context.support) access$200:50, DefaultLifecycleProcessor (org.springframework.context.support) start:350, DefaultLifecycleProcessor$LifecycleGroup (org.springframework.context.support) startBeans:149, DefaultLifecycleProcessor (org.springframework.context.support) onRefresh:112, DefaultLifecycleProcessor (org.springframework.context.support) finishRefresh:880, AbstractApplicationContext (org.springframework.context.support) refresh:546, AbstractApplicationContext (org.springframework.context.support) refresh:693, SpringApplication (org.springframework.boot) refreshContext:360, SpringApplication (org.springframework.boot) run:303, SpringApplication (org.springframework.boot) run:1118, SpringApplication (org.springframework.boot) run:1107, SpringApplication (org.springframework.boot) main:41, DataApplication (com.alipay.sofa.registry.server.data)
这部分是ConnectionRefreshMetaTask完成。ConnectionRefreshMetaTask 是按期 task,其在 Bean tasks 里面配置。
StartTaskEventHandler 会调用到 tasks,当收到 StartTaskEvent 以后,会启动 tasks里面的几个AbstractTask。
public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> { @Resource(name = "tasks") private List<AbstractTask> tasks; private ScheduledExecutorService executor = null; @Override public List<Class<? extends StartTaskEvent>> interest() { return Lists.newArrayList(StartTaskEvent.class); } @Override public void doHandle(StartTaskEvent event) { if (executor == null || executor.isShutdown()) { getExecutor(); } for (AbstractTask task : tasks) { if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) { executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),task.getTimeUnit()); } } } private void getExecutor() { executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass() .getSimpleName()); } }
具体tasks以下:
@Bean(name = "tasks") public List<AbstractTask> tasks() { List<AbstractTask> list = new ArrayList<>(); list.add(connectionRefreshTask()); list.add(connectionRefreshMetaTask()); list.add(renewNodeTask()); return list; }
ConnectionRefreshMetaTask 是按期task,会按期向EventCenter投放一个 MetaServerChangeEvent。
执行时候调用 metaServerService.getMetaServerMap();
返回一个MetaServerChangeEvent,而且添加到EventCenter之中。
public class ConnectionRefreshMetaTask extends AbstractTask { @Autowired private IMetaServerService metaServerService; @Autowired private EventCenter eventCenter; @Override public void handle() { eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); } }
ServerChangeHandler 是 metaClientHandler 的一部分,是MetaNodeExchanger 的响应函数。
ServerChangeHandler 继承了AbstractClientHandler,在interest之中,配置了会响应NodeChangeResult。
若是Meta有推送,ServerChangeHandler这里就有响应,这个会是 Meta Server 主动通知。
在ServerChangeHandler之中,拿到了NodeChangeResult以后,会判断变动节点类型,这里会根据 Note 类型不一样,决定产生 DataServerChangeEvent 仍是MetaServerChangeEvent。若是是NodeType.META,就发送消息给eventCenter,eventCenter.post(new MetaServerChangeEvent(map));
,这就是MetaServerChangeEvent的来源之一。
public class ServerChangeHandler extends AbstractClientHandler<NodeChangeResult> { @Autowired private EventCenter eventCenter; @Autowired private DataServerConfig dataServerConfig; @Override public Object doHandle(Channel channel, NodeChangeResult request) { ExecutorFactory.getCommonExecutor().execute(() -> { if (request.getNodeType() == NodeType.DATA) { eventCenter.post(new DataServerChangeEvent(request.getNodes(), request.getDataCenterListVersions(), FromType.META_NOTIFY)); } else if (request.getNodeType() == NodeType.META) { Map<String, Map<String, MetaNode>> metaNodesMap = request.getNodes(); if (metaNodesMap != null && !metaNodesMap.isEmpty()) { Map<String, MetaNode> metaNodeMap = metaNodesMap.get(dataServerConfig.getLocalDataCenter()); if (metaNodeMap != null && !metaNodeMap.isEmpty()) { HashMap<String, Set<String>> map = new HashMap<>(); map.put(dataServerConfig.getLocalDataCenter(), metaNodeMap.keySet()); eventCenter.post(new MetaServerChangeEvent(map)); } } } }); return CommonResponse.buildSuccessResponse(); } @Override public Class interest() { return NodeChangeResult.class; } @Override public HandlerType getType() { return HandlerType.PROCESSER; } @Override protected Node.NodeType getConnectNodeType() { return Node.NodeType.DATA; } }
此时逻辑图以下,能够看到三种MetaServerChangeEvent消息来源,ServerChangeHandler也会提供DataServerChangeEvent:
+-------------------------------+ |[DataServerBootstrap] | MetaServerChangeEvent | | | +-------------------------+ | startRaftClient | | | | | | | | +-------------------------------+ | +-------------------------------+ | | [Timer] | | | | | +-------------+ | ConnectionRefreshMetaTask +------------------------------> | EventCenter | | | MetaServerChangeEvent | +-------+-----+ +-------------------------------+ | ^ +-------------------------------+ | | | [Push<NodeChangeResult>] | | | | | | | | +-------------------------+ | | | MetaServerChangeEvent | | ServerChangeHandler | | | +----------------------------------------+ +-------------------------------+ DataServerChangeEvent
MetaServerChangeEventHandler 用来响应 MetaServerChangeEvent 消息。由于其继承了AbstractEventHandler,因此 MetaServerChangeEventHandler 已经注册到了EventCenter之上。
注意,这里有一个再次转换DataServerChangeEvent的过程,即这里又会主动和MetaServer交互,若是返回消息是NodeChangeResult,就转换为DataServerChangeEvent。
这是由于Meta Server的这个推送,也许是告诉data Server,"hi,目前data server也有变更,兄弟你再来拉取下"。
在处理时候,MetaServerChangeEventHandler会去与MetaServer交互,看看其有效性,若是有效,就注册。
逻辑以下:
DataNode(new URL(DataServerConfig.IP), dataServerConfig .getLocalDataCenter());
具体代码以下:
public class MetaServerChangeEventHandler extends AbstractEventHandler<MetaServerChangeEvent> { @Autowired private DataServerConfig dataServerConfig; @Autowired private IMetaServerService metaServerService; @Autowired private MetaNodeExchanger metaNodeExchanger; @Autowired private EventCenter eventCenter; @Autowired private MetaServerConnectionFactory metaServerConnectionFactory; @Override public List<Class<? extends MetaServerChangeEvent>> interest() { return Lists.newArrayList(MetaServerChangeEvent.class); } @Override public void doHandle(MetaServerChangeEvent event) { Map<String, Set<String>> ipMap = event.getIpMap(); for (Entry<String, Set<String>> ipEntry : ipMap.entrySet()) { String dataCenter = ipEntry.getKey(); Set<String> ips = ipEntry.getValue(); if (!CollectionUtils.isEmpty(ips)) { for (String ip : ips) { Connection connection = metaServerConnectionFactory.getConnection(dataCenter, ip); if (connection == null || !connection.isFine()) { registerMetaServer(dataCenter, ip); } } Set<String> ipSet = metaServerConnectionFactory.getIps(dataCenter); for (String ip : ipSet) { if (!ips.contains(ip)) { metaServerConnectionFactory.remove(dataCenter, ip); } } } else { //remove connections of dataCenter if the connectionMap of the dataCenter in ipMap is empty removeDataCenter(dataCenter); } } //remove connections of dataCenter if the dataCenter not exist in ipMap Set<String> dataCenters = metaServerConnectionFactory.getAllDataCenters(); for (String dataCenter : dataCenters) { if (!ipMap.containsKey(dataCenter)) { removeDataCenter(dataCenter); } } } private void registerMetaServer(String dataCenter, String ip) { PeerId leader = metaServerService.getLeader(); for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) { try { Channel channel = metaNodeExchanger.connect(new URL(ip, dataServerConfig .getMetaServerPort())); //connect all meta server if (channel != null && channel.isConnected()) { metaServerConnectionFactory.register(dataCenter, ip, ((BoltChannel) channel).getConnection()); } //register leader meta node if (ip.equals(leader.getIp())) { Object obj = null; try { obj = metaNodeExchanger.request(new Request() { @Override public Object getRequestBody() { return new DataNode(new URL(DataServerConfig.IP), dataServerConfig .getLocalDataCenter()); } @Override public URL getRequestUrl() { return new URL(ip, dataServerConfig.getMetaServerPort()); } }).getResult(); } if (obj instanceof NodeChangeResult) { NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj; Map<String, Long> versionMap = result.getDataCenterListVersions(); //send renew after first register dataNode Set<StartTaskTypeEnum> set = new HashSet<>(); set.add(StartTaskTypeEnum.RENEW); eventCenter.post(new StartTaskEvent(set)); eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap,DataServerChangeEvent.FromType.REGISTER_META)); break; } } } } }
此时逻辑图以下:
+-------------------------------+ |[DataServerBootstrap] | MetaServerChangeEvent | | | +-------------------------+ | startRaftClient | | | | | +---------------+ | | | | | +-------------------------------+ | | | +-------------------------------+ | | | | [Timer] | | v | | | | 1 +-------+-----+ | | ConnectionRefreshMetaTask +------------------------------> | EventCenter +----+ | | | MetaServerChangeEvent | +-------+-----+ | | +-------------------------------+ | ^ | | +-------------------------------+ | | | | | | | | | | | [Push<NodeChangeResult>] | | | | | | | | | | | | +-------------------------+ | | | | | MetaServerChangeEvent | | | | ServerChangeHandler | 2 | | | | +----------------------------------------+ | | +-------------------------------+ DataServerChangeEvent | | | | | | MetaServerChangeEvent | | 3 | | +----------------------------------------------------+ | | | v | +-----------------+--------------+ DataServerChangeEvent | | | 4 | | MetaServerChangeEventHandler +------------------------------------------+ | | +--------------------------------+
手机以下:
下面咱们讲讲dataServer如何管理metaServer的链接。
咱们知道,一次 tcp 请求大体分为三个步骤:创建链接、通讯、关闭链接。每次创建新链接都会经历三次握手,中间包含三次网络传输,对于高并发的系统,这是一笔不小的负担;关闭链接一样如此。为了减小每次网络调用请求的开销,对链接进行管理、复用,能够极大的提升系统的性能。
为了提升通讯效率,咱们须要考虑复用链接,减小 TCP 三次握手的次数,所以须要有链接管理的机制。
关于链接管理,SOFARegistry有两个层次的链接管理,分别是 Connection 和 Node。
能够用socket(localIp,localPort, remoteIp,remotePort )表明一个链接,在Netty中用Channel来表示,在sofa-bolt使用Connection对象来抽象一个链接,一个链接在client跟server端各用一个connection对象表示。
有了Connection这个抽象以后,天然的须要提供接口来管理Connection, 这个接口就是ConnectionFactory。
不管是服务端仍是客户端,其实本质都在作一件事情:建立 ConnectionEventHandler 实例并添加到 Netty 的 pipeline 中。
以后当有 ConnectionEvent 触发时(不管是 Netty 定义的事件被触发,仍是 SOFABolt 定义的事件被触发),ConnectionEventHandler 会经过异步线程执行器通知 ConnectionEventListener,ConnectionEventListener 将消息派发给具体的 ConnectionEventProcessor 实现类。
metaServerConnectionFactory 是用来存储全部 meta Sever Connection,这是Bolt的机制应用,须要维持一个长链接。
MetaServerChangeEvent 内容是:dataCenter,以及其下面的Data Server ip 列表。对应MetaServerConnectionFactory 的 MAP 是:
Map< dataCenter : Map<ip, Connection> >
具体定义以下:
public class MetaServerConnectionFactory { private final Map<String, Map<String, Connection>> MAP = new ConcurrentHashMap<>(); /** * * @param dataCenter * @param ip * @param connection */ public void register(String dataCenter, String ip, Connection connection) { Map<String, Connection> connectionMap = MAP.get(dataCenter); if (connectionMap == null) { Map<String, Connection> newConnectionMap = new ConcurrentHashMap<>(); connectionMap = MAP.putIfAbsent(dataCenter, newConnectionMap); if (connectionMap == null) { connectionMap = newConnectionMap; } } connectionMap.put(ip, connection); } }
只是在 MetaServerChangeEventHandler . doHandle 函数中有添加操做,调用了metaServerConnectionFactory.register
。
因此在 doHandle 函数中,遍历Event全部的 meta Server IP,这里每个ip对应一个 data Center。对于每个ip作以下操做:
removeDataCenter(dataCenter);
其中使用了metaNodeExchanger去链接metaServer。具体代码以下:
private void registerMetaServer(String dataCenter, String ip) { PeerId leader = metaServerService.getLeader(); for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) { try { Channel channel = metaNodeExchanger.connect(new URL(ip, dataServerConfig .getMetaServerPort())); //connect all meta server if (channel != null && channel.isConnected()) { metaServerConnectionFactory.register(dataCenter, ip, ((BoltChannel) channel).getConnection()); } //其余操做 } }
MetaServerConnectionFactory在运行时以下:
metaServerConnectionFactory = {MetaServerConnectionFactory@5387} MAP = {ConcurrentHashMap@6154} size = 1 "DefaultDataCenter" -> {ConcurrentHashMap@6167} size = 1
dataServer和metaServer之间是推拉模型交互。
MetaNodeExchanger 是 bolt Exchange,把metaServer相关的网络操做集中在一块儿。不管是MetaServerChangeEventHandler仍是DefaultMetaServiceImpl,都基于此与Meta Server交互。其中
connect 设置了响应函数metaClientHandlers
而 request 时候,若是失败了,则会 metaServerService.refreshLeader().getIp() 刷新地址,从新调用。
这里会测试MetaServer有效性 。
public class MetaNodeExchanger implements NodeExchanger { @Autowired private Exchange boltExchange; @Autowired private IMetaServerService metaServerService; @Autowired private DataServerConfig dataServerConfig; @Resource(name = "metaClientHandlers") private Collection<AbstractClientHandler> metaClientHandlers; @Override public Response request(Request request) { Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE); try { final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(), dataServerConfig.getRpcTimeout()); return () -> result; } catch (Exception e) { //retry URL url = new URL(metaServerService.refreshLeader().getIp(), dataServerConfig.getMetaServerPort()); final Object result = client.sendSync(url, request.getRequestBody(), request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout()); return () -> result; } } public Channel connect(URL url) { Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE); if (client == null) { synchronized (this) { client = boltExchange.getClient(Exchange.META_SERVER_TYPE); if (client == null) { client = boltExchange.connect(Exchange.META_SERVER_TYPE, url, metaClientHandlers.toArray(new ChannelHandler[metaClientHandlers.size()])); } } } //try to connect data Channel channel = client.getChannel(url); if (channel == null) { synchronized (this) { channel = client.getChannel(url); if (channel == null) { channel = client.connect(url); } } } return channel; } }
MetaNodeExchanger响应Handler以下,这部分是推模型,前面已经提到了,serverChangeHandler会响应推送。
@Bean(name = "metaClientHandlers") public Collection<AbstractClientHandler> metaClientHandlers() { Collection<AbstractClientHandler> list = new ArrayList<>(); list.add(serverChangeHandler()); list.add(statusConfirmHandler()); list.add(notifyProvideDataChangeHandler()); return list; }
DefaultMetaServiceImpl是Meta Server相关服务的核心实现。
其中,raftClient是raft的入口,metaNodeExchanger 是bolt的入口。metaServerConnectionFactory 保存目前全部的 meta server bolt connection。
public class DefaultMetaServiceImpl implements IMetaServerService { @Autowired private DataServerConfig dataServerConfig; @Autowired private MetaNodeExchanger metaNodeExchanger; @Autowired private MetaServerConnectionFactory metaServerConnectionFactory; @Autowired private DataServerCache dataServerCache; private RaftClient raftClient; }
刷新是重要功能之一,用来获取raft leader。
@Override public PeerId getLeader() { if (raftClient == null) { startRaftClient(); } PeerId leader = raftClient.getLeader(); if (leader == null) { throw new RuntimeException( "[DefaultMetaServiceImpl] register MetaServer get no leader!"); } return leader; } @Override public PeerId refreshLeader() { if (raftClient == null) { startRaftClient(); } PeerId leader = raftClient.refreshLeader(); if (leader == null) { throw new RuntimeException("[RaftClientManager] refresh MetaServer get no leader!"); } return leader; }
另一个重要功能是重连。
getMetaServerMap完成了重连,getMetaServerMap 的做用:
GetNodesRequest(NodeType.META)
;GetNodesRequest(NodeType.META)
的结果 NodeChangeResult,构建一个 MetaServerChangeEvent,放入EventCenter。eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
具体代码以下:
@Override public Map<String, Set<String>> getMetaServerMap() { HashMap<String, Set<String>> map = new HashMap<>(); Set<String> set = dataServerConfig.getMetaServerIpAddresses(); Map<String, Connection> connectionMap = metaServerConnectionFactory .getConnections(dataServerConfig.getLocalDataCenter()); Connection connection = null; try { if (connectionMap.isEmpty()) { List<String> list = new ArrayList(set); Collections.shuffle(list); connection = ((BoltChannel) metaNodeExchanger.connect(new URL(list.iterator() .next(), dataServerConfig.getMetaServerPort()))).getConnection(); } else { List<Connection> connections = new ArrayList<>(connectionMap.values()); Collections.shuffle(connections); connection = connections.iterator().next(); if (!connection.isFine()) { connection = ((BoltChannel) metaNodeExchanger.connect(new URL(connection .getRemoteIP(), dataServerConfig.getMetaServerPort()))).getConnection(); } } GetNodesRequest request = new GetNodesRequest(NodeType.META); final Connection finalConnection = connection; Object obj = metaNodeExchanger.request(new Request() { @Override public Object getRequestBody() { return request; } @Override public URL getRequestUrl() { return new URL(finalConnection.getRemoteIP(), finalConnection.getRemotePort()); } }).getResult(); if (obj instanceof NodeChangeResult) { NodeChangeResult<MetaNode> result = (NodeChangeResult<MetaNode>) obj; Map<String, Map<String, MetaNode>> metaNodesMap = result.getNodes(); if (metaNodesMap != null && !metaNodesMap.isEmpty()) { Map<String, MetaNode> metaNodeMap = metaNodesMap.get(dataServerConfig .getLocalDataCenter()); if (metaNodeMap != null && !metaNodeMap.isEmpty()) { map.put(dataServerConfig.getLocalDataCenter(), metaNodeMap.keySet()); } } } } return map; }
其中,具体获取MetaServer信息是在
@ConfigurationProperties(prefix = DataServerConfig.PRE_FIX) public class DataServerConfig { /** * Getter method for property <tt>metaServerIpAddress</tt>. * * @return property value of metaServerIpAddress */ public Set<String> getMetaServerIpAddresses() { if (metaIps != null && !metaIps.isEmpty()) { return metaIps; } metaIps = new HashSet<>(); if (commonConfig != null) { Map<String, Collection<String>> metaMap = commonConfig.getMetaNode(); if (metaMap != null && !metaMap.isEmpty()) { String localDataCenter = commonConfig.getLocalDataCenter(); if (localDataCenter != null && !localDataCenter.isEmpty()) { Collection<String> metas = metaMap.get(localDataCenter); if (metas != null && !metas.isEmpty()) { metaIps = metas.stream().map(NetUtil::getIPAddressFromDomain).collect(Collectors.toSet()); } } } } return metaIps; } }
在文中咱们能够看到,MetaServerChangeEvent也会转化为 DataServerChangeEvent,投放到EventCenter。
如前图的2,4两步。这是由于Meta Server的这个推送,也许是告诉data Server,"hi,目前data server也有变更"。因此下一期咱们介绍如何处理DataServerChangeEvent。
前图:
+-------------------------------+ |[DataServerBootstrap] | MetaServerChangeEvent | | | +-------------------------+ | startRaftClient | | | | | +---------------+ | | | | | +-------------------------------+ | | | +-------------------------------+ | | | | [Timer] | | v | | | | 1 +-------+-----+ | | ConnectionRefreshMetaTask +------------------------------> | EventCenter +----+ | | | MetaServerChangeEvent | +-------+-----+ | | +-------------------------------+ | ^ | | +-------------------------------+ | | | | | | | | | | | [Push<NodeChangeResult>] | | | | | | | | | | | | +-------------------------+ | | | | | MetaServerChangeEvent | | | | ServerChangeHandler | 2 | | | | +----------------------------------------+ | | +-------------------------------+ DataServerChangeEvent | | | | | | MetaServerChangeEvent | | 3 | | +----------------------------------------------------+ | | | v | +-----------------+--------------+ DataServerChangeEvent | | | 4 | | MetaServerChangeEventHandler +------------------------------------------+ | | +--------------------------------+
手机以下: