SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。node
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让你们借以学习阿里如何设计。数组
本文为第五篇,介绍SOFARegistry消息总线的异步处理。缓存
前文咱们讲述了SOFARegistry的消息总线,本文咱们讲讲一个变种 DataChangeEventCenter。网络
DataChangeEventCenter 是被独立出来的,专门处理数据变化相关的消息。session
为何要分离呢?由于:架构
DataChangeEventCenter 的独特业务场景以下:异步
所以,DataChangeEventCenter 代码和业务联系至关紧密,前文的 EventCenter 已经不适合了。ide
关于延迟和归并操做,咱们单独说明下。函数
蚂蚁金服业务的一个特色是:经过链接敏感的特性对服务宕机作到秒级发现。学习
所以 SOFARegistry 在健康检测的设计方面决定“服务数据与服务发布者的实体链接绑定在一块儿,断连立刻清数据”,简称此特色叫作链接敏感性。链接敏感性是指在 SOFARegistry 里全部 Client 都与 SessionServer 保持长链接,每条长链接都设置基于 SOFABolt 的链接心跳,若是长链接断连客户端当即发起从新建连,时刻保持 Client 与 SessionServer 之间可靠的链接。
但带来了一个问题就是:可能由于网络问题,短时间内会出现大量从新建连操做。好比只是网络问题致使链接断开,实际的服务进程没有宕机,此时客户端当即发起从新链接 SessionServer 而且从新注册全部服务数据。
可是 假如此过程耗时足够短暂(例如 500ms 内发生断连和重连),服务订阅者**应该**感觉不到服务下线。从而 SOFARegistry 内部应该作相应处理。
SOFARegistry 内部作了归并和延迟操做来保证用户不受影响。好比 DataServer 内部的数据经过 mergeDatum 延迟合并变动的 Publisher 服务信息,version 是合并后最新的版本号。
对于 DataChangeEventCenter,就是经过消息的延迟和归并来协助完成这个功能。
下面是 DataChangeEventCenter 整体的功能描述:
DataChangeEventCenter具体分红四部分:
接下来咱们一一介绍,由于 DataChangeEventCenter 和业务结合紧密,因此咱们会深刻结合业务进行讲解。
DataChangeEventCenter 中维护着一个 DataChangeEventQueue 队列数组,这是核心。数组中的每一个元素是一个事件队列。具体定义以下:
public class DataChangeEventCenter {/** * count of DataChangeEventQueue */private int queueCount;/** * queues of DataChangeEvent */private DataChangeEventQueue[] dataChangeEventQueues;@Autowiredprivate DataServerConfig dataServerConfig;@Autowiredprivate DatumCache datumCache; }复制代码
DataChangeEventCenter 专门处理 IDataChangeEvent 类型消息,其具体实现为三种:
这些不一样类型的消息能够放入同一个队列,具体放入哪一个队列,是根据特定判别方式来决定,好比根据Publisher的DataInfoId来作hash,以此决定放入哪一个Queue。
即,当对应 handler 的 onChange 方法被触发时,会计算该变化服务的 dataInfoId 的 Hash 值,从而进一步肯定出该服务注册数据所在的队列编号,进而把该变化的数据封装成一个数据变化对象,传入到队列中。
在初始化函数中,构建了EventQueue,每个Queue启动了一个线程,用来处理消息。
@PostConstructpublic void init() {if (isInited.compareAndSet(false, true)) { queueCount = dataServerConfig.getQueueCount(); dataChangeEventQueues = new DataChangeEventQueue[queueCount];for (int idx = 0; idx < queueCount; idx++) { dataChangeEventQueues[idx] = new DataChangeEventQueue(idx, dataServerConfig, this,datumCache); dataChangeEventQueues[idx].start(); } } }复制代码
put消息比较简单,具体如何判别应该把Event放入哪个Queue是根据具体方式来判断,好比根据Publisher的DataInfoId来作hash,以此决定放入哪一个Queue:
int idx = hash(publisher.getDataInfoId()); Datum datum = new Datum(publisher, dataCenter); dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum));复制代码
具体是经过 dataChangeEventQueues.onChange 来作处理,好比以下几个函数,分别处理不一样的消息类型。具体都是找到queue,而后调用:
public void onChange(Publisher publisher, String dataCenter) {int idx = hash(publisher.getDataInfoId()); Datum datum = new Datum(publisher, dataCenter);if (publisher instanceof UnPublisher) { datum.setContainsUnPub(true); }if (publisher.getPublishType() != PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum)); } else { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); } }public void onChange(ClientChangeEvent event) {for (DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) { dataChangeEventQueue.onChange(event); } }public void onChange(DatumSnapshotEvent event) {for (DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) { dataChangeEventQueue.onChange(event); } }public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) {int idx = hash(datum.getDataInfoId()); DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum); dataChangeEventQueues[idx].onChange(event); }复制代码
由于 DataChangeEvent 最经常使用,因此咱们单独拿出来讲明。
DataChangeEvent会根据DataChangeTypeEnum和DataSourceTypeEnum来进行区分,就是处理类型和消息来源。
DataChangeTypeEnum具体分为:
DataSourceTypeEnum 具体分为:
具体定义以下:
public class DataChangeEvent implements IDataChangeEvent {/** * type of changed data, MERGE or COVER */private DataChangeTypeEnum changeType;private DataSourceTypeEnum sourceType;/** * data changed */private Datum datum; }复制代码
DataChangeEventQueue 是这个子模块的核心,用于多路分别处理,增长处理能力。每个Queue内部启动一个线程,用于异步处理,也能增长处理能力。
这里的核心是:
BlockingQueue eventQueue;
Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();
DelayQueue CHANGE_QUEUE = new DelayQueue();
讲解以下:
具体定义以下:
public class DataChangeEventQueue {private final String name;/** * a block queue that stores all data change events */private final BlockingQueue<IDataChangeEvent> eventQueue;private final Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();private final DelayQueue<ChangeData> CHANGE_QUEUE = new DelayQueue();private final int notifyIntervalMs;private final int notifyTempDataIntervalMs;private final ReentrantLock lock = new ReentrantLock();private final int queueIdx;private DataServerConfig dataServerConfig;private DataChangeEventCenter dataChangeEventCenter;private DatumCache datumCache; }复制代码
DataChangeEventQueue#start 方法在 DataChangeEventCenter 初始化的时候被一个新的线程调用,该线程会源源不断地从队列中获取新增事件,而且进行分发。新增数据会由此添加进节点内,实现分片。由于 eventQueue 是一个 BlockingQueue,因此可使用while (true)来控制。
当event被取出以后,会根据 DataChangeScopeEnum.DATUM 的不一样,会作不一样的处理。
具体代码以下:
public void start() { Executor executor = ExecutorFactory .newSingleThreadExecutor(String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), getName())); executor.execute(() -> {while (true) {try { IDataChangeEvent event = eventQueue.take(); DataChangeScopeEnum scope = event.getScope();if (scope == DataChangeScopeEnum.DATUM) { DataChangeEvent dataChangeEvent = (DataChangeEvent) event;//Temporary push data will be notify as soon as,and not merge to normal pub data;if (dataChangeEvent.getSourceType() == DataSourceTypeEnum.PUB_TEMP) { addTempChangeData(dataChangeEvent.getDatum(), dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType()); } else { handleDatum(dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType(), dataChangeEvent.getDatum()); } } else if (scope == DataChangeScopeEnum.CLIENT) { handleClientOff((ClientChangeEvent) event); } else if (scope == DataChangeScopeEnum.SNAPSHOT) { handleSnapshot((DatumSnapshotEvent) event); } } } }); }复制代码
具体以下图:
+----------------------------+ | DataChangeEventCenter | | | | +-----------------------+ | | | DataChangeEventQueue[]| | | +-----------------------+ | +----------------------------+ | | v +------------------+------------------------+ | DataChangeEventQueue | | | | +---------------------------------------+ | | | | | | | BlockingQueue<IDataChangeEvent> +-------------+ | | | | | | | | | +-v---------+ | | Map<String, Map<String, ChangeData<> | | <--> | | | | | | | Executor | | | | | | | | | start +------------------------------> | | | | | | +-+---------+ | | | | | | | DelayQueue<ChangeData> <-------------------+ | | | | | +---------------------------------------+ | +-------------------------------------------+复制代码
handleDatum 具体处理是把Datum转换为 ChangeData来处理,
为何要转换成 ChangeData来存储呢。
由于不管是消息处理方式或者来源,都有不一样的类型。好比在 NotifyFetchDatumHandler . fetchDatum 函数中,会先从其余 data server 获取 Datum,而后会根据 Datum 向dataChangeEventCenter中投放消息,通知本 Data Server 进行 BACKUP 操做,类型是 COVER 类型。
转换成 ChangeData就能够把消息处理方式或者来源统一块儿来处理。
用户会存储一个包含 datum 的消息。
dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, DataSourceTypeEnum.BACKUP, datum);复制代码
DataChangeEventQueue 会从 DataChangeEvent 中获取 Datum,而后把 Datum 转换为 ChangeData,存储起来。
private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum targetDatum) {//get changed datumChangeData changeData = getChangeData(targetDatum.getDataCenter(), targetDatum.getDataInfoId(), sourceType, changeType); Datum cacheDatum = changeData.getDatum();if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) { changeData.setDatum(targetDatum); } }复制代码
ChangeData 定义以下:
public class ChangeData implements Delayed {/** data changed */private Datum datum;/** change time */private Long gmtCreate;/** timeout */private long timeout;private DataSourceTypeEnum sourceType;private DataChangeTypeEnum changeType; }复制代码
这里是处理真实ChangeData缓存,以及新加入的Datum。
具体以下:
private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum targetDatum) { lock.lock();try {//get changed datumChangeData changeData = getChangeData(targetDatum.getDataCenter(), targetDatum.getDataInfoId(), sourceType, changeType); Datum cacheDatum = changeData.getDatum();if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) { changeData.setDatum(targetDatum); } else { Map<String, Publisher> targetPubMap = targetDatum.getPubMap(); Map<String, Publisher> cachePubMap = cacheDatum.getPubMap();for (Publisher pub : targetPubMap.values()) { String registerId = pub.getRegisterId(); Publisher cachePub = cachePubMap.get(registerId);if (cachePub != null) {// if the registerTimestamp of cachePub is greater than the registerTimestamp of pub, it means// that pub is not the newest data, should be ignoredif (pub.getRegisterTimestamp() < cachePub.getRegisterTimestamp()) {continue; }// if pub and cachePub both are publisher, and sourceAddress of both are equal,// and version of cachePub is greater than version of pub, should be ignoredif (!(pub instanceof UnPublisher) && !(cachePub instanceof UnPublisher) && pub.getSourceAddress().equals(cachePub.getSourceAddress()) && cachePub.getVersion() > pub.getVersion()) {continue; } } cachePubMap.put(registerId, pub); cacheDatum.setVersion(targetDatum.getVersion()); } } } finally { lock.unlock(); } }复制代码
当提取时候,使用take函数,从CHANGE_QUEUE 和 CHANGE_DATA_MAP_FOR_MERGE 提出ChangeData。
public ChangeData take() throws InterruptedException { ChangeData changeData = CHANGE_QUEUE.take(); lock.lock();try { removeMapForMerge(changeData);return changeData; } finally { lock.unlock(); } }复制代码
具体提取Datum会在DataChangeHandler。
DataChangeHandler 会按期提取DataChangeEventCenter中的消息,而后进行处理,主要功能就是执行ChangeNotifier 来通知相关模块:hi,这里有新数据变化来到了,兄弟们走起来。
public class DataChangeHandler {@Autowiredprivate DataServerConfig dataServerConfig;@Autowiredprivate DataChangeEventCenter dataChangeEventCenter;@Autowiredprivate DatumCache datumCache;@Resourceprivate List<IDataChangeNotifier> dataChangeNotifiers; }复制代码
DataChangeHandler 会遍历 DataChangeEventCenter 中全部 DataChangeEventQueue,而后从 DataChangeEventQueue 之中取出ChangeData,针对每个ChangeData,生成一个ChangeNotifier。
每一个ChangeNotifier都是一个处理线程。
每一个 dataChangeEventQueue 生成了 5 个 ChangeNotifier。
@PostConstructpublic void start() { DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();int queueCount = queues.length; Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName()); Executor notifyExecutor = ExecutorFactory .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName()); for (int idx = 0; idx < queueCount; idx++) {final DataChangeEventQueue dataChangeEventQueue = queues[idx];final String name = dataChangeEventQueue.getName(); executor.execute(() -> {while (true) { final ChangeData changeData = dataChangeEventQueue.take(); notifyExecutor.execute(new ChangeNotifier(changeData, name)); } }); } }复制代码
咱们回顾下业务:
当有数据发布者 publisher 上下线时,会分别触发 publishDataProcessor 或 unPublishDataHandler ,Handler 会往 dataChangeEventCenter 中添加一个数据变动事件,用于异步地通知事件变动中心数据的变动。事件变动中心收到该事件以后,会往队列中加入事件。此时 dataChangeEventCenter 会根据不一样的事件类型异步地对上下线数据进行相应的处理。
对于 ChangeData,会生成 ChangeNotifier 进行处理。会把这个事件变动信息经过 ChangeNotifier 对外发布,通知其余节点进行数据同步。
private class ChangeNotifier implements Runnable {private ChangeData changeData;private String name;@Overridepublic void run() {if (changeData instanceof SnapshotData) { ...... } else { Datum datum = changeData.getDatum(); String dataCenter = datum.getDataCenter(); String dataInfoId = datum.getDataInfoId(); DataSourceTypeEnum sourceType = changeData.getSourceType(); DataChangeTypeEnum changeType = changeData.getChangeType();if (changeType == DataChangeTypeEnum.MERGE && sourceType != DataSourceTypeEnum.BACKUP && sourceType != DataSourceTypeEnum.SYNC) {//update version for pub or unPub merge to cache//if the version product before merge to cache,it may be cause small version override big onedatum.updateVersion(); }long version = datum.getVersion();try {if (sourceType == DataSourceTypeEnum.CLEAN) {if (datumCache.cleanDatum(dataCenter, dataInfoId)) { ...... } } else if (sourceType == DataSourceTypeEnum.PUB_TEMP) { notifyTempPub(datum, sourceType, changeType); } else { MergeResult mergeResult = datumCache.putDatum(changeType, datum); Long lastVersion = mergeResult.getLastVersion();if (lastVersion != null&& lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) {return; }//lastVersion null means first add datumif (lastVersion == null || version != lastVersion) {if (mergeResult.isChangeFlag()) { notify(datum, sourceType, lastVersion); } } } } } } }复制代码
notify函数会遍历dataChangeNotifiers
private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) {for (IDataChangeNotifier notifier : dataChangeNotifiers) {if (notifier.getSuitableSource().contains(sourceType)) { notifier.notify(datum, lastVersion); } } }复制代码
对应的Bean是:
@Bean(name = "dataChangeNotifiers")public List<IDataChangeNotifier> dataChangeNotifiers() { List<IDataChangeNotifier> list = new ArrayList<>(); list.add(sessionServerNotifier()); list.add(tempPublisherNotifier()); list.add(backUpNotifier());return list; }复制代码
至于如何处理通知,咱们后续会撰文处理。
至此,DataChangeEventCenter 总体逻辑以下图所示
+----------------------------+ | DataChangeEventCenter | | | | +-----------------------+ | | | DataChangeEventQueue[]| | | +-----------------------+ | +----------------------------+ | | v +------------------+------------------------+ | DataChangeEventQueue | | | | +---------------------------------------+ | | | | | | | BlockingQueue<IDataChangeEvent> +-------------+ | | | | | | | | | +-v---------+ | | Map<String, Map<String, ChangeData<> | | <--> | | | | | | | Executor | | | | | | | | | start +------------------------------> | | | | | | +-+---------+ | | | | | +----------------+ DelayQueue<ChangeData> <-------------------+ | | | | | | | +---------------------------------------+ | | +-------------------------------------------+ | | | +--------------------------+ | take | | notify +-------------------+ +-------> | DataChangeHandler | +---------> |dataChangeNotifiers| | | +-------------------+ +--------------------------+复制代码
手机以下图:
由于独特的业务场景,因此阿里把 DataChangeEventCenter 单独分离出来,知足了如下业务需求。若是你们在实际工做中有相似的需求,能够参考借鉴,具体处理方式以下: