SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。html
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让你们借以学习阿里如何设计。java
本文为第十四篇,介绍SOFARegistry服务上线和操做日志。上文是从Session Server角度,本文从 Data Server 角度介绍。node
咱们首先回顾整体业务流程,这部分属于数据分片。数组
回顾下“一次服务注册过程”的服务数据在内部流转过程。缓存
由于篇幅所限,上文讨论的是前两点,本文介绍第三,第四点。网络
当服务上线时,会计算新增服务的 dataInfoId Hash 值,从而对该服务进行分片,最后寻找最近的一个节点,存储到相应的节点上。session
DataServer 服务在启动时添加了 publishDataProcessor 来处理相应的服务发布者数据发布请求,该 publishDataProcessor 就是 PublishDataHandler。当有新的服务发布者上线,DataServer 的 PublishDataHandler 将会被触发。数据结构
该 Handler 首先会判断当前节点的状态,如果非工做状态则返回请求失败。如果工做状态,则触发数据变化事件中心 DataChangeEventCenter 的 onChange 方法。架构
DataChangeEventQueue 中维护着一个 DataChangeEventQueue 队列数组,数组中的每一个元素是一个事件队列。当上文中的 onChange 方法被触发时,会计算该变化服务的 dataInfoId 的 Hash 值,从而进一步肯定出该服务注册数据所在的队列编号,进而把该变化的数据封装成一个数据变化对象,传入到队列中。app
DataChangeEventQueue#start 方法在 DataChangeEventCenter 初始化的时候被一个新的线程调用,该方法会源源不断地从队列中获取新增事件,而且进行分发。新增数据会由此添加进节点内,实现分片。
与此同时,DataChangeHandler 会把这个事件变动信息经过 ChangeNotifier 对外发布,通知其余节点进行数据同步。
这里须要首先讲解几个相关数据结构。
Publisher是数据发布者信息。
public class Publisher extends BaseInfo { private List<ServerDataBox> dataList; private PublishType publishType = PublishType.NORMAL; }
是从SOFARegistry自己出发而聚集的数据发布者信息,里面核心是 :
和
<租户 instanceId>构成,例如在 SOFARPC 的场景下,一个 dataInfoId 一般是
com.alipay.sofa.rpc.example.HelloService#@#SOFA#@#00001`,其中SOFA 是 group 名称,00001 是租户 id。group 和 instance 主要是方便对服务数据作逻辑上的切分,使不一样 group 和 instance 的服务数据在逻辑上彻底独立。模型里有 group 和 instanceId 字段,但这里不额外列出来,读者只要理解 dataInfoId 的含义便可;具体代码以下:
public class Datum implements Serializable { private String dataInfoId; private String dataCenter; private String dataId; private String instanceId; private String group; private Map<String/*registerId*/, Publisher> pubMap = new ConcurrentHashMap<>(); private long version; private boolean containsUnPub = false; }
DatumCache 是最新的Datum。
public class DatumCache { @Autowired private DatumStorage localDatumStorage; }
具体存储是在LocalDatumStorage中完成。
public class LocalDatumStorage implements DatumStorage { /** * row: dataCenter * column: dataInfoId * value: datum */ protected final Map<String, Map<String, Datum>> DATUM_MAP = new ConcurrentHashMap<>(); /** * all datum index * * row: ip:port * column: registerId * value: publisher */ protected final Map<String, Map<String, Publisher>> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>(); @Autowired private DataServerConfig dataServerConfig; }
Operator 是每一步Datum对应的操做。
public class Operator { private Long version; private Long sourceVersion; private Datum datum; private DataSourceTypeEnum sourceType; }
记录了全部的Datum操做。其中:
public class Acceptor { private final String dataInfoId; private final String dataCenter; private int maxBufferSize; static final int DEFAULT_DURATION_SECS = 30; private final Deque<Long/*version*/> logOperatorsOrder = new ConcurrentLinkedDeque<>(); private Map<Long/*version*/, Operator> logOperators = new ConcurrentHashMap<>(); private final DatumCache datumCache; }
总结下这几个数据结构的联系:
咱们先回顾下 Datum 的前因后果。
首先,咱们讲讲Session Server 内部如何获取Datum
在 Session Server 内部,Datum存储在 SessionCacheService 之中。
好比在 DataChangeFetchCloudTask 内部,能够这样获取 Datum。
private Map<String, Datum> getDatumsCache() { Map<String, Datum> map = new HashMap<>(); NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.META); Collection<String> dataCenters = nodeManager.getDataCenters(); if (dataCenters != null) { Collection<Key> keys = dataCenters.stream(). map(dataCenter -> new Key(KeyType.OBJ, DatumKey.class.getName(), new DatumKey(fetchDataInfoId, dataCenter))). collect(Collectors.toList()); Map<Key, Value> values = null; values = sessionCacheService.getValues(keys); if (values != null) { values.forEach((key, value) -> { if (value != null && value.getPayload() != null) { map.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload()); } }); } } return map; }
Session Server 会向 Data Server 发送 PublishDataRequest 请求。
在DataServer内部,PublishDataHandler 是用来处理 PublishDataRequest。
public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> { @Autowired private ForwardService forwardService; @Autowired private SessionServerConnectionFactory sessionServerConnectionFactory; @Autowired private DataChangeEventCenter dataChangeEventCenter; @Autowired private DataServerConfig dataServerConfig; @Autowired private DatumLeaseManager datumLeaseManager; @Autowired private ThreadPoolExecutor publishProcessorExecutor; @Override public Object doHandle(Channel channel, PublishDataRequest request) { Publisher publisher = Publisher.internPublisher(request.getPublisher()); if (forwardService.needForward()) { CommonResponse response = new CommonResponse(); response.setSuccess(false); response.setMessage("Request refused, Server status is not working"); return response; } dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter()); if (publisher.getPublishType() != PublishType.TEMPORARY) { String connectId = WordCache.getInstance().getWordCache( publisher.getSourceAddress().getAddressString()); sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(), connectId); // record the renew timestamp datumLeaseManager.renew(connectId); } return CommonResponse.buildSuccessResponse(); } }
在 DataChangeEventCenter 的 onChange 函数中,会进行投放。
public void onChange(Publisher publisher, String dataCenter) { int idx = hash(publisher.getDataInfoId()); Datum datum = new Datum(publisher, dataCenter); if (publisher instanceof UnPublisher) { datum.setContainsUnPub(true); } if (publisher.getPublishType() != PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum)); } else { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); } }
在DataChangeEventQueue之中,会调用 handleDatum 来处理。在这里对Datum进行存储。
在 DataChangeHandler 之中,会提取ChangeData,而后进行Notify。
public void start() { DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues(); int queueCount = queues.length; Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName()); Executor notifyExecutor = ExecutorFactory .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName()); for (int idx = 0; idx < queueCount; idx++) { final DataChangeEventQueue dataChangeEventQueue = queues[idx]; final String name = dataChangeEventQueue.getName(); executor.execute(() -> { while (true) { final ChangeData changeData = dataChangeEventQueue.take(); notifyExecutor.execute(new ChangeNotifier(changeData, name)); } }); } }
具体以下:
+ Session Server | Data Server | | | | +--------------------------+ PublishDataRequest +--------------------+ | DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler | +-----------+--------------+ | +------+-------------+ ^ | | | getValues | | onChange(Publisher) | | v | | +--------+--------------+ +---------+----------+ | | DataChangeEventCenter | |sessionCacheService | | +--------+--------------+ +--------------------+ | | | | Datum | | | v | +--------+-------------+ | | DataChangeEventQueue | | +--------+-------------+ | | | | | | ChangeData | v | +-------+-----------+ | | DataChangeHandler | + +-------------------+
因而咱们接着进行 DataChangeHandler 处理。即总述中提到的:DataChangeHandler 会把这个事件变动信息:
下面咱们从第一部分 :把这个事件变动信息变成Operator,放到AbstractAcceptorStore 出发,进行讲解日志操做。
即如图所示:
+ Session Server | Data Server | | | + +--------------------------+ PublishDataRequest +--------------------+ | DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler | +-----------+--------------+ | +------+-------------+ ^ | | | getValues | | onChange(Publisher) | | v | | +--------+--------------+ +---------+----------+ | | DataChangeEventCenter | |sessionCacheService | | +--------+--------------+ +--------------------+ | | | | Datum | | | v | +--------+-------------+ | | DataChangeEventQueue | | +--------+-------------+ | | | | | | ChangeData | v | +-------+-----------+ | | DataChangeHandler | | +-------+-----------+ | | | | | v | +-------+---------+ | | ChangeNotifier | | +-------+---------+ | | | | | v | +----------+------------+ | | AbstractAcceptorStore | | +-----------------------+ +
Acceptor的appendOperator谁来调用?在Notifier 里面有,好比:
public class BackUpNotifier implements IDataChangeNotifier { @Autowired private SyncDataService syncDataService; @Override public void notify(Datum datum, Long lastVersion) { syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum, DataSourceTypeEnum.BACKUP)); } }
以及另外一个:
public class SnapshotBackUpNotifier implements IDataChangeNotifier { @Autowired private SyncDataService syncDataService; @Override public void notify(Datum datum, Long lastVersion) { syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum, DataSourceTypeEnum.BACKUP)); } }
AbstractAcceptorStore是日志存储,咱们下面详细分析。
对于操做信息,提供了一个Bean来存储。
@Bean public AcceptorStore localAcceptorStore() { return new LocalAcceptorStore(); }
做用是在 storeServiceMap 中存放各类 AcceptorStore,目前只有LocalAcceptorStore 这一个。
public class StoreServiceFactory implements ApplicationContextAware { private static Map<String/*supportType*/, AcceptorStore> storeServiceMap = new HashMap<>(); /** * get AcceptorStore by storeType * @param storeType * @return */ public static AcceptorStore getStoreService(String storeType) { return storeServiceMap.get(storeType); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, AcceptorStore> map = applicationContext.getBeansOfType(AcceptorStore.class); map.forEach((key, value) -> storeServiceMap.put(value.getType(), value)); } }
AbstractAcceptorStore 是存储的基本实现类,几个基本成员是。
acceptors :是一个矩阵,按照dataCenter,dataInfoId维度来分类,存储了此维度下的Acceptor;就是说,针对每个dataCenter,dataInfoId的组合,都有一个Acceptor,用来存储这下面的Operator。
notifyAcceptorsCache :是一个矩阵,按照dataCente,dataInfoId维度来分类,缓存了此维度下须要进一步处理的Acceptor;
delayQueue :配合notifyAcceptorsCache使用,针对notifyAcceptorsCache的每个新acceptor,系统会添加一个消息进入queue,这个queue等延时到了,就会取出,而且从notifyAcceptorsCache取出对应的新acceptor进行相应处理;
按说应该是 cache 有东西,因此dequeue 时候就会取出来,可是若是这期间多放入了几个进入 Cache,原有cache 的 value 只是被替换而已,等时间到了,也会取出来。
notifyAcceptorsCache 也是按照 data center 来控制的,只有按期 removeCache。
public abstract class AbstractAcceptorStore implements AcceptorStore { private static final int DEFAULT_MAX_BUFFER_SIZE = 30; @Autowired protected IMetaServerService metaServerService; @Autowired private Exchange boltExchange; @Autowired private DataServerConfig dataServerConfig; @Autowired private DataServerConnectionFactory dataServerConnectionFactory; @Autowired private DatumCache datumCache; private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> acceptors = new ConcurrentHashMap<>(); private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> notifyAcceptorsCache = new ConcurrentHashMap<>(); private DelayQueue<DelayItem<Acceptor>> delayQueue }
具体以下图:
+-----------------------------+ +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> |[AbstractAcceptorStore] | | | | +-> dataCenter +---+ | | | | | acceptors +--------------->+ +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> | | | | notifyAcceptorsCache | | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> | + | +-> dataCenter +-->+ +-----------------------------+ | | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> | | | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> | +-> dataCenter +-->+ | | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> +-------------------->+ | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> +-> dataCenter +---+ +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
手机如图:
有一点须要说明,就是delayQueue 为什么要延迟队列。这是因为SOFA的“秒级服务上下线通知“特性形成的。
由于要实现此特性,因此涉及到了一个链接敏感性问题,即在 SOFARegistry 里,全部 Client 都与 SessionServer 保持长链接,每条长链接都会有基于 bolt 的链接心跳,若是链接断开,Client 会立刻从新建连,时刻保证 Client 与 SessionServer 之间有可靠的链接。
由于强烈的链接敏感性,因此致使若是只是网络问题致使链接断开,实际的进程并无宕机,那么 Client 会立刻重连 SessionServer 并从新注册全部服务数据。这种大量的短暂的服务下线后又从新上线会给用户带来困扰和麻烦。
所以在 DataServer 内部实现了数据延迟合并的功能,就是这里的DelayQueue。
addOperator的基本逻辑是:
在操做中,都是使用putIfAbsent,这样短时间内如有多个一样value插入,则不会替换原有的value,这样 起到了归并做用。
@Override public void addOperator(Operator operator) { Datum datum = operator.getDatum(); String dataCenter = datum.getDataCenter(); String dataInfoId = datum.getDataInfoId(); try { Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter); if (acceptorMap == null) { Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>(); acceptorMap = acceptors.putIfAbsent(dataCenter, newMap); if (acceptorMap == null) { acceptorMap = newMap; } } Acceptor existAcceptor = acceptorMap.get(dataInfoId); if (existAcceptor == null) { Acceptor newAcceptor = new Acceptor(DEFAULT_MAX_BUFFER_SIZE, dataInfoId, dataCenter, datumCache); existAcceptor = acceptorMap.putIfAbsent(dataInfoId, newAcceptor); if (existAcceptor == null) { existAcceptor = newAcceptor; } } if (operator instanceof SnapshotOperator) { //snapshot: clear the queue, Make other data retrieve the latest memory data existAcceptor.clearBefore(); } else { existAcceptor.appendOperator(operator); } //put cache putCache(existAcceptor); } }
putCache的做用是:
这里也使用putIfAbsent,这样短时间内如有多个一样value插入,则不会替换原有的value,这样 起到了归并做用。
private void putCache(Acceptor acceptor) { String dataCenter = acceptor.getDataCenter(); String dataInfoId = acceptor.getDataInfoId(); try { Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter); if (acceptorMap == null) { Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>(); acceptorMap = notifyAcceptorsCache.putIfAbsent(dataCenter, newMap); if (acceptorMap == null) { acceptorMap = newMap; } } Acceptor existAcceptor = acceptorMap.putIfAbsent(dataInfoId, acceptor); if (existAcceptor == null) { addQueue(acceptor); } } }
具体消费是在按期任务中完成。消费日志的目的就是同步日志操做给其余 DataServer。
Scheduler类是按期任务,会启动两个线程池按期调用AcceptorStore的函数。
public class Scheduler { private final ScheduledExecutorService scheduler; public final ExecutorService versionCheckExecutor; private final ThreadPoolExecutor expireCheckExecutor; @Autowired private AcceptorStore localAcceptorStore; public Scheduler() { scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler")); expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck")); versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory( "SyncDataScheduler-versionChangeCheck")); } public void startScheduler() { scheduler.schedule( new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3, TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()), 30, TimeUnit.SECONDS); versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck()); } }
AbstractAcceptorStore中函数以下:
changeDataCheck 内部是一个while true,因此不须要再使用线程池。
changeDataCheck绑定在delayQueue上,若是有新消息,则会取出Acceptor,也从notifyAcceptorsCache取出Acceptor,调用notifyChange(acceptor);进行处理 。
@Override public void changeDataCheck() { while (true) { try { DelayItem<Acceptor> delayItem = delayQueue.take(); Acceptor acceptor = delayItem.getItem(); removeCache(acceptor); // compare and remove } catch (InterruptedException e) { break; } catch (Throwable e) { LOGGER.error(e.getMessage(), e); } } }
消费Cache用到的是removeCache。
private void removeCache(Acceptor acceptor) { String dataCenter = acceptor.getDataCenter(); String dataInfoId = acceptor.getDataInfoId(); try { Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter); if (acceptorMap != null) { boolean result = acceptorMap.remove(dataInfoId, acceptor); if (result) { //data change notify notifyChange(acceptor); } } } }
在removeCache中,也使用notifyChange进行了通知,逻辑以下:
private void notifyChange(Acceptor acceptor) { Long lastVersion = acceptor.getLastVersion(); NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(), acceptor.getDataCenter(), lastVersion, getType()); List<String> targetDataIps = getTargetDataIp(acceptor.getDataInfoId()); for (String targetDataIp : targetDataIps) { if (DataServerConfig.IP.equals(targetDataIp)) { continue; } Server syncServer = boltExchange.getServer(dataServerConfig.getSyncDataPort()); for (int tryCount = 0; tryCount < dataServerConfig.getDataSyncNotifyRetry(); tryCount++) { try { Connection connection = dataServerConnectionFactory.getConnection(targetDataIp); if (connection == null) { TimeUtil.randomDelay(1000); continue; } syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()), request, 1000); break; } } } }
这部分的调用逻辑为:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest
。
具体以下图:
+--------------------------+ | | +----------------------------------------------------------------------+ | versionCheckExecutor | | [AbstractAcceptorStore] | | | | | +--------+-----------------+ | | | | | | | | | | | | | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors | | changeDataCheck | | +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache | removeCache / notifyChange | | + +----------------------------------------------------------------------+ | | | | NotifyDataSyncRequest | v +------+-----------+ | Other DataServer | +------------------+
手机以下图:
checkAcceptorsChangAndExpired做用是遍历acceptors每一个acceptor,看看是否expired,进行处理。
@Override public void checkAcceptorsChangAndExpired() { acceptors.forEach((dataCenter, acceptorMap) -> { if (acceptorMap != null && !acceptorMap.isEmpty()) { acceptorMap.forEach((dataInfoId, acceptor) -> acceptor.checkExpired(0)); } }); }
此时,逻辑以下:
+--------------------------+ +------------------------+ | | +----------------------------------------------------------------------+ | | | versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor | | | | | | | +--------+-----------------+ | | +--------------+---------+ | | | | | | | | | | | | | | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+ | changeDataCheck | | checkAcceptorsChangAndExpired +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache | removeCache / notifyChange | | + +----------------------------------------------------------------------+ | | | | NotifyDataSyncRequest | v +------+-----------+ | Other DataServer | +------------------+
手机以下:
这里记录了日志,即记录了全部的Datum操做。
操做日志存储采用Queue方式,获取日志时候经过当前版本号在堆栈内所在位置,把全部版本以后的操做日志同步过来执行。
public class Acceptor { private final String dataInfoId; private final String dataCenter; private int maxBufferSize; static final int DEFAULT_DURATION_SECS = 30; private final Deque<Long/*version*/> logOperatorsOrder = new ConcurrentLinkedDeque<>(); private Map<Long/*version*/, Operator> logOperators = new ConcurrentHashMap<>(); private final DatumCache datumCache; }
关键变量是:
Operator 就是每一步操做对应的Datum。
public class Operator { private Long version; private Long sourceVersion; private Datum datum; private DataSourceTypeEnum sourceType; }
此函数做用是:添加一个操做日志。
具体代码以下:
/** * append operator to queue,if queue is full poll the first element and append. * Process will check version sequence,it must append with a consequent increase in version, * otherwise queue will be clean * * @param operator */ public void appendOperator(Operator operator) { write.lock(); try { if (isFull()) { logOperators.remove(logOperatorsOrder.poll()); } if (operator.getSourceVersion() == null) { operator.setSourceVersion(0L); } Long tailVersion = logOperatorsOrder.peekLast(); if (tailVersion != null) { //operation add not by solid sequence if (tailVersion.longValue() != operator.getSourceVersion().longValue()) { clearBefore(); } } Operator previousOperator = logOperators.put(operator.getVersion(), operator); if (previousOperator == null) { logOperatorsOrder.add(operator.getVersion()); } } finally { write.unlock(); } }
appendOperator谁来调用?在Notifier 里面有,好比:
public class BackUpNotifier implements IDataChangeNotifier { @Autowired private SyncDataService syncDataService; @Override public void notify(Datum datum, Long lastVersion) { syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum, DataSourceTypeEnum.BACKUP)); } }
以及
public class SnapshotBackUpNotifier implements IDataChangeNotifier { @Autowired private SyncDataService syncDataService; @Override public void notify(Datum datum, Long lastVersion) { syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum, DataSourceTypeEnum.BACKUP)); } }
此方法做用是去除过时日志。version是时间戳,因此能够按期check,若是过时,就清除。
public void checkExpired(int durationSEC) { write.lock(); try { //check all expired Long peekVersion = logOperatorsOrder.peek(); if (peekVersion != null && isExpired(durationSEC, peekVersion)) { logOperators.remove(logOperatorsOrder.poll()); checkExpired(durationSEC); } } finally { write.unlock(); } }
此请求做用是通知接收端进行数据同步。
回忆下这部分的调用逻辑为:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest
。
接收端data server经过NotifyDataSyncHandler处理
public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements AfterWorkingProcess { @Autowired private DataServerConfig dataServerConfig; @Autowired private GetSyncDataHandler getSyncDataHandler; @Autowired private DataChangeEventCenter dataChangeEventCenter; private Executor executor = ExecutorFactory .newFixedThreadPool( 10, NotifyDataSyncHandler.class .getSimpleName()); private ThreadPoolExecutor notifyExecutor; @Autowired private DataNodeStatus dataNodeStatus; @Autowired private DatumCache datumCache; }
doHandle方法用来继续处理。
@Override public Object doHandle(Channel channel, NotifyDataSyncRequest request) { final Connection connection = ((BoltChannel) channel).getConnection(); if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) { noWorkQueue.add(new SyncDataRequestForWorking(connection, request)); return CommonResponse.buildSuccessResponse(); } executorRequest(connection, request); return CommonResponse.buildSuccessResponse(); }
由于接到了发起端DataServer的同步通知NotifyDataSyncRequest,因此接收端DataServer主动发起拉取,进行同步数据。即调用GetSyncDataHandler来发送SyncDataRequest
private void executorRequest(Connection connection, NotifyDataSyncRequest request) { executor.execute(() -> { fetchSyncData(connection, request); }); } protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) { String dataInfoId = request.getDataInfoId(); String dataCenter = request.getDataCenter(); Datum datum = datumCache.get(dataCenter, dataInfoId); Long version = (datum == null) ? null : datum.getVersion(); Long requestVersion = request.getVersion(); if (version == null || requestVersion == 0L || version < requestVersion) { getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection, new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()), dataChangeEventCenter)); } }
GetSyncDataHandler和SyncDataCallback配合。
即调用GetSyncDataHandler来发送SyncDataRequest,用SyncDataCallback接收同步结果。
├── remoting │ ├── dataserver │ │ ├── DataServerConnectionFactory.java │ │ ├── DataServerNodeFactory.java │ │ ├── GetSyncDataHandler.java │ │ ├── SyncDataCallback.java │ │ ├── handler │ │ └── task
GetSyncDataHandler 和 SyncDataCallback 这两个辅助类的位置比较奇怪,大概由于是功能类,因此放在dataserver目录下,我的认为也许单独设置一个目录存放更好。
public class GetSyncDataHandler { @Autowired private DataNodeExchanger dataNodeExchanger; public void syncData(SyncDataCallback callback) { int tryCount = callback.getRetryCount(); if (tryCount > 0) { try { callback.setRetryCount(--tryCount); dataNodeExchanger.request(new Request() { @Override public Object getRequestBody() { return callback.getRequest(); } @Override public URL getRequestUrl() { return new URL(callback.getConnection().getRemoteIP(), callback .getConnection().getRemotePort()); } @Override public CallbackHandler getCallBackHandler() { return new CallbackHandler() { @Override public void onCallback(Channel channel, Object message) { callback.onResponse(message); } @Override public void onException(Channel channel, Throwable exception) { callback.onException(exception); } @Override public Executor getExecutor() { return callback.getExecutor(); } }; } }); } } } }
这里接收同步结果。
public class SyncDataCallback implements InvokeCallback { private static final Executor EXECUTOR = ExecutorFactory.newFixedThreadPool(5, SyncDataCallback.class.getSimpleName()); private static final int RETRY_COUNT = 3; private Connection connection; private SyncDataRequest request; private GetSyncDataHandler getSyncDataHandler; private int retryCount; private DataChangeEventCenter dataChangeEventCenter; @Override public void onResponse(Object obj) { GenericResponse<SyncData> response = (GenericResponse) obj; if (!response.isSuccess()) { getSyncDataHandler.syncData(this); } else { SyncData syncData = response.getData(); Collection<Datum> datums = syncData.getDatums(); DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request .getDataSourceType()); if (syncData.getWholeDataTag()) { //handle all data, replace cache with these datum directly for (Datum datum : datums) { if (datum == null) { datum = new Datum(); datum.setDataInfoId(syncData.getDataInfoId()); datum.setDataCenter(syncData.getDataCenter()); } Datum.internDatum(datum); dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum); break; } } else { //handle incremental data one by one if (!CollectionUtils.isEmpty(datums)) { for (Datum datum : datums) { if (datum != null) { Datum.internDatum(datum); dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE, dataSourceTypeEnum, datum); } } } } } } }
此时逻辑以下:
[Sender DataServer] +--------------------------+ +------------------------+ | | +----------------------------------------------------------------------+ | | | versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor | | | | | | | +--------+-----------------+ | | +--------------+---------+ | | | | | | | | | | | | | | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+ | changeDataCheck | | checkAcceptorsChangAndExpired +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache | removeCache / notifyChange | | + +----------------------------------------------------------------------+ | NotifyDataSyncRequest| 1 ^ 2 | | +-------------------------------------------------------------------------------------------------------------------------------------------+ | | SyncDataRequest v | +-------+-----------------------------------+ |[Other DataServer] | | | | | | | | | + | | GetSyncDataHandler SyncDataCallback | | | | | | | | | +-------------------------------------------+
手机如图:
SyncDataRequest发送回通知发送者。因此这里是other DataServer 发送给 Sender DataServer。
public class SyncDataRequest implements Serializable { private String dataInfoId; private String dataCenter; private String dataSourceType; /** * be null when dataInfoId not exist in local datumCache */ private Long version; }
咱们回忆下,SyncDataRequest 从哪里来?在 NotifyDataSyncHandler 的响应函数中,会产生 SyncDataRequest。这里会根据请求的信息,从cache之中获取infoId对应的version,而后发送请求。
public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements AfterWorkingProcess { protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) { String dataInfoId = request.getDataInfoId(); String dataCenter = request.getDataCenter(); Datum datum = datumCache.get(dataCenter, dataInfoId); Long version = (datum == null) ? null : datum.getVersion(); Long requestVersion = request.getVersion(); if (version == null || requestVersion == 0L || version < requestVersion) { getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection, new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()), dataChangeEventCenter)); } } }
进而在AbstractAcceptorStore之中
private void notifyChange(Acceptor acceptor) { Long lastVersion = acceptor.getLastVersion(); //may be delete by expired if (lastVersion == null) { lastVersion = 0L; } NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(), acceptor.getDataCenter(), lastVersion, getType()); syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()), request, 1000); }
通知发起者使用 SyncDataHandler 来处理。
节点间数据同步 Handler,该 Handler 被触发时,会经过版本号进行比对,若当前 DataServer 所存储数据版本号含有当前请求版本号,则会返回全部大于当前请求数据版本号的全部数据,便于节点间进行数据同步。
public class SyncDataHandler extends AbstractServerHandler<SyncDataRequest> { @Autowired private SyncDataService syncDataService; @Override public Object doHandle(Channel channel, SyncDataRequest request) { SyncData syncData = syncDataService.getSyncDataChange(request); return new GenericResponse<SyncData>().fillSucceed(syncData); } @Override public HandlerType getType() { return HandlerType.PROCESSER; } @Override public Class interest() { return SyncDataRequest.class; } @Override protected Node.NodeType getConnectNodeType() { return Node.NodeType.DATA; } }
具体业务服务是SyncDataServiceImpl。会从acceptorStore获取data,即getSyncDataChange方法。
public class SyncDataServiceImpl implements SyncDataService { @Override public void appendOperator(Operator operator) { AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(operator.getSourceType() .toString()); if (acceptorStore != null) { acceptorStore.addOperator(operator); } } @Override public SyncData getSyncDataChange(SyncDataRequest syncDataRequest) { AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(syncDataRequest .getDataSourceType()); if (acceptorStore != null) { return acceptorStore.getSyncData(syncDataRequest); } } }
关于appendOperator如何调用,前文有描述。
SyncDataServiceImpl会继续调用到AbstractAcceptorStore。
根据dataCenter和dataInfoId获取出Acceptor,而后返回其process后的数据。
@Override public SyncData getSyncData(SyncDataRequest syncDataRequest) { String dataCenter = syncDataRequest.getDataCenter(); String dataInfoId = syncDataRequest.getDataInfoId(); Long currentVersion = syncDataRequest.getVersion(); try { Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter); Acceptor existAcceptor = acceptorMap.get(dataInfoId); return existAcceptor.process(currentVersion); } }
而后是Acceptor的处理。
处理发送数据的当前版本号,若是当前版本号存在于当前queue中,返回全部版本号大于当前版本号的Operator,不然全部Operator。
public SyncData process(Long currentVersion) { read.lock(); try { Collection<Operator> operators = acceptOperator(currentVersion); List<Datum> retList = new LinkedList<>(); SyncData syncData; boolean wholeDataTag = false; if (operators != null) { //first get all data if (operators.isEmpty()) { wholeDataTag = true; retList.add(datumCache.get(dataCenter, dataInfoId)); } else { for (Operator operator : operators) { retList.add(operator.getDatum()); } } syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList); } else { //no match get all data wholeDataTag = true; retList.add(datumCache.get(dataCenter, dataInfoId)); syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList); } return syncData; } finally { read.unlock(); } }
同步数据结构以下:
public class SyncData implements Serializable { private String dataInfoId; private String dataCenter; private Collection<Datum> datums; private boolean wholeDataTag; }
此时图示以下:
[Sender DataServer] +--------------------------+ +------------------------+ | | +----------------------------------------------------------------------+ | | | versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor | | | | | | | +--------+-----------------+ | | +--------------+---------+ | | | | | | | | | | | | | | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+ | changeDataCheck | | checkAcceptorsChangAndExpired +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache | removeCache / notifyChange | | + +------------------------------------------------+-----+---------------+ | ^ | NotifyDataSyncRequest| 1 +-----------------+ 3 +--------------------+ 4 | | | | syncDataHandler +------> | SyncDataServiceImpl+------+ | | +-----+-----------+ +--------------------+ | | ^ 2 | | | | 5 | | | +-------------------------------------------------------------------------------------------------------------------------------------------+ | | SyncDataRequest | v | | +-------+-----------------------------------+ | |[Other DataServer] | | | | | | | | | | | | + | | | GetSyncDataHandler SyncDataCallback | <---------------------------+ | | | | | | | | +-------------------------------------------+
手机以下:
回到接受者,遍历接受到的全部Datum,逐一调用:
若是是所有datum,调用
dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);
不然调用
dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,dataSourceTypeEnum, datum)
具体以下:
public class SyncDataCallback implements InvokeCallback { private static final Executor EXECUTOR = ExecutorFactory.newFixedThreadPool(5, SyncDataCallback.class.getSimpleName()); private static final int RETRY_COUNT = 3; private Connection connection; private SyncDataRequest request; private GetSyncDataHandler getSyncDataHandler; private int retryCount; private DataChangeEventCenter dataChangeEventCenter; @Override public void onResponse(Object obj) { GenericResponse<SyncData> response = (GenericResponse) obj; if (!response.isSuccess()) { getSyncDataHandler.syncData(this); } else { SyncData syncData = response.getData(); Collection<Datum> datums = syncData.getDatums(); DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request .getDataSourceType()); if (syncData.getWholeDataTag()) { //handle all data, replace cache with these datum directly for (Datum datum : datums) { if (datum == null) { datum = new Datum(); datum.setDataInfoId(syncData.getDataInfoId()); datum.setDataCenter(syncData.getDataCenter()); } Datum.internDatum(datum); dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum); break; } } else { //handle incremental data one by one if (!CollectionUtils.isEmpty(datums)) { for (Datum datum : datums) { if (datum != null) { Datum.internDatum(datum); dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE, dataSourceTypeEnum, datum); } } } } } } }
DataChangeEventCenter调用以下:
public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) { int idx = hash(datum.getDataInfoId()); DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum); dataChangeEventQueues[idx].onChange(event); }
DataChangeEventQueue调用handleDatum处理,这部分在其余文章中已经讲述。这里只是贴出代码。
@Override public void run() { if (changeData instanceof SnapshotData) { SnapshotData snapshotData = (SnapshotData) changeData; String dataInfoId = snapshotData.getDataInfoId(); Map<String, Publisher> toBeDeletedPubMap = snapshotData.getToBeDeletedPubMap(); Map<String, Publisher> snapshotPubMap = snapshotData.getSnapshotPubMap(); Datum oldDatum = datumCache.get(dataServerConfig.getLocalDataCenter(), dataInfoId); long lastVersion = oldDatum != null ? oldDatum.getVersion() : 0l; Datum datum = datumCache.putSnapshot(dataInfoId, toBeDeletedPubMap, snapshotPubMap); long version = datum != null ? datum.getVersion() : 0l; notify(datum, changeData.getSourceType(), null); } else { Datum datum = changeData.getDatum(); String dataCenter = datum.getDataCenter(); String dataInfoId = datum.getDataInfoId(); DataSourceTypeEnum sourceType = changeData.getSourceType(); DataChangeTypeEnum changeType = changeData.getChangeType(); if (changeType == DataChangeTypeEnum.MERGE && sourceType != DataSourceTypeEnum.BACKUP && sourceType != DataSourceTypeEnum.SYNC) { //update version for pub or unPub merge to cache //if the version product before merge to cache,it may be cause small version override big one datum.updateVersion(); } long version = datum.getVersion(); try { if (sourceType == DataSourceTypeEnum.CLEAN) { if (datumCache.cleanDatum(dataCenter, dataInfoId)) { } } else if (sourceType == DataSourceTypeEnum.PUB_TEMP) { notifyTempPub(datum, sourceType, changeType); } else { MergeResult mergeResult = datumCache.putDatum(changeType, datum); Long lastVersion = mergeResult.getLastVersion(); if (lastVersion != null && lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) { return; } //lastVersion null means first add datum if (lastVersion == null || version != lastVersion) { if (mergeResult.isChangeFlag()) { notify(datum, sourceType, lastVersion); } } } } } }
DataChangeHandler 会按期提取DataChangeEventCenter中的消息,而后进行处理。
ChangeNotifier存储了Datum。由于此时版本号已经更新,因此不会再次通知,至此流程结束。
MergeResult mergeResult = datumCache.putDatum(changeType, datum); //lastVersion null means first add datum if (lastVersion == null || version != lastVersion) { if (mergeResult.isChangeFlag()) { notify(datum, sourceType, lastVersion); } }
此时逻辑以下:
[Sender DataServer] +--------------------------+ +------------------------+ | | +----------------------------------------------------------------------+ | | | versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor | | | | | | | +--------+-----------------+ | | +--------------+---------+ | | | | | | | | | | | | | | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+ | changeDataCheck | | checkAcceptorsChangAndExpired +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache | removeCache / notifyChange | | + +------------------------------------------------+-----+---------------+ | ^ | NotifyDataSyncRequest| 1 +-----------------+ 3 +--------------------+ 4 | | | | syncDataHandler +------> | SyncDataServiceImpl+------+ | | +-----+-----------+ +--------------------+ | | ^ 2 | | | | 5 | | | +-------------------------------------------------------------------------------------------------------------------------------------------+ | | SyncDataRequest | [Other DataServer] | | | | | | | | | | | +---------------------------------------+ | | | | | | v | v +------+-----------++ +-----------+-------+ 6 +-----------------------+ 7 +--------------------+ 8 +-----------------+ | GetSyncDataHandler| | SyncDataCallback +-----> | DataChangeEventCenter | +--> |DataChangeEventQueue| +--> |DataChangeHandler| +-------------------+ +-------------------+ +-----------------------+ +--------------------+ +-----------------+
手机上以下:
回顾下“一次服务注册过程”的服务数据在内部流转过程。
由于篇幅所限,上文讨论的是前两点,本文介绍第三,第四点。若是之后有时间,会介绍最后两点。
Eureka系列(六) TimedSupervisorTask类解析
Eureka的TimedSupervisorTask类(自动调节间隔的周期性任务)
java线程池ThreadPoolExecutor类使用详解
Java线程池ThreadPoolExecutor实现原理剖析
深刻理解Java线程池:ThreadPoolExecutor
深刻理解Java线程池:ThreadPoolExecutor
Java中线程池ThreadPoolExecutor原理探究
蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容
蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路
服务注册中心 Session 存储策略 | SOFARegistry 解析
海量数据下的注册中心 - SOFARegistry 架构介绍
服务注册中心数据分片和同步方案详解 | SOFARegistry 解析
蚂蚁金服开源通讯框架SOFABolt解析之超时控制机制及心跳机制
蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析