SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。html
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让你们借以学习阿里如何设计。java
本文为第六篇,介绍SOFARegistry的存储结构,本文与业务联系密切。node
首先,咱们从 Data Server 角度出发,看看自己可能涉及的存储结构。算法
哪些须要存储。缓存
因此咱们获得以下问题须要思考:服务器
可能有些问题脱离了本文研究范畴,可是咱们也一块儿罗列在这里。网络
时间和空间之间的平衡关系能够说是计算机系统中最为本质的关系之一。session
时间和空间这一对矛盾关系在推荐系统中的典型表现,主要体如今对缓存的使用上。数据结构
缓存一般用来存储一些计算代价较高以及相对静态变化较少的数据,经常在生产者和消费者之间起到缓冲的做用,使得两者能够解耦,各自异步进行。架构
利用缓存来解耦系统,带来性能上的提高以及开发的便利,是在系统架构设计中须要掌握的一种通用的思路。
在大部分的服务注册中心系统中,每台服务器都存储着全量的服务注册数据,服务器之间经过一致性协议(paxos、Raft 等)实现数据的复制,或者采用只保障最终一致性的算法,来实现异步数据复制。这样的设计对于通常业务规模的系统来讲没有问题,而当应用于有着海量服务的庞大的业务系统来讲,就会遇到性能瓶颈。
为解决这一问题,SOFARegistry 采用了数据分片的方法。全量服务注册数据再也不保存在单机里,而是分布于每一个节点中,每台服务器保存必定量的服务注册数据,同时进行多副本备份,从理论上实现了服务无限扩容,且实现了高可用,最终达到支撑海量数据的目的。
在各类数据分片算法中,SOFARegistry 采用了业界主流的一致性 Hash 算法作数据分片,当节点动态扩缩容时,数据仍能均匀分布,维持数据的平衡。
在数据同步时,没有采用与 Dynamo、Casandra、Tair、Codis、Redis cluster 等项目中相似的预分片机制,而是在 DataServer 内存里以 dataInfoId 为粒度进行操做日志记录,这种实现方式在某种程度上也实现了“预分片”,从而保障了数据同步的有效性。
为了更好的说明数据类型,咱们只能从SOFA博客中大段摘取文字。
和
<租户 instanceId>构成,例如在 SOFARPC 的场景下,一个 dataInfoId 一般是
com.alipay.sofa.rpc.example.HelloService#@#SOFA#@#00001`,其中SOFA 是 group 名称,00001 是租户 id。group 和 instance 主要是方便对服务数据作逻辑上的切分,使不一样 group 和 instance 的服务数据在逻辑上彻底独立。模型里有 group 和 instanceId 字段,但这里不额外列出来,读者只要理解 dataInfoId 的含义便可。bolt://192.168.1.100:8080?timeout=2000
”。这里使用 dataList,表示一个 PublisherRegister 能够容许同时发布多个服务数据(可是一般只会发布一个)。关于“zone”和“scope”的概念理解,这里再举个例子。以下图所示,物理机房内有 ZoneA 和 ZoneB 两个单元,PublisherA 处于 ZoneA 里,因此发布服务时指定了 zone=ZoneA,PublisherB 处于 ZoneB 里,因此发布服务时指定了 zone=ZoneB;此时 Subscriber 订阅时指定了 scope=datacenter 级别,所以它能够获取到 PublisherA 和 PublisherB (若是 Subscriber 订阅时指定了 scope=zone 级别,那么它只能获取到 PublisherA)。
Data 层是数据服务器集群。Data 层经过分片存储的方式保存着所用应用的服务注册数据。数据按照 dataInfoId(每一份服务数据的惟一标识)进行一致性 Hash 分片,多副本备份,保证数据的高可用。
SOFARegistry 最先选择了一致性哈希分片,因此一样遇到了数据分布不固定带来的数据同步难题。咱们如何解决的呢?咱们经过在 DataServer 内存中以 dataInfoId 的粒度记录操做日志,而且在 DataServer 之间也是以 dataInfoId 的粒度去作数据同步(一个服务就由一个 dataInfoId 惟标识)。其实这种日志记录的思想和虚拟桶是一致的,只是每一个 datainfoId 就至关于一个 slot 了,这是一种因历史缘由而采起的妥协方案。在服务注册中心的场景下,datainfoId 每每对应着一个发布的服务,因此总量仍是比较有限的,以蚂蚁金服目前的规模,每台 DataServer 中承载的 dataInfoId 数量也仅在数万的级别,勉强实现了 dataInfoId 做为 slot 的数据多副本同步方案。
最终一致性
SOFARegistry 在数据存储层面采用了相似 Eureka 的最终一致性的过程,可是存储内容上和 Eureka 在每一个节点存储相同内容特性不一样,采用每一个节点上的内容按照一致性 Hash 数据分片来达到数据容量无限水平扩展能力。
SOFARegistry 是一个 AP 分布式系统,代表了在已有条件 P 的前提下,选择了 A 可用性。当数据进行同步时,获取到的数据与实际数据不一致。但由于存储的信息为服务的注册节点,尽管会有短暂的不一致产生,但对于客户端来讲,大几率仍是能从这部分数据中找到可用的节点,不会由于数据暂时的不一致对业务系统带来致命性的伤害。
集群内部数据迁移过程
SOFARegistry 的 DataServer 选择了“一致性 Hash分片”来存储数据。在“一致性 Hash分片”的基础上,为了不“分片数据不固定”这个问题,SOFARegistry 选择了在 DataServer 内存里以 dataInfoId 的粒度记录操做日志,而且在 DataServer 之间也是以 dataInfoId 的粒度去作数据同步。
图 DataServer 之间进行异步数据同步
数据和副本分别分布在不一样的节点上,进行一致性 Hash 分片,当时对主副本进行写操做以后,主副本会把数据异步地更新到其余副本中,实现了集群内部不一样副本之间的数据迁移工做。
为了肯定服务发布数据的变动,SOFA对于一个服务不只定义了服务 ID,还对一个服务 ID 定义了对应的版本信息。
服务发布数据变动主动通知到 Session 时,Session 对服务 ID 版本变动比较,高版本覆盖低版本数据,而后进行推送。
由于有了服务 ID 的版本号,Session 能够按期发起版本号比较,若是Session 存储的的服务 ID 版本号高于dataServer存储的 ,Session再次拉取新版本数据进行推送,这样避免了某次变动通知没有通知到全部订阅方的状况。
首先,咱们讲讲一些基本概念。
DataCenter表明一个物理机房。一个数据中心包括多个DataNode,这些DataNode就是同机房数据节点。
nodeList.add(new DataNode(new URL("192.168.0.1", 9632), "DefaultDataCenter")); nodeList.add(new DataNode(new URL("192.168.0.2", 9632), "DefaultDataCenter")); nodeList.add(new DataNode(new URL("192.168.0.3", 9632), "DefaultDataCenter"));
DataNode是Server节点,能够表明任意类型的Server,不管是meta,data,session。
public class DataNode implements Node, HashNode { private final URL nodeUrl; private final String nodeName; private final String dataCenter; private String regionId; private NodeStatus nodeStatus; private long registrationTimestamp; }
这是 Data Server 概念。
为何要有DataNode和DataServerNode两个相似的数据结构类型。
原来这是分属于不一样的包,或者模块。
具体定义以下:
public class DataServerNode implements HashNode { private String ip; private String dataCenter; private Connection connection; }
Publisher 是服务概念,具体以下。
public class Publisher extends BaseInfo { private List<ServerDataBox> dataList; private PublishType publishType = PublishType.NORMAL; }
Datum类定义以下,能够看到里面有一个ConcurrentHashMap,其中放入了Datum所包括的Publisher。
Publisher 只是表明发布者本身业务服务器。Datum则是从SOFARegistry总体角度作了整理,就是一个Session Server包括的服务聚合。
public class Datum implements Serializable { private String dataInfoId; private String dataCenter; private String dataId; private String instanceId; private String group; private Map<String/*registerId*/, Publisher> pubMap = new ConcurrentHashMap<>(); private long version; private boolean containsUnPub = false; }
如下是关于本Data Server服务器的数据结构。
DataNodeStatus表明自己Data Server的状态。
com.alipay.sofa.registry.server.data.node.DataNodeStatus public enum LocalServerStatusEnum { INITIAL, WORKING } public class DataNodeStatus { private volatile LocalServerStatusEnum status = LocalServerStatusEnum.INITIAL; public LocalServerStatusEnum getStatus() { return status; } public void setStatus(LocalServerStatusEnum status) { LocalServerStatusEnum originStatus = this.status; this.status = status; } }
DataServerCache . updateDataServerStatus 中有设置DataNodeStatus的状态,好比:dataNodeStatus.setStatus(LocalServerStatusEnum.WORKING)。
private void updateDataServerStatus() { if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) { dataNodeStatus.setStatus(LocalServerStatusEnum.WORKING); //after working update current dataCenter list to old DataServerChangeItem updateItem( newDataServerChangeItem.getServerMap().get(dataServerConfig.getLocalDataCenter()), newVersion, dataServerConfig.getLocalDataCenter()); } }
另外 addNotWorkingServer 有 addStatus 操做。
public void addNotWorkingServer(long version, String ip) { synchronized (DataServerCache.class) { if (version >= curVersion.get()) { addStatus(version, ip, LocalServerStatusEnum.INITIAL); if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) { updateDataServerStatus(); } } } }
DataServerConfig包括了本DataServer所有配置,这里只摘录了相关信息。
public class DataServerConfig { public static final String IP = NetUtil.getLocalAddress().getHostAddress(); private CommonConfig commonConfig; private int numberOfReplicas = 1000; public int getNumberOfReplicas() { return numberOfReplicas; } public String getLocalDataCenter() { return commonConfig.getLocalDataCenter(); } }
对于 DataServerConfig 和 DataNodeStatus,系统作了beans。
@Configuration protected static class DataServerBootstrapConfigConfiguration { ... @Bean @ConditionalOnMissingBean public DataServerConfig dataServerConfig(CommonConfig commonConfig) { return new DataServerConfig(commonConfig); } @Bean public DataNodeStatus dataNodeStatus() { return new DataNodeStatus(); } ... }
具体以下:
+-----------------------------------------------+ | | | [Data Server] | | | | | | +---------------+ +------------------+ | | | DataNodeStatus| | DataServerConfig | | | +---------------+ +------------------+ | | | +-----------------------------------------------+
由于不涉及到 Meta Server 内部架构,因此从 Data Server 角度看,只要存储 Meta Server 对应的网络 Connection 便可。
其中逻辑意义是:Map<dataCenter, Map<ip, Connection>>
,就是哪些dataCenter中包括哪些Meta Server,对应于哪些ip。
public class MetaServerConnectionFactory { private final Map<String, Map<String, Connection>> MAP = new ConcurrentHashMap<>(); }
具体以下:
+-----------------------------------------+ | MetaServerConnectionFactory | | | | Map<dataCenter, Map<ip, Connection> > | | | +-----------------------------------------+
由于不涉及到 Session Server 内部架构,因此从 Data Server 角度看,只要存储 Session Server 对应的网络 Connection 便可。
其中逻辑含义从注释便可了解。
public class SessionServerConnectionFactory { private static final int DELAY = 30 * 1000; private static final Map EMPTY_MAP = new HashMap(0); /** * key : SessionServer address * value: SessionServer processId */ private final Map<String, String> SESSION_CONN_PROCESS_ID_MAP = new ConcurrentHashMap<>(); /** * key : SessionServer processId * value: ip:port of clients */ private final Map<String, Set<String>> PROCESS_ID_CONNECT_ID_MAP = new ConcurrentHashMap<>(); /** * key : SessionServer processId * value: pair(SessionServer address, SessionServer connection) */ private final Map<String, Pair> PROCESS_ID_SESSION_CONN_MAP = new ConcurrentHashMap<>(); @Autowired private DisconnectEventHandler disconnectEventHandler; }
具体以下:
+------------------------------------------------------------------------------------+ |[SessionServerConnectionFactory] | | | | | | | |EMPTY_MAP | | | |Map<SessionServer address, SessionServer processId> | | | |Map<SessionServer processId, Set<ip:port of clients> > | | | |Map<SessionServer processId, pair(SessionServer address, SessionServer connection)> | | | +------------------------------------------------------------------------------------+
由于涉及到与其余 Data Server 的深度交互,因此须要对其余 Data Server 的深度信息做存储。
分为两个部分:DataServerNodeFactory和DataServerCache。
为何要有DataServerNodeFactory和DataServerCache两个相似的数据结构类型。
从注释来看,是为了把功能分离细化,DataServerNodeFactory专一链接管理,DataServerCache注重dataServer的变化与版本管理。
DataServerNodeFactory:
the factory to hold other dataservers and connection connected to them
DataServerCache
cache of dataservers
因此也分别在不一样的包,或者模块。
DataServerNodeFactory 存储了其余 Data Server 的 DataServerNode,由于 DataServerNode 自己就包括了 Connection,因此 DataServerNodeFactory 也间接的包含了 Connection,这从其类定义注释能够看出,并且其定义是在remoting.dataserver包之中。
the factory to hold other dataservers and connection connected to them
DataServerNodeFactory 里面按照两个维度存储同一类东西,就是其余DataServer :
具体定义以下:
public class DataServerNodeFactory { /** * row: dataCenter * column: ip * value dataServerNode */ private static final Map<String, Map<String, DataServerNode>> MAP = new ConcurrentHashMap<>(); /** * key: dataCenter * value: consistentHash */ private static final Map<String, ConsistentHash<DataServerNode>> CONSISTENT_HASH_MAP = new ConcurrentHashMap<>(); private static AtomicBoolean init = new AtomicBoolean(false); }
在DataServerChangeEventHandler.doHandle里面会调用connectDataServer,其又会调用 DataServerNodeFactory.register(new DataServerNode(ip, dataCenter, conn), dataServerConfig); 来添加,其里面又会生成一致性Hash。
DefaultMetaServiceImpl 会调用 DataServerNodeFactory 来计算 consistentHash 来获取 DataServerNode。
public class DefaultMetaServiceImpl implements IMetaServerService { @Override public DataServerNode getDataServer(String dataCenter, String dataInfoId) { return DataServerNodeFactory.computeDataServerNode(dataCenter, dataInfoId); } }
由注释能够知道,这是其余dataservers的缓存。
cache of dataservers
几个关键变量:
nodeStatusMap 是本 Data Center 中全部 Data Server 的状态。
具体以下:
com.alipay.sofa.registry.server.data.cache.DataServerCache public class DataServerCache { @Autowired private DataNodeStatus dataNodeStatus; @Autowired private DataServerConfig dataServerConfig; @Autowired private AfterWorkingProcessHandler afterWorkingProcessHandler; /** current dataServer list and version */ private volatile DataServerChangeItem dataServerChangeItem = new DataServerChangeItem(); /** new input dataServer list and version */ private volatile DataServerChangeItem newDataServerChangeItem = new DataServerChangeItem(); private final AtomicBoolean HAS_NOTIFY_ALL = new AtomicBoolean(false); private AtomicLong curVersion = new AtomicLong(-1L); /** version -> Map(serverIp, serverStatus) */ private Map<Long, Map<String, LocalServerStatusEnum>> nodeStatusMap = new ConcurrentHashMap<>(); }
下面介绍几个DataServerCache的函数。
updateItem会被几个不一样地方调用,进行更新dataServerChangeItem,就是插入一个new DataServer。
好比:LocalDataServerChangeEvent 和 DataServerChangeEvent 的响应函数就会调用。
public void updateItem(Map<String, DataNode> localDataNodes, Long version, String dataCenter) { synchronized (DataServerCache.class) { Long oldVersion = dataServerChangeItem.getVersionMap().get(dataCenter); Map<String, DataNode> oldList = dataServerChangeItem.getServerMap().get(dataCenter); Set<String> oldIps = oldList == null ? new HashSet<>() : oldList.keySet(); Set<String> newIps = localDataNodes == null ? new HashSet<>() : localDataNodes.keySet(); dataServerChangeItem.getServerMap().put(dataCenter, localDataNodes); dataServerChangeItem.getVersionMap().put(dataCenter, version); } }
newDataServerChangeItem 用这个来获取全部的datacenters的全部DataServer。
/** * get all datacenters * * @return */ public Set<String> getAllDataCenters() { return newDataServerChangeItem.getVersionMap().keySet(); }
dataServerChangeItem 被用来获取某一个特定 data center的全部DataServer。
public Map<String, DataNode> getDataServers(String dataCenter) { return getDataServers(dataCenter, dataServerChangeItem); } public Map<String, DataNode> getDataServers(String dataCenter, DataServerChangeItem dataServerChangeItem) { return doGetDataServers(dataCenter, dataServerChangeItem); } private Map<String, DataNode> doGetDataServers(String dataCenter, DataServerChangeItem dataServerChangeItem) { synchronized (DataServerCache.class) { Map<String, Map<String, DataNode>> dataserverMap = dataServerChangeItem.getServerMap(); if (dataserverMap.containsKey(dataCenter)) { return dataserverMap.get(dataCenter); } else { return new HashMap<>(); } } }
此数据结构实际只用来获取local data center的data servers。
/** * change info of datacenters */ public class DataServerChangeItem { /** datacenter -> Map<ip, DataNode> */ private Map<String, Map<String, DataNode>> serverMap; /** datacenter -> version */ private Map<String, Long> versionMap; }
有些类会间接使用dataServerCache。
好比:DefaultMetaServiceImpl . dataServerCache 会被NotifyOnlineHandler调用。
public class NotifyOnlineHandler extends AbstractServerHandler<NotifyOnlineRequest> { @Autowired private DataServerCache dataServerCache; @Override public Object doHandle(Channel channel, NotifyOnlineRequest request) { long version = request.getVersion(); if (version >= dataServerCache.getCurVersion()) { dataServerCache.addNotWorkingServer(version, request.getIp()); } return CommonResponse.buildSuccessResponse(); } }
NotifyOnlineHandler其配置在
@Bean(name = "serverSyncHandlers") public Collection<AbstractServerHandler> serverSyncHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(getDataHandler()); list.add(publishDataProcessor()); list.add(unPublishDataHandler()); list.add(notifyFetchDatumHandler()); list.add(notifyOnlineHandler()); //在这里 list.add(syncDataHandler()); list.add(dataSyncServerConnectionHandler()); return list; }
属于 dataSyncServer 响应函数的一部分。
private void openDataSyncServer() { if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } }
具体以下:
+---------------------------------------------------+ | | | [DataServerNodeFactory] | | | | | | Map<dataCenter, Map<ip, dataServerNode> > | | | | Map<dataCenter, ConsistentHash<DataServerNode> > | | | +---------------------------------------------------+ +---------------------------------------------------+ | [DataServerCache] | | | | | | DataServerChangeItem dataServerChangeItem | | | | DataServerChangeItem newDataServerChangeItem | | | | Map<>ersion, Map<serverIp, serverStatus> > | | | +---------------------------------------------------+
从以前的MetaServer分析以及DataServerChangeItem,可知DataSever也是有版本的。
public class DataServerChangeItem { /** datacenter -> Map<ip, DataNode> */ private Map<String, Map<String, DataNode>> serverMap; /** datacenter -> version */ private Map<String, Long> versionMap; }
在DataServer中,DefaultMetaServiceImpl中有设置版本号。
if (obj instanceof NodeChangeResult) { NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj; Map<String, Long> versionMap = result.getDataCenterListVersions(); versionMap.put(result.getLocalDataCenter(), result.getVersion()); return new DataServerChangeItem(result.getNodes(), versionMap); }
其来源是MetaServer的DataStoreService。
dataNodeRepositoryMap.forEach((dataCenter, dataNodeRepository) -> { if (localDataCenter.equalsIgnoreCase(dataCenter)) { nodeChangeResult.setVersion(dataNodeRepository.getVersion()); } versionMap.put(dataCenter, dataNodeRepository.getVersion()); Map<String, RenewDecorate<DataNode>> dataMap = dataNodeRepository.getNodeMap(); Map<String, DataNode> newMap = new ConcurrentHashMap<>(); dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal())); pushNodes.put(dataCenter, newMap); });
以及DataStoreService。
metaRepositoryMap.forEach((dataCenter, metaNodeRepository) -> { if (localDataCenter.equalsIgnoreCase(dataCenter)) { nodeChangeResult.setVersion(metaNodeRepository.getVersion()); } versionMap.put(dataCenter, metaNodeRepository.getVersion()); Map<String, RenewDecorate<MetaNode>> dataMap = metaNodeRepository.getNodeMap(); Map<String, MetaNode> newMap = new ConcurrentHashMap<>(); dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal())); pushNodes.put(dataCenter, newMap); });
都是提取dataServer的版本号,发送出去。
服务信息包括 Subscriber 和 Publisher,这些信息须要深度存储,本文仅以 Publisher 为例分析。
前面描述了,Publisher包括在Datum之中,因此咱们下面的讲解以Datum为主。
Datum类定义以下,能够看到里面有一个ConcurrentHashMap,其中放入了Datum所包括的Publisher。
Publisher 只是表明发布者本身业务服务器。Datum则是从SOFARegistry总体角度作了整理,就是一个Session Server包括的服务聚合。
public class Datum implements Serializable { private String dataInfoId; private String dataCenter; private String dataId; private String instanceId; private String group; private Map<String/*registerId*/, Publisher> pubMap = new ConcurrentHashMap<>(); private long version; private boolean containsUnPub = false; }
DatumCache缓存了全部Datum,就是本DataServer对应全部的SessionServer中全部的服务。
public class DatumCache { @Autowired private DatumStorage localDatumStorage; }
LocalDatumStorage负责Datum具体的存储。
public class LocalDatumStorage implements DatumStorage { /** * row: dataCenter * column: dataInfoId * value: datum */ protected final Map<String, Map<String, Datum>> DATUM_MAP = new ConcurrentHashMap<>(); /** * all datum index * * row: ip:port * column: registerId * value: publisher */ protected final Map<String, Map<String, Publisher>> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>(); @Autowired private DataServerConfig dataServerConfig; }
相关的Bean以下:
@Configuration public static class DataServerStorageConfiguration { @Bean @ConditionalOnMissingBean public DatumCache datumCache() { return new DatumCache(); } @Bean @ConditionalOnMissingBean public LocalDatumStorage localDatumStorage() { return new LocalDatumStorage(); } }
首先,咱们讲讲Session Server 内部如何获取Datum
在 Session Server 内部,Datum存储在 SessionCacheService 之中。
好比在 DataChangeFetchCloudTask 内部,能够这样获取 Datum。
private Map<String, Datum> getDatumsCache() { Map<String, Datum> map = new HashMap<>(); NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.META); Collection<String> dataCenters = nodeManager.getDataCenters(); if (dataCenters != null) { Collection<Key> keys = dataCenters.stream(). map(dataCenter -> new Key(KeyType.OBJ, DatumKey.class.getName(), new DatumKey(fetchDataInfoId, dataCenter))). collect(Collectors.toList()); Map<Key, Value> values = null; values = sessionCacheService.getValues(keys); if (values != null) { values.forEach((key, value) -> { if (value != null && value.getPayload() != null) { map.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload()); } }); } } return map; }
Session Server 会向 Data Server 发送 PublishDataRequest 请求。
在DataServer内部,PublishDataHandler 是用来处理 PublishDataRequest。
public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> { @Autowired private ForwardService forwardService; @Autowired private SessionServerConnectionFactory sessionServerConnectionFactory; @Autowired private DataChangeEventCenter dataChangeEventCenter; @Autowired private DataServerConfig dataServerConfig; @Autowired private DatumLeaseManager datumLeaseManager; @Autowired private ThreadPoolExecutor publishProcessorExecutor; @Override public Object doHandle(Channel channel, PublishDataRequest request) { Publisher publisher = Publisher.internPublisher(request.getPublisher()); if (forwardService.needForward()) { CommonResponse response = new CommonResponse(); response.setSuccess(false); response.setMessage("Request refused, Server status is not working"); return response; } dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter()); if (publisher.getPublishType() != PublishType.TEMPORARY) { String connectId = WordCache.getInstance().getWordCache( publisher.getSourceAddress().getAddressString()); sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(), connectId); // record the renew timestamp datumLeaseManager.renew(connectId); } return CommonResponse.buildSuccessResponse(); } }
在 DataChangeEventCenter 的 onChange 函数中,会进行投放。
public void onChange(Publisher publisher, String dataCenter) { int idx = hash(publisher.getDataInfoId()); Datum datum = new Datum(publisher, dataCenter); if (publisher instanceof UnPublisher) { datum.setContainsUnPub(true); } if (publisher.getPublishType() != PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum)); } else { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); } }
在DataChangeEventQueue之中,会调用 handleDatum 来处理。在这里对Datum进行存储。
在 DataChangeHandler 之中,会提取ChangeData,而后进行Notify。
public void start() { DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues(); int queueCount = queues.length; Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName()); Executor notifyExecutor = ExecutorFactory .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName()); for (int idx = 0; idx < queueCount; idx++) { final DataChangeEventQueue dataChangeEventQueue = queues[idx]; final String name = dataChangeEventQueue.getName(); executor.execute(() -> { while (true) { final ChangeData changeData = dataChangeEventQueue.take(); notifyExecutor.execute(new ChangeNotifier(changeData, name)); } }); } }
具体以下:
+ Session Server | Data Server | | | | +--------------------------+ PublishDataRequest +--------------------+ | DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler | +-----------+--------------+ | +------+-------------+ ^ | | | getValues | | onChange(Publisher) | | v | | +--------+--------------+ +---------+----------+ | | DataChangeEventCenter | |sessionCacheService | | +--------+--------------+ +--------------------+ | | | | Datum | | | v | +--------+-------------+ | | DataChangeEventQueue | | +--------+-------------+ | | | | | | ChangeData | v | +-------+-----------+ | | DataChangeHandler | + +-------------------+
至此,本文总结基本存储结构以下:
物理机房DataCenter
DataCenter表明一个物理机房。一个数据中心包括多个DataNode,这些DataNode就是同机房数据节点。
Server节点DataNode
DataNode是Server节点,能够表明任意类型的Server,不管是meta,data,session。
数据节点DataServerNode
这是 Data Server 概念。
服务Publisher
Publisher 是服务概念,
服务聚合Datum
Datum里面有一个ConcurrentHashMap,其中放入了Datum所包括的Publisher。
Publisher 只是表明发布者本身业务服务器。Datum则是从SOFARegistry总体角度作了整理,就是一个Session Server包括的服务聚合。
DataNodeStatus表明自己Data Server的状态。DataServerConfig包括了本DataServer所有配置。
由于不涉及到 Meta Server 内部架构,因此从 Data Server 角度看,只要存储 Meta Server 对应的网络 Connection 便可。
由于不涉及到 Session Server 内部架构,因此从 Data Server 角度看,只要存储 Session Server 对应的网络 Connection 便可。
由于涉及到与其余 Data Server 的深度交互,因此须要对其余 Data Server 的深度信息做存储。
分为两个部分:DataServerNodeFactory 和 DataServerCache。
为何要有DataServerNodeFactory和DataServerCache两个相似的数据结构类型。
从注释来看,是为了把功能分离细化,DataServerNodeFactory专一链接管理,DataServerCache注重dataServer的变化与版本管理。
蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容
蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路
服务注册中心 Session 存储策略 | SOFARegistry 解析
海量数据下的注册中心 - SOFARegistry 架构介绍
服务注册中心数据分片和同步方案详解 | SOFARegistry 解析
蚂蚁金服开源通讯框架SOFABolt解析之超时控制机制及心跳机制
蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析