SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。html
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让你们借以学习阿里如何设计。java
本文为第十七篇,介绍SOFARegistry的延迟操做。数组
为何要有AfterWorkingProcess?缓存
AfterWorkingProcess 的做用是延迟操做。猜想大体是由于某些状况下,没法执行业务,只能在后续时机进行弥补。session
在官方博客有相似论述也支持咱们的判断 :架构
在数据未同步完成以前,全部对新节点的读数据操做,将转发到拥有该数据分片的数据节点。框架
在数据未同步完成以前,禁止对新节点的写数据操做,防止在数据同步过程当中出现新的数据不一致状况。异步
能够看到相似这种业务上延迟操做应该如何实现。async
接口定义以下:ide
public interface AfterWorkingProcess { void afterWorkingProcess(); int getOrder(); }
这个 afterWorkProcessors 会做为 AfterWorkingProcessHandler 的成员变量进行处理。用于处理一些业务逻辑结束后的处理动做。
@Bean(name = "afterWorkProcessors") public List<AfterWorkingProcess> afterWorkingProcessors() { List<AfterWorkingProcess> list = new ArrayList<>(); list.add(renewDatumHandler()); list.add(datumLeaseManager()); list.add(disconnectEventHandler()); list.add(notifyDataSyncHandler()); return list; } @Bean public AfterWorkingProcessHandler afterWorkingProcessHandler() { return new AfterWorkingProcessHandler(); }
这里用法比较少见。AfterWorkingProcessHandler 也是 AfterWorkingProcess 的实现类。
在其 afterWorkingProcess 函数中,会对 Bean afterWorkingProcessors 中间注册的实现类一一调用其 afterWorkingProcess 业务函数。
其中,getOrder 会指定执行优先级,这是一个常见套路。
public class AfterWorkingProcessHandler implements AfterWorkingProcess { @Resource(name = "afterWorkProcessors") private List<AfterWorkingProcess> afterWorkingProcessors; @Override public void afterWorkingProcess() { if(afterWorkingProcessors != null){ List<AfterWorkingProcess> list = afterWorkingProcessors.stream().sorted(Comparator.comparing(AfterWorkingProcess::getOrder)).collect(Collectors.toList()); list.forEach(AfterWorkingProcess::afterWorkingProcess); } } @Override public int getOrder() { return 0; } }
只有在 DataServerCache # updateDataServerStatus 函数中有调用:
afterWorkingProcessHandler.afterWorkingProcess();
而在 DataServerCache 中有以下函数都会调用到 updateDataServerStatus:
图示以下:
+------------------------------------------+ | DataServerCache | +----------------------------------------------+ | | | AfterWorkingProcess | | synced +----------------------+ | | | | | | +----------------------------+ | +------------------------------------------+ | | | | | AfterWorkingProcessHandler | | |renewDatumHandler.afterWorkingProcess | | | | | | | | | | | | v | | | | |datumLeaseManager.afterWorkingProcess | | | notifiedAll +--->updateDataServerStatus +------> afterWorkingProcess +------>+ | | | ^ ^ | | | | |disconnectEventHandler.afterWorkingProcess| | | | | | +----------------------------+ | | | | | | | | | |notifyDataSyncHandler.afterWorkingProcess | | | checkAndUpdateStatus+-----------+ | | | +------------------------------------------+ | | | | +----------------------------------------------+ | addNotWorkingServer +---------------+ | | | +------------------------------------------+
手机以下:
由于是业务关联,因此不须要什么定时,异步之类。
public class DisconnectEventHandler implements InitializingBean, AfterWorkingProcess { /** * a DelayQueue that contains client disconnect events */ private final DelayQueue<DisconnectEvent> EVENT_QUEUE = new DelayQueue<>(); @Autowired private SessionServerConnectionFactory sessionServerConnectionFactory; @Autowired private DataChangeEventCenter dataChangeEventCenter; @Autowired private DataServerConfig dataServerConfig; @Autowired private DataNodeStatus dataNodeStatus; private static final int BLOCK_FOR_ALL_SYNC = 5000; private static final BlockingQueue<DisconnectEvent> noWorkQueue = new LinkedBlockingQueue<>(); }
在receive的正常业务操做中,若是发现自己状态不是 WORKING,则把event放入 BlockingQueue 之中。
public void receive(DisconnectEvent event) { if (event.getType() == DisconnectTypeEnum.SESSION_SERVER) { SessionServerDisconnectEvent sessionServerDisconnectEvent = (SessionServerDisconnectEvent) event; sessionServerDisconnectEvent.getProcessId()); } else if (event.getType() == DisconnectTypeEnum.CLIENT) { ClientDisconnectEvent clientDisconnectEvent = (ClientDisconnectEvent) event; } if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) { noWorkQueue.add(event); return; } EVENT_QUEUE.add(event); }
当时机来到时候,系统再次调用afterWorkingProcess。这里会始终Block在noWorkQueue上,若是不为空,则会执行请求。
public void afterWorkingProcess() { try { /* * After the snapshot data is synchronized during startup, it is queued and then placed asynchronously into * DatumCache. When the notification becomes WORKING, there may be data in the queue that is not executed * to DatumCache. So it need to sleep for a while. */ TimeUnit.MILLISECONDS.sleep(BLOCK_FOR_ALL_SYNC); while (!noWorkQueue.isEmpty()) { DisconnectEvent event = noWorkQueue.poll(1, TimeUnit.SECONDS); if (event != null) { receive(event); } } } }
图示以下:
+----------------------------------------------------------+ | DisconnectEventHandler | | +-------------------------+ | | | receive | | | | | NOT WORKING | | | dataNodeStatus.getStatus+---------------+ | | | + | | | | | | WORKING | | add | | | | | | | | | v | | | | | EVENT_QUEUE.add(event) | | | | | | +---v---------+ | | +-------------------------+ | | | | | noWorkQueue | | | | | | | +-----------------------+ +-----+-------+ | | | afterWorkingProcess | | | | | | | poll | | | | NOT isEmpty | | | | receive(event) <----------------------+ | | | | | | | | | | +-----------------------+ | +----------------------------------------------------------+
DisconnectEventHandler 和 NotifyDataSyncHandler 的实现相似。
依托一个 LinkedBlockingQueue 作缓存queue。
public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements AfterWorkingProcess { private static final BlockingQueue<SyncDataRequestForWorking> noWorkQueue = new LinkedBlockingQueue<>(); }
在doHandle的正常业务操做中,若是发现自己状态不是 WORKING,则用业务逻辑SyncDataRequestForWorking 构建一个消息 SyncDataRequestForWorking,放入 LinkedBlockingQueue 之中。
@Override public Object doHandle(Channel channel, NotifyDataSyncRequest request) { final Connection connection = ((BoltChannel) channel).getConnection(); if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) { noWorkQueue.add(new SyncDataRequestForWorking(connection, request)); return CommonResponse.buildSuccessResponse(); } executorRequest(connection, request); return CommonResponse.buildSuccessResponse(); }
当时机来到时候,系统再次调用afterWorkingProcess。这里会始终Block在noWorkQueue上,若是不为空,则会执行请求。
@Override public void afterWorkingProcess() { while (!noWorkQueue.isEmpty()) { SyncDataRequestForWorking event = noWorkQueue.poll(1, TimeUnit.SECONDS); if (event != null) { executorRequest(event.getConnection(), event.getRequest()); } } } }
图示以下:
+----------------------------------------------------------+ | NotifyDataSyncHandler | | +-------------------------+ | | | doHandle | | | | | NOT WORKING | | | dataNodeStatus.getStatus+---------------+ | | | + | | | | | | WORKING | | add | | | | | | | | | v | | | | | executorRequest | | | | | | +---v---------+ | | +-------------------------+ | | | | | noWorkQueue | | | | | | | +-----------------------+ +-----+-------+ | | | afterWorkingProcess | | | | | | | poll | | | | NOT isEmpty | | | | executorRequest <----------------------+ | | | | | | | | | | +-----------------------+ | +----------------------------------------------------------+
RenewDatumHandler 同 DatumLeaseManager 这二者很相似。并无使用queue,只是提交一个线程。
其实现目的在注释中写的很清楚:
/* * After the snapshot data is synchronized during startup, it is queued and then placed asynchronously into * DatumCache. When the notification becomes WORKING, there may be data in the queue that is not executed * to DatumCache. So it need to sleep for a while. */
可是细节又有所不一样,这两个类是同一个做者,怀疑此君在实验比较两种不一样实现方式。
RenewDatumHandler 基于 ThreadPoolExecutorDataServer 来实现。
public class RenewDatumHandler extends AbstractServerHandler<RenewDatumRequest> implements AfterWorkingProcess { @Autowired private ThreadPoolExecutor renewDatumProcessorExecutor; }
renewDatumProcessorExecutor 是一个Bean,具体代码以下,ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,按FIFO原则进行排序。
@Bean(name = "renewDatumProcessorExecutor") public ThreadPoolExecutor renewDatumProcessorExecutor(DataServerConfig dataServerConfig) { return new ThreadPoolExecutorDataServer("RenewDatumProcessorExecutor", dataServerConfig.getRenewDatumExecutorMinPoolSize(), dataServerConfig.getRenewDatumExecutorMaxPoolSize(), 300, TimeUnit.SECONDS, new ArrayBlockingQueue<>(dataServerConfig.getRenewDatumExecutorQueueSize()), new NamedThreadFactory("DataServer-RenewDatumProcessor-executor", true)); }
ThreadPoolExecutorDataServer 主要代码以下,就是简单继承了ThreadPoolExecutor,估计这里后续会有新功能添加,如今只是占坑:
public class ThreadPoolExecutorDataServer extends ThreadPoolExecutor { @Override public void execute(Runnable command) { super.execute(command); } }
对于afterWorkingProcess,就是提交了一个线程,其业务是:等待一段时间,而后设置renewEnabled。
@Override public void afterWorkingProcess() { renewDatumProcessorExecutor.submit(() -> { TimeUnit.MILLISECONDS.sleep(dataServerConfig.getRenewEnableDelaySec()); renewEnabled.set(true); }); }
蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容
蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路
服务注册中心 Session 存储策略 | SOFARegistry 解析
海量数据下的注册中心 - SOFARegistry 架构介绍
服务注册中心数据分片和同步方案详解 | SOFARegistry 解析
蚂蚁金服开源通讯框架SOFABolt解析之超时控制机制及心跳机制
蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析