SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。java
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让你们借以学习阿里如何设计。服务器
本文为第十一篇,介绍SOFARegistry如何处理Data节点变化,即处理DataServerChangeEvent消息。网络
上文中咱们提到,MetaServerChangeEvent也会转化为 DataServerChangeEvent,投放到EventCenter。session
这是由于Meta Server的这个推送,也许是在告诉data Server,"hi,目前data server也有变更"。因此本期咱们介绍如何处理DataServerChangeEvent,此处须要结合上文。架构
咱们在这里首先要讲讲几个业务范畴。并发
为支持海量数据,SOFARegistry 采用了一致性 Hash 来分片存储 Publisher 数据,避免了单个服务器存储全量数据时产生的容量瓶颈问题。而在这个模型中,每一个数据分片拥有多个副本,当存储注册数的 DataServer 进行扩容、缩容时,MetaServer 会把这个变动通知到 DataServer 和 SessionServer,数据分片会在集群内部进行数据迁移与同步,此时就出现了 DataServer 内部数据的一致性问题。框架
MetaServer 会经过网络链接感知到新节点上线或者下线,全部的 DataServer 中运行着一个定时刷新链接的任务 ConnectionRefreshTask,该任务定时去轮询 MetaServer,获取数据节点的信息。须要注意的是,除了 DataServer 主动去 MetaServer 拉取节点信息外,MetaServer 也会主动发送 NodeChangeResult 请求到各个节点,通知节点信息发生变化,推拉获取信息的最终效果是一致的。dom
这部分整体逻辑以下:异步
当轮询信息返回数据节点有变化时,会向 EventCenter 投递一个 DataServerChangeEvent 事件,在该事件的处理器中,若是判断出是当前机房节点信息有变化,则会投递新的事件 LocalDataServerChangeEvent。ide
该事件的处理器 LocalDataServerChangeEventHandler 中会判断当前节点是否为新加入的节点,若是是新节点则会向其它节点发送 NotifyOnlineRequest 请求,如图所示:
图 DataServer 节点上线时新节点的逻辑
本文就主要讲解从DataServerChangeEvent到LocalDataServerChangeEvent这部分的逻辑。
DataServerChangeEvent有三种来源:启动主动获取,按期,推送。这三种具体以下:
由于有了上文的知识,咱们应该知道,启动主动获取 和 推送 这两种方式是经过MetaServerChangeEvent完成的,结合上文逻辑图,如今简述以下:
+-------------------------------+ |[DataServerBootstrap] | MetaServerChangeEvent | | | +-------------------------+ | startRaftClient | a | | | | +---------------+ | | | | | +-------------------------------+ | | | +-------------------------------+ | | | | [Timer] | | v | | | b | 1 +-------+-----+ | | ConnectionRefreshMetaTask +------------------------------> | EventCenter +----+ | | | MetaServerChangeEvent | +-------+-----+ | | +-------------------------------+ | ^ | | +-------------------------------+ | | | | | | | | | | | [Push<NodeChangeResult>] | | | | | | | c | | | | | +-------------------------+ | | | | | MetaServerChangeEvent | | | | ServerChangeHandler | 2 | | | | +----------------------------------------+ | | +-------------------------------+ DataServerChangeEvent | | | | | | MetaServerChangeEvent | | 3 | | +----------------------------------------------------+ | | | v | +-----------------+--------------+ DataServerChangeEvent | | | 4 | | MetaServerChangeEventHandler +------------------------------------------+ | | +--------------------------------+
当 DataServer 节点初始化成功后,会启动任务自动去链接 MetaServer。启动时,会从配置里面读取meta server配置,metaServerService.getMetaServerMap();据此构建MetaServerChangeEvent,投放到EventCenter之中。
private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }
MetaServerChangeEventHandler 用来响应 MetaServerChangeEvent 消息。由于其继承了AbstractEventHandler,因此 MetaServerChangeEventHandler 已经注册到了EventCenter之上。
在处理MetaServerChangeEvent以后,该任务会往事件中心 EventCenter 注册一个 DataServerChangeEvent 事件,该事件注册后会被触发,以后将对新增节点计算 Hash 值,同时进行纳管分片。
就是对应上图 a,1,3,4这条线,是DataServerChangeEvent的来源1。
这个来源是其余消息的转换,即NodeChangeResult的转换。并且有两个转换过程。
除了 DataServer 主动去 MetaServer 拉取节点信息外,MetaServer 也会主动发送 NodeChangeResult 请求到各个节点,通知节点信息发生变化,推拉获取信息的最终效果是一致的。
ServerChangeHandler 是 metaClientHandler 的一部分,是MetaNodeExchanger 的响应函数。
在ServerChangeHandler之中,拿到了NodeChangeResult以后,会判断变动节点类型,这里会根据 Note 类型不一样,决定产生 DataServerChangeEvent 仍是 MetaServerChangeEvent。
若是是NodeType.META,就发送消息给eventCenter,即eventCenter.post(new MetaServerChangeEvent(map));
,
这就是MetaServerChangeEvent和DataServerChangeEvent来源之一。就是对应上图2这条线,是DataServerChangeEvent的来源2。
public class ServerChangeHandler extends AbstractClientHandler<NodeChangeResult> { @Autowired private EventCenter eventCenter; @Autowired private DataServerConfig dataServerConfig; @Override public Object doHandle(Channel channel, NodeChangeResult request) { ExecutorFactory.getCommonExecutor().execute(() -> { if (request.getNodeType() == NodeType.DATA) { eventCenter.post(new DataServerChangeEvent(request.getNodes(), request.getDataCenterListVersions(), FromType.META_NOTIFY)); } else if (request.getNodeType() == NodeType.META) { Map<String, Map<String, MetaNode>> metaNodesMap = request.getNodes(); if (metaNodesMap != null && !metaNodesMap.isEmpty()) { Map<String, MetaNode> metaNodeMap = metaNodesMap.get(dataServerConfig.getLocalDataCenter()); if (metaNodeMap != null && !metaNodeMap.isEmpty()) { HashMap<String, Set<String>> map = new HashMap<>(); map.put(dataServerConfig.getLocalDataCenter(), metaNodeMap.keySet()); eventCenter.post(new MetaServerChangeEvent(map)); } } } }); return CommonResponse.buildSuccessResponse(); } @Override public Class interest() { return NodeChangeResult.class; } @Override public HandlerType getType() { return HandlerType.PROCESSER; } @Override protected Node.NodeType getConnectNodeType() { return Node.NodeType.DATA; } }
MetaServerChangeEventHandler 用来响应 MetaServerChangeEvent 消息。由于其继承了AbstractEventHandler,因此 MetaServerChangeEventHandler 已经注册到了EventCenter之上。
注意,这里有一个再次转换DataServerChangeEvent的过程,即MetaServerChangeEventHandler这里会再主动和MetaServer交互,这是由于Meta Server的这个推送,也许是在告诉data Server,"hi,目前data server也有变更"。
若是返回消息是NodeChangeResult,就转换为DataServerChangeEvent,投放DataServerChangeEvent到Event Center。
就是对应上图 b,1,3,4这条线,是DataServerChangeEvent的来源3。
public class MetaServerChangeEventHandler extends AbstractEventHandler<MetaServerChangeEvent> { private void registerMetaServer(String dataCenter, String ip) { ...... if (obj instanceof NodeChangeResult) { NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj; Map<String, Long> versionMap = result.getDataCenterListVersions(); //send renew after first register dataNode Set<StartTaskTypeEnum> set = new HashSet<>(); set.add(StartTaskTypeEnum.RENEW); eventCenter.post(new StartTaskEvent(set)); eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap, DataServerChangeEvent.FromType.REGISTER_META)); break; } } }
咱们这里要重点讲解DataServerChangeEvent的来源“轮训拉”。
MetaServer 会经过网络链接感知到新节点上线或者下线,全部的 DataServer 中运行着一个定时刷新链接的任务 ConnectionRefreshTask,该任务定时去轮询 MetaServer,获取数据节点的信息。
ConnectionRefreshTask 在 tasks 这个 Bean中启动。
@Bean(name = "tasks") public List<AbstractTask> tasks() { List<AbstractTask> list = new ArrayList<>(); list.add(connectionRefreshTask()); list.add(connectionRefreshMetaTask()); list.add(renewNodeTask()); return list; }
tasks是在startScheduler间接启动的。
eventCenter.post(new StartTaskEvent( Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW).collect(Collectors.toSet())));
StartTaskEventHandler响应StartTaskEvent,其会逐一启动tasks。
public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> { @Resource(name = "tasks") private List<AbstractTask> tasks; private ScheduledExecutorService executor = null; @Override public void doHandle(StartTaskEvent event) { if (executor == null || executor.isShutdown()) { getExecutor(); } for (AbstractTask task : tasks) { if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) { executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(), task.getTimeUnit()); } } } private void getExecutor() { executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass() .getSimpleName()); } }
这里有一个技巧。
ConnectionRefreshTask里面指定了支持CONNECT_DATA,StartTaskEventHandler在启动时判断支持类型,发现是CONNECT_DATA,就启动了ConnectionRefreshTask。
AbstractEventHandler 其中注册了eventCenter.register,这样它的继承类都默认注册到了EventCenter 之上。
public abstract class AbstractEventHandler<Event> implements InitializingBean { @Autowired private EventCenter eventCenter; @Override public void afterPropertiesSet() throws Exception { eventCenter.register(this); } /** * event handle func * @param event */ public void handle(Event event) { doHandle(event); } public abstract List<Class<? extends Event>> interest(); public abstract void doHandle(Event event); }
因而,connectionRefreshTask就启动了。
ConnectionRefreshTask负责轮询与meta Server交互,能够看到,也发送了DataServerChangeEvent。
public class ConnectionRefreshTask extends AbstractTask { @Autowired private IMetaServerService metaServerService; @Autowired private EventCenter eventCenter; @Override public void handle() { DataServerChangeItem dataServerChangeItem = metaServerService.getDateServers(); if (dataServerChangeItem != null) { eventCenter .post(new DataServerChangeEvent(dataServerChangeItem, FromType.CONNECT_TASK)); } } @Override public int getDelay() { return 30; } @Override public int getInitialDelay() { return 0; } @Override public TimeUnit getTimeUnit() { return TimeUnit.SECONDS; } @Override public StartTaskTypeEnum getStartTaskTypeEnum() { return StartTaskTypeEnum.CONNECT_DATA; } }
ConnectionRefreshTask 调用 metaServerService.getDateServers();getDateServers 的做用是:
具体以下:
@Override public DataServerChangeItem getDateServers() { Map<String, Connection> connectionMap = metaServerConnectionFactory .getConnections(dataServerConfig.getLocalDataCenter()); String leader = getLeader().getIp(); if (connectionMap.containsKey(leader)) { Connection connection = connectionMap.get(leader); if (connection.isFine()) { try { GetNodesRequest request = new GetNodesRequest(NodeType.DATA); Object obj = metaNodeExchanger.request(new Request() { @Override public Object getRequestBody() { return request; } @Override public URL getRequestUrl() { return new URL(connection.getRemoteIP(), connection.getRemotePort()); } }).getResult(); if (obj instanceof NodeChangeResult) { NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj; Map<String, Long> versionMap = result.getDataCenterListVersions(); versionMap.put(result.getLocalDataCenter(), result.getVersion()); return new DataServerChangeItem(result.getNodes(), versionMap); } } } } String newip = refreshLeader().getIp(); return null; }
DataServerChangeEvent 事件被触发后,由 DataServerChangeEventHandler 来进行相应的处理,分别分为以下一些步骤:
SOFA这里主要是处理LocalDataServerChangeEvent,异地机房的部分没有开源。
关于上面第三点,详细说明以下:
从DataServerChangeEvent中提取DataServerChangeItem,若是发现有一个DataCenter就是本机,则使用以下语句获取新加入的DataServer。
Set<String> newjoined = new HashSet<>(ips); newjoined.removeAll(localDataServers);
而后使用这些新加入的DataServer来构建 LocalDataServerChangeEvent。
参见以下片断:
//get changed dataservers Map<String, Set<String>> changedMap = dataServerCache.compareAndSet( dataServerChangeItem, event.getFromType()); if (!changedMap.isEmpty()) { for (Entry<String, Set<String>> changeEntry : changedMap.entrySet()) { String dataCenter = changeEntry.getKey(); Set<String> ips = changeEntry.getValue(); String dataCenter = changeEntry.getKey(); Set<String> ips = changeEntry.getValue(); //if the dataCenter is self, post LocalDataServerChangeEvent if (dataServerConfig.isLocalDataCenter(dataCenter)) { Set<String> newjoined = new HashSet<>(ips); newjoined.removeAll(localDataServers); eventCenter.post(new LocalDataServerChangeEvent(map, newjoined, dataServerChangeItem.getVersionMap().get(dataCenter), newVersion)); } else { dataServerCache.updateItem(newDataNodes, newVersion, dataCenter); eventCenter.post(new RemoteDataServerChangeEvent(dataCenter, map, dataServerChangeItem.getVersionMap().get(dataCenter), newVersion)); } }
具体代码以下:
public class DataServerChangeEventHandler extends AbstractEventHandler<DataServerChangeEvent> { private static final int TRY_COUNT = 5; @Autowired private DataServerConfig dataServerConfig; @Autowired private DataServerCache dataServerCache; @Autowired private DataNodeExchanger dataNodeExchanger; @Autowired private EventCenter eventCenter; @Override public List<Class<? extends DataServerChangeEvent>> interest() { return Lists.newArrayList(DataServerChangeEvent.class); } @Override public void doHandle(DataServerChangeEvent event) { synchronized (this) { //register self first,execute once DataServerNodeFactory.initConsistent(dataServerConfig); DataServerChangeItem dataServerChangeItem = event.getDataServerChangeItem(); Set<String> localDataServers = dataServerCache.getDataServers( dataServerConfig.getLocalDataCenter()).keySet(); //get changed dataservers 获得变化了的dataServers Map<String, Set<String>> changedMap = dataServerCache.compareAndSet( dataServerChangeItem, event.getFromType()); if (!changedMap.isEmpty()) { for (Entry<String, Set<String>> changeEntry : changedMap.entrySet()) { String dataCenter = changeEntry.getKey(); Set<String> ips = changeEntry.getValue(); Long newVersion = dataServerCache.getDataCenterNewVersion(dataCenter); if (!CollectionUtils.isEmpty(ips)) { for (String ip : ips) { if (!StringUtils.equals(ip, DataServerConfig.IP)) { DataServerNode dataServerNode = DataServerNodeFactory .getDataServerNode(dataCenter, ip); if (dataServerNode == null || dataServerNode.getConnection() == null || !dataServerNode.getConnection().isFine()) { connectDataServer(dataCenter, ip); } } } //remove all old DataServerNode not in change map Set<String> ipSet = DataServerNodeFactory.getIps(dataCenter); for (String ip : ipSet) { if (!ips.contains(ip)) { DataServerNodeFactory.remove(dataCenter, ip, dataServerConfig); } } Map<String, DataNode> newDataNodes = dataServerCache .getNewDataServerMap(dataCenter); //avoid input map reference operation DataServerNodeFactory MAP Map<String, DataNode> map = new ConcurrentHashMap<>(newDataNodes); //if the dataCenter is self, post LocalDataServerChangeEvent if (dataServerConfig.isLocalDataCenter(dataCenter)) { //为何 local 时候不作 updateItem Set<String> newjoined = new HashSet<>(ips); newjoined.removeAll(localDataServers); eventCenter.post(new LocalDataServerChangeEvent(map, newjoined, dataServerChangeItem.getVersionMap().get(dataCenter), newVersion)); } else { dataServerCache.updateItem(newDataNodes, newVersion, dataCenter); eventCenter.post(new RemoteDataServerChangeEvent(dataCenter, map, dataServerChangeItem.getVersionMap().get(dataCenter), newVersion)); } } else { //if the dataCenter which has no dataServers is not self, remove it if (!dataServerConfig.isLocalDataCenter(dataCenter)) { removeDataCenter(dataCenter); eventCenter.post(new RemoteDataServerChangeEvent(dataCenter, Collections.EMPTY_MAP, dataServerChangeItem.getVersionMap().get( dataCenter), newVersion)); } Map<String, DataNode> newDataNodes = dataServerCache .getNewDataServerMap(dataCenter); dataServerCache.updateItem(newDataNodes, newVersion, dataCenter); } } } else { //refresh for keep connect other dataServers //若是没有“有变化”的DataServer,则从新链接一下现有的DataServer Set<String> allDataCenter = new HashSet<>(dataServerCache.getAllDataCenters()); for (String dataCenter : allDataCenter) { Map<String, DataNode> dataNodes = dataServerCache .getNewDataServerMap(dataCenter); if (dataNodes != null) { for (DataNode dataNode : dataNodes.values()) { if (!StringUtils.equals(dataNode.getIp(), DataServerConfig.IP)) { DataServerNode dataServerNode = DataServerNodeFactory .getDataServerNode(dataCenter, dataNode.getIp()); Connection connection = dataServerNode != null ? dataServerNode .getConnection() : null; if (connection == null || !connection.isFine()) { connectDataServer(dataCenter, dataNode.getIp()); } } } } } } } } /** * connect specific dataserver * * @param dataCenter * @param ip */ private void connectDataServer(String dataCenter, String ip) { Connection conn = null; for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) { try { conn = ((BoltChannel) dataNodeExchanger.connect(new URL(ip, dataServerConfig .getSyncDataPort()))).getConnection(); break; } catch (Exception e) { TimeUtil.randomDelay(3000); } } if (conn == null || !conn.isFine()) { throw new RuntimeException( String .format( "[DataServerChangeEventHandler] connect dataServer %s in %s failed five times,dataServer will not work,please check connect!", ip, dataCenter)); } //maybe get dataNode from metaServer,current has not start! register dataNode info to factory,wait for connect task next execute DataServerNodeFactory.register(new DataServerNode(ip, dataCenter, conn), dataServerConfig); } /** * remove dataCenter, and close connections of dataServers in this dataCenter * * @param dataCenter */ private void removeDataCenter(String dataCenter) { DataServerNodeFactory.getDataServerNodes(dataCenter).values().stream().map(DataServerNode::getConnection) .filter(connection -> connection != null && connection.isFine()).forEach(Connection::close); DataServerNodeFactory.remove(dataCenter); } }
因而,咱们的逻辑图拓展以下:
DataServerChangeEvent一共四个来源。
前三个来源是与MetaServerChangeEvent相关。
eventCenter.post(new MetaServerChangeEvent(map));
;就是对应下图2这条线,是DataServerChangeEvent的来源2。eventCenter.post(new MetaServerChangeEvent(map));
;注意,这里有一个再次转换DataServerChangeEvent的过程,即MetaServerChangeEventHandler这里会再主动和MetaServer交互,若是返回消息是NodeChangeResult,就转换为DataServerChangeEvent,投放DataServerChangeEvent到Event Center。就是对应下图 b,1,3,4这条线,是DataServerChangeEvent的来源3。第四来源是按期轮训。
全部的 DataServer 中运行着一个定时刷新链接的任务 ConnectionRefreshTask,该任务定时去轮询 MetaServer,获取数据节点的信息。
ConnectionRefreshTask 调用 metaServerService.getDateServers();
与MetaServer联系,从请求结果构建 DataServerChangeItem;在 EventCenter 中放一个消息 DataServerChangeEvent(dataServerChangeItem, FromType.CONNECT_TASK);就是对应下图5这条线,是DataServerChangeEvent的来源3。
最后,DataServerChangeEvent 事件被触发后,由 DataServerChangeEventHandler 来进行相应的处理。就是对应下图6,7这条线。
若当前节点是 DataCenter 节点,则触发 LocalDataServerChangeEvent 事件。SOFA这里主要是处理LocalDataServerChangeEvent,异地机房的部分没有开源。
+---------------------------+ |[DataServerBootstrap] | MetaServerChangeEvent +------------------------+ | | | | | +-------------------------+ | +------------------+ | | startRaftClient | a | | | | | | | | | | +-------------+ | | | | | | | | | | | +---------------------------+ | | | | | | | +---------------------------+ | v | | | | | | [Timer] | | | v | | | | | b | 1 +----+--+-----+ | | | | ConnectionRefreshMetaTask +------------------------------> | EventCenter +----+ | | | | | MetaServerChangeEvent | +-------+---+-+ | | | | +---------------------------+ | ^ ^ | | | | +---------------------------+ | | | | | | | | | | | | | | | | | [Push<NodeChangeResult>] | | | | | | | | | | c | | | | | | | | +-------------------------+ | | 5 | | | | | | MetaServerChangeEvent | | | | | | | ServerChangeHandler | 2 | | | | | | | +----------------------------------------+ | | | | | +---------------------------+ DataServerChangeEvent | | | | | | | | | | +-------------------------+ | | | | | | | | | | | | | ConnectionRefreshTask +----------------------------------------------+ | | | | | | | | | | +-------------------------+ | | | | | | | | MetaServerChangeEvent | | | | 3 | | | | +----------------------------------------------------+ | | | | | | | v | | | +-----------------+--------------+ DataServerChangeEvent | | | | | 4 | | | | MetaServerChangeEventHandler +----------------------------------------+ | | | | | | +--------------------------------+ | | | | DataServerChangeEvent | | +------------------------------+ | | | DataServerChangeEventHandler | <----------------------------------------+ | +---------------+--------------+ 6 | | | | 7 | +------------------------------------------------------------+ LocalDataServerChangeEvent / RemoteDataServerChangeEvent
本文讲解了SOFARegistry如何处理Data节点变化。
主要就是从DataServerChangeEvent到LocalDataServerChangeEvent这部分的逻辑。SOFA这里主要是处理LocalDataServerChangeEvent,异地机房的部分没有开源。因此下文咱们介绍LocalDataServerChangeEvent。
蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容
蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路
服务注册中心 Session 存储策略 | SOFARegistry 解析
海量数据下的注册中心 - SOFARegistry 架构介绍
服务注册中心数据分片和同步方案详解 | SOFARegistry 解析