[从源码学设计]蚂蚁金服SOFARegistry之网络封装和操做

[从源码学设计]蚂蚁金服SOFARegistry之网络封装和操做

0x00 摘要

SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。html

本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让你们借以学习阿里如何设计。java

本文为第二篇,介绍SOFARegistry的网络封装和操做。node

0x01 业务领域

1.1 SOFARegistry 整体架构

由于有的兄弟可能没有读过前面MetaServer的文章,因此这里回忆下SOFARegistry 整体架构。git

  • Client 层

应用服务器集群。Client 层是应用层,每一个应用系统经过依赖注册中心相关的客户端 jar 包,经过编程方式来使用服务注册中心的服务发布和服务订阅能力。github

  • Session 层

Session 服务器集群。顾名思义,Session 层是会话层,经过长链接和 Client 层的应用服务器保持通信,负责接收 Client 的服务发布和服务订阅请求。该层只在内存中保存各个服务的发布订阅关系,对于具体的服务信息,只在 Client 层和 Data 层之间透传转发。Session 层是无状态的,能够随着 Client 层应用规模的增加而扩容。编程

  • Data 层

数据服务器集群。Data 层经过分片存储的方式保存着所用应用的服务注册数据。数据按照 dataInfoId(每一份服务数据的惟一标识)进行一致性 Hash 分片,多副本备份,保证数据的高可用。下文的重点也在于随着数据规模的增加,Data 层如何在不影响业务的前提下实现平滑的扩缩容。bootstrap

  • Meta 层

元数据服务器集群。这个集群管辖的范围是 Session 服务器集群和 Data 服务器集群的服务器信息,其角色就至关于 SOFARegistry 架构内部的服务注册中心,只不过 SOFARegistry 做为服务注册中心是服务于广大应用服务层,而 Meta 集群是服务于 SOFARegistry 内部的 Session 集群和 Data 集群,Meta 层可以感知到 Session 节点和 Data 节点的变化,并通知集群的其它节点。api

1.2 应用场景

DataServer,SessionServer,MetaServer 本质上都是网络应用程序。这就决定了网络封装和操做是本系统的基础模块及功能,下面咱们讲讲其应用场景。缓存

1.2.1 单元化状态

SOFARegistry 的应用场景是单元化状态下。服务器

在单元化状态下,一个单元,是一个五脏俱全的缩小版整站,它是全能的,由于部署了全部应用;但它不是全量的,由于只能操做一部分数据。可以单元化的系统,很容易在多机房中部署,由于能够轻易的把几个单元部署在一个机房,而把另外几个部署在其余机房。借由在业务入口处设置一个流量调配器,能够调整业务流量在单元之间的比例。

1.2.2 内网通信

因此 SOFARegistry 考虑的就是在 IDC 私网环境中如何进行节点间通讯。高吞吐、高并发的通讯,数量众多的链接管理(C10K 问题),便捷的升级机制,兼容性保障,灵活的线程池模型运用,细致的异常处理与日志埋点等,这些功能都须要在通讯协议和实现框架上作文章。

1.2.3 Http协议

服务器也有若干配置需求,这用简单的http协议便可。

1.3 问题点

在这种内网单元化场景下,可以想到的问题点以下:

  • 如何制定私有协议。对于私网环境,若是全部应用的节点间,所有经过标准协议来通讯,会有不少问题:好比研发效率方面的影响,升级兼容性,无用字段的传输,功能定制也可能不那么灵活。
  • 如何进行连接管理(无锁建连、定时断连、自动重连);
  • 如何进行精细的线程模型的设计;
  • 如何进行超时控制;
  • 如何进行批量解包和批量提交处理;
  • 如何进行心跳控制;
  • 如何支持通讯模型(oneway、sync、callback、future);
  • 如何实现长链接;
  • 如何实现推拉模式;
  • 如何进行节点判活;

1.4 解决方案

对于这种高性能,高并发的场景,在Java体系下,必然选择非阻塞IO复用,那么天然选择基于Netty进行开发。

1.5 阿里方案

阿里就是借助 SOFABolt 通讯框架,实现基于TCP长链接的节点判活与推模式的变动推送,服务上下线通知时效性在秒级之内。

sofa-bolt是蚂蚁开源的一款基于Netty的网络通讯框架。在Netty的基础上对网络编程常见问题进行了一层简单封装,让中间件开发者更关注于中间件产品自己。

大致功能为:

  1. 链接管理
  2. 请求处理

SOFABolt能够理解为Netty的最佳实践,并额外进行了一些优化工做。

  • 基于Netty的高效的网络IO于线程模型的应用
  • 连接管理(无锁建连、定时断连、自动重连)
  • 通讯模型(oneway、sync、callback、future)
  • 超时控制
  • 批量解包和批量提交处理
  • 心跳于IDLE机制

SOFABolt框架咱们后续可能会专门有系列进行分析,目前认为基于SOFABolt此能够知足咱们需求,因此咱们会简单介绍SOFABolt,重点在于如何使用以及业务实现

1.6 实现问题

在肯定了采用SOFABolt以后,以前提到的问题点就基本被SOFABolt解决了,因此咱们暂时能想到的其余问题大体以下:

  • 对于网络中的各类功能模块,是否须要封装,若是封装,到什么程度比较恰当;
  • 具体封装,纵向横向分别须要细化到什么程度;
  • Data Server 既要对外提供服务,也会做为客户端向其余模块发送请求,这两个功能是否须要抽象出来;
  • 是否须要按照具体业务功能进行分类封装;

1.7 总述

咱们提早剧透,即从逻辑上看,阿里提供了两个层级的封装

从链接角度看,阿里实现了基于 netty.channel.Channel 的封装,从下往上看是:

  • 由于SOFABolt基于Netty,因此封装的核心是netty.channel.Channel。
  • 在此基础上, SOFABolt封装了com.alipay.remoting.Connection
  • 而后 SOFARegistry 基于SOFABolt 封装了 BoltChannel

从应用角度看,阿里实现了Server,Client层次的封装,从下往上看是:

  • SOFABolt构建了 RpcServer,RpcClient。
  • SOFARegisty 基于 RpcServer,RpcClient 构建了BoltClient 和 BoltServer。
  • 而后 SOFARegistry 基于此构建了 BoltExchange。做为 Client / Server 链接的抽象,负责节点之间的链接。
  • 最后构建了 XXXNodeExchanger,在 BoltExchange 基础上,把全部 Data Server 相关的 “Server,Client概念 强相关” 的网络操做统一集中在这里,用户能够直接使用。以DataServer业务模块为例,其内部按照业务不一样,实现了 DataNodeExchanger 和 MetaNodeExchanger。用以让:
    • DataServer 内部直接使用 DataNodeExchanger 与其余 DataServer 交互,
    • DataServer 内部直接使用 MetaNodeExchanger 与 MetaServer 交互。

具体逻辑大体以下:

+---------------------+                                 +---------------------+
|                     |                                 |                     |
|  DataNodeExchanger  |                                 |  MetaNodeExchanger  |
|                     |                                 |                     |
+----------------+----+                                 +--------+------------+
                 |                                               |
                 +-----------+                     +-------------+
                             |                     |
                             v                     v
                   +---------+---------------------+--------+
                   |  BoltExchange                          |
                   | +------------------------------------+ |
                   | |                                    | |
                   | |       Map<String, Client>          | |
                   | |                                    | |
                   | | ConcurrentHashMap<Integer, Server> | |
                   | |                                    | |
                   | +------------------------------------+ |
                   +----------------------------------------+
                          |                           |
                          |                           |
                          |                           |
                          v                           v
  +-----------------------+----------+      +---------+---------------------------------+
  | BoltClient                       |      | BoltServer                                |
  |                                  |      |                                           |
  | +------------------------------+ |      | +---------------------------------------+ |
  | |     remoting.rpc.RpcClient   | |      | |     remoting.rpc.RpcServer            | |
  | |  +-------------------------+ | |      | |                                       | |
  | |  | ConnectionEventHandler  | | |      | |   Map<String, Channel> channels       | |
  | |  |                         | | |      | |                                       | |
  | |  |  ConnectionFactory      | | |      | | List<ChannelHandler> channelHandlers  | |
  | |  +-------------------------+ | |      | |                                       | |
  | +------------------------------+ |      | +---------------------------------------+ |
  +----------------------------------+      +-------------------------------------------+
        |       |           |                 |                        |
        |       |           +---------------------------+              |
        |       v                             |         |              v
        |   +---+------------+ <--------------+   +-----v--------------+--------------+
        |   | ChannelHandler |                    |   BoltChannel                     |
        |   +----------------+                    |                                   |
        |                                         |  +------------------------------+ |
        |                                         |  |com.alipay.remoting.Connection| |
        |                                         |  +------------------------------+ |
        |                                         +-----------------------------------+
        |                                                       |
        v                                                       v
    +---+-------------+                               +---------+------------+
    | CallbackHandler |                               | netty.channel.Channel|
    +-----------------+                               +----------------------+

0x02 基础封装

SOFARegistry 对网络基础功能作了封装,也对外提供了API。如下是封装模块以及对外接口 registry-remoting-api。

├── CallbackHandler.java
├── Channel.java
├── ChannelHandler.java
├── Client.java
├── Endpoint.java
├── RemotingException.java
├── Server.java
└── exchange
    ├── Exchange.java
    ├── NodeExchanger.java
    ├── RequestException.java
    └── message
        ├── Request.java
        └── Response.java

其中比较关键的是四个接口:Server,Client,Exchange,Channel,所以这些就是网络封装的最基本概念。

2.1 Channel

Channel 这个概念比较广泛,表明了IO源与目标打开的链接。咱们先以Java的Channel为例来进行说明。

2.1.1 Java Channel

Java 的Channel 由java.nio.channels包定义的,Channel表示IO源与目标打开的链接,Channel相似于传统的“流”,只不过Channel自己不能直接访问数据,Channel只能与Buffer进行交互。

Channel用于在字节缓冲区和位于通道另外一侧的实体(一般是一个文件或套接字)之间有效地传输数据。通道是访问IO服务的导管,经过通道,咱们能够以最小的开销来访问操做系统的I/O服务;顺便说下,缓冲区是通道内部发送数据和接收数据的端点。

由java.nio.channels包定义的,Channel表示IO源与目标打开的链接,Channel相似于传统的“流”,只不过Channel自己不能直接访问数据,Channel只能与Buffer进行交互。通道主要用于传输数据,从缓冲区的一侧传到另外一侧的实体(如文件、套接字...),反之亦然;通道是访问IO服务的导管,经过通道,咱们能够以最小的开销来访问操做系统的I/O服务;顺便说下,缓冲区是通道内部发送数据和接收数据的端点。

2.1.2 SOFA Channel

从SOFARegistry的Channel定义能够看出其基本功能主要是属性相关功能。

public interface Channel {
    InetSocketAddress getRemoteAddress();
    InetSocketAddress getLocalAddress();
    boolean isConnected();
    Object getAttribute(String key);
    void setAttribute(String key, Object value);
    WebTarget getWebTarget();
    void close();
}

2.2 Server

Server是服务器对应的封装,其基本功能由定义可知,主要是基于Channel发送功能。

public interface Server extends Endpoint {
    boolean isOpen();
    Collection<Channel> getChannels();
    Channel getChannel(InetSocketAddress remoteAddress);
    Channel getChannel(URL url);
    void close(Channel channel);
    int getChannelCount();

    Object sendSync(final Channel channel, final Object message, final int timeoutMillis);

    void sendCallback(final Channel channel, final Object message, CallbackHandler callbackHandler,final int timeoutMillis);
}

2.3 Client

Client是客户端对应的封装,其基本功能也是基于Channel进行交互。

public interface Client extends Endpoint {

    Channel getChannel(URL url);

    Channel connect(URL url);

    Object sendSync(final URL url, final Object message, final int timeoutMillis);

    Object sendSync(final Channel channel, final Object message, final int timeoutMillis);

    void sendCallback(final URL url, final Object message, CallbackHandler callbackHandler,
                      final int timeoutMillis);
}

2.4 Exchange

Exchange 做为 Client / Server 链接的进一步抽象,负责同类型server之间的链接

public interface Exchange<T> {

    String DATA_SERVER_TYPE = "dataServer";
    String META_SERVER_TYPE = "metaServer";

    /**
     * connect same type server,one server ip one connection
     * such as different server on data server,serverOne and serverTwo,different type server must match different channelHandlers,
     * so we must connect by serverType,and get Client instance by serverType
     * @param serverType
     * @param serverUrl
     * @param channelHandlers
     */
    Client connect(String serverType, URL serverUrl, T... channelHandlers);

    /**
     * connect same type server,one server ip one connection
     * such as different server on data server,serverOne and serverTwo,different type server must match different channelHandlers,
     * so we must connect by serverType,and get Client instance by serverType
     * @param serverType
     * @param connNum connection number per serverUrl
     * @param serverUrl
     * @param channelHandlers
     */
    Client connect(String serverType, int connNum, URL serverUrl, T... channelHandlers);

    /**
     * bind server by server port in url parameter,one port must by same server type
     * @param url
     * @param channelHandlers
     */
    Server open(URL url, T... channelHandlers);

    Client getClient(String serverType);

    Server getServer(Integer port);
}

2.5 ChannelHandler

在创建链接中,能够设置一系列应对不一样任务的 handler (称之为 ChannelHandler)。

这些 ChannelHandler 有的做为 Listener 用来处理链接事件,有的做为 Processor 用来处理各类指定的事件,好比服务信息数据变化、Subscriber 注册等事件。

public interface ChannelHandler<T> {

    enum HandlerType {
        LISENTER,
        PROCESSER
    }

    enum InvokeType {
        SYNC,
        ASYNC
    }

    /**
     * on channel connected.
     * @param channel
     */
    void connected(Channel channel) throws RemotingException;

    /**
     * on channel disconnected.
     *
     * @param channel channel.
     */
    void disconnected(Channel channel) throws RemotingException;

    /**
     * on message received.
     * @param channel channel.
     * @param message message.
     */
    void received(Channel channel, T message) throws RemotingException;

    /**
     * on message reply.
     *
     * @param channel
     * @param message
     */
    Object reply(Channel channel, T message) throws RemotingException;

    /**
     * on exception caught.
     * @param channel channel.
     * @param message message.
     * @param exception exception.
     * @throws RemotingException
     */
    void caught(Channel channel, T message, Throwable exception) throws RemotingException;

    HandlerType getType();

    /**
     * return processor request class name
     */
    Class interest();

    /**
     * Select Sync process by reply or Async process by received
     */
    default InvokeType getInvokeType() {
        return InvokeType.SYNC;
    }

    /**
     * specify executor for processor handler
     */
    default Executor getExecutor() {
        return null;
    }
}

所以,网络基本对外接口以下:

+-------------------------------------------------------------------------+
|[registry+remoting+api]                                                  |
|                                                                         |
|              +----------+                      +-------------+          |
|              | Exchange |                      |NodeExchanger|          |
|              ++-----+--++                      +----+--------+          |
|               |     |  |                            |                   |
|               |     |  +----------------------+     |                   |
|               |     |                         |     |                   |
|        +------+     v                         v     v                   |
|        |         +--+-----+                 +-+-----++                  |
|        |         | Server |                 | Client |                  |
|        |         +-----+--+                 +-+----+-+                  |
|        |               |                      |    |                    |
|        |               +--------+     +-------+    |                    |
|        |                        |     |            |                    |
|        v                        v     v            v                    |
|   +----+-----------+            +-----+-+         ++--------------+     |
|   | ChannelHandler |            |Channel|         |CallbackHandler|     |
|   +----------------+            +-------+         +---------------+     |
+-------------------------------------------------------------------------+

0x03 SOFABolt

由于SOFARegistry主要是基于SOFABolt,无法绕开,因此咱们须要首先简单介绍SOFABolt。

Bolt是基于Netty,因此要先说明Netty Channel。

3.1 Netty Channel

在Netty框架中,Channel是其中核心概念之一,是Netty网络通讯的主体,由它负责同对端进行网络通讯、注册和数据操做等功能。

Netty对Jdk原生的ServerSocketChannel进行了封装和加强封装成了NioXXXChannel, 相对于原生的JdkChannel, Netty的Channel增长了以下的组件。

  • id 标识惟一身份信息
  • 可能存在的parent Channel
  • 管道 Pipeline
  • 用于数据读写的unsafe内部类
  • 关联上相伴终生的NioEventLoop

根据服务端和客户端,Channel能够分红两类:

  • 服务端: NioServerSocketChannel
  • 客户端: NioSocketChannel

其实inbound和outbound分别用于标识 Context 所对应的handler的类型, 在Netty中事件能够分为Inbound和Outbound事件,在ChannelPipeline的类注释中,有以下图示:

* 
 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+

3.2 Connection

Connection其删减版定义以下,能够看到其主要成员就是 Netty channel 实例

public class Connection {

    private Channel                                                               channel;

    private final ConcurrentHashMap<Integer, InvokeFuture>                        invokeFutureMap  = new ConcurrentHashMap<Integer, InvokeFuture>(4);

    /** Attribute key for connection */
    public static final AttributeKey<Connection>                                  CONNECTION       = AttributeKey.valueOf("connection");
  
    /** Attribute key for heartbeat count */
    public static final AttributeKey<Integer>                                     HEARTBEAT_COUNT  = AttributeKey.valueOf("heartbeatCount");

    /** Attribute key for heartbeat switch for each connection */
    public static final AttributeKey<Boolean>                                     HEARTBEAT_SWITCH = AttributeKey.valueOf("heartbeatSwitch");

    /** Attribute key for protocol */
    public static final AttributeKey<ProtocolCode>                                PROTOCOL         = AttributeKey.valueOf("protocol");

    /** Attribute key for version */
    public static final AttributeKey<Byte> VERSION = AttributeKey.valueOf("version");

    private Url url;

    private final ConcurrentHashMap<Integer/* id */, String/* poolKey */>       id2PoolKey       = new ConcurrentHashMap<Integer, String>(256);

    private Set<String>                                                           poolKeys         = new ConcurrentHashSet<String>();

    private final ConcurrentHashMap<String/* attr key*/, Object /*attr value*/> attributes       = new ConcurrentHashMap<String, Object>();
}

Connection的辅助类不少,摘录以下:

  • ConnectionFactory 链接工厂:建立链接、检测链接等
  • ConnectionPool 链接池:存储 { uniqueKey, List } ,uniqueKey 默认为 ip:port;包含 ConnectionSelectStrategy,从 pool 中选择 Connection
  • ConnectionEventHandler 和 ConnectionEventListener:事件处理器和监听器
  • ConnectionManager 链接管理器:是对外的门面,包含全部与 Connection 相关的对外的接口操做
  • Scanner 扫描器:Bolt 提供的一个统一的扫描器,用于执行一些后台任务

另外,须要注意的点以下:

  • 服务端建立 Connection 只有一个时机:netty 链接刚刚创建时
  • 客户端真正建立链接的时候,是在发起第一次调用的时候。

3.3 Connection消息处理

不管是服务端仍是客户端,其实本质都在作一件事情:建立 ConnectionEventHandler 实例并添加到 Netty 的 pipeline 中,基本原理是:

  • ConnectionEventListener:Connection 事件监听器,存储处理对应 ConnectionEventType 的 ConnectionEventProcessor 列表;
  • ConnectionEventProcessor:真正的 Connection 事件处理器接口;能够继承 ConnectionEventProcessor,编写自定义的事件处理类
  • 将自定义的事件处理类添加到 ConnectionEventListener 中;

ConnectionEventHandler处理两类事件

  • Netty 定义的事件:例如 connect,channelActive 等;
  • SOFABolt 定义的事件:事件类型 ConnectionEventType;

以后当有 ConnectionEvent 触发时(不管是 Netty 定义的事件被触发,仍是 SOFABolt 定义的事件被触发),ConnectionEventHandler 会经过异步线程执行器通知 ConnectionEventListener,ConnectionEventListener 将消息派发给具体的 ConnectionEventProcessor 实现类。

3.4 RpcServer

RpcServer实现了一个Server所必须的基本机制,能够直接使用,好比:

  • 编解码,协议版本,地址处理;
  • workerGroup(static类变量,实现多个 RpcServer 实例共享 workerGroup)与 bossGroup;
  • 请求消息处理器;
  • 响应消息处理器;
  • 心跳消息处理器;
  • 用户处理器 UserProcessor;
  • 链接Manager;
  • RpcServerRemoting (发起底层调用实现类) 实例,由于SOFABolt 能够进行双向调用,server 端也能够调用 client 端,因此此处构建了 RpcServerRemoting 实例;
  • ServerBootstrap 实例用以设置一系列 netty 服务端配置;

其中,须要说明的是:

  • 请求链 RpcHandler -> RpcCommandHandler -> RpcRequestProcessor -> UserProcessor
  • 响应链 RpcHandler -> RpcCommandHandler -> RpcResponseProcessor
  • 心跳链 RpcHandler -> RpcCommandHandler -> RpcHeartBeatProcessor

具体定义以下:

public class RpcServer extends AbstractRemotingServer {

    /** server bootstrap */
    private ServerBootstrap                             bootstrap;

    /** channelFuture */
    private ChannelFuture                               channelFuture;

    /** connection event handler */
    private ConnectionEventHandler                      connectionEventHandler;

    /** connection event listener */
    private ConnectionEventListener                     connectionEventListener = new ConnectionEventListener();

    /** user processors of rpc server */
    private ConcurrentHashMap<String, UserProcessor<?>> userProcessors          = new ConcurrentHashMap<String, UserProcessor<?>>(
                                                                                    4);

    /** boss event loop group, boss group should not be daemon, need shutdown manually*/
    private final EventLoopGroup                        bossGroup               = NettyEventLoopUtil.newEventLoopGroup(NamedThreadFactory("Rpc-netty-server-boss",false));
  
    /** worker event loop group. Reuse I/O worker threads between rpc servers. */
    private static final EventLoopGroup                 workerGroup             = NettyEventLoopUtil.newEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2,new NamedThreadFactory("Rpc-netty-server-worker",true));

    /** address parser to get custom args */
    private RemotingAddressParser                       addressParser;

    /** connection manager */
    private DefaultConnectionManager                    connectionManager;

    /** rpc remoting */
    protected RpcRemoting                               rpcRemoting;

    /** rpc codec */
    private Codec                                       codec                   = new RpcCodec();
}

3.5 RpcClient

RpcClient主要机制以下:

  • 用户处理器 UserProcessor ;RpcServer 能够主动向 RpcClient 发起请求,因此RpcClient 也须要建立 UserProcessor 来处理这些请求;
  • RpcConnectionFactory 工厂;
  • workerGroup(static类变量,实现多个 RpcClient 实例共享 workerGroup);
  • Codec 的实现类 RpcCodec 实例,用于建立 netty 的编解码器,实质上是一个工厂类;
  • HeartbeatHandler 心跳处理器;
  • RpcHandler 实例做为 netty 的业务逻辑处理器;
  • ConnectionSelectStrategy 链接选择器;
  • DefaultConnectionManager 链接管理器(是整个 Connection 设计的核心);
  • Bootstrap 实例用以设置一系列 netty 客户端配置;
  • Remote 层请求和响应封装实体的建立工厂 RpcCommandFactory 实例;
  • RpcClientRemoting (发起底层调用实现类) 实例;这是向 RpcServer 发起调用的工具类;

具体代码以下:

public class RpcClient extends AbstractConfigurableInstance {

    private ConcurrentHashMap<String, UserProcessor<?>> userProcessors           = new ConcurrentHashMap<String, UserProcessor<?>>();
  
    /** connection factory */
    private ConnectionFactory                           connectionFactory        = new RpcConnectionFactory(userProcessors, this);

    /** connection event handler */
    private ConnectionEventHandler                      connectionEventHandler   = new RpcConnectionEventHandler(switches());

    /** reconnect manager */
    private ReconnectManager                            reconnectManager;

    /** connection event listener */
    private ConnectionEventListener                     connectionEventListener  = new ConnectionEventListener();

    /** address parser to get custom args */
    private RemotingAddressParser                       addressParser;

    /** connection select strategy */
    private ConnectionSelectStrategy                    connectionSelectStrategy = new RandomSelectStrategy(
                                                                                     switches());

    /** connection manager */
    private DefaultConnectionManager                    connectionManager        = new DefaultConnectionManager(connectionSelectStrategy,connectionFactory,connectionEventHandler,connectionEventListener,switches());

    /** rpc remoting */
    protected RpcRemoting                               rpcRemoting;

    /** task scanner */
    private RpcTaskScanner                              taskScanner              = new RpcTaskScanner();

    /** connection monitor */
    private DefaultConnectionMonitor                    connectionMonitor;

    /** connection monitor strategy */
    private ConnectionMonitorStrategy                   monitorStrategy;
}

0x04 Bolt封装

针对上述提到的基础封装,系统针对Bolt和Http都进行了实现。如下是SOFABolt的对应封装 registry-remoting-bolt。

├── AsyncUserProcessorAdapter.java
├── BoltChannel.java
├── BoltChannelUtil.java
├── BoltClient.java
├── BoltServer.java
├── ConnectionEventAdapter.java
├── SyncUserProcessorAdapter.java
└── exchange
    └── BoltExchange.java

4.1 BoltChannel

BoltChannel 主要是封装了com.alipay.remoting.Connection,而com.alipay.remoting.Connection又封装了io.netty.channel.Channel。

感受Channel封装的不够完全,仍是把Connection暴露出来了,以此获得本地IP,port,远端IP,port等。

public class BoltChannel implements Channel {
    private Connection                connection;
    private AsyncContext              asyncContext;
    private BizContext                bizContext;
    private final Map<String, Object> attributes = new ConcurrentHashMap<>();
}

4.2 BoltServer

BoltServer 封装了 com.alipay.remoting.rpc.RpcServer。

在初始化的时候,调用 addConnectionEventProcessor,registerUserProcessor等把 handler 注册到 RpcServer。

用 ConcurrentHashMap 记录了全部链接到本Server 的Channel,key是IP:port。

public class BoltServer implements Server {
    /**
     * accoding server port
     * can not be null
     */
    private final URL                  url;
    private final List<ChannelHandler> channelHandlers;
    /**
     * bolt server
     */
    private RpcServer                  boltServer;
    /**
     * started status
     */
    private AtomicBoolean              isStarted   = new AtomicBoolean(false);
    private Map<String, Channel>       channels    = new ConcurrentHashMap<>();

    private AtomicBoolean              initHandler = new AtomicBoolean(false);
}

其主要功能以下,基本就是调用Bolt的功能:

@Override
public Object sendSync(Channel channel, Object message, int timeoutMillis) {
    if (channel != null && channel.isConnected()) {
        Url boltUrl = null;
        try {
            boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel
                .getRemoteAddress().getPort());
            return boltServer.invokeSync(boltUrl, message, timeoutMillis);
        } 
}

@Override
public void sendCallback(Channel channel, Object message, CallbackHandler callbackHandler,
                         int timeoutMillis) {
    if (channel != null && channel.isConnected()) {
        Url boltUrl = null;
        try {
            boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel
                .getRemoteAddress().getPort());
            boltServer.invokeWithCallback(boltUrl, message, new InvokeCallback() {
                @Override
                public void onResponse(Object result) {
                    callbackHandler.onCallback(channel, result);
                }

                @Override
                public void onException(Throwable e) {
                    callbackHandler.onException(channel, e);
                }

                @Override
                public Executor getExecutor() {
                    return callbackHandler.getExecutor();
                }
            }, timeoutMillis);
            return;
        } 
}

4.3 BoltClient

主要就是封装了 com.alipay.remoting.rpc.RpcClient。

在初始化的时候,调用 addConnectionEventProcessor,registerUserProcessor 等把 handler 注册到 RpcClient。

public class BoltClient implements Client {
    private RpcClient           rpcClient;
    private AtomicBoolean       closed         = new AtomicBoolean(false);
    private int                 connectTimeout = 2000;
    private final int           connNum;
}

主要函数以下:

@Override
    public Channel connect(URL url) {
        try {
            Connection connection = getBoltConnection(rpcClient, url);
            BoltChannel channel = new BoltChannel();
            channel.setConnection(connection);
            return channel;
        } 
    }

    protected Connection getBoltConnection(RpcClient rpcClient, URL url) throws RemotingException {
        Url boltUrl = createBoltUrl(url);
        try {
            Connection connection = rpcClient.getConnection(boltUrl, connectTimeout);
            if (connection == null || !connection.isFine()) {
                if (connection != null) {
                    connection.close();
                }
            }
            return connection;
        } 
    }

    @Override
    public Object sendSync(URL url, Object message, int timeoutMillis) {
            return rpcClient.invokeSync(createBoltUrl(url), message, timeoutMillis);
    }

    @Override
    public Object sendSync(Channel channel, Object message, int timeoutMillis) {
        if (channel != null && channel.isConnected()) {
            BoltChannel boltChannel = (BoltChannel) channel;
            return rpcClient.invokeSync(boltChannel.getConnection(), message, timeoutMillis); 
        }
    }

    @Override
    public void sendCallback(URL url, Object message, CallbackHandler callbackHandler,
                             int timeoutMillis) {
        try {
            Connection connection = getBoltConnection(rpcClient, url);
            BoltChannel channel = new BoltChannel();
            channel.setConnection(connection);
            rpcClient.invokeWithCallback(connection, message, new InvokeCallback() {

                @Override
                public void onResponse(Object result) {
                    callbackHandler.onCallback(channel, result);
                }

                @Override
                public void onException(Throwable e) {
                    callbackHandler.onException(channel, e);
                }

                @Override
                public Executor getExecutor() {
                    return callbackHandler.getExecutor();
                }
            }, timeoutMillis);
            return;
        } 
    }

4.4 BoltExchange

BoltExchange 的主要做用是维护了Client和Server两个ConcurrentHashMap。就是全部的Clients和Servers

这里进行了第一层链接维护。

Map<String, Client> clients 是依据String对Client作了区分,String包括以下:

String DATA_SERVER_TYPE = "dataServer";
String META_SERVER_TYPE = "metaServer";

就是说,假如 Data Server 使用了BoltExchange,则其内部只有两个BoltClient,这两个Client分别被 同 dataServer 和 metaServer 的交互 所复用。

ConcurrentHashMap<Integer, Server> serverMap 是依据自己端口对 自己启动的Server 作了区分

public class BoltExchange implements Exchange<ChannelHandler> {

    private Map<String, Client>                clients   = new ConcurrentHashMap<>();

    private ConcurrentHashMap<Integer, Server> serverMap = new ConcurrentHashMap<>();

    @Override
    public Client connect(String serverType, URL serverUrl, ChannelHandler... channelHandlers) {
        return this.connect(serverType, 1, serverUrl, channelHandlers);
    }

    @Override
    public Client connect(String serverType, int connNum, URL serverUrl, ChannelHandler... channelHandlers) {
        Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers));
        client.connect(serverUrl);
        return client;
    }

    @Override
    public Server open(URL url, ChannelHandler... channelHandlers) {
        BoltServer server = createBoltServer(url, channelHandlers);
        setServer(server, url);
        server.startServer();
        return server;
    }

    @Override
    public Client getClient(String serverType) {
        return clients.get(serverType);
    }

    @Override
    public Server getServer(Integer port) {
        return serverMap.get(port);
    }

    /**
     * add server into serverMap
     * @param server
     * @param url
     */
    public void setServer(Server server, URL url) {
        serverMap.putIfAbsent(url.getPort(), server);
    }

    private BoltClient newBoltClient(int connNum, ChannelHandler[] channelHandlers) {
        BoltClient boltClient = createBoltClient(connNum);
        boltClient.initHandlers(Arrays.asList(channelHandlers));
        return boltClient;
    }

    protected BoltClient createBoltClient(int connNum) {
        return new BoltClient(connNum);
    }

    protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {
        return new BoltServer(url, Arrays.asList(channelHandlers));
    }
}

4.5 BoltExchange 获取Bolt Client

内部会根据不一样的server type从 boltExchange 取出对应Bolt Client。

String DATA_SERVER_TYPE = "dataServer";
String META_SERVER_TYPE = "metaServer";

用以下方法put Client。

Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers));

Bolt用以下办法获取Client

Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);

Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);

获得对应的Client以后,而后分别根据参数的url创建Channel,或者发送请求。

Channel channel = client.getChannel(url);

client.sendCallback(request.getRequestUrl()....;

此时大体逻辑以下:

+----------------------------------------+
                 |  BoltExchange                          |
                 | +------------------------------------+ |
                 | |                                    | |
                 | |       Map<String, Client>          | |
                 | |                                    | |
                 | | ConcurrentHashMap<Integer, Server> | |
                 | |                                    | |
                 | +------------------------------------+ |
                 +----------------------------------------+
                        |                           |
                        |                           |
                        |                           |
                        v                           v
+-----------------------+----------+      +---------+---------------------------------+
| BoltClient                       |      | BoltServer                                |
|                                  |      |                                           |
| +------------------------------+ |      | +---------------------------------------+ |
| |     remoting.rpc.RpcClient   | |      | |     remoting.rpc.RpcServer            | |
| |  +-------------------------+ | |      | |                                       | |
| |  | ConnectionEventHandler  | | |      | |   Map<String, Channel> channels       | |
| |  |                         | | |      | |                                       | |
| |  |  ConnectionFactory      | | |      | | List<ChannelHandler> channelHandlers  | |
| |  +-------------------------+ | |      | |                                       | |
| +------------------------------+ |      | +---------------------------------------+ |
+----------------------------------+      +-------------------------------------------+
      |       |           |                 |                        |
      |       |           +---------------------------+              |
      |       v                             |         |              v
      |   +---+------------+ <--------------+   +-----v--------------+--------------+
      |   | ChannelHandler |                    |   BoltChannel                     |
      |   +----------------+                    |                                   |
      |                                         |  +------------------------------+ |
      |                                         |  |com.alipay.remoting.Connection| |
      |                                         |  +------------------------------+ |
      |                                         +-----------------------------------+
      |                                                       |
      v                                                       v
  +---+-------------+                               +---------+------------+
  | CallbackHandler |                               | netty.channel.Channel|
  +-----------------+                               +----------------------+

0x04 Http封装

如下是基于Jetty封装的Http模块 registry-remoting-http。

├── JerseyChannel.java
├── JerseyClient.java
├── JerseyJettyServer.java
├── exchange
│   └── JerseyExchange.java
└── jetty
    └── server
        ├── HttpChannelOverHttpCustom.java
        ├── HttpConnectionCustom.java
        └── HttpConnectionCustomFactory.java

由于 Http 服务不是SOFTRegistry主要功能,因此此处略去。

0x05 功能模块

咱们从目录结构能够大体看出功能模块划分。

├── remoting
│   ├── DataNodeExchanger.java
│   ├── MetaNodeExchanger.java
│   ├── dataserver
│   │   ├── DataServerConnectionFactory.java
│   │   ├── DataServerNodeFactory.java
│   │   ├── GetSyncDataHandler.java
│   │   ├── SyncDataCallback.java
│   │   ├── handler
│   │   │   ├── DataSyncServerConnectionHandler.java
│   │   │   ├── FetchDataHandler.java
│   │   │   ├── NotifyDataSyncHandler.java
│   │   │   ├── NotifyFetchDatumHandler.java
│   │   │   ├── NotifyOnlineHandler.java
│   │   │   └── SyncDataHandler.java
│   │   └── task
│   │       ├── AbstractTask.java
│   │       ├── ConnectionRefreshTask.java
│   │       └── RenewNodeTask.java
│   ├── handler
│   │   ├── AbstractClientHandler.java
│   │   └── AbstractServerHandler.java
│   ├── metaserver
│   │   ├── handler
│   │   ├── provideData
│   │   └── task
│   └── sessionserver
│       ├── disconnect
│       ├── forward
│       └── handler

由于每一个大功能模块大同小异,因此咱们下面主要以 dataserver 目录下为主,兼顾 metaserver 和 sessionserver目录下的特殊部分。

Data Server 比较复杂,便是服务器也是客户端,因此分别作了不一样的组件来抽象这两个概念

5.1 Server组件

DataServerBootstrap#start 方法,用于启动一系列的初始化服务。在此函数中,启动了若干网络服务,用来提供 对外接口。

public void start() {
    try {
        openDataServer();
        openDataSyncServer();
        openHttpServer();
        startRaftClient();
    } 
}

各 Handler 具体做用如图所示:

图 3 各 Handler 做用

5.2 Bolt Server

DataServer 和 DataSyncServer 是 Bolt Server,是节点间的 bolt 通讯组件,其中:

  • boltExchange。bolt组件通信组件,用来给server和dataSyncServer提供通信服务;
  • DataServer。dataServer 则负责数据相关服务,好比数据服务,获取数据的推送,服务上下线通知等;
  • dataSyncServer。dataSyncServer 主要是处理一些数据同步相关的服务;

具体代码以下:

private void openDataServer() {
    try {
        if (serverForSessionStarted.compareAndSet(false, true)) {
            server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(),
                dataServerConfig.getPort()), serverHandlers
                .toArray(new ChannelHandler[serverHandlers.size()]));
        }
    } 
}

private void openDataSyncServer() {
    try {
        if (serverForDataSyncStarted.compareAndSet(false, true)) {
            dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
                .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
                .toArray(new ChannelHandler[serverSyncHandlers.size()]));
        }
    } 
}

这两个server的handlers有部分重复,怀疑开发者在作功能迁移。

@Bean(name = "serverSyncHandlers")
public Collection<AbstractServerHandler> serverSyncHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(getDataHandler());
    list.add(publishDataProcessor());
    list.add(unPublishDataHandler());
    list.add(notifyFetchDatumHandler());
    list.add(notifyOnlineHandler());
    list.add(syncDataHandler());
    list.add(dataSyncServerConnectionHandler());
    return list;
}

@Bean(name = "serverHandlers")
public Collection<AbstractServerHandler> serverHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(getDataHandler());
    list.add(clientOffHandler());
    list.add(getDataVersionsHandler());
    list.add(publishDataProcessor());
    list.add(sessionServerRegisterHandler());
    list.add(unPublishDataHandler());
    list.add(dataServerConnectionHandler());
    list.add(renewDatumHandler());
    list.add(datumSnapshotHandler());
    return list;
}

5.2.1 DataSyncServer

这里用DataSyncServer作具体说明。

启动 DataSyncServer 时,注册了以下几个 handler 用于处理 bolt 请求 :

图5 DayaSyncServer 注册的 Handler

DayaSyncServer 注册的 Handler 以下:

  • getDataHandler

该 Handler 主要用于数据的获取,当一个请求过来时,会经过请求中的 DataCenter 和 DataInfoId 获取当前 DataServer 节点存储的相应数据。

  • publishDataProcessor \ unPublishDataHandler

当有数据发布者 publisher 上下线时,会分别触发 publishDataProcessor 或 unPublishDataHandler ,Handler 会往 dataChangeEventCenter 中添加一个数据变动事件,用于异步地通知事件变动中心数据的变动。事件变动中心收到该事件以后,会往队列中加入事件。此时 dataChangeEventCenter 会根据不一样的事件类型异步地对上下线数据进行相应的处理。

与此同时,DataChangeHandler 会把这个事件变动信息经过 ChangeNotifier 对外发布,通知其余节点进行数据同步。

  • notifyFetchDatumHandler

这是一个数据拉取请求,当该 Handler 被触发时,通知当前 DataServer 节点进行版本号对比,若请求中数据的版本号高于当前节点缓存中的版本号,则会进行数据同步操做,保证数据是最新的。

  • notifyOnlineHandler

这是一个 DataServer 上线通知请求 Handler,当其余节点上线时,会触发该 Handler,从而当前节点在缓存中存储新增的节点信息。用于管理节点状态,到底是 INITIAL 仍是 WORKING 。

  • syncDataHandler

节点间数据同步 Handler,该 Handler 被触发时,会经过版本号进行比对,若当前 DataServer 所存储数据版本号含有当前请求版本号,则会返回全部大于当前请求数据版本号的全部数据,便于节点间进行数据同步。

  • dataSyncServerConnectionHandler

链接管理 Handler,当其余 DataServer 节点与当前 DataServer 节点链接时,会触发 connect 方法,从而在本地缓存中注册链接信息,而当其余 DataServer 节点与当前节点断连时,则会触发 disconnect 方法,从而删除缓存信息,进而保证当前 DataServer 节点存储有全部与之链接的 DataServer 节点。

5.2.2 调用链

dataSyncServer 调用链以下:

在 DataServerBootstrap 中有

private void openDataSyncServer() {
    try {
        if (serverForDataSyncStarted.compareAndSet(false, true)) {
            dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
                .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
                .toArray(new ChannelHandler[serverSyncHandlers.size()]));
        }
    }
}

而后有

public class BoltExchange implements Exchange<ChannelHandler> {
    @Override
    public Server open(URL url, ChannelHandler... channelHandlers) {
        BoltServer server = createBoltServer(url, channelHandlers);
        setServer(server, url);
        server.startServer();
        return server;
    }
  
    protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {
        return new BoltServer(url, Arrays.asList(channelHandlers));
    }
}

BoltServer启动以下,而且用户自定义了UserProcessor。

public class BoltServer implements Server {
    public BoltServer(URL url, List<ChannelHandler> channelHandlers) {
        this.channelHandlers = channelHandlers;
        this.url = url;
    }
  
    public void startServer() {
        if (isStarted.compareAndSet(false, true)) {
                boltServer = new RpcServer(url.getPort(), true);
                initHandler();
                boltServer.start();
        }
    }  
  
    private void initHandler() {
        if (initHandler.compareAndSet(false, true)) {
            boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT,
                new ConnectionEventAdapter(ConnectionEventType.CONNECT,
                    getConnectionEventHandler(), this));
            boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE,
                new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(),
                    this));
            boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION,
                new ConnectionEventAdapter(ConnectionEventType.EXCEPTION,
                    getConnectionEventHandler(), this));

            registerUserProcessorHandler();
        }
    }  
  
    //这里分了同步和异步
    private void registerUserProcessorHandler() {
        if (channelHandlers != null) {
            for (ChannelHandler channelHandler : channelHandlers) {
                if (HandlerType.PROCESSER.equals(channelHandler.getType())) {
                    if (InvokeType.SYNC.equals(channelHandler.getInvokeType())) {
                        boltServer.registerUserProcessor(new SyncUserProcessorAdapter(
                            channelHandler));
                    } else {
                        boltServer.registerUserProcessor(new AsyncUserProcessorAdapter(
                            channelHandler));
                    }
                }
            }
        }
    }
  
}

5.3 HttpServer

HttpServer 是用于控制的Http 通讯组件以及其配置,提供一系列 REST 接口,用于 dashboard 管理、数据查询等;

  • jerseyExchange。jersey组件通信组件,提供服务;
  • httpServer 主要提供一系列 http 接口,用于 dashboard 管理、数据查询等;
private void openHttpServer() {
    try {
        if (httpServerStarted.compareAndSet(false, true)) {
            bindResourceConfig();
            httpServer = jerseyExchange.open(
                new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig
                    .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig });
        }
    } 
}

5.4 RaftClient

RaftClient 是基于Raft协议的客户端,用来基于raft协议获取meta server leader信息。

private void startRaftClient() {
    metaServerService.startRaftClient();
    eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}

具体DefaultMetaServiceImpl中实现代码以下:

@Override
public void startRaftClient() {
    try {
        if (clientStart.compareAndSet(false, true)) {
            String serverConf = getServerConfig();
            raftClient = new RaftClient(getGroup(), serverConf);
            raftClient.start();
        }
    }
}

功能模块最后逻辑大体以下:

+->  getDataHandler
                                                  |
                                                  +->  publishDataProcessor
                                                  |
                                                  +--> unPublishDataHandler
                                                  |
                               +---------------+  +--> notifyFetchDatumHandler
                        +----> | DataSyncServer+->+
                        |      +---------------+  +--> notifyOnlineHandler
                        |                         |
                        |                         +--> syncDataHandler
                        |       +-------------+   |
+-------------------+   +---->  | HttpServer  |   +->  dataSyncServerConnectionHandler
|DataServerBootstrap+-->+       +-------------+
+-------------------+   |
                        |
                        |       +------------+    +->  getDataHandler
                        +-----> | RaftClient |    |
                        |       +------------+    +--> clientOffHandler
                        |                         |
                        |       +-------------+   +--> getDataVersionsHandler
                        +-----> |  DataServer +-->+
                                +-------------+   +--> publishDataProcessor
                                                  |
                                                  +--> sessionServerRegisterHandler
                                                  |
                                                  +--> unPublishDataHandler
                                                  |
                                                  +--> dataServerConnectionHandler
                                                  |
                                                  +--> renewDatumHandler
                                                  |
                                                  +->  datumSnapshotHandler

0x06 链接抽象 Exchange

Exchange 做为 Client / Server 链接的抽象,负责节点之间的链接。

Data Server 这里主要是 DataNodeExchanger 和 MetaNodeExchanger,用来:

  • 封装 BoltExchange

  • 把 Bolt Client 和 Bolt Channel 进行抽象。

  • 提供能够直接使用的网络API,好比ForwardServiceImpl,GetSyncDataHandler这些散落的Bean能够直接使用DataNodeExchanger来作网络交互。

从对外接口中能够看出来,

  • connect 函数用来建立链接,返回Channel,这个设置了handler,用来处理服务器推送;
  • request 函数用来发起请求;

这里有两个问题:

  • 为何没有 SessionNodeExchanger?
  • 一样是网络资源的管理,这里和 ConnectionFactory有什么区别?

多是由于Session Server可能会不少,不必保存Bolt client和Server,可是Session对应的有ConnectionFactory。ConectionFactory 是较低层次的封装,下文会讲解。

6.1 DataNodeExchanger

咱们以 DataNodeExchanger 为例。

把全部 Data Server 相关的 “Server,Client概念 直接强相关” 的网络操做统一集中在这里。

6.1.1 具体实现

能够看到,主要是对 boltExchange 进行了更高层次的封装。

具体代码以下:

import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.remoting.ChannelHandler;
import com.alipay.sofa.registry.remoting.Client;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.remoting.exchange.NodeExchanger;
import com.alipay.sofa.registry.remoting.exchange.message.Request;
import com.alipay.sofa.registry.remoting.exchange.message.Response;

public class DataNodeExchanger implements NodeExchanger {

    @Autowired
    private Exchange                          boltExchange;

    @Autowired
    private DataServerConfig                  dataServerConfig;

    @Resource(name = "dataClientHandlers")
    private Collection<AbstractClientHandler> dataClientHandlers;

    @Override
    public Response request(Request request) {
        Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);

        if (null != request.getCallBackHandler()) {
            client.sendCallback(request.getRequestUrl(), request.getRequestBody(),
                    request.getCallBackHandler(),
                    request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout());
            return () -> Response.ResultStatus.SUCCESSFUL;
        } else {
            final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(),
                    dataServerConfig.getRpcTimeout());
            return () -> result;
        }
    }

    public Channel connect(URL url) {
        Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);
        if (client == null) {
            synchronized (this) {
                client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);
                if (client == null) {
                    client = boltExchange.connect(Exchange.DATA_SERVER_TYPE, url,
                        dataClientHandlers.toArray(new ChannelHandler[dataClientHandlers.size()]));
                }
            }
        }

        Channel channel = client.getChannel(url);
        if (channel == null) {
            synchronized (this) {
                channel = client.getChannel(url);
                if (channel == null) {
                    channel = client.connect(url);
                }
            }
        }

        return channel;
    }
}

6.1.2 推送Handler相关Bean

上面代码中使用了 dataClientHandlers,其是BoltClient所使用的,Server会对此进行推送,这两个Handler会处理。

@Bean(name = "dataClientHandlers")
public Collection<AbstractClientHandler> dataClientHandlers() {
    Collection<AbstractClientHandler> list = new ArrayList<>();
    list.add(notifyDataSyncHandler());
    list.add(fetchDataHandler());
    return list;
}

此时具体网络概念以下:

+-------------------+
| ForwardServiceImpl| +-----------------+
+-------------------+                   |
                                        |
+--------------------+                  |        +-------------------+
| GetSyncDataHandler | +-----------------------> | DataNodeExchanger |
+--------------------+                  |        +-------+-----------+
                                        |                |
+----------------------------------+    |                |
| LocalDataServerChangeEventHandler| +--+                |
+----------------------------------+    |                |
                                        |                |
+------------------------------+        |                v
| DataServerChangeEventHandler +--------+          +-----+--------+
+------------------------------+                   | BoltExchange |
                                                   +-----+--------+
                                                         |
        +-----------------+                              |
        |  JerseyExchange |                              |
        +-------+---------+                      +-------+-------+
                |                                |               |
                |                                |               |
                v                                v               v
       +--------+----------+              +------+-----+   +-----+------+
       | JerseyJettyServer |              | BoltServer |   | BoltServer |
       +-------------------+              +------------+   +------------+
             httpServer                       server       dataSyncServer

0x07 Handler

AbstractServerHandler 和 AbstractClientHandler 对 com.alipay.sofa.registry.remoting.ChannelHandler 进行了实现。

这里须要结合SOFABolt来说解。

7.1 SOFABolt

以 RpcServer 为例,SOFABolt在这里的使用是两种处理器:

  • 用户请求处理器 (UserProcessor) :SOFABolt 提供了两种用户请求处理器,SyncUserProcessor 与 AsyncUserProcessor。 两者的区别在于,前者须要在当前处理线程以return返回值的形式返回处理结果;然后者,有一个 AsyncContext 存根,能够在当前线程,也能够在异步线程,调用 sendResponse 方法返回处理结果;
  • 链接事件处理器 (ConnectionEventProcessor) :SOFABolt 提供了两种事件监听,建连事件(ConnectionEventType.CONNECT)与断连事件(ConnectionEventType.CLOSE),用户能够建立本身的事件处理器,并注册到客户端或者服务端。客户端与服务端,均可以监听到各自的建连与断连事件;

7.2 定义

Handler主要代码分别以下:

public abstract class AbstractServerHandler<T> implements ChannelHandler<T> {
    protected NodeType getConnectNodeType() {
        return NodeType.DATA;
    }

    @Override
    public Object reply(Channel channel, T request) {
        try {
            logRequest(channel, request);
            checkParam(request);
            return doHandle(channel, request);
        } catch (Exception e) {
            return buildFailedResponse(e.getMessage());
        }
    }
}

public abstract class AbstractClientHandler<T> implements ChannelHandler<T> {
    @Override
    public Object reply(Channel channel, T request) {
        try {
            logRequest(channel, request);
            checkParam(request);
            return doHandle(channel, request);
        } catch (Exception e) {
            return buildFailedResponse(e.getMessage());
        }
    }
}

系统能够据此实现各类派生类。

这里须要注意的是:ChannelHandler之中分红两类,分别以下,分别对应了RpcServer中的listener和userProcessor。

enum HandlerType {
    LISENTER,
    PROCESSER
}

以serverSyncHandlers为例,只有dataSyncServerConnectionHandler是Listener,其他都是Processor。

这也符合常理,由于消息响应函数就是应该只有一个

@Bean(name = "serverSyncHandlers")
public Collection<AbstractServerHandler> serverSyncHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(getDataHandler());
    list.add(publishDataProcessor());
    list.add(unPublishDataHandler());
    list.add(notifyFetchDatumHandler());
    list.add(notifyOnlineHandler());
    list.add(syncDataHandler());
    list.add(dataSyncServerConnectionHandler());  只有这个是Listener,其他都是Processor。
    return list;
}

总结以下:

+->  getDataHandler
                         |
                         +->  publishDataProcessor
                         |
                         +--> unPublishDataHandler
+-------------------+    |
| serverSyncHandlers+-------> notifyFetchDatumHandler
+-------------------+    |
                         +--> notifyOnlineHandler
                         |
                         +--> syncDataHandler
                         |
                         +->  dataSyncServerConnectionHandler(Listener)

7.3 使用

在启动时,会使用serverSyncHandlers完成BoltServer的启动。

private void openDataSyncServer() {
    try {
        if (serverForDataSyncStarted.compareAndSet(false, true)) {
            dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
                .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
                .toArray(new ChannelHandler[serverSyncHandlers.size()]));
        }
    } 
}

BoltExchange中有:

@Override
public Server open(URL url, ChannelHandler... channelHandlers) {
    BoltServer server = createBoltServer(url, channelHandlers);
    setServer(server, url);
    server.startServer();
    return server;
}

protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {
		return new BoltServer(url, Arrays.asList(channelHandlers));
}

在BoltServer中有以下代码,主要是设置Handler。

public void startServer() {
    if (isStarted.compareAndSet(false, true)) {
        try {
            boltServer = new RpcServer(url.getPort(), true);
            initHandler();
            boltServer.start();
        } 
    }
}

private void initHandler() {
        if (initHandler.compareAndSet(false, true)) {
            boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT,
                new ConnectionEventAdapter(ConnectionEventType.CONNECT,
                    getConnectionEventHandler(), this));
            boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE,
                new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(),
                    this));
            boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION,
                new ConnectionEventAdapter(ConnectionEventType.EXCEPTION,
                    getConnectionEventHandler(), this));

            registerUserProcessorHandler();
        }
}

最终则是调用到RpcServer之中,注册了链接响应函数和用户定义函数。

/**
 * Add processor to process connection event.
 */
public void addConnectionEventProcessor(ConnectionEventType type,
                                        ConnectionEventProcessor processor) {
    this.connectionEventListener.addConnectionEventProcessor(type, processor);
}

/**
 * Use UserProcessorRegisterHelper{@link UserProcessorRegisterHelper} to help register user processor for server side.
 */
@Override
public void registerUserProcessor(UserProcessor<?> processor) {
    UserProcessorRegisterHelper.registerUserProcessor(processor, this.userProcessors);
}

此时与SOFABolt逻辑以下:

+----------------------------+
| [RpcServer]                |
|                            |
|                            |  +--> EXCEPTION +--> DataSyncServerConnectionHandler
|                            |  |
|    connectionEventListener--->--->-CONNECT +----> DataSyncServerConnectionHandler
|                            |  |
|                            |  +--> CLOSE  +-----> DataSyncServerConnectionHandler
|                            |
|                            |
|                            |  +--> EXCEPTION +--> DataSyncServerConnectionHandler
|                            |  |
|    connectionEventHandler +-->---> CONNECT +----> DataSyncServerConnectionHandler
|                            |  |
|                            |  +--> CLOSE  +-----> DataSyncServerConnectionHandler
|                            |
|                            |
|                            |  +---->  GetDataRequest  +-------->  getDataHandler
|                            |  |
|                            |  +---->  PublishDataRequest +----->  publishDataProcessor
|                            |  |
|    userProcessors +---------------->  UnPublishDataRequest +----> unPublishDataHandler
|                            |  |
|                            |  +---->  NotifyOnlineRequest-----> notifyFetchDatumHandler
|                            |  |
|                            |  +---->  NotifyOnlineRequest +-----> notifyOnlineHandler
+----------------------------+  |
                                +---->  SyncDataRequest   +-------> syncDataHandler

0x08 总结

至此,咱们把SOFARegistry网络封装和操做大体梳理了下。

从逻辑上看,阿里提供了两个层级的封装:

从链接角度看,阿里实现了基于 netty.channel.Channel 的封装,从下往上看是:

  • 由于SOFABolt基于Netty,因此封装的核心是netty.channel.Channel。
  • 在此基础上, SOFABolt封装了com.alipay.remoting.Connection
  • 而后 SOFARegistry 基于SOFABolt 封装了 BoltChannel

从应用角度看,阿里实现了Server,Client层次的封装,从下往上看是:

  • SOFABolt构建了 RpcServer,RpcClient。
  • SOFARegisty 基于 RpcServer,RpcClient 构建了BoltClient 和 BoltServer。
  • 而后 SOFARegistry 基于此构建了 BoltExchange。做为 Client / Server 链接的抽象,负责节点之间的链接。
  • 最后构建了 XXXNodeExchanger,在 BoltExchange 基础上,把全部 Data Server 内部的 “Server,Client概念 强相关” 的网络操做统一集中在这里,用户能够直接使用。以DataServer业务模块为例,其内部按照业务不一样,实现了 DataNodeExchanger 和 MetaNodeExchanger。用以让:
    • DataServer 内部直接使用 DataNodeExchanger 与其余 DataServer 交互,
    • DataServer 内部直接使用 MetaNodeExchanger 与 MetaServer 交互。

具体逻辑大体以下:

+---------------------+                                 +---------------------+
|                     |                                 |                     |
|  DataNodeExchanger  |                                 |  MetaNodeExchanger  |
|                     |                                 |                     |
+----------------+----+                                 +--------+------------+
                 |                                               |
                 +-----------+                     +-------------+
                             |                     |
                             v                     v
                   +---------+---------------------+--------+
                   |  BoltExchange                          |
                   | +------------------------------------+ |
                   | |                                    | |
                   | |       Map<String, Client>          | |
                   | |                                    | |
                   | | ConcurrentHashMap<Integer, Server> | |
                   | |                                    | |
                   | +------------------------------------+ |
                   +----------------------------------------+
                          |                           |
                          |                           |
                          |                           |
                          v                           v
  +-----------------------+----------+      +---------+---------------------------------+
  | BoltClient                       |      | BoltServer                                |
  |                                  |      |                                           |
  | +------------------------------+ |      | +---------------------------------------+ |
  | |     remoting.rpc.RpcClient   | |      | |     remoting.rpc.RpcServer            | |
  | |  +-------------------------+ | |      | |                                       | |
  | |  | ConnectionEventHandler  | | |      | |   Map<String, Channel> channels       | |
  | |  |                         | | |      | |                                       | |
  | |  |  ConnectionFactory      | | |      | | List<ChannelHandler> channelHandlers  | |
  | |  +-------------------------+ | |      | |                                       | |
  | +------------------------------+ |      | +---------------------------------------+ |
  +----------------------------------+      +-------------------------------------------+
        |       |           |                 |                        |
        |       |           +---------------------------+              |
        |       v                             |         |              v
        |   +---+------------+ <--------------+   +-----v--------------+--------------+
        |   | ChannelHandler |                    |   BoltChannel                     |
        |   +----------------+                    |                                   |
        |                                         |  +------------------------------+ |
        |                                         |  |com.alipay.remoting.Connection| |
        |                                         |  +------------------------------+ |
        |                                         +-----------------------------------+
        |                                                       |
        v                                                       v
    +---+-------------+                               +---------+------------+
    | CallbackHandler |                               | netty.channel.Channel|
    +-----------------+                               +----------------------+

阿里这里封装的很是细致。由于SOFARegistry是比较繁杂的系统,因此把网络概念,功能作封装是至关有必要的。你们在平常开发中可能不用这么细致的封装,能够参考阿里的思路,本身作选择和裁剪便可。

0xFF 参考

https://timyang.net/architecture/cell-distributed-system/

SOFABolt 源码分析12 - Connection 链接管理设计

SOFABolt 源码分析2 - RpcServer 服务端启动的设计

SOFABolt 源码分析3 - RpcClient 客户端启动的设计

蚂蚁通讯框架实践

sofa-bolt 远程调用

sofa-bolt学习

SOFABolt 设计总结 - 优雅简洁的设计之道

SofaBolt源码分析-服务启动到消息处理

SOFABolt 源码分析

SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计

SOFARegistry 介绍

SOFABolt 源码分析13 - Connection 事件处理机制的设计

相关文章
相关标签/搜索