SOFARegistry 是蚂蚁金服开源的生产级、高时效、高可用的服务注册中心。java
本系列文章重点在于分析设计和架构,即利用多篇文章,从多角度反推总结 DataServer 或 SOFARegistry 的实现机制和架构思路,借以学习阿里如何设计。git
本文为第二篇,介绍SOFARegistry网络封装和操做。github
有的兄弟可能没有读过前面MetaServer的文章,这里回忆下SOFARegistry 整体架构。编程
应用服务器集群。Client 层是应用层,每一个应用系统经过依赖注册中心相关的客户端 jar 包,经过编程方式来使用服务注册中心的服务发布和服务订阅能力。bootstrap
Session 服务器集群。顾名思义,Session 层是会话层,经过长链接和 Client 层的应用服务器保持通信,负责接收 Client 的服务发布和服务订阅请求。该层只在内存中保存各个服务的发布订阅关系,对于具体的服务信息,只在 Client 层和 Data 层之间透传转发。Session 层是无状态的,能够随着 Client 层应用规模的增加而扩容。api
数据服务器集群。Data 层经过分片存储的方式保存着所用应用的服务注册数据。数据按照 dataInfoId(每一份服务数据的惟一标识)进行一致性 Hash 分片,多副本备份,保证数据的高可用。下文的重点也在于随着数据规模的增加,Data 层如何在不影响业务的前提下实现平滑的扩缩容。缓存
元数据服务器集群。这个集群管辖的范围是 Session 服务器集群和 Data 服务器集群的服务器信息,其角色就至关于 SOFARegistry 架构内部的服务注册中心,只不过 SOFARegistry 做为服务注册中心是服务于广大应用服务层,而 Meta 集群是服务于 SOFARegistry 内部的 Session 集群和 Data 集群,Meta 层可以感知到 Session 节点和 Data 节点的变化,并通知集群的其它节点。服务器
DataServer,SessionServer,MetaServer 本质都是网络应用程序。这决定了网络封装和操做是本系统的基础模块及功能,下面咱们讲讲应用场景。微信
SOFARegistry 的应用场景是单元化状态。网络
在单元化状态,一个单元是一个五脏俱全的缩小版整站,它是全能的,由于部署了全部应用;但它不是全量的,由于只能操做一部分数据。可以单元化的系统,很容易在多机房中部署,由于能够轻易的把几个单元部署在一个机房,而把另外几个部署在其余机房。借由在业务入口处设置一个流量调配器,能够调整业务流量在单元之间的比例。
因此 SOFARegistry 考虑的就是在 IDC 私网环境中如何进行节点间通讯。高吞吐、高并发的通讯,数量众多的链接管理(C10K 问题),便捷的升级机制,兼容性保障,灵活的线程池模型运用,细致的异常处理与日志埋点等,这些功能都须要在通讯协议和实现框架上作文章。
服务器也有若干配置需求,用简单http协议便可。
在内网单元化场景下,可以想到的问题点以下:
对于高性能,高并发的场景,在Java体系下必然选择非阻塞IO复用,天然选择基于Netty开发。
阿里是借助 SOFABolt 通讯框架,实现基于TCP长链接的节点判活与推模式的变动推送,服务上下线通知时效性在秒级内。
sofa-bolt是蚂蚁开源的基于Netty的网络通讯框架。在Netty基础上对网络编程常见问题进行简单封装,让中间件开发者更关注于中间件产品自己。
大致功能为:
SOFABolt能够理解为Netty的最佳实践,并额外进行一些优化工做。
SOFABolt框架后续可能会专门有系列进行分析,目前认为基于SOFABolt能够知足咱们需求,因此咱们会简单介绍SOFABolt,重点在于如何使用以及业务实现。
在肯定了采用SOFABolt以后,以前提到的问题点就基本被SOFABolt解决,咱们暂时能想到的其余问题大体以下:
咱们提早剧透,即从逻辑上看,阿里提供了两个层级的封装
从链接角度看,阿里实现了基于 netty.channel.Channel 的封装,从下往上看是:
从应用角度看,阿里实现了Server,Client层次的封装,从下往上看是:
具体逻辑大体以下:
+---------------------+ +---------------------+ | | | | | DataNodeExchanger | | MetaNodeExchanger | | | | | +----------------+----+ +--------+------------+ | | +-----------+ +-------------+ | | v v +---------+---------------------+--------+ | BoltExchange | | +------------------------------------+ | | | | | | | Map<String, Client> | | | | | | | | ConcurrentHashMap<Integer, Server> | | | | | | | +------------------------------------+ | +----------------------------------------+ | | | | | | v v +-----------------------+----------+ +---------+---------------------------------+ | BoltClient | | BoltServer | | | | | | +------------------------------+ | | +---------------------------------------+ | | | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | | | | +-------------------------+ | | | | | | | | | ConnectionEventHandler | | | | | Map<String, Channel> channels | | | | | | | | | | | | | | | ConnectionFactory | | | | | List<ChannelHandler> channelHandlers | | | | +-------------------------+ | | | | | | | +------------------------------+ | | +---------------------------------------+ | +----------------------------------+ +-------------------------------------------+ | | | | | | | +---------------------------+ | | v | | v | +---+------------+ <--------------+ +-----v--------------+--------------+ | | ChannelHandler | | BoltChannel | | +----------------+ | | | | +------------------------------+ | | | |com.alipay.remoting.Connection| | | | +------------------------------+ | | +-----------------------------------+ | | v v +---+-------------+ +---------+------------+ | CallbackHandler | | netty.channel.Channel| +-----------------+ +----------------------+复制代码
SOFARegistry 对网络基础功能作了封装,也对外提供了API。如下是封装模块以及对外接口 registry-remoting-api。
├── CallbackHandler.java ├── Channel.java ├── ChannelHandler.java ├── Client.java ├── Endpoint.java ├── RemotingException.java ├── Server.java └── exchange ├── Exchange.java ├── NodeExchanger.java ├── RequestException.java └── message ├── Request.java └── Response.java复制代码
其中比较关键的是四个接口:Server,Client,Exchange,Channel,所以这些就是网络封装的最基本概念。
Channel 这个概念比较广泛,表明了IO源与目标打开的链接以及链接的操做。咱们先以Java的Channel为例来进行说明。
Java 的Channel 由java.nio.channels包定义,Channel表示IO源与目标打开的链接,Channel相似于传统的“流”,不过Channel自己不能直接访问数据,Channel只能与Buffer进行交互。
Channel用于在字节缓冲区和位于通道另外一侧的实体(一般是一个文件或套接字)之间有效地传输数据。通道是访问IO服务的导管,经过通道,咱们以最小的开销来访问操做系统的I/O服务;顺便说下,缓冲区是通道内部发送数据和接收数据的端点。
由java.nio.channels包定义,Channel表示IO源与目标打开的链接,Channel相似于传统的“流”,只不过Channel自己不能直接访问数据,Channel只能与Buffer进行交互。通道主要用于传输数据,从缓冲区的一侧传到另外一侧的实体(如文件、套接字...),反之亦然;通道是访问IO服务的导管,经过通道,咱们能够以最小的开销来访问操做系统的I/O服务;
从SOFARegistry的Channel定义能够看出其基本功能主要是属性相关功能。
public interface Channel {InetSocketAddress getRemoteAddress();InetSocketAddress getLocalAddress();boolean isConnected();Object getAttribute(String key);void setAttribute(String key, Object value);WebTarget getWebTarget();void close(); }复制代码
Server是服务器对应的封装,其基本功能由定义可知,主要是基于Channel发送功能。
public interface Server extends Endpoint {boolean isOpen();Collection<Channel> getChannels();Channel getChannel(InetSocketAddress remoteAddress);Channel getChannel(URL url);void close(Channel channel);int getChannelCount();Object sendSync(final Channel channel, final Object message, final int timeoutMillis);void sendCallback(final Channel channel, final Object message, CallbackHandler callbackHandler,final int timeoutMillis); }复制代码
Client是客户端对应的封装,其基本功能也是基于Channel进行交互。
public interface Client extends Endpoint {Channel getChannel(URL url);Channel connect(URL url);Object sendSync(final URL url, final Object message, final int timeoutMillis);Object sendSync(final Channel channel, final Object message, final int timeoutMillis);void sendCallback(final URL url, final Object message, CallbackHandler callbackHandler, final int timeoutMillis); }复制代码
Exchange 是路由,做为 Client / Server 链接的进一步抽象,负责同类型server之间的链接。
public interface Exchange<T> { String DATA_SERVER_TYPE = "dataServer"; String META_SERVER_TYPE = "metaServer";Client connect(String serverType, URL serverUrl, T... channelHandlers);Client connect(String serverType, int connNum, URL serverUrl, T... channelHandlers);Server open(URL url, T... channelHandlers);Client getClient(String serverType);Server getServer(Integer port); }复制代码
在创建链接中,能够设置一系列应对不一样任务的 handler (称之为 ChannelHandler)。
这些 ChannelHandler 有的做为 Listener 用来处理链接事件,有的做为 Processor 用来处理各类指定的事件,好比服务信息数据变化、Subscriber 注册等事件。
public interface ChannelHandler<T> {enum HandlerType { LISENTER, PROCESSER }enum InvokeType { SYNC, ASYNC }void connected(Channel channel) throws RemotingException;void disconnected(Channel channel) throws RemotingException;void received(Channel channel, T message) throws RemotingException;Object reply(Channel channel, T message) throws RemotingException;void caught(Channel channel, T message, Throwable exception) throws RemotingException;HandlerType getType();Class interest();default InvokeType getInvokeType() {return InvokeType.SYNC; }default Executor getExecutor() {return null; } }复制代码
所以,网络基本对外接口以下:
+-------------------------------------------------------------------------+ |[registry+remoting+api] | | | | +----------+ +-------------+ | | | Exchange | |NodeExchanger| | | ++-----+--++ +----+--------+ | | | | | | | | | | +----------------------+ | | | | | | | | | +------+ v v v | | | +--+-----+ +-+-----++ | | | | Server | | Client | | | | +-----+--+ +-+----+-+ | | | | | | | | | +--------+ +-------+ | | | | | | | | | v v v v | | +----+-----------+ +-----+-+ ++--------------+ | | | ChannelHandler | |Channel| |CallbackHandler| | | +----------------+ +-------+ +---------------+ | +-------------------------------------------------------------------------+复制代码
由于SOFARegistry主要是基于SOFABolt,无法绕开,因此咱们须要首先简单介绍SOFABolt。
Bolt是基于Netty,因此要先说明Netty Channel。
在Netty框架中,Channel是其中核心概念之一,是Netty网络通讯的主体,由它负责同对端进行网络通讯、注册和数据操做等功能。
Netty对Jdk原生的ServerSocketChannel进行了封装和加强封装成了NioXXXChannel, 相对于原生的JdkChannel, Netty的Channel增长了以下的组件。
根据服务端和客户端,Channel能够分红两类:
其实inbound和outbound分别用于标识 Context 所对应的handler的类型, 在Netty中事件能够分为Inbound和Outbound事件,在ChannelPipeline的类注释中,有以下图示:
* * I/O Request * via {@link Channel} or * {@link ChannelHandlerContext} * | * +---------------------------------------------------+---------------+ * | ChannelPipeline | | * | \|/ | * | +---------------------+ +-----------+----------+ | * | | Inbound Handler N | | Outbound Handler 1 | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * | | \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler N-1 | | Outbound Handler 2 | | * | +----------+----------+ +-----------+----------+ | * | /|\ . | * | . . | * | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| * | [ method call] [method call] | * | . . | * | . \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 2 | | Outbound Handler M-1 | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * | | \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 1 | | Outbound Handler M | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * +---------------+-----------------------------------+---------------+ * | \|/ * +---------------+-----------------------------------+---------------+ * | | | | * | [ Socket.read() ] [ Socket.write() ] | * | | * | Netty Internal I/O Threads (Transport Implementation) | * +-------------------------------------------------------------------+复制代码
Connection其删减版定义以下,能够看到其主要成员就是 Netty channel 实例:
public class Connection {private Channel channel;private final ConcurrentHashMap<Integer, InvokeFuture> invokeFutureMap = new ConcurrentHashMap<Integer, InvokeFuture>(4);/** Attribute key for connection */public static final AttributeKey<Connection> CONNECTION = AttributeKey.valueOf("connection"); /** Attribute key for heartbeat count */public static final AttributeKey<Integer> HEARTBEAT_COUNT = AttributeKey.valueOf("heartbeatCount");/** Attribute key for heartbeat switch for each connection */public static final AttributeKey<Boolean> HEARTBEAT_SWITCH = AttributeKey.valueOf("heartbeatSwitch");/** Attribute key for protocol */public static final AttributeKey<ProtocolCode> PROTOCOL = AttributeKey.valueOf("protocol");/** Attribute key for version */public static final AttributeKey<Byte> VERSION = AttributeKey.valueOf("version");private Url url;private final ConcurrentHashMap<Integer/* id */, String/* poolKey */> id2PoolKey = new ConcurrentHashMap<Integer, String>(256);private Set<String> poolKeys = new ConcurrentHashSet<String>();private final ConcurrentHashMap<String/* attr key*/, Object /*attr value*/> attributes = new ConcurrentHashMap<String, Object>(); }复制代码
Connection的辅助类不少,摘录以下:
另外,须要注意的点以下:
不管是服务端仍是客户端,其实本质都在作一件事情:建立 ConnectionEventHandler 实例并添加到 Netty 的 pipeline 中,基本原理是:
ConnectionEventHandler处理两类事件
以后当有 ConnectionEvent 触发时(不管是 Netty 定义的事件被触发,仍是 SOFABolt 定义的事件被触发),ConnectionEventHandler 会经过异步线程执行器通知 ConnectionEventListener,ConnectionEventListener 将消息派发给具体的 ConnectionEventProcessor 实现类。
RpcServer实现了一个Server所必须的基本机制,能够直接使用,好比:
其中,须要说明的是:
具体定义以下:
public class RpcServer extends AbstractRemotingServer {/** server bootstrap */private ServerBootstrap bootstrap;/** channelFuture */private ChannelFuture channelFuture;/** connection event handler */private ConnectionEventHandler connectionEventHandler;/** connection event listener */private ConnectionEventListener connectionEventListener = new ConnectionEventListener();/** user processors of rpc server */private ConcurrentHashMap<String, UserProcessor<?>> userProcessors = new ConcurrentHashMap<String, UserProcessor<?>>(4);/** boss event loop group, boss group should not be daemon, need shutdown manually*/private final EventLoopGroup bossGroup = NettyEventLoopUtil.newEventLoopGroup(NamedThreadFactory("Rpc-netty-server-boss",false)); /** worker event loop group. Reuse I/O worker threads between rpc servers. */private static final EventLoopGroup workerGroup = NettyEventLoopUtil.newEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2,new NamedThreadFactory("Rpc-netty-server-worker",true));/** address parser to get custom args */private RemotingAddressParser addressParser;/** connection manager */private DefaultConnectionManager connectionManager;/** rpc remoting */protected RpcRemoting rpcRemoting;/** rpc codec */private Codec codec = new RpcCodec(); }复制代码
RpcClient主要机制以下:
具体代码以下:
public class RpcClient extends AbstractConfigurableInstance {private ConcurrentHashMap<String, UserProcessor<?>> userProcessors = new ConcurrentHashMap<String, UserProcessor<?>>(); /** connection factory */private ConnectionFactory connectionFactory = new RpcConnectionFactory(userProcessors, this);/** connection event handler */private ConnectionEventHandler connectionEventHandler = new RpcConnectionEventHandler(switches());/** reconnect manager */private ReconnectManager reconnectManager;/** connection event listener */private ConnectionEventListener connectionEventListener = new ConnectionEventListener();/** address parser to get custom args */private RemotingAddressParser addressParser;/** connection select strategy */private ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy( switches());/** connection manager */private DefaultConnectionManager connectionManager = new DefaultConnectionManager(connectionSelectStrategy,connectionFactory,connectionEventHandler,connectionEventListener,switches());/** rpc remoting */protected RpcRemoting rpcRemoting;/** task scanner */private RpcTaskScanner taskScanner = new RpcTaskScanner();/** connection monitor */private DefaultConnectionMonitor connectionMonitor;/** connection monitor strategy */private ConnectionMonitorStrategy monitorStrategy; }复制代码
针对上述提到的基础封装,系统针对Bolt和Http都进行了实现。如下是SOFABolt的对应封装 registry-remoting-bolt。
├── AsyncUserProcessorAdapter.java ├── BoltChannel.java ├── BoltChannelUtil.java ├── BoltClient.java ├── BoltServer.java ├── ConnectionEventAdapter.java ├── SyncUserProcessorAdapter.java └── exchange └── BoltExchange.java复制代码
BoltChannel 主要是封装了com.alipay.remoting.Connection,而com.alipay.remoting.Connection又封装了io.netty.channel.Channel。
感受Channel封装的不够完全,仍是把Connection暴露出来了,以此获得本地IP,port,远端IP,port等。
public class BoltChannel implements Channel {private Connection connection;private AsyncContext asyncContext;private BizContext bizContext;private final Map<String, Object> attributes = new ConcurrentHashMap<>(); }复制代码
BoltServer 封装了 com.alipay.remoting.rpc.RpcServer。
在初始化的时候,调用 addConnectionEventProcessor,registerUserProcessor等把 handler 注册到 RpcServer。
用 ConcurrentHashMap 记录了全部链接到本Server 的Channel,key是IP:port。
public class BoltServer implements Server {/** * accoding server port */private final URL url;private final List<ChannelHandler> channelHandlers;/** * bolt server */private RpcServer boltServer;/** * started status */private AtomicBoolean isStarted = new AtomicBoolean(false);private Map<String, Channel> channels = new ConcurrentHashMap<>();private AtomicBoolean initHandler = new AtomicBoolean(false); }复制代码
其主要功能以下,基本就是调用Bolt的功能:
@Overridepublic Object sendSync(Channel channel, Object message, int timeoutMillis) {if (channel != null && channel.isConnected()) { Url boltUrl = null;try { boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel .getRemoteAddress().getPort());return boltServer.invokeSync(boltUrl, message, timeoutMillis); } }@Overridepublic void sendCallback(Channel channel, Object message, CallbackHandler callbackHandler, int timeoutMillis) {if (channel != null && channel.isConnected()) { Url boltUrl = null;try { boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel .getRemoteAddress().getPort()); boltServer.invokeWithCallback(boltUrl, message, new InvokeCallback() {@Overridepublic void onResponse(Object result) { callbackHandler.onCallback(channel, result); }@Overridepublic void onException(Throwable e) { callbackHandler.onException(channel, e); }@Overridepublic Executor getExecutor() {return callbackHandler.getExecutor(); } }, timeoutMillis);return; } }复制代码
主要就是封装了 com.alipay.remoting.rpc.RpcClient。
在初始化的时候,调用 addConnectionEventProcessor,registerUserProcessor 等把 handler 注册到 RpcClient。
public class BoltClient implements Client {private RpcClient rpcClient;private AtomicBoolean closed = new AtomicBoolean(false);private int connectTimeout = 2000;private final int connNum; }复制代码
主要函数以下:
@Overridepublic Channel connect(URL url) {try { Connection connection = getBoltConnection(rpcClient, url); BoltChannel channel = new BoltChannel(); channel.setConnection(connection);return channel; } }protected Connection getBoltConnection(RpcClient rpcClient, URL url) throws RemotingException { Url boltUrl = createBoltUrl(url);try { Connection connection = rpcClient.getConnection(boltUrl, connectTimeout);if (connection == null || !connection.isFine()) {if (connection != null) { connection.close(); } }return connection; } }@Overridepublic Object sendSync(URL url, Object message, int timeoutMillis) {return rpcClient.invokeSync(createBoltUrl(url), message, timeoutMillis); }@Overridepublic Object sendSync(Channel channel, Object message, int timeoutMillis) {if (channel != null && channel.isConnected()) { BoltChannel boltChannel = (BoltChannel) channel;return rpcClient.invokeSync(boltChannel.getConnection(), message, timeoutMillis); } }@Overridepublic void sendCallback(URL url, Object message, CallbackHandler callbackHandler, int timeoutMillis) {try { Connection connection = getBoltConnection(rpcClient, url); BoltChannel channel = new BoltChannel(); channel.setConnection(connection); rpcClient.invokeWithCallback(connection, message, new InvokeCallback() {@Overridepublic void onResponse(Object result) { callbackHandler.onCallback(channel, result); }@Overridepublic void onException(Throwable e) { callbackHandler.onException(channel, e); }@Overridepublic Executor getExecutor() {return callbackHandler.getExecutor(); } }, timeoutMillis);return; } }复制代码
BoltExchange 的主要做用是维护了Client和Server两个ConcurrentHashMap。就是全部的Clients和Servers。
这里进行了第一层链接维护。
Map<String, Client> clients 是依据String对Client作了区分,String包括以下:
String DATA_SERVER_TYPE = "dataServer"; String META_SERVER_TYPE = "metaServer";复制代码
就是说,假如 Data Server 使用了BoltExchange,则其内部只有两个BoltClient,这两个Client分别被 同 dataServer 和 metaServer 的交互 所复用。
ConcurrentHashMap<Integer, Server> serverMap 是依据自己端口对 自己启动的Server 作了区分。
public class BoltExchange implements Exchange<ChannelHandler> {private Map<String, Client> clients = new ConcurrentHashMap<>();private ConcurrentHashMap<Integer, Server> serverMap = new ConcurrentHashMap<>();@Overridepublic Client connect(String serverType, URL serverUrl, ChannelHandler... channelHandlers) {return this.connect(serverType, 1, serverUrl, channelHandlers); }@Overridepublic Client connect(String serverType, int connNum, URL serverUrl, ChannelHandler... channelHandlers) { Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers)); client.connect(serverUrl);return client; }@Overridepublic Server open(URL url, ChannelHandler... channelHandlers) { BoltServer server = createBoltServer(url, channelHandlers); setServer(server, url); server.startServer();return server; }@Overridepublic Client getClient(String serverType) {return clients.get(serverType); }@Overridepublic Server getServer(Integer port) {return serverMap.get(port); }/** * add server into serverMap * @param server * @param url */public void setServer(Server server, URL url) { serverMap.putIfAbsent(url.getPort(), server); }private BoltClient newBoltClient(int connNum, ChannelHandler[] channelHandlers) { BoltClient boltClient = createBoltClient(connNum); boltClient.initHandlers(Arrays.asList(channelHandlers));return boltClient; }protected BoltClient createBoltClient(int connNum) {return new BoltClient(connNum); }protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {return new BoltServer(url, Arrays.asList(channelHandlers)); } }复制代码
内部会根据不一样的server type从 boltExchange 取出对应Bolt Client。
String DATA_SERVER_TYPE = "dataServer"; String META_SERVER_TYPE = "metaServer";复制代码
用以下方法put Client。
Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers));复制代码
Bolt用以下办法获取Client
Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE); Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);复制代码
获得对应的Client以后,而后分别根据参数的url创建Channel,或者发送请求。
Channel channel = client.getChannel(url); client.sendCallback(request.getRequestUrl()....; 复制代码
此时大体逻辑以下:
+----------------------------------------+ | BoltExchange | | +------------------------------------+ | | | | | | | Map<String, Client> | | | | | | | | ConcurrentHashMap<Integer, Server> | | | | | | | +------------------------------------+ | +----------------------------------------+ | | | | | | v v +-----------------------+----------+ +---------+---------------------------------+ | BoltClient | | BoltServer | | | | | | +------------------------------+ | | +---------------------------------------+ | | | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | | | | +-------------------------+ | | | | | | | | | ConnectionEventHandler | | | | | Map<String, Channel> channels | | | | | | | | | | | | | | | ConnectionFactory | | | | | List<ChannelHandler> channelHandlers | | | | +-------------------------+ | | | | | | | +------------------------------+ | | +---------------------------------------+ | +----------------------------------+ +-------------------------------------------+ | | | | | | | +---------------------------+ | | v | | v | +---+------------+ <--------------+ +-----v--------------+--------------+ | | ChannelHandler | | BoltChannel | | +----------------+ | | | | +------------------------------+ | | | |com.alipay.remoting.Connection| | | | +------------------------------+ | | +-----------------------------------+ | | v v +---+-------------+ +---------+------------+ | CallbackHandler | | netty.channel.Channel| +-----------------+ +----------------------+复制代码
如下是基于Jetty封装的Http模块 registry-remoting-http。
├── JerseyChannel.java ├── JerseyClient.java ├── JerseyJettyServer.java ├── exchange │ └── JerseyExchange.java └── jetty └── server ├── HttpChannelOverHttpCustom.java ├── HttpConnectionCustom.java └── HttpConnectionCustomFactory.java复制代码
由于 Http 服务不是SOFTRegistry主要功能,此处略去。
从目录结构可大体看出功能模块划分。
├── remoting │ ├── dataserver │ │ ├── handler │ │ └── task │ ├── handler │ ├── metaserver │ └── sessionserver复制代码
由于每一个大功能模块大同小异,因此咱们下面主要以 dataserver 目录下为主,兼顾 metaserver 和 sessionserver目录下的特殊部分。
Data Server 比较复杂,便是服务器也是客户端,因此分别作了不一样的组件来抽象这两个概念。
DataServerBootstrap#start 方法,用于启动一系列的初始化服务。在此函数中,启动了若干网络服务,用来提供 对外接口。
public void start() {try { openDataServer(); openDataSyncServer(); openHttpServer(); startRaftClient(); } }复制代码
各 Handler 具体做用如图所示:
DataServer 和 DataSyncServer 是 Bolt Server,是节点间的 bolt 通讯组件,其中:
具体代码以下:
private void openDataServer() {try {if (serverForSessionStarted.compareAndSet(false, true)) { server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig.getPort()), serverHandlers .toArray(new ChannelHandler[serverHandlers.size()])); } } }private void openDataSyncServer() {try {if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } }复制代码
这两个server的handlers有部分重复,怀疑开发者在作功能迁移。
@Bean(name = "serverSyncHandlers")public Collection<AbstractServerHandler> serverSyncHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(getDataHandler()); list.add(publishDataProcessor()); list.add(unPublishDataHandler()); list.add(notifyFetchDatumHandler()); list.add(notifyOnlineHandler()); list.add(syncDataHandler()); list.add(dataSyncServerConnectionHandler());return list; }@Bean(name = "serverHandlers")public Collection<AbstractServerHandler> serverHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(getDataHandler()); list.add(clientOffHandler()); list.add(getDataVersionsHandler()); list.add(publishDataProcessor()); list.add(sessionServerRegisterHandler()); list.add(unPublishDataHandler()); list.add(dataServerConnectionHandler()); list.add(renewDatumHandler()); list.add(datumSnapshotHandler());return list; }复制代码
用DataSyncServer作具体说明。
启动 DataSyncServer 时,注册以下几个 handler 处理 bolt 请求 :
DayaSyncServer 注册的 Handler 以下:
该 Handler 主要用于数据的获取,当一个请求过来时,会经过请求中的 DataCenter 和 DataInfoId 获取当前 DataServer 节点存储的相应数据。
数据发布者 publisher 上下线会分别触发 publishDataProcessor 或 unPublishDataHandler ,Handler 会往 dataChangeEventCenter 中添加一个数据变动事件,用于异步地通知事件变动中心数据变动。事件变动中心收到该事件后,会往队列中加入事件。此时 dataChangeEventCenter 会根据不一样的事件类型异步地对上下线数据进行相应的处理。
与此同时,DataChangeHandler 把这个事件变动信息经过 ChangeNotifier 对外发布,通知其余节点进行数据同步。
数据拉取请求,该 Handler 被触发时,通知当前 DataServer 节点进行版本号对比,若请求中数据的版本号高于当前节点缓存中的版本号,则会进行数据同步操做,保证数据是最新的。
这是一个 DataServer 上线通知请求 Handler,当其余节点上线时,会触发该 Handler,从而当前节点在缓存中存储新增的节点信息。用于管理节点状态,到底是 INITIAL 仍是 WORKING 。
节点间数据同步 Handler,该 Handler 被触发时,会经过版本号进行比对,若当前 DataServer 所存储数据版本号含有当前请求版本号,则会返回全部大于当前请求数据版本号的全部数据,便于节点间进行数据同步。
链接管理 Handler,当其余 DataServer 节点与当前 DataServer 节点链接时,会触发 connect 方法,从而在本地缓存中注册链接信息,而当其余 DataServer 节点与当前节点断连时,会触发 disconnect 方法,从而删除缓存信息,进而保证当前 DataServer 节点存储有全部与之链接的 DataServer 节点。
dataSyncServer 调用链以下:
在 DataServerBootstrap 中有
private void openDataSyncServer() {try {if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } }复制代码
而后有
public class BoltExchange implements Exchange<ChannelHandler> {@Overridepublic Server open(URL url, ChannelHandler... channelHandlers) { BoltServer server = createBoltServer(url, channelHandlers); setServer(server, url); server.startServer();return server; } protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {return new BoltServer(url, Arrays.asList(channelHandlers)); } }复制代码
BoltServer启动以下,而且用户自定义了UserProcessor。
public class BoltServer implements Server {public BoltServer(URL url, List<ChannelHandler> channelHandlers) {this.channelHandlers = channelHandlers;this.url = url; } public void startServer() {if (isStarted.compareAndSet(false, true)) { boltServer = new RpcServer(url.getPort(), true); initHandler(); boltServer.start(); } } private void initHandler() {if (initHandler.compareAndSet(false, true)) { boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT,new ConnectionEventAdapter(ConnectionEventType.CONNECT, getConnectionEventHandler(), this)); boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE,new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(),this)); boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION,new ConnectionEventAdapter(ConnectionEventType.EXCEPTION, getConnectionEventHandler(), this)); registerUserProcessorHandler(); } } //这里分了同步和异步private void registerUserProcessorHandler() {if (channelHandlers != null) {for (ChannelHandler channelHandler : channelHandlers) {if (HandlerType.PROCESSER.equals(channelHandler.getType())) {if (InvokeType.SYNC.equals(channelHandler.getInvokeType())) { boltServer.registerUserProcessor(new SyncUserProcessorAdapter( channelHandler)); } else { boltServer.registerUserProcessor(new AsyncUserProcessorAdapter( channelHandler)); } } } } } }复制代码
HttpServer 用于控制的Http 通讯组件以及其配置,提供一系列 REST 接口,用于 dashboard 管理、数据查询等;
private void openHttpServer() {try {if (httpServerStarted.compareAndSet(false, true)) { bindResourceConfig(); httpServer = jerseyExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig }); } } }复制代码
RaftClient 是基于Raft协议的客户端,用来基于raft协议获取meta server leader信息。
private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }复制代码
具体DefaultMetaServiceImpl中实现代码以下:
@Overridepublic void startRaftClient() {try {if (clientStart.compareAndSet(false, true)) { String serverConf = getServerConfig(); raftClient = new RaftClient(getGroup(), serverConf); raftClient.start(); } } }复制代码
功能模块最后逻辑大体以下:
+-> getDataHandler | +-> publishDataProcessor | +--> unPublishDataHandler | +---------------+ +--> notifyFetchDatumHandler +----> | DataSyncServer+->+ | +---------------+ +--> notifyOnlineHandler | | | +--> syncDataHandler | +-------------+ | +-------------------+ +----> | HttpServer | +-> dataSyncServerConnectionHandler |DataServerBootstrap+-->+ +-------------+ +-------------------+ | | | +------------+ +-> getDataHandler +-----> | RaftClient | | | +------------+ +--> clientOffHandler | | | +-------------+ +--> getDataVersionsHandler +-----> | DataServer +-->+ +-------------+ +--> publishDataProcessor | +--> sessionServerRegisterHandler | +--> unPublishDataHandler | +--> dataServerConnectionHandler | +--> renewDatumHandler | +-> datumSnapshotHandler复制代码
Exchange 做为 Client / Server 链接的抽象,负责节点之间的链接。
Data Server 主要是 DataNodeExchanger 和 MetaNodeExchanger,用来:
封装 BoltExchange
把 Bolt Client 和 Bolt Channel 进行抽象。
提供可直接使用的网络API,如ForwardServiceImpl,GetSyncDataHandler这些散落的Bean能够直接使用DataNodeExchanger来作网络交互。
从对外接口中能够看出,
这里有两个问题:
可能由于Session Server会不少,不必保存Bolt client和Server,但Session对应的有ConnectionFactory。ConectionFactory 是低层次封装,下文讲解。
以 DataNodeExchanger为例。
把全部 Data Server 相关 非 “Server,Client概念 直接强相关” 的网络操做统一集中在这里。
能够看到主要对 boltExchange 进行更高层次的封装。
具体代码以下:
public class DataNodeExchanger implements NodeExchanger {@Autowiredprivate Exchange boltExchange;@Autowiredprivate DataServerConfig dataServerConfig;@Resource(name = "dataClientHandlers")private Collection<AbstractClientHandler> dataClientHandlers;@Overridepublic Response request(Request request) { Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);if (null != request.getCallBackHandler()) { client.sendCallback(request.getRequestUrl(), request.getRequestBody(), request.getCallBackHandler(), request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout());return () -> Response.ResultStatus.SUCCESSFUL; } else {final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(), dataServerConfig.getRpcTimeout());return () -> result; } }public Channel connect(URL url) { Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);if (client == null) {synchronized (this) { client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);if (client == null) { client = boltExchange.connect(Exchange.DATA_SERVER_TYPE, url, dataClientHandlers.toArray(new ChannelHandler[dataClientHandlers.size()])); } } } Channel channel = client.getChannel(url);if (channel == null) {synchronized (this) { channel = client.getChannel(url);if (channel == null) { channel = client.connect(url); } } }return channel; } }复制代码
上面代码中使用了 dataClientHandlers,其是BoltClient所使用的,Server会对此进行推送,这两个Handler会处理。
@Bean(name = "dataClientHandlers")public Collection<AbstractClientHandler> dataClientHandlers() { Collection<AbstractClientHandler> list = new ArrayList<>(); list.add(notifyDataSyncHandler()); list.add(fetchDataHandler());return list; }复制代码
此时具体网络概念以下:
+-------------------+ | ForwardServiceImpl| +-----------------+ +-------------------+ | | +--------------------+ | +-------------------+ | GetSyncDataHandler | +-----------------------> | DataNodeExchanger | +--------------------+ | +-------+-----------+ | | +----------------------------------+ | | | LocalDataServerChangeEventHandler| +--+ | +----------------------------------+ | | | | +------------------------------+ | v | DataServerChangeEventHandler +--------+ +-----+--------+ +------------------------------+ | BoltExchange | +-----+--------+ | +-----------------+ | | JerseyExchange | | +-------+---------+ +-------+-------+ | | | | | | v v v +--------+----------+ +------+-----+ +-----+------+ | JerseyJettyServer | | BoltServer | | BoltServer | +-------------------+ +------------+ +------------+ httpServer server dataSyncServer复制代码
AbstractServerHandler 和 AbstractClientHandler 对 com.alipay.sofa.registry.remoting.ChannelHandler 进行了实现。
须要结合SOFABolt讲解。
以 RpcServer 为例,SOFABolt在这里的使用是两种处理器:
Handler主要代码分别以下:
public abstract class AbstractServerHandler<T> implements ChannelHandler<T> {protected NodeType getConnectNodeType() {return NodeType.DATA; }@Overridepublic Object reply(Channel channel, T request) {try { logRequest(channel, request); checkParam(request);return doHandle(channel, request); } catch (Exception e) {return buildFailedResponse(e.getMessage()); } } }public abstract class AbstractClientHandler<T> implements ChannelHandler<T> {@Overridepublic Object reply(Channel channel, T request) {try { logRequest(channel, request); checkParam(request);return doHandle(channel, request); } catch (Exception e) {return buildFailedResponse(e.getMessage()); } } }复制代码
系统能够据此实现各类派生类。
须要注意是:ChannelHandler之中分红两类,分别对应了RpcServer中的listener和userProcessor。
enum HandlerType { LISENTER, PROCESSER } 复制代码
以serverSyncHandlers为例,只有dataSyncServerConnectionHandler是Listener,其他是Processor。
这也符合常理,由于消息响应函数应该只有一个。
@Bean(name = "serverSyncHandlers")public Collection<AbstractServerHandler> serverSyncHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(getDataHandler()); list.add(publishDataProcessor()); list.add(unPublishDataHandler()); list.add(notifyFetchDatumHandler()); list.add(notifyOnlineHandler()); list.add(syncDataHandler()); list.add(dataSyncServerConnectionHandler()); 只有这个是Listener,其他都是Processor。return list; }复制代码
总结以下:
+-> getDataHandler | +-> publishDataProcessor | +--> unPublishDataHandler +-------------------+ | | serverSyncHandlers+-------> notifyFetchDatumHandler +-------------------+ | +--> notifyOnlineHandler | +--> syncDataHandler | +-> dataSyncServerConnectionHandler(Listener)复制代码
在启动时,会使用serverSyncHandlers完成BoltServer的启动。
private void openDataSyncServer() {try {if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } }复制代码
BoltExchange中有:
@Overridepublic Server open(URL url, ChannelHandler... channelHandlers) { BoltServer server = createBoltServer(url, channelHandlers); setServer(server, url); server.startServer();return server; }protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) { return new BoltServer(url, Arrays.asList(channelHandlers)); }复制代码
在BoltServer中有以下代码,主要是设置Handler。
public void startServer() {if (isStarted.compareAndSet(false, true)) {try { boltServer = new RpcServer(url.getPort(), true); initHandler(); boltServer.start(); } } }private void initHandler() {if (initHandler.compareAndSet(false, true)) { boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT,new ConnectionEventAdapter(ConnectionEventType.CONNECT, getConnectionEventHandler(), this)); boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE,new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(),this)); boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION,new ConnectionEventAdapter(ConnectionEventType.EXCEPTION, getConnectionEventHandler(), this)); registerUserProcessorHandler(); } }复制代码
最终则是调用到RpcServer之中,注册了链接响应函数和用户定义函数。
public void addConnectionEventProcessor(ConnectionEventType type, ConnectionEventProcessor processor) {this.connectionEventListener.addConnectionEventProcessor(type, processor); }@Overridepublic void registerUserProcessor(UserProcessor<?> processor) { UserProcessorRegisterHelper.registerUserProcessor(processor, this.userProcessors); }复制代码
此时与SOFABolt逻辑以下:
+----------------------------+ | [RpcServer] | | | | | +--> EXCEPTION +--> DataSyncServerConnectionHandler | | | | connectionEventListener--->--->-CONNECT +----> DataSyncServerConnectionHandler | | | | | +--> CLOSE +-----> DataSyncServerConnectionHandler | | | | | | +--> EXCEPTION +--> DataSyncServerConnectionHandler | | | | connectionEventHandler +-->---> CONNECT +----> DataSyncServerConnectionHandler | | | | | +--> CLOSE +-----> DataSyncServerConnectionHandler | | | | | | +----> GetDataRequest +--------> getDataHandler | | | | | +----> PublishDataRequest +-----> publishDataProcessor | | | | userProcessors +----------------> UnPublishDataRequest +----> unPublishDataHandler | | | | | +----> NotifyOnlineRequest-----> notifyFetchDatumHandler | | | | | +----> NotifyOnlineRequest +-----> notifyOnlineHandler +----------------------------+ | +----> SyncDataRequest +-------> syncDataHandler复制代码
至此,咱们把SOFARegistry网络封装和操做大体梳理下。
从逻辑上看,阿里提供了两个层级的封装:
从链接角度看,阿里实现了基于 netty.channel.Channel 的封装,从下往上看是:
从应用角度看,阿里实现了Server,Client层次的封装,从下往上看是:
具体逻辑大体以下:
阿里封装的很是细致。由于SOFARegistry是繁杂系统,因此把网络概念,功能作封装至关有必要。你们在平常开发中可能不用这么细致封装,参考阿里的思路本身作选择和裁剪便可。
★★★★★★关于生活和技术的思考★★★★★★
微信公众帐号:罗西的思考
若是您想及时获得我的撰写文章的消息推送,或者想看看我的推荐的技术资料,敬请关注。