[从源码学设计]蚂蚁金服SOFARegistry之消息总线异步处理

0x00 摘要

SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。node

本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让你们借以学习阿里如何设计。数组

本文为第五篇,介绍SOFARegistry消息总线的异步处理。缓存

0x01 为什么分离

前文咱们讲述了SOFARegistry的消息总线,本文咱们讲讲一个变种 DataChangeEventCenter。网络

DataChangeEventCenter 是被独立出来的,专门处理数据变化相关的消息。session

为何要分离呢?由于:架构

  • 从架构说,DataChangeEventCenter 是专门处理数据变化消息,这是一种解耦;
  • 从技术上来讲,DataChangeEventCenter 也和 EventCenter 有具体实现技巧的不一样,因此须要分开处理;
  • 但更深刻的缘由是业务场景不一样,下面分析中咱们能够看出,DataChangeEventCenter 和业务耦合的至关紧密;

0x02 业务领域

2.1 应用场景

DataChangeEventCenter 的独特业务场景以下:异步

  • 须要提供归并功能。即短时间内会有多个通知来到,不须要逐一处理,只处理最后一个便可;
  • 异步处理消息;
  • 须要保证消息顺序;
  • 有延迟操做;
  • 须要提升处理能力,并行处理;

所以,DataChangeEventCenter 代码和业务联系至关紧密,前文的 EventCenter 已经不适合了。ide

2.2 延迟和归并

关于延迟和归并操做,咱们单独说明下。函数

2.2.1 业务特色

蚂蚁金服业务的一个特色是:经过链接敏感的特性对服务宕机作到秒级发现。学习

所以 SOFARegistry 在健康检测的设计方面决定“服务数据与服务发布者的实体链接绑定在一块儿,断连立刻清数据”,简称此特色叫作链接敏感性。链接敏感性是指在 SOFARegistry 里全部 Client 都与 SessionServer 保持长链接,每条长链接都设置基于 SOFABolt 的链接心跳,若是长链接断连客户端当即发起从新建连,时刻保持 Client 与 SessionServer 之间可靠的链接。

2.2.2 问题

但带来了一个问题就是:可能由于网络问题,短时间内会出现大量从新建连操做。好比只是网络问题致使链接断开,实际的服务进程没有宕机,此时客户端当即发起从新链接 SessionServer 而且从新注册全部服务数据。

可是 假如此过程耗时足够短暂(例如 500ms 内发生断连和重连),服务订阅者**应该**感觉不到服务下线。从而  SOFARegistry 内部应该作相应处理。

2.2.3 解决

SOFARegistry 内部作了归并和延迟操做来保证用户不受影响。好比 DataServer 内部的数据经过 mergeDatum 延迟合并变动的 Publisher 服务信息,version 是合并后最新的版本号。

对于 DataChangeEventCenter,就是经过消息的延迟和归并来协助完成这个功能。

2.3 蚂蚁金服实现

下面是 DataChangeEventCenter 整体的功能描述:

  • 当有数据发布者 publisher 上下线时,会分别触发 publishDataProcessor 或 unPublishDataHandler;
  • Handler 首先会判断当前节点的状态:
    • 如果非工做状态则返回请求失败;
    • 如果工做状态,Handler 会往 dataChangeEventCenter 中添加一个数据变动事件,则触发数据变化事件中心 DataChangeEventCenter 的 onChange 方法。用于异步地通知事件变动中心数据的变动;
  • 事件变动中心收到该事件以后,会往队列中加入事件。此时 dataChangeEventCenter 会根据不一样的事件类型异步地对上下线数据进行相应的处理;
  • 与此同时,DataChangeHandler 会把这个事件变动信息经过 ChangeNotifier 对外发布,通知其余节点进行数据同步;

0x03 DataChangeEventCenter

3.1 总述

DataChangeEventCenter具体分红四部分:

  • Event Center:组织成消息中心;
  • Event Queue:用于多路分别处理,增长处理能力;
  • Event Task:每个Queue内部启动一个线程,用于异步处理,增长处理能力;
  • Event Handler:用于处理内部ChangeData;

接下来咱们一一介绍,由于 DataChangeEventCenter 和业务结合紧密,因此咱们会深刻结合业务进行讲解。

3.2 DataChangeEventCenter

3.2.1 定义

DataChangeEventCenter 中维护着一个 DataChangeEventQueue 队列数组,这是核心。数组中的每一个元素是一个事件队列。具体定义以下:

public class DataChangeEventCenter {/**
     * count of DataChangeEventQueue
     */private int                    queueCount;/**
     * queues of DataChangeEvent
     */private DataChangeEventQueue[] dataChangeEventQueues;@Autowiredprivate DataServerConfig       dataServerConfig;@Autowiredprivate DatumCache             datumCache;
}复制代码

3.2.2 消息类型

DataChangeEventCenter 专门处理 IDataChangeEvent 类型消息,其具体实现为三种:

  • public class ClientChangeEvent implements IDataChangeEvent
  • public class DataChangeEvent implements IDataChangeEvent
  • public class DatumSnapshotEvent implements IDataChangeEvent

这些不一样类型的消息能够放入同一个队列,具体放入哪一个队列,是根据特定判别方式来决定,好比根据Publisher的DataInfoId来作hash,以此决定放入哪一个Queue。

即,当对应 handler 的 onChange 方法被触发时,会计算该变化服务的 dataInfoId 的 Hash 值,从而进一步肯定出该服务注册数据所在的队列编号,进而把该变化的数据封装成一个数据变化对象,传入到队列中。

3.2.3 初始化

在初始化函数中,构建了EventQueue,每个Queue启动了一个线程,用来处理消息。

@PostConstructpublic void init() {if (isInited.compareAndSet(false, true)) {
        queueCount = dataServerConfig.getQueueCount();
        dataChangeEventQueues = new DataChangeEventQueue[queueCount];for (int idx = 0; idx < queueCount; idx++) {
            dataChangeEventQueues[idx] = new DataChangeEventQueue(idx, dataServerConfig, this,datumCache);
            dataChangeEventQueues[idx].start();
        }
    }
}复制代码

3.2.4 Put 消息

put消息比较简单,具体如何判别应该把Event放入哪个Queue是根据具体方式来判断,好比根据Publisher的DataInfoId来作hash,以此决定放入哪一个Queue:

int idx = hash(publisher.getDataInfoId());
Datum datum = new Datum(publisher, dataCenter);
dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
                DataSourceTypeEnum.PUB, datum));复制代码

3.2.5 如何处理消息

具体是经过 dataChangeEventQueues.onChange 来作处理,好比以下几个函数,分别处理不一样的消息类型。具体都是找到queue,而后调用:

public void onChange(Publisher publisher, String dataCenter) {int idx = hash(publisher.getDataInfoId());
    Datum datum = new Datum(publisher, dataCenter);if (publisher instanceof UnPublisher) {
        datum.setContainsUnPub(true);
    }if (publisher.getPublishType() != PublishType.TEMPORARY) {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB, datum));
    } else {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB_TEMP, datum));
    }
}public void onChange(ClientChangeEvent event) {for (DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) {
        dataChangeEventQueue.onChange(event);
    }
}public void onChange(DatumSnapshotEvent event) {for (DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) {
        dataChangeEventQueue.onChange(event);
    }
}public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) {int idx = hash(datum.getDataInfoId());
    DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum);
    dataChangeEventQueues[idx].onChange(event);
}复制代码

3.3 DataChangeEvent

由于 DataChangeEvent 最经常使用,因此咱们单独拿出来讲明。

DataChangeEvent会根据DataChangeTypeEnum和DataSourceTypeEnum来进行区分,就是处理类型和消息来源。

DataChangeTypeEnum具体分为:

  • MERGE,若是变动类型是MERGE,则会更新缓存中须要更新的新Datum,而且更新版本号;
  • COVER,若是变动类型是 COVER,则会覆盖原有的缓存;

DataSourceTypeEnum 具体分为:

  • PUB :pub by client;
  • PUB_TEMP :pub temporary data;
  • SYNC:sync from dataservers in other datacenter;
  • BACKUP:from dataservers in the same datacenter;
  • CLEAN:local dataInfo check,not belong this node schedule remove;
  • SNAPSHOT:Snapshot data, after renew finds data inconsistent;

具体定义以下:

public class DataChangeEvent implements IDataChangeEvent {/**
     * type of changed data, MERGE or COVER
     */private DataChangeTypeEnum changeType;private DataSourceTypeEnum sourceType;/**
     * data changed
     */private Datum              datum;
}复制代码

3.4 DataChangeEventQueue

DataChangeEventQueue 是这个子模块的核心,用于多路分别处理,增长处理能力。每个Queue内部启动一个线程,用于异步处理,也能增长处理能力。

3.4.1 核心变量

这里的核心是:

  • BlockingQueue      eventQueue;

  • Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();

  • DelayQueue               CHANGE_QUEUE              = new DelayQueue();

讲解以下:

  • 能够看到,这里操做的数据类型是ChangeData,把Datum转换成 ChangeData 能够把消息处理方式 或者 来源统一块儿来处理;
  • eventQueue 用来存储投放的消息,全部消息block在queue上,这能够保证消息的顺序处理;
  • CHANGE_DATA_MAP_FOR_MERGE。顾名思义,主要处理消息归并。这是按照 dataCenter,dataInfoId 做为维度,分别存储 ChangeData,能够理解为一个矩阵Map,使用putIfAbsent方法添加键值对,若是map集合中没有该key对应的值,则直接添加,并返回null,若是已经存在对应的值,则依旧为原来的值。这样若是短时间内向map中添加多个消息,这样就对多余的消息作了归并;
  • CHANGE_QUEUE 的做用是用于统一处理投放的ChangeData,不管是哪一个 data center的数据,都会统一在这里处理;这里须要注意的是使用了DelayQueue来进行延迟操做,就是咱们以前业务中提到的延迟操做;

具体定义以下:

public class DataChangeEventQueue {private final String                               name;/**
     * a block queue that stores all data change events
     */private final BlockingQueue<IDataChangeEvent>      eventQueue;private final Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();private final DelayQueue<ChangeData>               CHANGE_QUEUE              = new DelayQueue();private final int                                  notifyIntervalMs;private final int                                  notifyTempDataIntervalMs;private final ReentrantLock                        lock                      = new ReentrantLock();private final int                                  queueIdx;private DataServerConfig                           dataServerConfig;private DataChangeEventCenter                      dataChangeEventCenter;private DatumCache                                 datumCache;
}复制代码

3.4.2 启动和引擎

DataChangeEventQueue#start 方法在 DataChangeEventCenter 初始化的时候被一个新的线程调用,该线程会源源不断地从队列中获取新增事件,而且进行分发。新增数据会由此添加进节点内,实现分片。由于 eventQueue 是一个 BlockingQueue,因此可使用while (true)来控制。

当event被取出以后,会根据 DataChangeScopeEnum.DATUM 的不一样,会作不一样的处理。

  • 若是是DataChangeScopeEnum.DATUM,则判断dataChangeEvent.getSourceType();
    • 若是是 DataSourceTypeEnum.PUB_TEMP,则addTempChangeData,就是往CHANGE_QUEUE添加ChangeData;
    • 若是不是,则handleDatum;
  • 若是是DataChangeScopeEnum.CLIENT,则handleClientOff((ClientChangeEvent) event);
  • 若是是DataChangeScopeEnum.SNAPSHOT,则handleSnapshot((DatumSnapshotEvent) event);

具体代码以下:

public void start() {
    Executor executor = ExecutorFactory
            .newSingleThreadExecutor(String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), getName()));
    executor.execute(() -> {while (true) {try {
                IDataChangeEvent event = eventQueue.take();
                DataChangeScopeEnum scope = event.getScope();if (scope == DataChangeScopeEnum.DATUM) {
                    DataChangeEvent dataChangeEvent = (DataChangeEvent) event;//Temporary push data will be notify as soon as,and not merge to normal pub data;if (dataChangeEvent.getSourceType() == DataSourceTypeEnum.PUB_TEMP) {
                        addTempChangeData(dataChangeEvent.getDatum(), dataChangeEvent.getChangeType(),
                                dataChangeEvent.getSourceType());
                    } else {
                        handleDatum(dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType(),
                                dataChangeEvent.getDatum());
                    }
                } else if (scope == DataChangeScopeEnum.CLIENT) {
                    handleClientOff((ClientChangeEvent) event);
                } else if (scope == DataChangeScopeEnum.SNAPSHOT) {
                    handleSnapshot((DatumSnapshotEvent) event);
                }
            } 
        }
    });
}复制代码

具体以下图:

      +----------------------------+
      |   DataChangeEventCenter    |
      |                            |
      | +-----------------------+  |
      | | DataChangeEventQueue[]|  |
      | +-----------------------+  |
      +----------------------------+
                   |
                   |
                   v
+------------------+------------------------+
|          DataChangeEventQueue             |
|                                           |
| +---------------------------------------+ |
| |                                       | |
| |    BlockingQueue<IDataChangeEvent> +-------------+
| |                                       | |        |
| |                                       | |      +-v---------+
| | Map<String, Map<String, ChangeData<>  | | <--> |           |
| |                                       | |      | Executor  |
| |                                       | |      |           |
| |         start +------------------------------> |           |
| |                                       | |      +-+---------+
| |                                       | |        |
| |      DelayQueue<ChangeData>  <-------------------+
| |                                       | |
| +---------------------------------------+ |
+-------------------------------------------+复制代码

3.4.3 ChangeData

handleDatum 具体处理是把Datum转换为 ChangeData来处理,

为何要转换成 ChangeData来存储呢。

由于不管是消息处理方式或者来源,都有不一样的类型。好比在 NotifyFetchDatumHandler . fetchDatum 函数中,会先从其余 data server 获取 Datum,而后会根据 Datum 向dataChangeEventCenter中投放消息,通知本 Data Server 进行 BACKUP 操做,类型是 COVER 类型。

转换成 ChangeData就能够把消息处理方式或者来源统一块儿来处理。

用户会存储一个包含 datum 的消息。

dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, DataSourceTypeEnum.BACKUP, datum);复制代码

DataChangeEventQueue 会从 DataChangeEvent 中获取 Datum,而后把 Datum 转换为 ChangeData,存储起来。

private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType,
                         Datum targetDatum) {//get changed datumChangeData changeData = getChangeData(targetDatum.getDataCenter(),
                targetDatum.getDataInfoId(), sourceType, changeType);
            Datum cacheDatum = changeData.getDatum();if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) {
                changeData.setDatum(targetDatum);
            } 
}复制代码

ChangeData 定义以下:

public class ChangeData implements Delayed {/** data changed */private Datum              datum;/** change time */private Long               gmtCreate;/** timeout */private long               timeout;private DataSourceTypeEnum sourceType;private DataChangeTypeEnum changeType;
}复制代码

3.4.4 处理Datum

3.4.4.1 加入Datum

这里是处理真实ChangeData缓存,以及新加入的Datum。

  • 首先从 CHANGE_DATA_MAP_FOR_MERGE  获取以前存储的变动的ChangeData,若是没有,就生成一个加入,此时要为后续可能的归并作准备;
  • 拿到ChangeData以后
    • 若是变动类型是 COVER,则会覆盖原有的缓存。changeData.setDatum(targetDatum);
    • 不然是MERGE,则会更新缓存中须要更新的新Datum,而且更新版本号;

具体以下:

private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType,
                         Datum targetDatum) {
    lock.lock();try {//get changed datumChangeData changeData = getChangeData(targetDatum.getDataCenter(),
            targetDatum.getDataInfoId(), sourceType, changeType);
        Datum cacheDatum = changeData.getDatum();if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) {
            changeData.setDatum(targetDatum);
        } else {
            Map<String, Publisher> targetPubMap = targetDatum.getPubMap();
            Map<String, Publisher> cachePubMap = cacheDatum.getPubMap();for (Publisher pub : targetPubMap.values()) {
                String registerId = pub.getRegisterId();
                Publisher cachePub = cachePubMap.get(registerId);if (cachePub != null) {// if the registerTimestamp of cachePub is greater than the registerTimestamp of pub, it means// that pub is not the newest data, should be ignoredif (pub.getRegisterTimestamp() < cachePub.getRegisterTimestamp()) {continue;
                    }// if pub and cachePub both are publisher, and sourceAddress of both are equal,// and version of cachePub is greater than version of pub, should be ignoredif (!(pub instanceof UnPublisher) && !(cachePub instanceof UnPublisher)
                        && pub.getSourceAddress().equals(cachePub.getSourceAddress())
                        && cachePub.getVersion() > pub.getVersion()) {continue;
                    }
                }
                cachePubMap.put(registerId, pub);
                cacheDatum.setVersion(targetDatum.getVersion());
            }
        }
    } finally {
        lock.unlock();
    }
}复制代码
3.4.4.2 提出Datum

当提取时候,使用take函数,从CHANGE_QUEUE 和 CHANGE_DATA_MAP_FOR_MERGE 提出ChangeData。

public ChangeData take() throws InterruptedException {
    ChangeData changeData = CHANGE_QUEUE.take();
    lock.lock();try {
        removeMapForMerge(changeData);return changeData;
    } finally {
        lock.unlock();
    }
}复制代码

具体提取Datum会在DataChangeHandler。

3.5 DataChangeHandler

DataChangeHandler 会按期提取DataChangeEventCenter中的消息,而后进行处理,主要功能就是执行ChangeNotifier 来通知相关模块:hi,这里有新数据变化来到了,兄弟们走起来。

3.5.1 类定义

public class DataChangeHandler {@Autowiredprivate DataServerConfig          dataServerConfig;@Autowiredprivate DataChangeEventCenter     dataChangeEventCenter;@Autowiredprivate DatumCache                datumCache;@Resourceprivate List<IDataChangeNotifier> dataChangeNotifiers;
}复制代码

3.5.2 执行引擎ChangeNotifier

DataChangeHandler 会遍历 DataChangeEventCenter 中全部 DataChangeEventQueue,而后从 DataChangeEventQueue 之中取出ChangeData,针对每个ChangeData,生成一个ChangeNotifier。

每一个ChangeNotifier都是一个处理线程。

每一个 dataChangeEventQueue 生成了 5 个 ChangeNotifier。

@PostConstructpublic void start() {
    DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();int queueCount = queues.length;
    Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
    Executor notifyExecutor = ExecutorFactory
            .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());  for (int idx = 0; idx < queueCount; idx++) {final DataChangeEventQueue dataChangeEventQueue = queues[idx];final String name = dataChangeEventQueue.getName();
        executor.execute(() -> {while (true) {                 final ChangeData changeData = dataChangeEventQueue.take();
                 notifyExecutor.execute(new ChangeNotifier(changeData, name));
            }
        });
    }
}复制代码

3.5.3 Notify

咱们回顾下业务:

当有数据发布者 publisher 上下线时,会分别触发 publishDataProcessor 或 unPublishDataHandler ,Handler 会往 dataChangeEventCenter 中添加一个数据变动事件,用于异步地通知事件变动中心数据的变动。事件变动中心收到该事件以后,会往队列中加入事件。此时 dataChangeEventCenter 会根据不一样的事件类型异步地对上下线数据进行相应的处理。

对于 ChangeData,会生成 ChangeNotifier 进行处理。会把这个事件变动信息经过 ChangeNotifier 对外发布,通知其余节点进行数据同步。

private class ChangeNotifier implements Runnable {private ChangeData changeData;private String     name;@Overridepublic void run() {if (changeData instanceof SnapshotData) {
           ......
        } else {
            Datum datum = changeData.getDatum();

            String dataCenter = datum.getDataCenter();
            String dataInfoId = datum.getDataInfoId();
            DataSourceTypeEnum sourceType = changeData.getSourceType();
            DataChangeTypeEnum changeType = changeData.getChangeType();if (changeType == DataChangeTypeEnum.MERGE
                && sourceType != DataSourceTypeEnum.BACKUP
                && sourceType != DataSourceTypeEnum.SYNC) {//update version for pub or unPub merge to cache//if the version product before merge to cache,it may be cause small version override big onedatum.updateVersion();
            }long version = datum.getVersion();try {if (sourceType == DataSourceTypeEnum.CLEAN) {if (datumCache.cleanDatum(dataCenter, dataInfoId)) {
                      ......
                    }

                } else if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
                    notifyTempPub(datum, sourceType, changeType);
                } else {
                    MergeResult mergeResult = datumCache.putDatum(changeType, datum);
                    Long lastVersion = mergeResult.getLastVersion();if (lastVersion != null&& lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) {return;
                    }//lastVersion null means first add datumif (lastVersion == null || version != lastVersion) {if (mergeResult.isChangeFlag()) {
                            notify(datum, sourceType, lastVersion);
                        }
                    }
                }
            } 
        }

    }
}复制代码

notify函数会遍历dataChangeNotifiers

private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) {for (IDataChangeNotifier notifier : dataChangeNotifiers) {if (notifier.getSuitableSource().contains(sourceType)) {
            notifier.notify(datum, lastVersion);
        }
    }
}复制代码

对应的Bean是:

@Bean(name = "dataChangeNotifiers")public List<IDataChangeNotifier> dataChangeNotifiers() {
    List<IDataChangeNotifier> list = new ArrayList<>();
    list.add(sessionServerNotifier());
    list.add(tempPublisherNotifier());
    list.add(backUpNotifier());return list;
}复制代码

至于如何处理通知,咱们后续会撰文处理。

至此,DataChangeEventCenter 总体逻辑以下图所示

                +----------------------------+
                |   DataChangeEventCenter    |
                |                            |
                | +-----------------------+  |
                | | DataChangeEventQueue[]|  |
                | +-----------------------+  |
                +----------------------------+
                             |
                             |
                             v
          +------------------+------------------------+
          |          DataChangeEventQueue             |
          |                                           |
          | +---------------------------------------+ |
          | |                                       | |
          | |    BlockingQueue<IDataChangeEvent> +-------------+
          | |                                       | |        |
          | |                                       | |      +-v---------+
          | | Map<String, Map<String, ChangeData<>  | | <--> |           |
          | |                                       | |      | Executor  |
          | |                                       | |      |           |
          | |         start +------------------------------> |           |
          | |                                       | |      +-+---------+
          | |                                       | |        |
+----------------+ DelayQueue<ChangeData>  <-------------------+
|         | |                                       | |
|         | +---------------------------------------+ |
|         +-------------------------------------------+
|
|
|         +--------------------------+
|  take   |                          |    notify   +-------------------+
+-------> |    DataChangeHandler     | +---------> |dataChangeNotifiers|
          |                          |             +-------------------+
          +--------------------------+复制代码

手机以下图:

0x04 结论

由于独特的业务场景,因此阿里把 DataChangeEventCenter 单独分离出来,知足了如下业务需求。若是你们在实际工做中有相似的需求,能够参考借鉴,具体处理方式以下:

  • 须要提升处理能力,并行处理;
    • 用queue数组实现,每个Queue均可以处理消息,增长处理能力;
  • 异步处理消息;
    • 每个Queue内部启动一个线程,用于异步处理;
  • 须要保证消息顺序;
    • eventQueue 用来存储投放的消息,全部消息block在queue上,这能够保证消息的顺序处理;
  • 有延迟操做;
    • 使用了DelayQueue来进行延迟操做;
  • 须要归并操做,即短时间内会有多个通知来到,不须要逐一处理,只处理最后一个便可;
    • 使用putIfAbsent方法添加键值对,若是map集合中没有该key对应的值,则直接添加,并返回null,若是已经存在对应的值,则依旧为原来的值。这样若是短时间内向map中添加多个消息,这样就对多余的消息作了归并;
相关文章
相关标签/搜索