[从源码学设计]蚂蚁金服SOFARegistry之网络封装和操做

0x00 摘要

SOFARegistry 是蚂蚁金服开源的生产级、高时效、高可用的服务注册中心。java

本系列文章重点在于分析设计和架构,即利用多篇文章,从多角度反推总结 DataServer 或 SOFARegistry 的实现机制和架构思路,借以学习阿里如何设计。git

本文为第二篇,介绍SOFARegistry网络封装和操做。github

0x01 业务领域

1.1 SOFARegistry 整体架构

有的兄弟可能没有读过前面MetaServer的文章,这里回忆下SOFARegistry 整体架构。编程

  • Client 层

应用服务器集群。Client 层是应用层,每一个应用系统经过依赖注册中心相关的客户端 jar 包,经过编程方式来使用服务注册中心的服务发布和服务订阅能力。bootstrap

  • Session 层

Session 服务器集群。顾名思义,Session 层是会话层,经过长链接和 Client 层的应用服务器保持通信,负责接收 Client 的服务发布和服务订阅请求。该层只在内存中保存各个服务的发布订阅关系,对于具体的服务信息,只在 Client 层和 Data 层之间透传转发。Session 层是无状态的,能够随着 Client 层应用规模的增加而扩容。api

  • Data 层

数据服务器集群。Data 层经过分片存储的方式保存着所用应用的服务注册数据。数据按照 dataInfoId(每一份服务数据的惟一标识)进行一致性 Hash 分片,多副本备份,保证数据的高可用。下文的重点也在于随着数据规模的增加,Data 层如何在不影响业务的前提下实现平滑的扩缩容。缓存

  • Meta 层

元数据服务器集群。这个集群管辖的范围是 Session 服务器集群和 Data 服务器集群的服务器信息,其角色就至关于 SOFARegistry 架构内部的服务注册中心,只不过 SOFARegistry 做为服务注册中心是服务于广大应用服务层,而 Meta 集群是服务于 SOFARegistry 内部的 Session 集群和 Data 集群,Meta 层可以感知到 Session 节点和 Data 节点的变化,并通知集群的其它节点。服务器

1.2 应用场景

DataServer,SessionServer,MetaServer 本质都是网络应用程序。这决定了网络封装和操做是本系统的基础模块及功能,下面咱们讲讲应用场景。微信

1.2.1 单元化状态

SOFARegistry 的应用场景是单元化状态。网络

在单元化状态,一个单元是一个五脏俱全的缩小版整站,它是全能的,由于部署了全部应用;但它不是全量的,由于只能操做一部分数据。可以单元化的系统,很容易在多机房中部署,由于能够轻易的把几个单元部署在一个机房,而把另外几个部署在其余机房。借由在业务入口处设置一个流量调配器,能够调整业务流量在单元之间的比例。

1.2.2 内网通信

因此 SOFARegistry 考虑的就是在 IDC 私网环境中如何进行节点间通讯。高吞吐、高并发的通讯,数量众多的链接管理(C10K 问题),便捷的升级机制,兼容性保障,灵活的线程池模型运用,细致的异常处理与日志埋点等,这些功能都须要在通讯协议和实现框架上作文章。

1.2.3 Http协议

服务器也有若干配置需求,用简单http协议便可。

1.3 问题点

在内网单元化场景下,可以想到的问题点以下:

  • 如何制定私有协议。私网环境若是全部应用的节点间,所有经过标准协议来通讯,会有不少问题:好比研发效率方面的影响,升级兼容性,无用字段的传输,功能定制也可能不那么灵活。
  • 如何进行连接管理(无锁建连、定时断连、自动重连);
  • 如何进行精细的线程模型的设计;
  • 如何进行超时控制;
  • 如何进行批量解包和批量提交处理;
  • 如何进行心跳控制;
  • 如何支持通讯模型(oneway、sync、callback、future);
  • 如何实现长链接;
  • 如何实现推拉模式;
  • 如何进行节点判活;

1.4 解决方案

对于高性能,高并发的场景,在Java体系下必然选择非阻塞IO复用,天然选择基于Netty开发。

1.5 阿里方案

阿里是借助 SOFABolt 通讯框架,实现基于TCP长链接的节点判活与推模式的变动推送,服务上下线通知时效性在秒级内。

sofa-bolt是蚂蚁开源的基于Netty的网络通讯框架。在Netty基础上对网络编程常见问题进行简单封装,让中间件开发者更关注于中间件产品自己。

大致功能为:

  1. 链接管理
  2. 请求处理

SOFABolt能够理解为Netty的最佳实践,并额外进行一些优化工做。

  • 基于Netty的高效的网络IO于线程模型的应用
  • 连接管理(无锁建连、定时断连、自动重连)
  • 通讯模型(oneway、sync、callback、future)
  • 超时控制
  • 批量解包和批量提交处理
  • 心跳与IDLE机制

SOFABolt框架后续可能会专门有系列进行分析,目前认为基于SOFABolt能够知足咱们需求,因此咱们会简单介绍SOFABolt,重点在于如何使用以及业务实现。

1.6 实现问题

在肯定了采用SOFABolt以后,以前提到的问题点就基本被SOFABolt解决,咱们暂时能想到的其余问题大体以下:

  • 网络中的各类功能模块是否须要封装,若是封装,到什么程度比较恰当;
  • 具体封装,纵向横向分别须要细化到什么程度;
  • Data Server 既要对外提供服务,也会做为客户端向其余模块发送请求,这两个功能是否须要抽象出来;
  • 是否须要按照具体业务功能进行分类封装;

1.7 总述

咱们提早剧透,即从逻辑上看,阿里提供了两个层级的封装

从链接角度看,阿里实现了基于 netty.channel.Channel 的封装,从下往上看是:

  • 由于SOFABolt基于Netty,因此封装的核心是netty.channel.Channel。
  • 在此基础上, SOFABolt封装了com.alipay.remoting.Connection。
  • 而后 SOFARegistry 基于SOFABolt 封装了 BoltChannel,做为链接以及链接操做的抽象。

从应用角度看,阿里实现了Server,Client层次的封装,从下往上看是:

  • SOFABolt构建了 RpcServer,RpcClient。
  • SOFARegisty 基于 RpcServer,RpcClient 构建了BoltClient 和 BoltServer。
  • 而后 SOFARegistry 基于此构建了 BoltExchange。这是路由,做为 Client / Server 链接的进一步抽象,负责节点之间的链接。
  • 最后构建了 XXXNodeExchanger,在 BoltExchange 基础上,把全部 Data Server 相关的 非 “Server,Client概念 强相关” 的网络操做统一集中在这里,用户能够直接使用。以DataServer业务模块为例,其内部按照业务不一样,实现了 DataNodeExchanger 和  MetaNodeExchanger。用以让:
    • DataServer 内部直接使用 DataNodeExchanger 与其余 DataServer 交互,
    • DataServer 内部直接使用 MetaNodeExchanger 与 MetaServer 交互。

具体逻辑大体以下:

+---------------------+                                 +---------------------+
|                     |                                 |                     |
|  DataNodeExchanger  |                                 |  MetaNodeExchanger  |
|                     |                                 |                     |
+----------------+----+                                 +--------+------------+
                 |                                               |
                 +-----------+                     +-------------+
                             |                     |
                             v                     v
                   +---------+---------------------+--------+
                   |  BoltExchange                          |
                   | +------------------------------------+ |
                   | |                                    | |
                   | |       Map<String, Client>          | |
                   | |                                    | |
                   | | ConcurrentHashMap<Integer, Server> | |
                   | |                                    | |
                   | +------------------------------------+ |
                   +----------------------------------------+
                          |                           |
                          |                           |
                          |                           |
                          v                           v
  +-----------------------+----------+      +---------+---------------------------------+
  | BoltClient                       |      | BoltServer                                |
  |                                  |      |                                           |
  | +------------------------------+ |      | +---------------------------------------+ |
  | |     remoting.rpc.RpcClient   | |      | |     remoting.rpc.RpcServer            | |
  | |  +-------------------------+ | |      | |                                       | |
  | |  | ConnectionEventHandler  | | |      | |   Map<String, Channel> channels       | |
  | |  |                         | | |      | |                                       | |
  | |  |  ConnectionFactory      | | |      | | List<ChannelHandler> channelHandlers  | |
  | |  +-------------------------+ | |      | |                                       | |
  | +------------------------------+ |      | +---------------------------------------+ |
  +----------------------------------+      +-------------------------------------------+
        |       |           |                 |                        |
        |       |           +---------------------------+              |
        |       v                             |         |              v
        |   +---+------------+ <--------------+   +-----v--------------+--------------+
        |   | ChannelHandler |                    |   BoltChannel                     |
        |   +----------------+                    |                                   |
        |                                         |  +------------------------------+ |
        |                                         |  |com.alipay.remoting.Connection| |
        |                                         |  +------------------------------+ |
        |                                         +-----------------------------------+
        |                                                       |
        v                                                       v
    +---+-------------+                               +---------+------------+
    | CallbackHandler |                               | netty.channel.Channel|
    +-----------------+                               +----------------------+复制代码

0x02 基础封装

SOFARegistry 对网络基础功能作了封装,也对外提供了API。如下是封装模块以及对外接口 registry-remoting-api。

├── CallbackHandler.java
├── Channel.java
├── ChannelHandler.java
├── Client.java
├── Endpoint.java
├── RemotingException.java
├── Server.java
└── exchange
    ├── Exchange.java
    ├── NodeExchanger.java
    ├── RequestException.java
    └── message
        ├── Request.java
        └── Response.java复制代码

其中比较关键的是四个接口:Server,Client,Exchange,Channel,所以这些就是网络封装的最基本概念。

2.1 Channel

Channel 这个概念比较广泛,表明了IO源与目标打开的链接以及链接的操做。咱们先以Java的Channel为例来进行说明。

2.1.1 Java Channel

Java 的Channel 由java.nio.channels包定义,Channel表示IO源与目标打开的链接,Channel相似于传统的“流”,不过Channel自己不能直接访问数据,Channel只能与Buffer进行交互。

Channel用于在字节缓冲区和位于通道另外一侧的实体(一般是一个文件或套接字)之间有效地传输数据。通道是访问IO服务的导管,经过通道,咱们以最小的开销来访问操做系统的I/O服务;顺便说下,缓冲区是通道内部发送数据和接收数据的端点。

由java.nio.channels包定义,Channel表示IO源与目标打开的链接,Channel相似于传统的“流”,只不过Channel自己不能直接访问数据,Channel只能与Buffer进行交互。通道主要用于传输数据,从缓冲区的一侧传到另外一侧的实体(如文件、套接字...),反之亦然;通道是访问IO服务的导管,经过通道,咱们能够以最小的开销来访问操做系统的I/O服务;

2.1.2 SOFA Channel

从SOFARegistry的Channel定义能够看出其基本功能主要是属性相关功能。

public interface Channel {InetSocketAddress getRemoteAddress();InetSocketAddress getLocalAddress();boolean isConnected();Object getAttribute(String key);void setAttribute(String key, Object value);WebTarget getWebTarget();void close();
}复制代码

2.2 Server

Server是服务器对应的封装,其基本功能由定义可知,主要是基于Channel发送功能。

public interface Server extends Endpoint {boolean isOpen();Collection<Channel> getChannels();Channel getChannel(InetSocketAddress remoteAddress);Channel getChannel(URL url);void close(Channel channel);int getChannelCount();Object sendSync(final Channel channel, final Object message, final int timeoutMillis);void sendCallback(final Channel channel, final Object message, CallbackHandler callbackHandler,final int timeoutMillis);
}复制代码

2.3 Client

Client是客户端对应的封装,其基本功能也是基于Channel进行交互。

public interface Client extends Endpoint {Channel getChannel(URL url);Channel connect(URL url);Object sendSync(final URL url, final Object message, final int timeoutMillis);Object sendSync(final Channel channel, final Object message, final int timeoutMillis);void sendCallback(final URL url, final Object message, CallbackHandler callbackHandler,                      final int timeoutMillis);
}复制代码

2.4 Exchange

Exchange 是路由,做为 Client / Server 链接的进一步抽象,负责同类型server之间的链接。

public interface Exchange<T> {

    String DATA_SERVER_TYPE = "dataServer";
    String META_SERVER_TYPE = "metaServer";Client connect(String serverType, URL serverUrl, T... channelHandlers);Client connect(String serverType, int connNum, URL serverUrl, T... channelHandlers);Server open(URL url, T... channelHandlers);Client getClient(String serverType);Server getServer(Integer port);
}复制代码

2.5 ChannelHandler

在创建链接中,能够设置一系列应对不一样任务的 handler (称之为 ChannelHandler)。

这些 ChannelHandler 有的做为 Listener 用来处理链接事件,有的做为 Processor 用来处理各类指定的事件,好比服务信息数据变化、Subscriber 注册等事件。

public interface ChannelHandler<T> {enum HandlerType {
        LISENTER,
        PROCESSER
    }enum InvokeType {
        SYNC,
        ASYNC
    }void connected(Channel channel) throws RemotingException;void disconnected(Channel channel) throws RemotingException;void received(Channel channel, T message) throws RemotingException;Object reply(Channel channel, T message) throws RemotingException;void caught(Channel channel, T message, Throwable exception) throws RemotingException;HandlerType getType();Class interest();default InvokeType getInvokeType() {return InvokeType.SYNC;
    }default Executor getExecutor() {return null;
    }
}复制代码

所以,网络基本对外接口以下:

+-------------------------------------------------------------------------+
|[registry+remoting+api]                                                  |
|                                                                         |
|              +----------+                      +-------------+          |
|              | Exchange |                      |NodeExchanger|          |
|              ++-----+--++                      +----+--------+          |
|               |     |  |                            |                   |
|               |     |  +----------------------+     |                   |
|               |     |                         |     |                   |
|        +------+     v                         v     v                   |
|        |         +--+-----+                 +-+-----++                  |
|        |         | Server |                 | Client |                  |
|        |         +-----+--+                 +-+----+-+                  |
|        |               |                      |    |                    |
|        |               +--------+     +-------+    |                    |
|        |                        |     |            |                    |
|        v                        v     v            v                    |
|   +----+-----------+            +-----+-+         ++--------------+     |
|   | ChannelHandler |            |Channel|         |CallbackHandler|     |
|   +----------------+            +-------+         +---------------+     |
+-------------------------------------------------------------------------+复制代码

0x03 SOFABolt

由于SOFARegistry主要是基于SOFABolt,无法绕开,因此咱们须要首先简单介绍SOFABolt。

Bolt是基于Netty,因此要先说明Netty Channel。

3.1 Netty Channel

在Netty框架中,Channel是其中核心概念之一,是Netty网络通讯的主体,由它负责同对端进行网络通讯、注册和数据操做等功能。

Netty对Jdk原生的ServerSocketChannel进行了封装和加强封装成了NioXXXChannel, 相对于原生的JdkChannel, Netty的Channel增长了以下的组件。

  • id 标识惟一身份信息
  • 可能存在的parent Channel
  • 管道 Pipeline
  • 用于数据读写的unsafe内部类
  • 关联上相伴终生的NioEventLoop

根据服务端和客户端,Channel能够分红两类:

  • 服务端: NioServerSocketChannel
  • 客户端: NioSocketChannel

其实inbound和outbound分别用于标识 Context 所对应的handler的类型, 在Netty中事件能够分为Inbound和Outbound事件,在ChannelPipeline的类注释中,有以下图示:

 * 
 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+复制代码

3.2 Connection

Connection其删减版定义以下,能够看到其主要成员就是 Netty channel 实例:

public class Connection {private Channel                                                               channel;private final ConcurrentHashMap<Integer, InvokeFuture>                        invokeFutureMap  = new ConcurrentHashMap<Integer, InvokeFuture>(4);/** Attribute key for connection */public static final AttributeKey<Connection>                                  CONNECTION       = AttributeKey.valueOf("connection");  /** Attribute key for heartbeat count */public static final AttributeKey<Integer>                                     HEARTBEAT_COUNT  = AttributeKey.valueOf("heartbeatCount");/** Attribute key for heartbeat switch for each connection */public static final AttributeKey<Boolean>                                     HEARTBEAT_SWITCH = AttributeKey.valueOf("heartbeatSwitch");/** Attribute key for protocol */public static final AttributeKey<ProtocolCode>                                PROTOCOL         = AttributeKey.valueOf("protocol");/** Attribute key for version */public static final AttributeKey<Byte> VERSION = AttributeKey.valueOf("version");private Url url;private final ConcurrentHashMap<Integer/* id */, String/* poolKey */>       id2PoolKey       = new ConcurrentHashMap<Integer, String>(256);private Set<String>                                                           poolKeys         = new ConcurrentHashSet<String>();private final ConcurrentHashMap<String/* attr key*/, Object /*attr value*/> attributes       = new ConcurrentHashMap<String, Object>();
}复制代码

Connection的辅助类不少,摘录以下:

  • ConnectionFactory 链接工厂:建立链接、检测链接等
  • ConnectionPool 链接池:存储 { uniqueKey, List } ,uniqueKey 默认为 ip:port;包含 ConnectionSelectStrategy,从 pool 中选择 Connection
  • ConnectionEventHandler 和 ConnectionEventListener:事件处理器和监听器
  • ConnectionManager 链接管理器:是对外的门面,包含全部与 Connection 相关的对外的接口操做
  • Scanner 扫描器:Bolt 提供的一个统一的扫描器,用于执行一些后台任务

另外,须要注意的点以下:

  • 服务端建立 Connection 只有一个时机:netty 链接刚刚创建时
  • 客户端真正建立链接的时候,是在发起第一次调用的时候。

3.3 Connection消息处理

不管是服务端仍是客户端,其实本质都在作一件事情:建立 ConnectionEventHandler 实例并添加到 Netty 的 pipeline 中,基本原理是:

  • ConnectionEventListener:Connection 事件监听器,存储处理对应 ConnectionEventType 的 ConnectionEventProcessor 列表;
  • ConnectionEventProcessor:真正的 Connection 事件处理器接口;能够继承 ConnectionEventProcessor,编写自定义的事件处理类
  • 将自定义的事件处理类添加到 ConnectionEventListener 中;

ConnectionEventHandler处理两类事件

  • Netty 定义的事件:例如 connect,channelActive 等;
  • SOFABolt 定义的事件:事件类型 ConnectionEventType;

以后当有 ConnectionEvent 触发时(不管是 Netty 定义的事件被触发,仍是 SOFABolt 定义的事件被触发),ConnectionEventHandler 会经过异步线程执行器通知 ConnectionEventListener,ConnectionEventListener 将消息派发给具体的 ConnectionEventProcessor 实现类。

3.4 RpcServer

RpcServer实现了一个Server所必须的基本机制,能够直接使用,好比:

  • 编解码,协议版本,地址处理;
  • workerGroup(static类变量,实现多个 RpcServer 实例共享 workerGroup)与 bossGroup;
  • 请求消息处理器;
  • 响应消息处理器;
  • 心跳消息处理器;
  • 用户处理器 UserProcessor;
  • 链接Manager;
  • RpcServerRemoting (发起底层调用实现类) 实例,由于SOFABolt 能够进行双向调用,server 端也能够调用 client 端,因此此处构建了 RpcServerRemoting 实例;
  • ServerBootstrap 实例用以设置一系列 netty 服务端配置;

其中,须要说明的是:

  • 请求链 RpcHandler -> RpcCommandHandler -> RpcRequestProcessor -> UserProcessor
  • 响应链 RpcHandler -> RpcCommandHandler -> RpcResponseProcessor
  • 心跳链 RpcHandler -> RpcCommandHandler -> RpcHeartBeatProcessor

具体定义以下:

public class RpcServer extends AbstractRemotingServer {/** server bootstrap */private ServerBootstrap                             bootstrap;/** channelFuture */private ChannelFuture                               channelFuture;/** connection event handler */private ConnectionEventHandler                      connectionEventHandler;/** connection event listener */private ConnectionEventListener                     connectionEventListener = new ConnectionEventListener();/** user processors of rpc server */private ConcurrentHashMap<String, UserProcessor<?>> userProcessors          = new ConcurrentHashMap<String, UserProcessor<?>>(4);/** boss event loop group, boss group should not be daemon, need shutdown manually*/private final EventLoopGroup                        bossGroup               = NettyEventLoopUtil.newEventLoopGroup(NamedThreadFactory("Rpc-netty-server-boss",false));  /** worker event loop group. Reuse I/O worker threads between rpc servers. */private static final EventLoopGroup                 workerGroup             = NettyEventLoopUtil.newEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2,new NamedThreadFactory("Rpc-netty-server-worker",true));/** address parser to get custom args */private RemotingAddressParser                       addressParser;/** connection manager */private DefaultConnectionManager                    connectionManager;/** rpc remoting */protected RpcRemoting                               rpcRemoting;/** rpc codec */private Codec                                       codec                   = new RpcCodec();
}复制代码

3.5 RpcClient

RpcClient主要机制以下:

  • 用户处理器 UserProcessor ;RpcServer 能够主动向 RpcClient 发起请求,因此RpcClient 也须要建立 UserProcessor 来处理这些请求;
  • RpcConnectionFactory 工厂;
  • workerGroup(static类变量,实现多个 RpcClient 实例共享 workerGroup);
  • Codec 的实现类 RpcCodec 实例,用于建立 netty 的编解码器,实质上是一个工厂类;
  • HeartbeatHandler 心跳处理器;
  • RpcHandler 实例做为 netty 的业务逻辑处理器;
  • ConnectionSelectStrategy 链接选择器;
  • DefaultConnectionManager 链接管理器(是整个 Connection 设计的核心);
  • Bootstrap 实例用以设置一系列 netty 客户端配置;
  • Remote 层请求和响应封装实体的建立工厂 RpcCommandFactory 实例;
  • RpcClientRemoting (发起底层调用实现类) 实例;这是向 RpcServer 发起调用的工具类;

具体代码以下:

public class RpcClient extends AbstractConfigurableInstance {private ConcurrentHashMap<String, UserProcessor<?>> userProcessors           = new ConcurrentHashMap<String, UserProcessor<?>>();  /** connection factory */private ConnectionFactory                           connectionFactory        = new RpcConnectionFactory(userProcessors, this);/** connection event handler */private ConnectionEventHandler                      connectionEventHandler   = new RpcConnectionEventHandler(switches());/** reconnect manager */private ReconnectManager                            reconnectManager;/** connection event listener */private ConnectionEventListener                     connectionEventListener  = new ConnectionEventListener();/** address parser to get custom args */private RemotingAddressParser                       addressParser;/** connection select strategy */private ConnectionSelectStrategy                    connectionSelectStrategy = new RandomSelectStrategy(
                                                                                     switches());/** connection manager */private DefaultConnectionManager                    connectionManager        = new DefaultConnectionManager(connectionSelectStrategy,connectionFactory,connectionEventHandler,connectionEventListener,switches());/** rpc remoting */protected RpcRemoting                               rpcRemoting;/** task scanner */private RpcTaskScanner                              taskScanner              = new RpcTaskScanner();/** connection monitor */private DefaultConnectionMonitor                    connectionMonitor;/** connection monitor strategy */private ConnectionMonitorStrategy                   monitorStrategy;
}复制代码

0x04 Bolt封装

针对上述提到的基础封装,系统针对Bolt和Http都进行了实现。如下是SOFABolt的对应封装 registry-remoting-bolt。

├── AsyncUserProcessorAdapter.java
├── BoltChannel.java
├── BoltChannelUtil.java
├── BoltClient.java
├── BoltServer.java
├── ConnectionEventAdapter.java
├── SyncUserProcessorAdapter.java
└── exchange
    └── BoltExchange.java复制代码

4.1 BoltChannel

BoltChannel 主要是封装了com.alipay.remoting.Connection,而com.alipay.remoting.Connection又封装了io.netty.channel.Channel。

感受Channel封装的不够完全,仍是把Connection暴露出来了,以此获得本地IP,port,远端IP,port等。

public class BoltChannel implements Channel {private Connection                connection;private AsyncContext              asyncContext;private BizContext                bizContext;private final Map<String, Object> attributes = new ConcurrentHashMap<>();
}复制代码

4.2 BoltServer

BoltServer 封装了 com.alipay.remoting.rpc.RpcServer。

在初始化的时候,调用 addConnectionEventProcessor,registerUserProcessor等把  handler 注册到 RpcServer。

用 ConcurrentHashMap  记录了全部链接到本Server 的Channel,key是IP:port。

public class BoltServer implements Server {/**
     * accoding server port
     */private final URL                  url;private final List<ChannelHandler> channelHandlers;/**
     * bolt server
     */private RpcServer                  boltServer;/**
     * started status
     */private AtomicBoolean              isStarted   = new AtomicBoolean(false);private Map<String, Channel>       channels    = new ConcurrentHashMap<>();private AtomicBoolean              initHandler = new AtomicBoolean(false);
}复制代码

其主要功能以下,基本就是调用Bolt的功能:

@Overridepublic Object sendSync(Channel channel, Object message, int timeoutMillis) {if (channel != null && channel.isConnected()) {
        Url boltUrl = null;try {
            boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel
                .getRemoteAddress().getPort());return boltServer.invokeSync(boltUrl, message, timeoutMillis);
        } 
}@Overridepublic void sendCallback(Channel channel, Object message, CallbackHandler callbackHandler,                         int timeoutMillis) {if (channel != null && channel.isConnected()) {
        Url boltUrl = null;try {
            boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel
                .getRemoteAddress().getPort());
            boltServer.invokeWithCallback(boltUrl, message, new InvokeCallback() {@Overridepublic void onResponse(Object result) {
                    callbackHandler.onCallback(channel, result);
                }@Overridepublic void onException(Throwable e) {
                    callbackHandler.onException(channel, e);
                }@Overridepublic Executor getExecutor() {return callbackHandler.getExecutor();
                }
            }, timeoutMillis);return;
        } 
}复制代码

4.3 BoltClient

主要就是封装了 com.alipay.remoting.rpc.RpcClient。

在初始化的时候,调用 addConnectionEventProcessor,registerUserProcessor 等把  handler 注册到 RpcClient。

public class BoltClient implements Client {private RpcClient           rpcClient;private AtomicBoolean       closed         = new AtomicBoolean(false);private int                 connectTimeout = 2000;private final int           connNum;
}复制代码

主要函数以下:

    @Overridepublic Channel connect(URL url) {try {
            Connection connection = getBoltConnection(rpcClient, url);
            BoltChannel channel = new BoltChannel();
            channel.setConnection(connection);return channel;
        } 
    }protected Connection getBoltConnection(RpcClient rpcClient, URL url) throws RemotingException {
        Url boltUrl = createBoltUrl(url);try {
            Connection connection = rpcClient.getConnection(boltUrl, connectTimeout);if (connection == null || !connection.isFine()) {if (connection != null) {
                    connection.close();
                }
            }return connection;
        } 
    }@Overridepublic Object sendSync(URL url, Object message, int timeoutMillis) {return rpcClient.invokeSync(createBoltUrl(url), message, timeoutMillis);
    }@Overridepublic Object sendSync(Channel channel, Object message, int timeoutMillis) {if (channel != null && channel.isConnected()) {
            BoltChannel boltChannel = (BoltChannel) channel;return rpcClient.invokeSync(boltChannel.getConnection(), message, timeoutMillis); 
        }
    }@Overridepublic void sendCallback(URL url, Object message, CallbackHandler callbackHandler,                             int timeoutMillis) {try {
            Connection connection = getBoltConnection(rpcClient, url);
            BoltChannel channel = new BoltChannel();
            channel.setConnection(connection);
            rpcClient.invokeWithCallback(connection, message, new InvokeCallback() {@Overridepublic void onResponse(Object result) {
                    callbackHandler.onCallback(channel, result);
                }@Overridepublic void onException(Throwable e) {
                    callbackHandler.onException(channel, e);
                }@Overridepublic Executor getExecutor() {return callbackHandler.getExecutor();
                }
            }, timeoutMillis);return;
        } 
    }复制代码

4.4 BoltExchange

BoltExchange 的主要做用是维护了Client和Server两个ConcurrentHashMap。就是全部的Clients和Servers。

这里进行了第一层链接维护。

Map<String, Client>  clients 是依据String对Client作了区分,String包括以下:

String DATA_SERVER_TYPE = "dataServer";
String META_SERVER_TYPE = "metaServer";复制代码

就是说,假如 Data Server 使用了BoltExchange,则其内部只有两个BoltClient,这两个Client分别被 同 dataServer 和 metaServer 的交互 所复用。

ConcurrentHashMap<Integer, Server> serverMap 是依据自己端口对 自己启动的Server 作了区分。

public class BoltExchange implements Exchange<ChannelHandler> {private Map<String, Client>                clients   = new ConcurrentHashMap<>();private ConcurrentHashMap<Integer, Server> serverMap = new ConcurrentHashMap<>();@Overridepublic Client connect(String serverType, URL serverUrl, ChannelHandler... channelHandlers) {return this.connect(serverType, 1, serverUrl, channelHandlers);
    }@Overridepublic Client connect(String serverType, int connNum, URL serverUrl, ChannelHandler... channelHandlers) {
        Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers));
        client.connect(serverUrl);return client;
    }@Overridepublic Server open(URL url, ChannelHandler... channelHandlers) {
        BoltServer server = createBoltServer(url, channelHandlers);
        setServer(server, url);
        server.startServer();return server;
    }@Overridepublic Client getClient(String serverType) {return clients.get(serverType);
    }@Overridepublic Server getServer(Integer port) {return serverMap.get(port);
    }/**
     * add server into serverMap
     * @param server
     * @param url
     */public void setServer(Server server, URL url) {
        serverMap.putIfAbsent(url.getPort(), server);
    }private BoltClient newBoltClient(int connNum, ChannelHandler[] channelHandlers) {
        BoltClient boltClient = createBoltClient(connNum);
        boltClient.initHandlers(Arrays.asList(channelHandlers));return boltClient;
    }protected BoltClient createBoltClient(int connNum) {return new BoltClient(connNum);
    }protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {return new BoltServer(url, Arrays.asList(channelHandlers));
    }
}复制代码

4.5 BoltExchange 获取Bolt Client

内部会根据不一样的server type从 boltExchange 取出对应Bolt Client。

String DATA_SERVER_TYPE = "dataServer";
String META_SERVER_TYPE = "metaServer";复制代码

用以下方法put  Client。

Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers));复制代码

Bolt用以下办法获取Client

Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);

Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);复制代码

获得对应的Client以后,而后分别根据参数的url创建Channel,或者发送请求。

Channel channel = client.getChannel(url);

client.sendCallback(request.getRequestUrl()....;                  
复制代码

此时大体逻辑以下:

                 +----------------------------------------+
                 |  BoltExchange                          |
                 | +------------------------------------+ |
                 | |                                    | |
                 | |       Map<String, Client>          | |
                 | |                                    | |
                 | | ConcurrentHashMap<Integer, Server> | |
                 | |                                    | |
                 | +------------------------------------+ |
                 +----------------------------------------+
                        |                           |
                        |                           |
                        |                           |
                        v                           v
+-----------------------+----------+      +---------+---------------------------------+
| BoltClient                       |      | BoltServer                                |
|                                  |      |                                           |
| +------------------------------+ |      | +---------------------------------------+ |
| |     remoting.rpc.RpcClient   | |      | |     remoting.rpc.RpcServer            | |
| |  +-------------------------+ | |      | |                                       | |
| |  | ConnectionEventHandler  | | |      | |   Map<String, Channel> channels       | |
| |  |                         | | |      | |                                       | |
| |  |  ConnectionFactory      | | |      | | List<ChannelHandler> channelHandlers  | |
| |  +-------------------------+ | |      | |                                       | |
| +------------------------------+ |      | +---------------------------------------+ |
+----------------------------------+      +-------------------------------------------+
      |       |           |                 |                        |
      |       |           +---------------------------+              |
      |       v                             |         |              v
      |   +---+------------+ <--------------+   +-----v--------------+--------------+
      |   | ChannelHandler |                    |   BoltChannel                     |
      |   +----------------+                    |                                   |
      |                                         |  +------------------------------+ |
      |                                         |  |com.alipay.remoting.Connection| |
      |                                         |  +------------------------------+ |
      |                                         +-----------------------------------+
      |                                                       |
      v                                                       v
  +---+-------------+                               +---------+------------+
  | CallbackHandler |                               | netty.channel.Channel|
  +-----------------+                               +----------------------+复制代码

0x04 Http封装

如下是基于Jetty封装的Http模块 registry-remoting-http。

├── JerseyChannel.java
├── JerseyClient.java
├── JerseyJettyServer.java
├── exchange
│   └── JerseyExchange.java
└── jetty
    └── server
        ├── HttpChannelOverHttpCustom.java
        ├── HttpConnectionCustom.java
        └── HttpConnectionCustomFactory.java复制代码

由于 Http 服务不是SOFTRegistry主要功能,此处略去。

0x05 功能模块

从目录结构可大体看出功能模块划分。

├── remoting
│   ├── dataserver
│   │   ├── handler
│   │   └── task
│   ├── handler
│   ├── metaserver
│   └── sessionserver复制代码

由于每一个大功能模块大同小异,因此咱们下面主要以 dataserver 目录下为主,兼顾 metaserver 和 sessionserver目录下的特殊部分。

Data Server 比较复杂,便是服务器也是客户端,因此分别作了不一样的组件来抽象这两个概念。

5.1 Server组件

DataServerBootstrap#start 方法,用于启动一系列的初始化服务。在此函数中,启动了若干网络服务,用来提供 对外接口。

public void start() {try {
        openDataServer();
        openDataSyncServer();
        openHttpServer();
        startRaftClient();
    } 
}复制代码

各 Handler 具体做用如图所示:

5.2 Bolt Server

DataServer 和 DataSyncServer 是  Bolt Server,是节点间的 bolt 通讯组件,其中:

  • boltExchange。bolt组件通信组件,用来给server和dataSyncServer提供通信服务;
  • DataServer。dataServer 则负责数据相关服务,好比数据服务,获取数据的推送,服务上下线通知等;
  • dataSyncServer。dataSyncServer 主要是处理一些数据同步相关的服务;

具体代码以下:

private void openDataServer() {try {if (serverForSessionStarted.compareAndSet(false, true)) {
            server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(),
                dataServerConfig.getPort()), serverHandlers
                .toArray(new ChannelHandler[serverHandlers.size()]));
        }
    } 
}private void openDataSyncServer() {try {if (serverForDataSyncStarted.compareAndSet(false, true)) {
            dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
                .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
                .toArray(new ChannelHandler[serverSyncHandlers.size()]));
        }
    } 
}复制代码

这两个server的handlers有部分重复,怀疑开发者在作功能迁移。

@Bean(name = "serverSyncHandlers")public Collection<AbstractServerHandler> serverSyncHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(getDataHandler());
    list.add(publishDataProcessor());
    list.add(unPublishDataHandler());
    list.add(notifyFetchDatumHandler());
    list.add(notifyOnlineHandler());
    list.add(syncDataHandler());
    list.add(dataSyncServerConnectionHandler());return list;
}@Bean(name = "serverHandlers")public Collection<AbstractServerHandler> serverHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(getDataHandler());
    list.add(clientOffHandler());
    list.add(getDataVersionsHandler());
    list.add(publishDataProcessor());
    list.add(sessionServerRegisterHandler());
    list.add(unPublishDataHandler());
    list.add(dataServerConnectionHandler());
    list.add(renewDatumHandler());
    list.add(datumSnapshotHandler());return list;
}复制代码

5.2.1 DataSyncServer

用DataSyncServer作具体说明。

启动 DataSyncServer 时,注册以下几个 handler 处理 bolt 请求 :

图5 DayaSyncServer 注册的 Handler

DayaSyncServer 注册的 Handler 以下:

  • getDataHandler

该 Handler 主要用于数据的获取,当一个请求过来时,会经过请求中的 DataCenter 和 DataInfoId 获取当前 DataServer 节点存储的相应数据。

  • publishDataProcessor \ unPublishDataHandler

数据发布者 publisher 上下线会分别触发 publishDataProcessor 或 unPublishDataHandler ,Handler 会往 dataChangeEventCenter 中添加一个数据变动事件,用于异步地通知事件变动中心数据变动。事件变动中心收到该事件后,会往队列中加入事件。此时 dataChangeEventCenter 会根据不一样的事件类型异步地对上下线数据进行相应的处理。

与此同时,DataChangeHandler 把这个事件变动信息经过 ChangeNotifier 对外发布,通知其余节点进行数据同步。

  • notifyFetchDatumHandler

数据拉取请求,该 Handler 被触发时,通知当前 DataServer 节点进行版本号对比,若请求中数据的版本号高于当前节点缓存中的版本号,则会进行数据同步操做,保证数据是最新的。

  • notifyOnlineHandler

这是一个 DataServer 上线通知请求 Handler,当其余节点上线时,会触发该 Handler,从而当前节点在缓存中存储新增的节点信息。用于管理节点状态,到底是 INITIAL 仍是 WORKING 。

  • syncDataHandler

节点间数据同步 Handler,该 Handler 被触发时,会经过版本号进行比对,若当前 DataServer 所存储数据版本号含有当前请求版本号,则会返回全部大于当前请求数据版本号的全部数据,便于节点间进行数据同步。

  • dataSyncServerConnectionHandler

链接管理 Handler,当其余 DataServer 节点与当前 DataServer 节点链接时,会触发 connect 方法,从而在本地缓存中注册链接信息,而当其余 DataServer 节点与当前节点断连时,会触发 disconnect 方法,从而删除缓存信息,进而保证当前 DataServer 节点存储有全部与之链接的 DataServer 节点。

5.2.2 调用链

dataSyncServer 调用链以下:

在 DataServerBootstrap 中有

private void openDataSyncServer() {try {if (serverForDataSyncStarted.compareAndSet(false, true)) {
            dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
                .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
                .toArray(new ChannelHandler[serverSyncHandlers.size()]));
        }
    }
}复制代码

而后有

public class BoltExchange implements Exchange<ChannelHandler> {@Overridepublic Server open(URL url, ChannelHandler... channelHandlers) {
        BoltServer server = createBoltServer(url, channelHandlers);
        setServer(server, url);
        server.startServer();return server;
    }  protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {return new BoltServer(url, Arrays.asList(channelHandlers));
    }
}复制代码

BoltServer启动以下,而且用户自定义了UserProcessor。

public class BoltServer implements Server {public BoltServer(URL url, List<ChannelHandler> channelHandlers) {this.channelHandlers = channelHandlers;this.url = url;
    }  public void startServer() {if (isStarted.compareAndSet(false, true)) {
                boltServer = new RpcServer(url.getPort(), true);
                initHandler();
                boltServer.start();
        }
    }  
  private void initHandler() {if (initHandler.compareAndSet(false, true)) {
            boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT,new ConnectionEventAdapter(ConnectionEventType.CONNECT,
                    getConnectionEventHandler(), this));
            boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE,new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(),this));
            boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION,new ConnectionEventAdapter(ConnectionEventType.EXCEPTION,
                    getConnectionEventHandler(), this));

            registerUserProcessorHandler();
        }
    }  
  //这里分了同步和异步private void registerUserProcessorHandler() {if (channelHandlers != null) {for (ChannelHandler channelHandler : channelHandlers) {if (HandlerType.PROCESSER.equals(channelHandler.getType())) {if (InvokeType.SYNC.equals(channelHandler.getInvokeType())) {
                        boltServer.registerUserProcessor(new SyncUserProcessorAdapter(
                            channelHandler));
                    } else {
                        boltServer.registerUserProcessor(new AsyncUserProcessorAdapter(
                            channelHandler));
                    }
                }
            }
        }
    }
  
}复制代码

5.3 HttpServer

HttpServer 用于控制的Http 通讯组件以及其配置,提供一系列 REST 接口,用于 dashboard 管理、数据查询等;

  • jerseyExchange。jersey组件通信组件,提供服务;
  • httpServer 提供一系列 http 接口,用于 dashboard 管理、数据查询等;
private void openHttpServer() {try {if (httpServerStarted.compareAndSet(false, true)) {
            bindResourceConfig();
            httpServer = jerseyExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig
                    .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig });
        }
    } 
}复制代码

5.4 RaftClient

RaftClient 是基于Raft协议的客户端,用来基于raft协议获取meta server leader信息。

private void startRaftClient() {
    metaServerService.startRaftClient();
    eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}复制代码

具体DefaultMetaServiceImpl中实现代码以下:

@Overridepublic void startRaftClient() {try {if (clientStart.compareAndSet(false, true)) {
            String serverConf = getServerConfig();
            raftClient = new RaftClient(getGroup(), serverConf);
            raftClient.start();
        }
    }
}复制代码

功能模块最后逻辑大体以下:

                                                  +->  getDataHandler
                                                  |
                                                  +->  publishDataProcessor
                                                  |
                                                  +--> unPublishDataHandler
                                                  |
                               +---------------+  +--> notifyFetchDatumHandler
                        +----> | DataSyncServer+->+
                        |      +---------------+  +--> notifyOnlineHandler
                        |                         |
                        |                         +--> syncDataHandler
                        |       +-------------+   |
+-------------------+   +---->  | HttpServer  |   +->  dataSyncServerConnectionHandler
|DataServerBootstrap+-->+       +-------------+
+-------------------+   |
                        |
                        |       +------------+    +->  getDataHandler
                        +-----> | RaftClient |    |
                        |       +------------+    +--> clientOffHandler
                        |                         |
                        |       +-------------+   +--> getDataVersionsHandler
                        +-----> |  DataServer +-->+
                                +-------------+   +--> publishDataProcessor
                                                  |
                                                  +--> sessionServerRegisterHandler
                                                  |
                                                  +--> unPublishDataHandler
                                                  |
                                                  +--> dataServerConnectionHandler
                                                  |
                                                  +--> renewDatumHandler
                                                  |
                                                  +->  datumSnapshotHandler复制代码

0x06 链接抽象 Exchange

Exchange 做为 Client / Server 链接的抽象,负责节点之间的链接。

Data Server 主要是 DataNodeExchanger 和 MetaNodeExchanger,用来:

  • 封装 BoltExchange

  • 把 Bolt Client 和 Bolt Channel 进行抽象。

  • 提供可直接使用的网络API,如ForwardServiceImpl,GetSyncDataHandler这些散落的Bean能够直接使用DataNodeExchanger来作网络交互。

从对外接口中能够看出,

  • connect 函数建立链接,返回Channel,这个设置了handler,用来处理服务器推送;
  • request 函数发起请求;

这里有两个问题:

  • 为何没有SessionNodeExchanger?
  • 一样是网络资源管理,这里和ConnectionFactory有什么区别?

可能由于Session Server会不少,不必保存Bolt client和Server,但Session对应的有ConnectionFactory。ConectionFactory 是低层次封装,下文讲解。

6.1 DataNodeExchanger

以 DataNodeExchanger为例。

把全部 Data Server 相关 非 “Server,Client概念 直接强相关” 的网络操做统一集中在这里。

6.1.1 具体实现

能够看到主要对 boltExchange 进行更高层次的封装。

具体代码以下:

public class DataNodeExchanger implements NodeExchanger {@Autowiredprivate Exchange                          boltExchange;@Autowiredprivate DataServerConfig                  dataServerConfig;@Resource(name = "dataClientHandlers")private Collection<AbstractClientHandler> dataClientHandlers;@Overridepublic Response request(Request request) {
        Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);if (null != request.getCallBackHandler()) {
            client.sendCallback(request.getRequestUrl(), request.getRequestBody(),
                    request.getCallBackHandler(),
                    request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout());return () -> Response.ResultStatus.SUCCESSFUL;
        } else {final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(),
                    dataServerConfig.getRpcTimeout());return () -> result;
        }
    }public Channel connect(URL url) {
        Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);if (client == null) {synchronized (this) {
                client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);if (client == null) {
                    client = boltExchange.connect(Exchange.DATA_SERVER_TYPE, url,
                        dataClientHandlers.toArray(new ChannelHandler[dataClientHandlers.size()]));
                }
            }
        }

        Channel channel = client.getChannel(url);if (channel == null) {synchronized (this) {
                channel = client.getChannel(url);if (channel == null) {
                    channel = client.connect(url);
                }
            }
        }return channel;
    }
}复制代码

6.1.2 推送Handler相关Bean

上面代码中使用了 dataClientHandlers,其是BoltClient所使用的,Server会对此进行推送,这两个Handler会处理。

@Bean(name = "dataClientHandlers")public Collection<AbstractClientHandler> dataClientHandlers() {
    Collection<AbstractClientHandler> list = new ArrayList<>();
    list.add(notifyDataSyncHandler());
    list.add(fetchDataHandler());return list;
}复制代码

此时具体网络概念以下:

+-------------------+
| ForwardServiceImpl| +-----------------+
+-------------------+                   |
                                        |
+--------------------+                  |        +-------------------+
| GetSyncDataHandler | +-----------------------> | DataNodeExchanger |
+--------------------+                  |        +-------+-----------+
                                        |                |
+----------------------------------+    |                |
| LocalDataServerChangeEventHandler| +--+                |
+----------------------------------+    |                |
                                        |                |
+------------------------------+        |                v
| DataServerChangeEventHandler +--------+          +-----+--------+
+------------------------------+                   | BoltExchange |
                                                   +-----+--------+
                                                         |
        +-----------------+                              |
        |  JerseyExchange |                              |
        +-------+---------+                      +-------+-------+
                |                                |               |
                |                                |               |
                v                                v               v
       +--------+----------+              +------+-----+   +-----+------+
       | JerseyJettyServer |              | BoltServer |   | BoltServer |
       +-------------------+              +------------+   +------------+
             httpServer                       server       dataSyncServer复制代码

0x07 Handler

AbstractServerHandler 和 AbstractClientHandler 对 com.alipay.sofa.registry.remoting.ChannelHandler 进行了实现。

须要结合SOFABolt讲解。

7.1 SOFABolt

以 RpcServer 为例,SOFABolt在这里的使用是两种处理器:

  • 用户请求处理器 (UserProcessor) :SOFABolt 提供了两种请求处理器,SyncUserProcessor 与 AsyncUserProcessor。 两者的区别在于,前者须要在当前处理线程以return返回值的形式返回处理结果;然后者有一个 AsyncContext 存根,能够在当前线程,也能够在异步线程,调用 sendResponse 方法返回处理结果;
  • 链接事件处理器 (ConnectionEventProcessor)  :SOFABolt 提供了两种事件监听,建连事件(ConnectionEventType.CONNECT)与断连事件(ConnectionEventType.CLOSE),用户能够建立本身的事件处理器,并注册到客户端或者服务端。客户端与服务端均可以监听到各自的建连与断连事件;

7.2 定义

Handler主要代码分别以下:

public abstract class AbstractServerHandler<T> implements ChannelHandler<T> {protected NodeType getConnectNodeType() {return NodeType.DATA;
    }@Overridepublic Object reply(Channel channel, T request) {try {
            logRequest(channel, request);
            checkParam(request);return doHandle(channel, request);
        } catch (Exception e) {return buildFailedResponse(e.getMessage());
        }
    }
}public abstract class AbstractClientHandler<T> implements ChannelHandler<T> {@Overridepublic Object reply(Channel channel, T request) {try {
            logRequest(channel, request);
            checkParam(request);return doHandle(channel, request);
        } catch (Exception e) {return buildFailedResponse(e.getMessage());
        }
    }
}复制代码

系统能够据此实现各类派生类。

须要注意是:ChannelHandler之中分红两类,分别对应了RpcServer中的listener和userProcessor。

enum HandlerType {
    LISENTER,
    PROCESSER
} 
复制代码

以serverSyncHandlers为例,只有dataSyncServerConnectionHandler是Listener,其他是Processor。

这也符合常理,由于消息响应函数应该只有一个。

@Bean(name = "serverSyncHandlers")public Collection<AbstractServerHandler> serverSyncHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(getDataHandler());
    list.add(publishDataProcessor());
    list.add(unPublishDataHandler());
    list.add(notifyFetchDatumHandler());
    list.add(notifyOnlineHandler());
    list.add(syncDataHandler());
    list.add(dataSyncServerConnectionHandler());  只有这个是Listener,其他都是Processor。return list;
}复制代码

总结以下:

                         +->  getDataHandler
                         |
                         +->  publishDataProcessor
                         |
                         +--> unPublishDataHandler
+-------------------+    |
| serverSyncHandlers+-------> notifyFetchDatumHandler
+-------------------+    |
                         +--> notifyOnlineHandler
                         |
                         +--> syncDataHandler
                         |
                         +->  dataSyncServerConnectionHandler(Listener)复制代码

7.3 使用

在启动时,会使用serverSyncHandlers完成BoltServer的启动。

private void openDataSyncServer() {try {if (serverForDataSyncStarted.compareAndSet(false, true)) {
            dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
                .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
                .toArray(new ChannelHandler[serverSyncHandlers.size()]));
        }
    } 
}复制代码

BoltExchange中有:

@Overridepublic Server open(URL url, ChannelHandler... channelHandlers) {
    BoltServer server = createBoltServer(url, channelHandlers);
    setServer(server, url);
    server.startServer();return server;
}protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {		return new BoltServer(url, Arrays.asList(channelHandlers));
}复制代码

在BoltServer中有以下代码,主要是设置Handler。

public void startServer() {if (isStarted.compareAndSet(false, true)) {try {
            boltServer = new RpcServer(url.getPort(), true);
            initHandler();
            boltServer.start();
        } 
    }
}private void initHandler() {if (initHandler.compareAndSet(false, true)) {
            boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT,new ConnectionEventAdapter(ConnectionEventType.CONNECT,
                    getConnectionEventHandler(), this));
            boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE,new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(),this));
            boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION,new ConnectionEventAdapter(ConnectionEventType.EXCEPTION,
                    getConnectionEventHandler(), this));

            registerUserProcessorHandler();
        }
}复制代码

最终则是调用到RpcServer之中,注册了链接响应函数和用户定义函数。

public void addConnectionEventProcessor(ConnectionEventType type,
                                        ConnectionEventProcessor processor) {this.connectionEventListener.addConnectionEventProcessor(type, processor);
}@Overridepublic void registerUserProcessor(UserProcessor<?> processor) {
    UserProcessorRegisterHelper.registerUserProcessor(processor, this.userProcessors);
}复制代码

此时与SOFABolt逻辑以下:

+----------------------------+
| [RpcServer]                |
|                            |
|                            |  +--> EXCEPTION +--> DataSyncServerConnectionHandler
|                            |  |
|    connectionEventListener--->--->-CONNECT +----> DataSyncServerConnectionHandler
|                            |  |
|                            |  +--> CLOSE  +-----> DataSyncServerConnectionHandler
|                            |
|                            |
|                            |  +--> EXCEPTION +--> DataSyncServerConnectionHandler
|                            |  |
|    connectionEventHandler +-->---> CONNECT +----> DataSyncServerConnectionHandler
|                            |  |
|                            |  +--> CLOSE  +-----> DataSyncServerConnectionHandler
|                            |
|                            |
|                            |  +---->  GetDataRequest  +-------->  getDataHandler
|                            |  |
|                            |  +---->  PublishDataRequest +----->  publishDataProcessor
|                            |  |
|    userProcessors +---------------->  UnPublishDataRequest +----> unPublishDataHandler
|                            |  |
|                            |  +---->  NotifyOnlineRequest-----> notifyFetchDatumHandler
|                            |  |
|                            |  +---->  NotifyOnlineRequest +-----> notifyOnlineHandler
+----------------------------+  |
                                +---->  SyncDataRequest   +-------> syncDataHandler复制代码

0x08 总结

至此,咱们把SOFARegistry网络封装和操做大体梳理下。

从逻辑上看,阿里提供了两个层级的封装:

从链接角度看,阿里实现了基于 netty.channel.Channel 的封装,从下往上看是:

  • 由于SOFABolt基于Netty,因此封装的核心是netty.channel.Channel。
  • 在此基础上, SOFABolt封装了com.alipay.remoting.Connection
  • 而后 SOFARegistry 基于SOFABolt 封装了 BoltChannel

从应用角度看,阿里实现了Server,Client层次的封装,从下往上看是:

  • SOFABolt构建了 RpcServer,RpcClient。
  • SOFARegisty 基于 RpcServer,RpcClient 构建了BoltClient 和 BoltServer。
  • 而后 SOFARegistry 基于此构建了 BoltExchange。做为 Client / Server 链接的抽象,负责节点之间的链接。
  • 最后构建了 XXXNodeExchanger,在 BoltExchange 基础上,把全部 Data Server 内部的 非 “Server,Client概念 强相关” 的网络操做统一集中在这里,用户能够直接使用。以DataServer业务模块为例,其内部按照业务不一样,实现了 DataNodeExchanger 和  MetaNodeExchanger。用以让:
    • DataServer 内部直接使用 DataNodeExchanger 与其余 DataServer 交互,
    • DataServer 内部直接使用 MetaNodeExchanger 与 MetaServer 交互。

具体逻辑大体以下:

阿里封装的很是细致。由于SOFARegistry是繁杂系统,因此把网络概念,功能作封装至关有必要。你们在平常开发中可能不用这么细致封装,参考阿里的思路本身作选择和裁剪便可。

0xEE 我的信息

★★★★★★关于生活和技术的思考★★★★★★

微信公众帐号:罗西的思考

若是您想及时获得我的撰写文章的消息推送,或者想看看我的推荐的技术资料,敬请关注。

相关文章
相关标签/搜索