[从源码学设计]蚂蚁金服SOFARegistry之服务上线

[从源码学设计]蚂蚁金服SOFARegistry之服务上线

0x00 摘要

SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。html

本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让你们借以学习阿里如何设计。java

本文为第十三篇,介绍从SessionServer角度看的服务上线。node

本文以介绍业务为主,顺便整理逻辑,设计和模式。由于注册过程牵扯模块太多,因此本文仅仅专一在注册过程当中Session Server的部分。算法

0x01 业务领域

1.1 应用场景

服务的上下线过程是指服务经过代码调用执行常规注册(Publisher#register) 和下线(Publisher#unregister)操做,不考虑由于服务宕机等意外状况致使的下线场景。设计模式

1.1.1 服务发布

一个典型的 “RPC 调用的服务寻址” 应用场景,服务的提供方经过以下两个步骤完成服务发布:数组

  1. 注册,将本身以 Publisher 的角色注册到 SOFARegistry;
  2. 发布,将须要发布的数据 (一般是IP 地址、端口、调用方式等) 发布到 SOFARegistry;

与此相对应的,服务的调用方经过以下步骤实现服务调用:session

  1. 注册,将本身以 Subscriber 的角色注册到 SOFARegistry;
  2. 订阅,收到 SOFARegistry 推送的服务数据;

1.1.2 SessionServer的必要性

在SOFARegistry中,全部 Client 在注册和订阅数据时,根据 dataInfoId 作一致性 Hash,计算出应该访问哪一台 DataServer,而后与该 DataServer 创建长链接。数据结构

因为每一个 Client 一般都会注册和订阅比较多的 dataInfoId 数据,所以咱们能够预见每一个 Client 均会与好几台 DataServer 创建链接。这个架构存在的问题是:“每台 DataServer 承载的链接数会随 Client 数量的增加而增加,每台 Client 极端的状况下须要与每台 DataServer 都建连,所以经过 DataServer 的扩容并不能线性的分摊 Client 链接数”。架构

因此,为数据分片层(DataServer)专门设计一个链接代理层是很是重要的,因此 SOFARegistry 就有了 SessionServer 这一层。随着 Client 数量的增加,能够经过扩容 SessionServer 就解决了单机的链接数瓶颈问题。并发

1.2 问题点

由于SessionServer是一个中间层,因此看起来好像比较简单,表面上看,就是接受,转发。

可是实际上,在大型系统中,应该如何在逻辑上,物理上实现模块分割,解耦都是很是有必要的。

1.3 阿里方案

咱们主要看看阿里方案的注册部分。

1.3.1 注册过程

一次服务的上线(注册)过程

服务的上下线过程,是指服务经过代码调用作正常的注册(publisher.register) 和 下线(publisher.unregister),不考虑由于服务宕机等意外状况致使的下线。如上图,大概呈现了“一次服务注册过程”的服务数据在内部流转过程。

  1. Client 调用 publisher.register 向 SessionServer 注册服务。
  2. SessionServer 收到服务数据 (PublisherRegister) 后,将其写入内存 (SessionServer 会存储 Client 的数据到内存,用于后续能够跟 DataServer 作按期检查),再根据 dataInfoId 的一致性 Hash 寻找对应的 DataServer,将 PublisherRegister 发给 DataServer。
  3. DataServer 接收到 PublisherRegister 数据,首先也是将数据写入内存 ,DataServer 会以 dataInfoId 的维度汇总全部 PublisherRegister。同时,DataServer 将该 dataInfoId 的变动事件通知给全部 SessionServer,变动事件的内容是 dataInfoId 和版本号信息 version。
  4. 同时,异步地,DataServer 以 dataInfoId 维度增量地同步数据给其余副本。由于 DataServer 在一致性 Hash 分片的基础上,对每一个分片保存了多个副本(默认是3个副本)。
  5. SessionServer 接收到变动事件通知后,对比 SessionServer 内存中存储的 dataInfoId 的 version,若发现比 DataServer 发过来的小,则主动向 DataServer 获取 dataInfoId 的完整数据,即包含了全部该 dataInfoId 具体的 PublisherRegister 列表。
  6. 最后,SessionServer 将数据推送给相应的 Client,Client 就接收到这一次服务注册以后的最新的服务列表数据。

1.3.2 图示

下图展现了 Publisher 注册的代码流转过程

这个过程也是采用了 Handler - Task & Strategy - Listener 的方式来处理,任务在代码内部的处理流程和订阅过程基本一致。

图 - 代码流转:Publisher 注册

0x02 Client SDK

PublisherRegistration 是Client的接口,发布数据的关键代码以下:

// 构造发布者注册表
PublisherRegistration registration = new PublisherRegistration("com.alipay.test.demo.service:1.0@DEFAULT");
registration.setGroup("TEST_GROUP");
registration.setAppName("TEST_APP");

// 将注册表注册进客户端并发布数据
Publisher publisher = registryClient.register(registration, "10.10.1.1:12200?xx=yy");

// 如需覆盖上次发布的数据可使用发布者模型从新发布数据
publisher.republish("10.10.1.1:12200?xx=zz");

发布数据的关键是构造 PublisherRegistration,该类包含三个属性:

属性名 属性类型 描述
dataId String 数据ID,发布订阅时须要使用相同值,数据惟一标识由 dataId + group + instanceId 组成。
group String 数据分组,发布订阅时须要使用相同值,数据惟一标识由 dataId + group + instanceId 组成,默认值 DEFAULT_GROUP。
appName String 应用 appName。

0x03 Session server

流程来到了Session server。

3.1 Bean

首先,能够经过Beans来入手。

@Bean(name = "serverHandlers")
public Collection<AbstractServerHandler> serverHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(publisherHandler());
    list.add(subscriberHandler());
    list.add(watcherHandler());
    list.add(clientNodeConnectionHandler());
    list.add(cancelAddressRequestHandler());
    list.add(syncConfigHandler());
    return list;
}

serverHandlers 是Bolt Server 的响应函数组合。

@Bean
@ConditionalOnMissingBean(name = "sessionRegistry")
public Registry sessionRegistry() {
    return new SessionRegistry();
}

从Bean角度看,目前的逻辑是如图所示,这里有了一次解耦Strategy:

Beans


+-----------------------------------+
| Bolt Server(in openSessionServer) |        +---------------------------------+
|                                   |    +-> | DefaultPublisherHandlerStrategy |
|    +----------------------+       |    |   +---------+-----------------------+
|    |    serverHandlers    |       |    |             |
|    |                      |       |    |             |
|    | +------------------+ |       |    |             |
|    | | PublisherHandle+----------------+             v
|    | |                  | |       |          +-------+-------+
|    | | watcherHandler   | |       |          |SessionRegistry|
|    | |                  | |       |          +---------------+
|    | |     ......       | |       |
|    | +------------------+ |       |
|    +----------------------+       |
+-----------------------------------+

服务发布者和Session Server通常都应该处于一个Data Center之中,这就是阿里等实践的单体概念.

3.2 入口

PublisherHandler 是 Session Server对Client的接口,是Bolt Server 的响应函数。

public class PublisherHandler extends AbstractServerHandler {
    @Autowired
    private ExecutorManager          executorManager;

    @Autowired
    private PublisherHandlerStrategy publisherHandlerStrategy;

    @Override
    public Object reply(Channel channel, Object message) throws RemotingException {

        RegisterResponse result = new RegisterResponse();
        PublisherRegister publisherRegister = (PublisherRegister) message;
        publisherHandlerStrategy.handlePublisherRegister(channel, publisherRegister, result);
        return result;
    }

逻辑以下图所示:

Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |
                              |    |                      |       |
+--------+  PublisherRegister |    | +------------------+ |       |
| Client +---------------------------> PublisherHandler | |       |
+--------+          1         |    | |                  | |       |
               +              |    | |     ......       | |       |
               |              |    | +------------------+ |       |
               |              |    +----------------------+       |
               |              +-----------------------------------+
               |

3.3 策略

总体上,这里是采用 Handler - Task & Strategy - Listener 的方式来处理。

什么是策略模式(Strategy Pattern)

在软件开发过程当中经常遇到这样的状况,实现某一个功能有不少种算法或实现策略,咱们能够根据环境或者条件的不一样选择不一样的算法或者策略来完成该功能。若是将这些算法或者策略抽象出来,提供一个统一的接口,不一样的算法或者策略有不一样的实现类,这样在程序客户端就能够经过注入不一样的实现对象来实现算法或者策略的动态替换,这种模式的可扩展性和可维护性也更高,这就是策略模式。

策略模式的定义(Strategy Pattern)

  • 策略模式: 定义了算法族,分别封装起来,让它们之间能够相互替换,此模式让算法的变化独立与使用算法的客户。

  • 简单理解: 定义了一系列算法。每一个算法封装起来。各个算法之间能够互相替换。且算法的变化不会影响到使用算法的客户。属于行为型模式。

在策略模式(Strategy Pattern)中,一个类的行为或其算法能够在运行时更改。这种类型的设计模式属于行为型模式

在策略模式中,咱们建立表示各类策略的对象和一个行为随着策略对象改变而改变的 context 对象。策略对象改变 context 对象的执行算法。

3.3.1 目录结构

从目录结构看,有不少Strategy的定义和实现,应该蚂蚁内部但愿根据不一样状况制定不一样的策略,其中有些是目前留出的接口

com/alipay/sofa/registry/server/session/strategy

.
├── DataChangeRequestHandlerStrategy.java
├── PublisherHandlerStrategy.java
├── ReceivedConfigDataPushTaskStrategy.java
├── ReceivedDataMultiPushTaskStrategy.java
├── SessionRegistryStrategy.java
├── SubscriberHandlerStrategy.java
├── SubscriberMultiFetchTaskStrategy.java
├── SubscriberRegisterFetchTaskStrategy.java
├── SyncConfigHandlerStrategy.java
├── TaskMergeProcessorStrategy.java
├── WatcherHandlerStrategy.java
└── impl
    ├── DefaultDataChangeRequestHandlerStrategy.java
    ├── DefaultPublisherHandlerStrategy.java
    ├── DefaultPushTaskMergeProcessor.java
    ├── DefaultReceivedConfigDataPushTaskStrategy.java
    ├── DefaultReceivedDataMultiPushTaskStrategy.java
    ├── DefaultSessionRegistryStrategy.java
    ├── DefaultSubscriberHandlerStrategy.java
    ├── DefaultSubscriberMultiFetchTaskStrategy.java
    ├── DefaultSubscriberRegisterFetchTaskStrategy.java
    ├── DefaultSyncConfigHandlerStrategy.java
    └── DefaultWatcherHandlerStrategy.java

3.3.2 DefaultPublisherHandlerStrategy

从目前代码看,只是设置,分类,转发。即设置Publisher的缺省信息,而且根据 event type 不一样执行register或者unRegister。

public class DefaultPublisherHandlerStrategy implements PublisherHandlerStrategy {
    @Autowired
    private Registry            sessionRegistry;

    @Override
    public void handlePublisherRegister(Channel channel, PublisherRegister publisherRegister, RegisterResponse registerResponse) {
        try {
            String ip = channel.getRemoteAddress().getAddress().getHostAddress();
            int port = channel.getRemoteAddress().getPort();
            publisherRegister.setIp(ip);
            publisherRegister.setPort(port);

            if (StringUtils.isBlank(publisherRegister.getZone())) {
                publisherRegister.setZone(ValueConstants.DEFAULT_ZONE);
            }

            if (StringUtils.isBlank(publisherRegister.getInstanceId())) {
                publisherRegister.setInstanceId(DEFAULT_INSTANCE_ID);
            }

            Publisher publisher = PublisherConverter.convert(publisherRegister);
            publisher.setProcessId(ip + ":" + port);
            publisher.setSourceAddress(new URL(channel.getRemoteAddress()));
            if (EventTypeConstants.REGISTER.equals(publisherRegister.getEventType())) {
                sessionRegistry.register(publisher);
            } else if (EventTypeConstants.UNREGISTER.equals(publisherRegister.getEventType())) {
                sessionRegistry.unRegister(publisher);
            }
            registerResponse.setSuccess(true);
            registerResponse.setVersion(publisher.getVersion());
            registerResponse.setRegistId(publisherRegister.getRegistId());
            registerResponse.setMessage("Publisher register success!");
        } 
    }
}

逻辑以下图所示

Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |                          |                               |
               +              |    | |  watcherHandler  | |       |                          +-------------------------------+
               |              |    | |                  | |       |
               |              |    | |     ......       | |       |
               |              |    | +------------------+ |       |
               |              |    +----------------------+       |
                              +-----------------------------------+

手机如图

3.4 核心逻辑组件

前面代码中,策略会调用到 sessionRegistry.register(publisher),即注册功能。

从SessionRegistry的内部成员变量就可以看出来,这是 Session Server 核心逻辑所在。

主要提供了以下功能:

  • register(StoreData data) :注册新publisher或者subscriber data

  • cancel(List connectIds) :取消publisher或者subscriber data

  • remove(List connectIds) :移除publisher或者subscriber data

  • unRegister(StoreData data) :注销publisher或者subscriber data

  • .....

具体成员变量以下:

public class SessionRegistry implements Registry {

    /**
     * store subscribers
     */
    @Autowired
    private Interests                 sessionInterests;

    /**
     * store watchers
     */
    @Autowired
    private Watchers                  sessionWatchers;

    /**
     * store publishers
     */
    @Autowired
    private DataStore                 sessionDataStore;

    /**
     * transfer data to DataNode
     */
    @Autowired
    private DataNodeService           dataNodeService;

    /**
     * trigger task com.alipay.sofa.registry.server.meta.listener process
     */
    @Autowired
    private TaskListenerManager       taskListenerManager;

    /**
     * calculate data node url
     */
    @Autowired
    private NodeManager               dataNodeManager;

    @Autowired
    private SessionServerConfig       sessionServerConfig;

    @Autowired
    private Exchange                  boltExchange;

    @Autowired
    private SessionRegistryStrategy   sessionRegistryStrategy;

    @Autowired
    private WrapperInterceptorManager wrapperInterceptorManager;

    @Autowired
    private DataIdMatchStrategy       dataIdMatchStrategy;

    @Autowired
    private RenewService              renewService;

    @Autowired
    private WriteDataAcceptor         writeDataAcceptor;

    private volatile boolean          enableDataRenewSnapshot = true;
}

register函数生成一个WriteDataRequest,而后调用了 writeDataAcceptor.accept 完成处理。

@Override
public void register(StoreData storeData) {

    WrapperInvocation<StoreData, Boolean> wrapperInvocation = new WrapperInvocation(
            new Wrapper<StoreData, Boolean>() {
                @Override
                public Boolean call() {

                    switch (storeData.getDataType()) {
                        case PUBLISHER:
                            Publisher publisher = (Publisher) storeData;

                            sessionDataStore.add(publisher);

                            // All write operations to DataServer (pub/unPub/clientoff/renew/snapshot)
                            // are handed over to WriteDataAcceptor
                            writeDataAcceptor.accept(new WriteDataRequest() {
                                @Override
                                public Object getRequestBody() {
                                    return publisher;
                                }

                                @Override
                                public WriteDataRequestType getRequestType() {
                                    return WriteDataRequestType.PUBLISHER;
                                }

                                @Override
                                public String getConnectId() {
                                    return publisher.getSourceAddress().getAddressString();
                                }

                                @Override
                                public String getDataServerIP() {
                                    Node dataNode = dataNodeManager.getNode(publisher.getDataInfoId());
                                    return dataNode.getNodeUrl().getIpAddress();
                                }
                            });

                            sessionRegistryStrategy.afterPublisherRegister(publisher);
                            break;
                        case SUBSCRIBER:
                            Subscriber subscriber = (Subscriber) storeData;

                            sessionInterests.add(subscriber);

                            sessionRegistryStrategy.afterSubscriberRegister(subscriber);
                            break;
                        case WATCHER:
                            Watcher watcher = (Watcher) storeData;

                            sessionWatchers.add(watcher);

                            sessionRegistryStrategy.afterWatcherRegister(watcher);
                            break;
                        default:
                            break;
                    }
                    return null;
                }

                @Override
                public Supplier<StoreData> getParameterSupplier() {
                    return () -> storeData;
                }

            }, wrapperInterceptorManager);

    try {
        wrapperInvocation.proceed();
    } catch (Exception e) {
        throw new RuntimeException("Proceed register error!", e);
    }

}

目前逻辑以下图所示:

Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |          2               |                               |
               +              |    | |  watcherHandler  | |       |                          +------------+------------------+
               |              |    | |                  | |       |                                       |
               |              |    | |     ......       | |       |                                       |
               |              |    | +------------------+ |       |                                    3  | register
               |              |    +----------------------+       |                                       |
                              +-----------------------------------+                                       |
                                                                                                          v
                                                                                      +-------------------+-------------------+
                                                                                      |           SessionRegistry             |
                                                                                      |                                       |
                                                                                      |                                       |
                                                                                      |  storeData.getDataType() == PUBLISHER |
                                                                                      +---------------------------------------+

手机以下:

3.4.1 SessionRegistryStrategy

这里又出现一个策略,目前也只有一个实现,应该也是想要将来作成替换,目前功能只是简单的留下了接口为空。

咱们能够看出阿里到处想解耦的思路

public class DefaultSessionRegistryStrategy implements SessionRegistryStrategy {
    @Override
    public void afterPublisherRegister(Publisher publisher) {

    }
}

3.4.2 存储模块

前文在注册过程当中有:

sessionDataStore.add(publisher);

这里就是Session的 数据存储模块,也是系统的核心

public class SessionDataStore implements DataStore {
    /**
     * publisher store
     */
    private Map<String/*dataInfoId*/, Map<String/*registerId*/, Publisher>> registry      = new ConcurrentHashMap<>();

    /*** index */
    private Map<String/*connectId*/, Map<String/*registerId*/, Publisher>>  connectIndex  = new ConcurrentHashMap<>();
}

这里记录了两种存储方式,分别是按照 dataInfoId 和 connectId 来存储。

存储时候,会从版本号和时间戳两个维度来比较

@Override
public void add(Publisher publisher) {
    Publisher.internPublisher(publisher);

    write.lock();
    try {
        Map<String, Publisher> publishers = registry.get(publisher.getDataInfoId());

        if (publishers == null) {
            ConcurrentHashMap<String, Publisher> newmap = new ConcurrentHashMap<>();
            publishers = registry.putIfAbsent(publisher.getDataInfoId(), newmap);
            if (publishers == null) {
                publishers = newmap;
            }
        }

        Publisher existingPublisher = publishers.get(publisher.getRegisterId());

        if (existingPublisher != null) {

            if (existingPublisher.getVersion() != null) {
                long oldVersion = existingPublisher.getVersion();
                Long newVersion = publisher.getVersion();
                if (newVersion == null) {
                    return;
                } else if (oldVersion > newVersion) {
                    return;
                } else if (oldVersion == newVersion) {
                    Long newTime = publisher.getRegisterTimestamp();
                    long oldTime = existingPublisher.getRegisterTimestamp();
                    if (newTime == null) {
                        return;
                    }
                    if (oldTime > newTime) {
                        return;
                    }
                }
            }
        }
        publishers.put(publisher.getRegisterId(), publisher);
        addToConnectIndex(publisher);

    } finally {
        write.unlock();
    }
}

3.5 Acceptor模块

在SessionServer自己存储完成以后,接下来就是通知Data Server了。

3.5.1 整体Acceptor

WriteDataAcceptorImpl 负责处理具体Publisher的写入。首先须要把写入请求统一块儿来

使用 private Map<String, WriteDataProcessor> writeDataProcessors = new ConcurrentHashMap(); 来统一存储全部的写入请求

这里根据不一样的Connection来处理不一样链接的写入请求

具体以下:

public class WriteDataAcceptorImpl implements WriteDataAcceptor {

    @Autowired
    private TaskListenerManager             taskListenerManager;

    @Autowired
    private SessionServerConfig             sessionServerConfig;

    @Autowired
    private RenewService                    renewService;

    /**
     * acceptor for all write data request
     * key:connectId
     * value:writeRequest processor
     *
     */
    private Map<String, WriteDataProcessor> writeDataProcessors = new ConcurrentHashMap();

    public void accept(WriteDataRequest request) {
        String connectId = request.getConnectId();
        WriteDataProcessor writeDataProcessor = writeDataProcessors.computeIfAbsent(connectId,
                key -> new WriteDataProcessor(connectId, taskListenerManager, sessionServerConfig, renewService));

        writeDataProcessor.process(request);
    }
  
    public void remove(String connectId) {
        writeDataProcessors.remove(connectId);
    }
}

目前逻辑以下图所示

Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |          2               |                               |
               +              |    | |  watcherHandler  | |       |                          +------------+------------------+
               |              |    | |                  | |       |                                       |
               |              |    | |     ......       | |       |                                       |
               |              |    | +------------------+ |       |                              register | 3
               |              |    +----------------------+       |                                       |
               |              +-----------------------------------+                                       |
               |                                                                                          v
               | +-----------------------------------------------------+                    +-------------+-------------------------+
               | |           WriteDataAcceptorImpl                     |  WriteDataRequest  |           SessionRegistry             |
               | |                                                     | <------------------+                                       |
               | |                                                     |                    |                                       |
               | | Map<String, WriteDataProcessor> writeDataProcessors |                    |  storeData.getDataType() == PUBLISHER |
               | |                                                     |                    +---------------------------------------+
               + +-----------------------------------------------------+

手机如图

3.5.2 具体处理

前面已经把全部请求统一块儿来,如今就须要针对每个链接的写入继续处理

这里关键是以下数据结构,就是每个链接的写入请求 放到了queue中。

ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue

针对每一个请求不一样,作不一样处理。

对于咱们的例子,处理以下:

case PUBLISHER: {
		doPublishAsync(request);
}

而最终是向taskListenerManager发送给请求TaskType.PUBLISH_DATA_TASK,该请求将被PublishDataTaskListener调用publishDataTask来处理。

这里有一个listener解耦,咱们接下来说解。

private void doPublishAsync(WriteDataRequest request) {
    sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK);
}

private void sendEvent(Object eventObj, TaskType taskType) {
		TaskEvent taskEvent = new TaskEvent(eventObj, taskType);
		taskListenerManager.sendTaskEvent(taskEvent);
}

具体代码以下:

public class WriteDataProcessor {
    private final TaskListenerManager               taskListenerManager;

    private final SessionServerConfig               sessionServerConfig;

    private final RenewService                      renewService;

    private final String                            connectId;

    private Map<String, AtomicLong>                 lastUpdateTimestampMap = new ConcurrentHashMap<>();

    private AtomicBoolean                           writeDataLock          = new AtomicBoolean(
                                                                               false);

    private ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue          = new ConcurrentLinkedQueue();

    private AtomicInteger                           acceptorQueueSize      = new AtomicInteger(0);

    public void process(WriteDataRequest request) {
        // record the last update time by pub/unpub
        if (isWriteRequest(request)) {
            refreshUpdateTime(request.getDataServerIP());
        }

        if (request.getRequestType() == WriteDataRequestType.DATUM_SNAPSHOT) {
            // snapshot has high priority, so handle directly
            doHandle(request);
        } else {
            // If locked, insert the queue;
            // otherwise, try emptying the queue (to avoid residue) before processing the request.
            if (writeDataLock.get()) {
                addQueue(request);
            } else {
                flushQueue();
                doHandle(request);
            }
        }

    }

    private void doHandle(WriteDataRequest request) {
        switch (request.getRequestType()) {
            case PUBLISHER: {
                doPublishAsync(request);
            }
                break;
            case UN_PUBLISHER: {
                doUnPublishAsync(request);
            }
                break;
            case CLIENT_OFF: {
                doClientOffAsync(request);
            }
                break;
            case RENEW_DATUM: {
                if (renewAndSnapshotInSilence(request.getDataServerIP())) {
                    return;
                }
                doRenewAsync(request);
            }
                break;
            case DATUM_SNAPSHOT: {
                if (renewAndSnapshotInSilenceAndRefreshUpdateTime(request.getDataServerIP())) {
                    return;
                }
                halt();
                try {
                    doSnapshotAsync(request);
                } finally {
                    resume();
                }
            }
                break;
    }
      
    private void doPublishAsync(WriteDataRequest request) {
        sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK);
    }
      
    private void sendEvent(Object eventObj, TaskType taskType) {
        TaskEvent taskEvent = new TaskEvent(eventObj, taskType);
        taskListenerManager.sendTaskEvent(taskEvent);
    }
}

以下图所示

Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |          2               |                               |
               +              |    | |  watcherHandler  | |       |                          +------------+------------------+
               |              |    | |                  | |       |                                       |
               |              |    | |     ......       | |       |                                       |
               |              |    | +------------------+ |       |                              register | 3
               |              |    +----------------------+       |                                       |
               |              +-----------------------------------+                                       |
               |                                                                                          v
               | +---------------------------------------------------------+                    +---------+-----------------------------+
               | |           WriteDataAcceptorImpl                         |  WriteDataRequest  |           SessionRegistry             |
               | |                                                         | <------------------+                                       |
               | |                                                         |       4            |   sessionDataStore.add(publisher)     |
               | | Map<connectId , WriteDataProcessor> writeDataProcessors |                    |                                       |
               | |                                                         |                    |  storeData.getDataType() == PUBLISHER |
               | +----------------------+----------------------------------+                    |                                       |
               |                process | 5                                                     +---------------------------------------+
               |                        v
               |    +-------------------+---------------------+                     +--------------------------+
               |    |          WriteDataProcessor             |                     |  PublishDataTaskListener |
               |    |                                         |  PUBLISH_DATA_TASK  |                          |
               |    | ConcurrentLinkedQueue<WriteDataRequest> +-------------------> |      PublishDataTask     |
               |    |                                         |      6              +--------------------------+
               +    +-----------------------------------------+

手机如图 :

3.6 Listener 解耦

前面在逻辑上都是一体化的,在这里,进行了一次解耦。

3.6.1 解耦引擎

DefaultTaskListenerManager 是解耦的机制,能够看到,其中添加了listener,当用户调用sendTaskEvent时候,将遍历全部的listeners,调用对应的listener。

public class DefaultTaskListenerManager implements TaskListenerManager {

    private Multimap<TaskType, TaskListener> taskListeners = ArrayListMultimap.create();

    @Override
    public Multimap<TaskType, TaskListener> getTaskListeners() {
        return taskListeners;
    }

    @Override
    public void addTaskListener(TaskListener taskListener) {
        taskListeners.put(taskListener.support(), taskListener);
    }

    @Override
    public void sendTaskEvent(TaskEvent taskEvent) {
        Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType());
        for (TaskListener taskListener : taskListeners) {
            taskListener.handleEvent(taskEvent);
        }
    }
}

3.6.2 Listener

PublishDataTaskListener是对应的处理函数,在其support函数中,声明了支持PUBLISH_DATA_TASK。这样就完成了解耦。

public class PublishDataTaskListener implements TaskListener {

    @Autowired
    private DataNodeService dataNodeService;

    @Autowired
    private TaskProcessor   dataNodeSingleTaskProcessor;

    @Autowired
    private ExecutorManager executorManager;

    @Override
    public TaskType support() {
        return TaskType.PUBLISH_DATA_TASK;
    }

    @Override
    public void handleEvent(TaskEvent event) {

        SessionTask publishDataTask = new PublishDataTask(dataNodeService);

        publishDataTask.setTaskEvent(event);

        executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask));
    }
}

3.7 Task调度

上面找到了Listener,Listener中经过以下代码启动了执行业务的task来处理。可是这背后的机制须要探究。

executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask));

3.7.1 ExecutorManager

ExecutorManager 之中,对于线程池作了统一的启动,关闭。publishDataExecutor就是其中之一。

ExecutorManager相关代码摘取以下:

public class ExecutorManager {

    private final ScheduledThreadPoolExecutor scheduler;

    private final ThreadPoolExecutor          publishDataExecutor;

    private static final String               PUBLISH_DATA_EXECUTOR                      = "PublishDataExecutor";

    public ExecutorManager(SessionServerConfig sessionServerConfig) {
      
        publishDataExecutor = reportExecutors.computeIfAbsent(PUBLISH_DATA_EXECUTOR,
                k -> new SessionThreadPoolExecutor(PUBLISH_DATA_EXECUTOR,
                        sessionServerConfig.getPublishDataExecutorMinPoolSize(),
                        sessionServerConfig.getPublishDataExecutorMaxPoolSize(),
                        sessionServerConfig.getPublishDataExecutorKeepAliveTime(), TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(sessionServerConfig.getPublishDataExecutorQueueSize()),
                        new NamedThreadFactory("PublishData-executor", true)));
    }
  
		public ThreadPoolExecutor getPublishDataExecutor() {
        return publishDataExecutor;
    }
}

其中ExecutorManager的bean以下:

@Bean
public ExecutorManager executorManager(SessionServerConfig sessionServerConfig) {
    return new ExecutorManager(sessionServerConfig);
}

3.7.2 Processor

Processor是任务定义,内部封装了task。

public class DataNodeSingleTaskProcessor implements TaskProcessor<SessionTask> {

    @Override
    public ProcessingResult process(SessionTask task) {
        try {
            task.execute();
            return ProcessingResult.Success;
        } catch (Throwable throwable) {
            if (task instanceof Retryable) {
                Retryable retryAbleTask = (Retryable) task;
                if (retryAbleTask.checkRetryTimes()) {
                    return ProcessingResult.TransientError;
                }
            }
            return ProcessingResult.PermanentError;
        }
    }

    @Override
    public ProcessingResult process(List<SessionTask> tasks) {
        return null;
    }
}

3.7.3 业务Task

PublishDataTask的execute 之中 ,调用dataNodeService.register(publisher)进行注册。

public class PublishDataTask extends AbstractSessionTask {

    private final DataNodeService dataNodeService;

    private Publisher             publisher;

    public PublishDataTask(DataNodeService dataNodeService) {
        this.dataNodeService = dataNodeService;
    }

    @Override
    public void execute() {
        dataNodeService.register(publisher);
    }

    @Override
    public void setTaskEvent(TaskEvent taskEvent) {
        //taskId create from event
        if (taskEvent.getTaskId() != null) {
            setTaskId(taskEvent.getTaskId());
        }

        Object obj = taskEvent.getEventObj();
        if (obj instanceof Publisher) {
            this.publisher = (Publisher) obj;
        } 
    }
}

具体以下

+-------------------------------------------------+
|          DefaultTaskListenerManager             |
|                                                 |
|                                                 |
|  Multimap<TaskType, TaskListener> taskListeners |
|                                                 |
+----------------------+--------------------------+
                       |
                       |
     PUBLISH_DATA_TASK |
                       |
                       v
          +------------+--------------+
          |  PublishDataTaskListener  |
          +------------+--------------+
                       |
          setTaskEvent |
                       |
                       v
              +--------+--------+
              | PublishDataTask |
              +-----------------+

3.8 转发服务信息

通过listener解耦以后,PublishDataTask就调用了dataNodeService.register(publisher),因而接下来就是转发服务信息给Data Server

此处就是调用DataNodeServiceImpl的register函数来把请求转发给Data Server。

public class DataNodeServiceImpl implements DataNodeService {
    @Autowired
    private NodeExchanger         dataNodeExchanger;

    @Autowired
    private NodeManager           dataNodeManager;

    @Autowired
    private SessionServerConfig   sessionServerConfig;

    private AsyncHashedWheelTimer asyncHashedWheelTimer;
}

能够看到,创建了PublishDataRequest,而后经过Bolt Client,发送给Data Server。

@Override
public void register(final Publisher publisher) {
    String bizName = "PublishData";
    Request<PublishDataRequest> request = buildPublishDataRequest(publisher);
    try {
        sendRequest(bizName, request);
    } catch (RequestException e) {
        doRetryAsync(bizName, request, e, sessionServerConfig.getPublishDataTaskRetryTimes(),
            sessionServerConfig.getPublishDataTaskRetryFirstDelay(),
            sessionServerConfig.getPublishDataTaskRetryIncrementDelay());
    }
}

private CommonResponse sendRequest(String bizName, Request request) throws RequestException {
        Response response = dataNodeExchanger.request(request);
        Object result = response.getResult();
        CommonResponse commonResponse = (CommonResponse) result;
        return commonResponse;
}

以下:

+-------------------------------------------------+
|          DefaultTaskListenerManager             |
|                                                 |
|                                                 |
|  Multimap<TaskType, TaskListener> taskListeners |
|                                                 |
+----------------------+--------------------------+
                       |
     PUBLISH_DATA_TASK |
                       v
          +------------+--------------+
          |  PublishDataTaskListener  |
          +------------+--------------+
                       |
          setTaskEvent |
                       v
              +--------+--------+
              | PublishDataTask |
              +--------+--------+
              register |
                       |
            +----------v----------+
            | DataNodeServiceImpl |
            +----------+----------+
    PublishDataRequest |
                       v
            +----------+----------+  Client.sendSync   +------------+
            |  DataNodeExchanger  +------------------> | Data Server|
            +---------------------+ PublishDataRequest +------------+

如何知道发给哪个Data Sever?DataNodeExchanger 中有:

@Override
public Response request(Request request) throws RequestException {

    Response response;
    URL url = request.getRequestUrl();
    try {
        Client sessionClient = getClient(url);

        final Object result = sessionClient
                .sendSync(url, request.getRequestBody(), request.getTimeout() != null ? request.getTimeout() : sessionServerConfig.getDataNodeExchangeTimeOut());

        response = () -> result;
    } 

    return response;
}

因而去DataNodeServiceImpl寻找

private Request<PublishDataRequest> buildPublishDataRequest(Publisher publisher) {
    return new Request<PublishDataRequest>() {
        private AtomicInteger retryTimes = new AtomicInteger();

        @Override
        public PublishDataRequest getRequestBody() {
            PublishDataRequest publishDataRequest = new PublishDataRequest();
            publishDataRequest.setPublisher(publisher);
            publishDataRequest.setSessionServerProcessId(SessionProcessIdGenerator
                .getSessionProcessId());
            return publishDataRequest;
        }

        @Override
        public URL getRequestUrl() {
            return getUrl(publisher.getDataInfoId());
        }

        @Override
        public AtomicInteger getRetryTimes() {
            return retryTimes;
        }
    };
}

private URL getUrl(String dataInfoId) {
        Node dataNode = dataNodeManager.getNode(dataInfoId);
        //meta push data node has not port
        String dataIp = dataNode.getNodeUrl().getIpAddress();
        return new URL(dataIp, sessionServerConfig.getDataServerPort());
}

在 DataNodeManager中有:

@Override
public DataNode getNode(String dataInfoId) {
    DataNode dataNode = consistentHash.getNodeFor(dataInfoId);
    return dataNode;
}

可见是经过dataInfoId计算出hash,而后 从DataNodeManager之中获取对应的DataNode,获得其url

因而,上图拓展为:

+-------------------------------------------------+
|          DefaultTaskListenerManager             |
|                                                 |
|  Multimap<TaskType, TaskListener> taskListeners |
|                                                 |
+----------------------+--------------------------+
                       |
     PUBLISH_DATA_TASK |  1
                       v
          +------------+--------------+
          |  PublishDataTaskListener  |
          +------------+--------------+
                       |
          setTaskEvent |  2
                       v
              +--------+--------+        4     +---------------+
              | PublishDataTask |     +------> |DataNodeManager|
              +--------+--------+     |        +---------------+
              register |  3           |  consistentHash|
                       |              |                | 5
            +----------v----------+---+                v
            | DataNodeServiceImpl |       6      +-----+----+
            +----------+----------+ <------------+ DataNode |
    PublishDataRequest | 7              url      +----------+
                       v
            +----------+----------+
            |  DataNodeExchanger  |
            +----------+----------+
                       |
       Client.sendSync | PublishDataRequest
                       |
                       v 8
                 +-----+------+
                 | Data Server|
                 +------------+

0xFF 参考

蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容

蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路

服务注册中心 Session 存储策略 | SOFARegistry 解析

海量数据下的注册中心 - SOFARegistry 架构介绍

服务注册中心数据分片和同步方案详解 | SOFARegistry 解析

蚂蚁金服开源通讯框架SOFABolt解析之链接管理剖析

蚂蚁金服开源通讯框架SOFABolt解析之超时控制机制及心跳机制

蚂蚁金服开源通讯框架 SOFABolt 协议框架解析

蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析

蚂蚁通讯框架实践

sofa-bolt 远程调用

sofa-bolt学习

SOFABolt 设计总结 - 优雅简洁的设计之道

SofaBolt源码分析-服务启动到消息处理

SOFABolt 源码分析

SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计

SOFARegistry 介绍

SOFABolt 源码分析13 - Connection 事件处理机制的设计

相关文章
相关标签/搜索