以前咱们经过三篇文章初步分析了 MetaServer 的基本架构,MetaServer 这三篇文章为咱们接下来的工做作了坚实的铺垫。java
本系列咱们接着分析 Data Server,顺带会涉及一些 Session Server。由于 DataServer 和 MetaServer 代码实现和架构的基本套路相似,因此咱们主要关心差别点和DataServer的特色。node
本文会分析DataServer程序的基本架构。算法
前面文章专一于系统业务自己,本系列文章会换一种思路,重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让你们借以学习阿里如何设计。编程
具体学习方法是:bootstrap
学习时注意点是:缓存
由于会从多个维度来分析设计,好比业务维度和架构维度,所以在本系列中,可能有的文章会集中在模式的总结提取,有的文章会集中在业务实现,有的文章会集中在具体知识点的运用,也会出现 某一个业务模块或者代码段由于业务和实现 在不一样文章中被说起的现象,但愿你们事先有所了解。服务器
首先,咱们要回忆下SOFARegistry 整体架构网络
应用服务器集群。Client 层是应用层,每一个应用系统经过依赖注册中心相关的客户端 jar 包,经过编程方式来使用服务注册中心的服务发布和服务订阅能力。session
Session 服务器集群。顾名思义,Session 层是会话层,经过长链接和 Client 层的应用服务器保持通信,负责接收 Client 的服务发布和服务订阅请求。该层只在内存中保存各个服务的发布订阅关系,对于具体的服务信息,只在 Client 层和 Data 层之间透传转发。Session 层是无状态的,能够随着 Client 层应用规模的增加而扩容。架构
数据服务器集群。Data 层经过分片存储的方式保存着所用应用的服务注册数据。数据按照 dataInfoId(每一份服务数据的惟一标识)进行一致性 Hash 分片,多副本备份,保证数据的高可用。下文的重点也在于随着数据规模的增加,Data 层如何在不影响业务的前提下实现平滑的扩缩容。
元数据服务器集群。这个集群管辖的范围是 Session 服务器集群和 Data 服务器集群的服务器信息,其角色就至关于 SOFARegistry 架构内部的服务注册中心,只不过 SOFARegistry 做为服务注册中心是服务于广大应用服务层,而 Meta 集群是服务于 SOFARegistry 内部的 Session 集群和 Data 集群,Meta 层可以感知到 Session 节点和 Data 节点的变化,并通知集群的其它节点。
对于一个程序来讲,什么样才算是优秀的架构,其实没有一个放之四海而皆准的标准,关于这方面的书或者文章也有不少,因此咱们就从最简单直接的角度,即从结果来想:即静态和动态两方面。
好比,假设你程序是基于SpringBoot,那么Bean的构建和分类就很是重要,若是Bean处理得很好,对你整理动态架构是很是有帮助。
下面就开始分析DataServer程序的基本架构。
目录结构以下,咱们能够看出来SOFAReistry大体思路,固然由于业务和架构耦合,因此个人分类不必定彻底恰当,也有其余分类的方式,具体取决于你本身的思考方式。
程序基础业务功能:
业务功能:
具体目录以下:
. ├── DataApplication.java ├── bootstrap ├── cache ├── change ├── datasync │ └── sync ├── event │ └── handler ├── executor ├── node ├── remoting │ ├── dataserver │ │ ├── handler │ │ └── task │ ├── handler │ ├── metaserver │ │ ├── handler │ │ ├── provideData │ │ │ └── processor │ │ └── task │ └── sessionserver │ ├── disconnect │ ├── forward │ └── handler ├── renew ├── resource └── util复制代码
依然是相似MetaServer的路数,使用SpringBoot框架来进行整体搭建。
@EnableDataServer@SpringBootApplicationpublic class DataApplication {public static void main(String[] args) { SpringApplication.run(DataApplication.class, args); } }复制代码
EnableDataServer这个注解将引入基本配置 DataServerBeanConfiguration。
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(DataServerBeanConfiguration.class)public @interface EnableDataServer { }复制代码
DataServer是SpringBoot程序。因此大量使用Bean。
DataServerBeanConfiguration 的做用是构建各类相关配置,从其中能够看出来DataServer相关模块和功能。
系统初始化时的 bean 都在 DataServerBeanConfiguration 里面经过 JavaConfig 来注册,主要以以下几个配置类体现(配置类会有变动,具体内容能够参照源码实现):
部分Bean的功能以下:
缩减版代码以下 :
@Configuration@Import(DataServerInitializer.class)@EnableConfigurationPropertiespublic class DataServerBeanConfiguration {@Bean@ConditionalOnMissingBeanpublic DataServerBootstrap dataServerBootstrap() {}@Configurationprotected static class DataServerBootstrapConfigConfiguration {}@Configurationpublic static class DataServerStorageConfiguration {}@Configurationpublic static class LogTaskConfigConfiguration {}@Configurationpublic static class SessionRemotingConfiguration {}@Configurationpublic static class DataServerNotifyBeanConfiguration {}@Configurationpublic static class DataServerSyncBeanConfiguration {}@Configurationpublic static class DataServerEventBeanConfiguration {}@Configurationpublic static class DataServerRemotingBeanConfiguration {}@Configurationpublic static class ResourceConfiguration {}@Configurationpublic static class ExecutorConfiguration {}@Configurationpublic static class DataProvideDataConfiguration {} }复制代码
DataServer 模块启动入口类为 DataServerInitializer,该类不禁 JavaConfig 管理配置,而是继承了 SmartLifecycle 接口,在启动时由 Spring 框架调用其 start 方法。其简略版代码以下:
public class DataServerInitializer implements SmartLifecycle {@Autowiredprivate DataServerBootstrap dataServerBootstrap;@Overridepublic void start() { dataServerBootstrap.start();this.isRunning = true; } }复制代码
该方法中调用了 DataServerBootstrap#start 方法,用于启动一系列的初始化服务。
public void start() {try { openDataServer(); openDataSyncServer(); openHttpServer(); startRaftClient(); fetchProviderData(); startScheduler(); Runtime.getRuntime().addShutdownHook(new Thread(this::doStop)); } }复制代码
DataServerBootstrap负责程序的启动,具体以下:
@EnableConfigurationPropertiespublic class DataServerBootstrap {// 节点间的 bolt 通讯组件以及其配置@Autowiredprivate DataServerConfig dataServerConfig; @Resource(name = "serverHandlers")private Collection<AbstractServerHandler> serverHandlers;@Resource(name = "serverSyncHandlers")private Collection<AbstractServerHandler> serverSyncHandlers; @Autowiredprivate Exchange boltExchange;private Server server;private Server dataSyncServer; // 用于控制的Http 通讯组件以及其配置@Autowiredprivate ApplicationContext applicationContext; @Autowiredprivate ResourceConfig jerseyResourceConfig;@Autowiredprivate Exchange jerseyExchange; private Server httpServer; // JVM 内部的事件通讯组件以及其配置@Autowiredprivate EventCenter eventCenter; // MetaServer Raft相关组件@Autowiredprivate IMetaServerService metaServerService; @Autowiredprivate DatumLeaseManager datumLeaseManager; // 定时器组件以及其配置@Autowiredprivate Scheduler syncDataScheduler;@Autowiredprivate CacheDigestTask cacheDigestTask;/** * start dataserver */public void start() { openDataServer(); // 节点间的 bolt 通讯组件以及其配置openDataSyncServer(); openHttpServer(); // 用于控制的Http 通讯组件以及其配置startRaftClient(); // MetaServer Raft相关组件fetchProviderData(); startScheduler(); // 定时器组件以及其配置Runtime.getRuntime().addShutdownHook(new Thread(this::doStop)); }// 节点间的 bolt 通讯组件以及其配置private void openDataServer() {if (serverForSessionStarted.compareAndSet(false, true)) { server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig.getPort()), serverHandlers .toArray(new ChannelHandler[serverHandlers.size()])); } }private void openDataSyncServer() {if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } }// 用于控制的Http 通讯组件以及其配置private void openHttpServer() {if (httpServerStarted.compareAndSet(false, true)) { bindResourceConfig(); httpServer = jerseyExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig }); } }// MetaServer Raft相关组件private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }private void fetchProviderData() { ProvideData provideData = metaServerService .fetchData(ValueConstants.ENABLE_DATA_DATUM_EXPIRE);boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData() .getObject()); datumLeaseManager.setRenewEnable(enableDataDatumExpire); }// 定时器组件以及其配置private void startScheduler() {if (schedulerStarted.compareAndSet(false, true)) { syncDataScheduler.startScheduler();// start all startTask except correction taskeventCenter.post(new StartTaskEvent( Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW) .collect(Collectors.toSet())));//start dump logcacheDigestTask.start(); } } }复制代码
DataServer 的核心启动类是 DataServerBootstrap,对于其内部模块分类,官方博客主要说起其主要组件 :
该类主要包含了三类组件:节点间的 bolt 通讯组件、JVM 内部的事件通讯组件、定时器组件。
我这里划分的更加细致,把组件划分为以下:
dataServer 负责数据相关服务,好比数据服务,获取数据的推送,服务上下线通知等;
DataServer是基于Bolt进行通信。
private void openDataServer() {try {if (serverForSessionStarted.compareAndSet(false, true)) { server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig.getPort()), serverHandlers .toArray(new ChannelHandler[serverHandlers.size()])); } } }复制代码
其响应函数为serverHandlers
@Bean(name = "serverHandlers")public Collection<AbstractServerHandler> serverHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(getDataHandler()); list.add(clientOffHandler()); list.add(getDataVersionsHandler()); list.add(publishDataProcessor()); list.add(sessionServerRegisterHandler()); list.add(unPublishDataHandler()); list.add(dataServerConnectionHandler()); list.add(renewDatumHandler()); list.add(datumSnapshotHandler());return list; }复制代码
其具体功能以下 :
dataSyncServer 主要是处理一些数据同步相关的服务;也是基于Bolt进行通信。
private void openDataSyncServer() {try {if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } }复制代码
其响应函数为serverSyncHandlers。
@Bean(name = "serverSyncHandlers")public Collection<AbstractServerHandler> serverSyncHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(getDataHandler()); list.add(publishDataProcessor()); list.add(unPublishDataHandler()); list.add(notifyFetchDatumHandler()); list.add(notifyOnlineHandler()); list.add(syncDataHandler()); list.add(dataSyncServerConnectionHandler());return list; }复制代码
其具体功能以下 :
HttpServer 是 Http 通讯组件,提供一系列 REST 接口,用于 dashboard 管理、数据查询等。
其基于Jersey进行通信。
private void openHttpServer() {try {if (httpServerStarted.compareAndSet(false, true)) { bindResourceConfig(); httpServer = jerseyExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig }); } } }复制代码
各 Handler 具体做用如图 3 所示:
图 各 Handler 做用
Raft相关的是:
private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }复制代码
这个模块辅助各类按期任务,具体做用是:
private void startScheduler() {try {if (schedulerStarted.compareAndSet(false, true)) { syncDataScheduler.startScheduler();// start all startTask except correction taskeventCenter.post(new StartTaskEvent( Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW) .collect(Collectors.toSet())));//start dump logcacheDigestTask.start(); } } }复制代码
启动了versionCheckExecutor和scheduler,具体会调用LocalAcceptorStore中的函数进行按期检测。
public class Scheduler {public final ExecutorService versionCheckExecutor;private final ScheduledExecutorService scheduler;private final ThreadPoolExecutor expireCheckExecutor;@Autowiredprivate AcceptorStore localAcceptorStore;/** * constructor */public Scheduler() { scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler")); expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck")); versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(), new NamedThreadFactory("SyncDataScheduler-versionChangeCheck")); }/** * start scheduler */public void startScheduler() { scheduler.schedule(new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3, TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()),30, TimeUnit.SECONDS); versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck()); }/** * stop scheduler */public void stopScheduler() {if (scheduler != null && !scheduler.isShutdown()) { scheduler.shutdown(); }if (versionCheckExecutor != null && !versionCheckExecutor.isShutdown()) { versionCheckExecutor.shutdown(); } } }复制代码
StartTaskEventHandler内部有一个ScheduledExecutorService 和 tasks,一旦StartTaskEventHandler收到一个StartTaskEvent,就会按期调用tasks中的task执行;
@Bean(name = "tasks")public List<AbstractTask> tasks() { List<AbstractTask> list = new ArrayList<>(); list.add(connectionRefreshTask()); list.add(connectionRefreshMetaTask()); list.add(renewNodeTask());return list; }复制代码
具体代码以下:
public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> {@Resource(name = "tasks")private List<AbstractTask> tasks;private ScheduledExecutorService executor = null;@Overridepublic List<Class<? extends StartTaskEvent>> interest() {return Lists.newArrayList(StartTaskEvent.class); }@Overridepublic void doHandle(StartTaskEvent event) {if (executor == null || executor.isShutdown()) { getExecutor(); }for (AbstractTask task : tasks) {if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) { executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(), task.getTimeUnit()); } } }private void getExecutor() { executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass() .getSimpleName()); } }复制代码
这里专门就StartTaskEventHandler作简要说明,其就是针对 tasks Bean 里面声明的task,进行启动。
可是具体启动哪些task,则须要依据event里面的设置决定,下面代码中的循环就是看看tasks和event中如何匹配。
for (AbstractTask task : tasks) {if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) { executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),task.getTimeUnit()); } }复制代码
具体代码以下:
public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> {@Resource(name = "tasks")private List<AbstractTask> tasks;private ScheduledExecutorService executor = null;@Overridepublic List<Class<? extends StartTaskEvent>> interest() {return Lists.newArrayList(StartTaskEvent.class); }@Overridepublic void doHandle(StartTaskEvent event) {if (executor == null || executor.isShutdown()) { getExecutor(); }for (AbstractTask task : tasks) {if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) { executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(), task.getTimeUnit()); } } }private void getExecutor() { executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass() .getSimpleName()); } }复制代码
对应的beans,一共三个task。
@Bean(name = "tasks")public List<AbstractTask> tasks() { List<AbstractTask> list = new ArrayList<>(); list.add(connectionRefreshTask()); list.add(connectionRefreshMetaTask()); list.add(renewNodeTask());return list; }复制代码
对应了StartTaskTypeEnum中的枚举,其中VersionCompareTask没实现。
public enum StartTaskTypeEnum {/** * ConnectionRefreshMetaTask */CONNECT_META,/** * ConnectionRefreshDataTask */CONNECT_DATA,/** * RenewNodeTask */RENEW,/** * VersionCompareTask */VERSION_COMPARE }复制代码
咱们用 StartTaskEvent 举例,这里使用Set来指定本消息适用什么task处理。
public class StartTaskEvent implements Event {private final Set<StartTaskTypeEnum> suitableTypes;public StartTaskEvent(Set<StartTaskTypeEnum> suitableTypes) {this.suitableTypes = suitableTypes; }public Set<StartTaskTypeEnum> getSuitableTypes() {return suitableTypes; } }复制代码
在 MetaServerChangeEventHandler 之中,则启动了renew task。
if (obj instanceof NodeChangeResult) { NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj; Map<String, Long> versionMap = result.getDataCenterListVersions();//send renew after first register dataNodeSet<StartTaskTypeEnum> set = new HashSet<>(); set.add(StartTaskTypeEnum.RENEW); eventCenter.post(new StartTaskEvent(set)); eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap, DataServerChangeEvent.FromType.REGISTER_META));break; }复制代码
在启动时候,post了event,可是指定了启动非RENEW task。
private void startScheduler() {try {if (schedulerStarted.compareAndSet(false, true)) { syncDataScheduler.startScheduler();// start all startTask except correction taskeventCenter.post(new StartTaskEvent( Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW) .collect(Collectors.toSet())));//start dump logcacheDigestTask.start(); } } catch (Exception e) { schedulerStarted.set(false);throw new RuntimeException("Data Scheduler start error!", e); } }复制代码
最后动态架构以下,咱们也大体知道,DataServer就是一个SpringBoot程序,有几个Server,有若干Bean,有若干定时服务,具体有一些其余业务模块等等,这对咱们接下来的理解有帮助。
+---------------------------------------------------------------------------+ | [DataServerBootstrap] | | | | | | +------------------------------------------+ +------------------------+ | | | [Bolt related] | | [http relatged] | | | | | | | | | | DataServerConfig | | httpServer | | | | | | | | | | boltExchange | | jerseyExchange | | | | | | | | | | server +-----------> serverHandlers | | applicationContext | | | | | | | | | | dataSyncServer+----> serverSyncHandlers | | jerseyResourceConfig | | | | | | | | | +------------------------------------------+ +------------------------+ | | +---------------------+ +----------------+ +------------------------+ | | |[meta related] | |[JVM related] | |[Timer related] | | | | | | | | | | | | metaServerService | | | | syncDataScheduler | | | | | | EventCenter | | | | | | datumLeaseManager | | | | CacheDigestTask | | | +---------------------+ +----------------+ | | | | +------------------------+ | +---------------------------------------------------------------------------+复制代码
由于从问题出发更有帮助,因此咱们总结出一些问题列表,这些咱们指望在之后的分析中陆续解决。