SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。html
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让你们借以学习阿里如何设计。java
本文为第二篇,介绍SOFARegistry的网络封装和操做。node
由于有的兄弟可能没有读过前面MetaServer的文章,因此这里回忆下SOFARegistry 整体架构。git
应用服务器集群。Client 层是应用层,每一个应用系统经过依赖注册中心相关的客户端 jar 包,经过编程方式来使用服务注册中心的服务发布和服务订阅能力。github
Session 服务器集群。顾名思义,Session 层是会话层,经过长链接和 Client 层的应用服务器保持通信,负责接收 Client 的服务发布和服务订阅请求。该层只在内存中保存各个服务的发布订阅关系,对于具体的服务信息,只在 Client 层和 Data 层之间透传转发。Session 层是无状态的,能够随着 Client 层应用规模的增加而扩容。编程
数据服务器集群。Data 层经过分片存储的方式保存着所用应用的服务注册数据。数据按照 dataInfoId(每一份服务数据的惟一标识)进行一致性 Hash 分片,多副本备份,保证数据的高可用。下文的重点也在于随着数据规模的增加,Data 层如何在不影响业务的前提下实现平滑的扩缩容。bootstrap
元数据服务器集群。这个集群管辖的范围是 Session 服务器集群和 Data 服务器集群的服务器信息,其角色就至关于 SOFARegistry 架构内部的服务注册中心,只不过 SOFARegistry 做为服务注册中心是服务于广大应用服务层,而 Meta 集群是服务于 SOFARegistry 内部的 Session 集群和 Data 集群,Meta 层可以感知到 Session 节点和 Data 节点的变化,并通知集群的其它节点。api
DataServer,SessionServer,MetaServer 本质上都是网络应用程序。这就决定了网络封装和操做是本系统的基础模块及功能,下面咱们讲讲其应用场景。缓存
SOFARegistry 的应用场景是单元化状态下。服务器
在单元化状态下,一个单元,是一个五脏俱全的缩小版整站,它是全能的,由于部署了全部应用;但它不是全量的,由于只能操做一部分数据。可以单元化的系统,很容易在多机房中部署,由于能够轻易的把几个单元部署在一个机房,而把另外几个部署在其余机房。借由在业务入口处设置一个流量调配器,能够调整业务流量在单元之间的比例。
因此 SOFARegistry 考虑的就是在 IDC 私网环境中如何进行节点间通讯。高吞吐、高并发的通讯,数量众多的链接管理(C10K 问题),便捷的升级机制,兼容性保障,灵活的线程池模型运用,细致的异常处理与日志埋点等,这些功能都须要在通讯协议和实现框架上作文章。
服务器也有若干配置需求,这用简单的http协议便可。
在这种内网单元化场景下,可以想到的问题点以下:
对于这种高性能,高并发的场景,在Java体系下,必然选择非阻塞IO复用,那么天然选择基于Netty进行开发。
阿里就是借助 SOFABolt 通讯框架,实现基于TCP长链接的节点判活与推模式的变动推送,服务上下线通知时效性在秒级之内。
sofa-bolt是蚂蚁开源的一款基于Netty的网络通讯框架。在Netty的基础上对网络编程常见问题进行了一层简单封装,让中间件开发者更关注于中间件产品自己。
大致功能为:
SOFABolt能够理解为Netty的最佳实践,并额外进行了一些优化工做。
SOFABolt框架咱们后续可能会专门有系列进行分析,目前认为基于SOFABolt此能够知足咱们需求,因此咱们会简单介绍SOFABolt,重点在于如何使用以及业务实现。
在肯定了采用SOFABolt以后,以前提到的问题点就基本被SOFABolt解决了,因此咱们暂时能想到的其余问题大体以下:
咱们提早剧透,即从逻辑上看,阿里提供了两个层级的封装
从链接角度看,阿里实现了基于 netty.channel.Channel 的封装,从下往上看是:
从应用角度看,阿里实现了Server,Client层次的封装,从下往上看是:
具体逻辑大体以下:
+---------------------+ +---------------------+ | | | | | DataNodeExchanger | | MetaNodeExchanger | | | | | +----------------+----+ +--------+------------+ | | +-----------+ +-------------+ | | v v +---------+---------------------+--------+ | BoltExchange | | +------------------------------------+ | | | | | | | Map<String, Client> | | | | | | | | ConcurrentHashMap<Integer, Server> | | | | | | | +------------------------------------+ | +----------------------------------------+ | | | | | | v v +-----------------------+----------+ +---------+---------------------------------+ | BoltClient | | BoltServer | | | | | | +------------------------------+ | | +---------------------------------------+ | | | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | | | | +-------------------------+ | | | | | | | | | ConnectionEventHandler | | | | | Map<String, Channel> channels | | | | | | | | | | | | | | | ConnectionFactory | | | | | List<ChannelHandler> channelHandlers | | | | +-------------------------+ | | | | | | | +------------------------------+ | | +---------------------------------------+ | +----------------------------------+ +-------------------------------------------+ | | | | | | | +---------------------------+ | | v | | v | +---+------------+ <--------------+ +-----v--------------+--------------+ | | ChannelHandler | | BoltChannel | | +----------------+ | | | | +------------------------------+ | | | |com.alipay.remoting.Connection| | | | +------------------------------+ | | +-----------------------------------+ | | v v +---+-------------+ +---------+------------+ | CallbackHandler | | netty.channel.Channel| +-----------------+ +----------------------+
SOFARegistry 对网络基础功能作了封装,也对外提供了API。如下是封装模块以及对外接口 registry-remoting-api。
├── CallbackHandler.java ├── Channel.java ├── ChannelHandler.java ├── Client.java ├── Endpoint.java ├── RemotingException.java ├── Server.java └── exchange ├── Exchange.java ├── NodeExchanger.java ├── RequestException.java └── message ├── Request.java └── Response.java
其中比较关键的是四个接口:Server,Client,Exchange,Channel,所以这些就是网络封装的最基本概念。
Channel 这个概念比较广泛,表明了IO源与目标打开的链接。咱们先以Java的Channel为例来进行说明。
Java 的Channel 由java.nio.channels包定义的,Channel表示IO源与目标打开的链接,Channel相似于传统的“流”,只不过Channel自己不能直接访问数据,Channel只能与Buffer进行交互。
Channel用于在字节缓冲区和位于通道另外一侧的实体(一般是一个文件或套接字)之间有效地传输数据。通道是访问IO服务的导管,经过通道,咱们能够以最小的开销来访问操做系统的I/O服务;顺便说下,缓冲区是通道内部发送数据和接收数据的端点。
由java.nio.channels包定义的,Channel表示IO源与目标打开的链接,Channel相似于传统的“流”,只不过Channel自己不能直接访问数据,Channel只能与Buffer进行交互。通道主要用于传输数据,从缓冲区的一侧传到另外一侧的实体(如文件、套接字...),反之亦然;通道是访问IO服务的导管,经过通道,咱们能够以最小的开销来访问操做系统的I/O服务;顺便说下,缓冲区是通道内部发送数据和接收数据的端点。
从SOFARegistry的Channel定义能够看出其基本功能主要是属性相关功能。
public interface Channel { InetSocketAddress getRemoteAddress(); InetSocketAddress getLocalAddress(); boolean isConnected(); Object getAttribute(String key); void setAttribute(String key, Object value); WebTarget getWebTarget(); void close(); }
Server是服务器对应的封装,其基本功能由定义可知,主要是基于Channel发送功能。
public interface Server extends Endpoint { boolean isOpen(); Collection<Channel> getChannels(); Channel getChannel(InetSocketAddress remoteAddress); Channel getChannel(URL url); void close(Channel channel); int getChannelCount(); Object sendSync(final Channel channel, final Object message, final int timeoutMillis); void sendCallback(final Channel channel, final Object message, CallbackHandler callbackHandler,final int timeoutMillis); }
Client是客户端对应的封装,其基本功能也是基于Channel进行交互。
public interface Client extends Endpoint { Channel getChannel(URL url); Channel connect(URL url); Object sendSync(final URL url, final Object message, final int timeoutMillis); Object sendSync(final Channel channel, final Object message, final int timeoutMillis); void sendCallback(final URL url, final Object message, CallbackHandler callbackHandler, final int timeoutMillis); }
Exchange 做为 Client / Server 链接的进一步抽象,负责同类型server之间的链接。
public interface Exchange<T> { String DATA_SERVER_TYPE = "dataServer"; String META_SERVER_TYPE = "metaServer"; /** * connect same type server,one server ip one connection * such as different server on data server,serverOne and serverTwo,different type server must match different channelHandlers, * so we must connect by serverType,and get Client instance by serverType * @param serverType * @param serverUrl * @param channelHandlers */ Client connect(String serverType, URL serverUrl, T... channelHandlers); /** * connect same type server,one server ip one connection * such as different server on data server,serverOne and serverTwo,different type server must match different channelHandlers, * so we must connect by serverType,and get Client instance by serverType * @param serverType * @param connNum connection number per serverUrl * @param serverUrl * @param channelHandlers */ Client connect(String serverType, int connNum, URL serverUrl, T... channelHandlers); /** * bind server by server port in url parameter,one port must by same server type * @param url * @param channelHandlers */ Server open(URL url, T... channelHandlers); Client getClient(String serverType); Server getServer(Integer port); }
在创建链接中,能够设置一系列应对不一样任务的 handler (称之为 ChannelHandler)。
这些 ChannelHandler 有的做为 Listener 用来处理链接事件,有的做为 Processor 用来处理各类指定的事件,好比服务信息数据变化、Subscriber 注册等事件。
public interface ChannelHandler<T> { enum HandlerType { LISENTER, PROCESSER } enum InvokeType { SYNC, ASYNC } /** * on channel connected. * @param channel */ void connected(Channel channel) throws RemotingException; /** * on channel disconnected. * * @param channel channel. */ void disconnected(Channel channel) throws RemotingException; /** * on message received. * @param channel channel. * @param message message. */ void received(Channel channel, T message) throws RemotingException; /** * on message reply. * * @param channel * @param message */ Object reply(Channel channel, T message) throws RemotingException; /** * on exception caught. * @param channel channel. * @param message message. * @param exception exception. * @throws RemotingException */ void caught(Channel channel, T message, Throwable exception) throws RemotingException; HandlerType getType(); /** * return processor request class name */ Class interest(); /** * Select Sync process by reply or Async process by received */ default InvokeType getInvokeType() { return InvokeType.SYNC; } /** * specify executor for processor handler */ default Executor getExecutor() { return null; } }
所以,网络基本对外接口以下:
+-------------------------------------------------------------------------+ |[registry+remoting+api] | | | | +----------+ +-------------+ | | | Exchange | |NodeExchanger| | | ++-----+--++ +----+--------+ | | | | | | | | | | +----------------------+ | | | | | | | | | +------+ v v v | | | +--+-----+ +-+-----++ | | | | Server | | Client | | | | +-----+--+ +-+----+-+ | | | | | | | | | +--------+ +-------+ | | | | | | | | | v v v v | | +----+-----------+ +-----+-+ ++--------------+ | | | ChannelHandler | |Channel| |CallbackHandler| | | +----------------+ +-------+ +---------------+ | +-------------------------------------------------------------------------+
由于SOFARegistry主要是基于SOFABolt,无法绕开,因此咱们须要首先简单介绍SOFABolt。
Bolt是基于Netty,因此要先说明Netty Channel。
在Netty框架中,Channel是其中核心概念之一,是Netty网络通讯的主体,由它负责同对端进行网络通讯、注册和数据操做等功能。
Netty对Jdk原生的ServerSocketChannel
进行了封装和加强封装成了NioXXXChannel
, 相对于原生的JdkChannel, Netty的Channel增长了以下的组件。
根据服务端和客户端,Channel能够分红两类:
NioServerSocketChannel
NioSocketChannel
其实inbound和outbound分别用于标识 Context 所对应的handler的类型, 在Netty中事件能够分为Inbound和Outbound事件,在ChannelPipeline的类注释中,有以下图示:
* * I/O Request * via {@link Channel} or * {@link ChannelHandlerContext} * | * +---------------------------------------------------+---------------+ * | ChannelPipeline | | * | \|/ | * | +---------------------+ +-----------+----------+ | * | | Inbound Handler N | | Outbound Handler 1 | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * | | \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler N-1 | | Outbound Handler 2 | | * | +----------+----------+ +-----------+----------+ | * | /|\ . | * | . . | * | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| * | [ method call] [method call] | * | . . | * | . \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 2 | | Outbound Handler M-1 | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * | | \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 1 | | Outbound Handler M | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * +---------------+-----------------------------------+---------------+ * | \|/ * +---------------+-----------------------------------+---------------+ * | | | | * | [ Socket.read() ] [ Socket.write() ] | * | | * | Netty Internal I/O Threads (Transport Implementation) | * +-------------------------------------------------------------------+
Connection其删减版定义以下,能够看到其主要成员就是 Netty channel 实例:
public class Connection { private Channel channel; private final ConcurrentHashMap<Integer, InvokeFuture> invokeFutureMap = new ConcurrentHashMap<Integer, InvokeFuture>(4); /** Attribute key for connection */ public static final AttributeKey<Connection> CONNECTION = AttributeKey.valueOf("connection"); /** Attribute key for heartbeat count */ public static final AttributeKey<Integer> HEARTBEAT_COUNT = AttributeKey.valueOf("heartbeatCount"); /** Attribute key for heartbeat switch for each connection */ public static final AttributeKey<Boolean> HEARTBEAT_SWITCH = AttributeKey.valueOf("heartbeatSwitch"); /** Attribute key for protocol */ public static final AttributeKey<ProtocolCode> PROTOCOL = AttributeKey.valueOf("protocol"); /** Attribute key for version */ public static final AttributeKey<Byte> VERSION = AttributeKey.valueOf("version"); private Url url; private final ConcurrentHashMap<Integer/* id */, String/* poolKey */> id2PoolKey = new ConcurrentHashMap<Integer, String>(256); private Set<String> poolKeys = new ConcurrentHashSet<String>(); private final ConcurrentHashMap<String/* attr key*/, Object /*attr value*/> attributes = new ConcurrentHashMap<String, Object>(); }
Connection的辅助类不少,摘录以下:
另外,须要注意的点以下:
不管是服务端仍是客户端,其实本质都在作一件事情:建立 ConnectionEventHandler 实例并添加到 Netty 的 pipeline 中,基本原理是:
ConnectionEventHandler处理两类事件
以后当有 ConnectionEvent 触发时(不管是 Netty 定义的事件被触发,仍是 SOFABolt 定义的事件被触发),ConnectionEventHandler 会经过异步线程执行器通知 ConnectionEventListener,ConnectionEventListener 将消息派发给具体的 ConnectionEventProcessor 实现类。
RpcServer实现了一个Server所必须的基本机制,能够直接使用,好比:
其中,须要说明的是:
RpcHandler -> RpcCommandHandler -> RpcRequestProcessor -> UserProcessor
RpcHandler -> RpcCommandHandler -> RpcResponseProcessor
RpcHandler -> RpcCommandHandler -> RpcHeartBeatProcessor
具体定义以下:
public class RpcServer extends AbstractRemotingServer { /** server bootstrap */ private ServerBootstrap bootstrap; /** channelFuture */ private ChannelFuture channelFuture; /** connection event handler */ private ConnectionEventHandler connectionEventHandler; /** connection event listener */ private ConnectionEventListener connectionEventListener = new ConnectionEventListener(); /** user processors of rpc server */ private ConcurrentHashMap<String, UserProcessor<?>> userProcessors = new ConcurrentHashMap<String, UserProcessor<?>>( 4); /** boss event loop group, boss group should not be daemon, need shutdown manually*/ private final EventLoopGroup bossGroup = NettyEventLoopUtil.newEventLoopGroup(NamedThreadFactory("Rpc-netty-server-boss",false)); /** worker event loop group. Reuse I/O worker threads between rpc servers. */ private static final EventLoopGroup workerGroup = NettyEventLoopUtil.newEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2,new NamedThreadFactory("Rpc-netty-server-worker",true)); /** address parser to get custom args */ private RemotingAddressParser addressParser; /** connection manager */ private DefaultConnectionManager connectionManager; /** rpc remoting */ protected RpcRemoting rpcRemoting; /** rpc codec */ private Codec codec = new RpcCodec(); }
RpcClient主要机制以下:
整个 Connection 设计的核心
);具体代码以下:
public class RpcClient extends AbstractConfigurableInstance { private ConcurrentHashMap<String, UserProcessor<?>> userProcessors = new ConcurrentHashMap<String, UserProcessor<?>>(); /** connection factory */ private ConnectionFactory connectionFactory = new RpcConnectionFactory(userProcessors, this); /** connection event handler */ private ConnectionEventHandler connectionEventHandler = new RpcConnectionEventHandler(switches()); /** reconnect manager */ private ReconnectManager reconnectManager; /** connection event listener */ private ConnectionEventListener connectionEventListener = new ConnectionEventListener(); /** address parser to get custom args */ private RemotingAddressParser addressParser; /** connection select strategy */ private ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy( switches()); /** connection manager */ private DefaultConnectionManager connectionManager = new DefaultConnectionManager(connectionSelectStrategy,connectionFactory,connectionEventHandler,connectionEventListener,switches()); /** rpc remoting */ protected RpcRemoting rpcRemoting; /** task scanner */ private RpcTaskScanner taskScanner = new RpcTaskScanner(); /** connection monitor */ private DefaultConnectionMonitor connectionMonitor; /** connection monitor strategy */ private ConnectionMonitorStrategy monitorStrategy; }
针对上述提到的基础封装,系统针对Bolt和Http都进行了实现。如下是SOFABolt的对应封装 registry-remoting-bolt。
├── AsyncUserProcessorAdapter.java ├── BoltChannel.java ├── BoltChannelUtil.java ├── BoltClient.java ├── BoltServer.java ├── ConnectionEventAdapter.java ├── SyncUserProcessorAdapter.java └── exchange └── BoltExchange.java
BoltChannel 主要是封装了com.alipay.remoting.Connection,而com.alipay.remoting.Connection又封装了io.netty.channel.Channel。
感受Channel封装的不够完全,仍是把Connection暴露出来了,以此获得本地IP,port,远端IP,port等。
public class BoltChannel implements Channel { private Connection connection; private AsyncContext asyncContext; private BizContext bizContext; private final Map<String, Object> attributes = new ConcurrentHashMap<>(); }
BoltServer 封装了 com.alipay.remoting.rpc.RpcServer。
在初始化的时候,调用 addConnectionEventProcessor,registerUserProcessor等把 handler 注册到 RpcServer。
用 ConcurrentHashMap 记录了全部链接到本Server 的Channel,key是IP:port。
public class BoltServer implements Server { /** * accoding server port * can not be null */ private final URL url; private final List<ChannelHandler> channelHandlers; /** * bolt server */ private RpcServer boltServer; /** * started status */ private AtomicBoolean isStarted = new AtomicBoolean(false); private Map<String, Channel> channels = new ConcurrentHashMap<>(); private AtomicBoolean initHandler = new AtomicBoolean(false); }
其主要功能以下,基本就是调用Bolt的功能:
@Override public Object sendSync(Channel channel, Object message, int timeoutMillis) { if (channel != null && channel.isConnected()) { Url boltUrl = null; try { boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel .getRemoteAddress().getPort()); return boltServer.invokeSync(boltUrl, message, timeoutMillis); } } @Override public void sendCallback(Channel channel, Object message, CallbackHandler callbackHandler, int timeoutMillis) { if (channel != null && channel.isConnected()) { Url boltUrl = null; try { boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel .getRemoteAddress().getPort()); boltServer.invokeWithCallback(boltUrl, message, new InvokeCallback() { @Override public void onResponse(Object result) { callbackHandler.onCallback(channel, result); } @Override public void onException(Throwable e) { callbackHandler.onException(channel, e); } @Override public Executor getExecutor() { return callbackHandler.getExecutor(); } }, timeoutMillis); return; } }
主要就是封装了 com.alipay.remoting.rpc.RpcClient。
在初始化的时候,调用 addConnectionEventProcessor,registerUserProcessor 等把 handler 注册到 RpcClient。
public class BoltClient implements Client { private RpcClient rpcClient; private AtomicBoolean closed = new AtomicBoolean(false); private int connectTimeout = 2000; private final int connNum; }
主要函数以下:
@Override public Channel connect(URL url) { try { Connection connection = getBoltConnection(rpcClient, url); BoltChannel channel = new BoltChannel(); channel.setConnection(connection); return channel; } } protected Connection getBoltConnection(RpcClient rpcClient, URL url) throws RemotingException { Url boltUrl = createBoltUrl(url); try { Connection connection = rpcClient.getConnection(boltUrl, connectTimeout); if (connection == null || !connection.isFine()) { if (connection != null) { connection.close(); } } return connection; } } @Override public Object sendSync(URL url, Object message, int timeoutMillis) { return rpcClient.invokeSync(createBoltUrl(url), message, timeoutMillis); } @Override public Object sendSync(Channel channel, Object message, int timeoutMillis) { if (channel != null && channel.isConnected()) { BoltChannel boltChannel = (BoltChannel) channel; return rpcClient.invokeSync(boltChannel.getConnection(), message, timeoutMillis); } } @Override public void sendCallback(URL url, Object message, CallbackHandler callbackHandler, int timeoutMillis) { try { Connection connection = getBoltConnection(rpcClient, url); BoltChannel channel = new BoltChannel(); channel.setConnection(connection); rpcClient.invokeWithCallback(connection, message, new InvokeCallback() { @Override public void onResponse(Object result) { callbackHandler.onCallback(channel, result); } @Override public void onException(Throwable e) { callbackHandler.onException(channel, e); } @Override public Executor getExecutor() { return callbackHandler.getExecutor(); } }, timeoutMillis); return; } }
BoltExchange 的主要做用是维护了Client和Server两个ConcurrentHashMap。就是全部的Clients和Servers。
这里进行了第一层链接维护。
Map<String, Client> clients 是依据String对Client作了区分,String包括以下:
String DATA_SERVER_TYPE = "dataServer"; String META_SERVER_TYPE = "metaServer";
就是说,假如 Data Server 使用了BoltExchange,则其内部只有两个BoltClient,这两个Client分别被 同 dataServer 和 metaServer 的交互 所复用。
ConcurrentHashMap<Integer, Server> serverMap 是依据自己端口对 自己启动的Server 作了区分。
public class BoltExchange implements Exchange<ChannelHandler> { private Map<String, Client> clients = new ConcurrentHashMap<>(); private ConcurrentHashMap<Integer, Server> serverMap = new ConcurrentHashMap<>(); @Override public Client connect(String serverType, URL serverUrl, ChannelHandler... channelHandlers) { return this.connect(serverType, 1, serverUrl, channelHandlers); } @Override public Client connect(String serverType, int connNum, URL serverUrl, ChannelHandler... channelHandlers) { Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers)); client.connect(serverUrl); return client; } @Override public Server open(URL url, ChannelHandler... channelHandlers) { BoltServer server = createBoltServer(url, channelHandlers); setServer(server, url); server.startServer(); return server; } @Override public Client getClient(String serverType) { return clients.get(serverType); } @Override public Server getServer(Integer port) { return serverMap.get(port); } /** * add server into serverMap * @param server * @param url */ public void setServer(Server server, URL url) { serverMap.putIfAbsent(url.getPort(), server); } private BoltClient newBoltClient(int connNum, ChannelHandler[] channelHandlers) { BoltClient boltClient = createBoltClient(connNum); boltClient.initHandlers(Arrays.asList(channelHandlers)); return boltClient; } protected BoltClient createBoltClient(int connNum) { return new BoltClient(connNum); } protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) { return new BoltServer(url, Arrays.asList(channelHandlers)); } }
内部会根据不一样的server type从 boltExchange 取出对应Bolt Client。
String DATA_SERVER_TYPE = "dataServer"; String META_SERVER_TYPE = "metaServer";
用以下方法put Client。
Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers));
Bolt用以下办法获取Client
Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE); Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
获得对应的Client以后,而后分别根据参数的url创建Channel,或者发送请求。
Channel channel = client.getChannel(url); client.sendCallback(request.getRequestUrl()....;
此时大体逻辑以下:
+----------------------------------------+ | BoltExchange | | +------------------------------------+ | | | | | | | Map<String, Client> | | | | | | | | ConcurrentHashMap<Integer, Server> | | | | | | | +------------------------------------+ | +----------------------------------------+ | | | | | | v v +-----------------------+----------+ +---------+---------------------------------+ | BoltClient | | BoltServer | | | | | | +------------------------------+ | | +---------------------------------------+ | | | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | | | | +-------------------------+ | | | | | | | | | ConnectionEventHandler | | | | | Map<String, Channel> channels | | | | | | | | | | | | | | | ConnectionFactory | | | | | List<ChannelHandler> channelHandlers | | | | +-------------------------+ | | | | | | | +------------------------------+ | | +---------------------------------------+ | +----------------------------------+ +-------------------------------------------+ | | | | | | | +---------------------------+ | | v | | v | +---+------------+ <--------------+ +-----v--------------+--------------+ | | ChannelHandler | | BoltChannel | | +----------------+ | | | | +------------------------------+ | | | |com.alipay.remoting.Connection| | | | +------------------------------+ | | +-----------------------------------+ | | v v +---+-------------+ +---------+------------+ | CallbackHandler | | netty.channel.Channel| +-----------------+ +----------------------+
如下是基于Jetty封装的Http模块 registry-remoting-http。
├── JerseyChannel.java ├── JerseyClient.java ├── JerseyJettyServer.java ├── exchange │ └── JerseyExchange.java └── jetty └── server ├── HttpChannelOverHttpCustom.java ├── HttpConnectionCustom.java └── HttpConnectionCustomFactory.java
由于 Http 服务不是SOFTRegistry主要功能,因此此处略去。
咱们从目录结构能够大体看出功能模块划分。
├── remoting │ ├── DataNodeExchanger.java │ ├── MetaNodeExchanger.java │ ├── dataserver │ │ ├── DataServerConnectionFactory.java │ │ ├── DataServerNodeFactory.java │ │ ├── GetSyncDataHandler.java │ │ ├── SyncDataCallback.java │ │ ├── handler │ │ │ ├── DataSyncServerConnectionHandler.java │ │ │ ├── FetchDataHandler.java │ │ │ ├── NotifyDataSyncHandler.java │ │ │ ├── NotifyFetchDatumHandler.java │ │ │ ├── NotifyOnlineHandler.java │ │ │ └── SyncDataHandler.java │ │ └── task │ │ ├── AbstractTask.java │ │ ├── ConnectionRefreshTask.java │ │ └── RenewNodeTask.java │ ├── handler │ │ ├── AbstractClientHandler.java │ │ └── AbstractServerHandler.java │ ├── metaserver │ │ ├── handler │ │ ├── provideData │ │ └── task │ └── sessionserver │ ├── disconnect │ ├── forward │ └── handler
由于每一个大功能模块大同小异,因此咱们下面主要以 dataserver 目录下为主,兼顾 metaserver 和 sessionserver目录下的特殊部分。
Data Server 比较复杂,便是服务器也是客户端,因此分别作了不一样的组件来抽象这两个概念。
DataServerBootstrap#start 方法,用于启动一系列的初始化服务。在此函数中,启动了若干网络服务,用来提供 对外接口。
public void start() { try { openDataServer(); openDataSyncServer(); openHttpServer(); startRaftClient(); } }
各 Handler 具体做用如图所示:
DataServer 和 DataSyncServer 是 Bolt Server,是节点间的 bolt 通讯组件,其中:
具体代码以下:
private void openDataServer() { try { if (serverForSessionStarted.compareAndSet(false, true)) { server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig.getPort()), serverHandlers .toArray(new ChannelHandler[serverHandlers.size()])); } } } private void openDataSyncServer() { try { if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } }
这两个server的handlers有部分重复,怀疑开发者在作功能迁移。
@Bean(name = "serverSyncHandlers") public Collection<AbstractServerHandler> serverSyncHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(getDataHandler()); list.add(publishDataProcessor()); list.add(unPublishDataHandler()); list.add(notifyFetchDatumHandler()); list.add(notifyOnlineHandler()); list.add(syncDataHandler()); list.add(dataSyncServerConnectionHandler()); return list; } @Bean(name = "serverHandlers") public Collection<AbstractServerHandler> serverHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(getDataHandler()); list.add(clientOffHandler()); list.add(getDataVersionsHandler()); list.add(publishDataProcessor()); list.add(sessionServerRegisterHandler()); list.add(unPublishDataHandler()); list.add(dataServerConnectionHandler()); list.add(renewDatumHandler()); list.add(datumSnapshotHandler()); return list; }
这里用DataSyncServer作具体说明。
启动 DataSyncServer 时,注册了以下几个 handler 用于处理 bolt 请求 :
DayaSyncServer 注册的 Handler 以下:
该 Handler 主要用于数据的获取,当一个请求过来时,会经过请求中的 DataCenter 和 DataInfoId 获取当前 DataServer 节点存储的相应数据。
当有数据发布者 publisher 上下线时,会分别触发 publishDataProcessor 或 unPublishDataHandler ,Handler 会往 dataChangeEventCenter 中添加一个数据变动事件,用于异步地通知事件变动中心数据的变动。事件变动中心收到该事件以后,会往队列中加入事件。此时 dataChangeEventCenter 会根据不一样的事件类型异步地对上下线数据进行相应的处理。
与此同时,DataChangeHandler 会把这个事件变动信息经过 ChangeNotifier 对外发布,通知其余节点进行数据同步。
这是一个数据拉取请求,当该 Handler 被触发时,通知当前 DataServer 节点进行版本号对比,若请求中数据的版本号高于当前节点缓存中的版本号,则会进行数据同步操做,保证数据是最新的。
这是一个 DataServer 上线通知请求 Handler,当其余节点上线时,会触发该 Handler,从而当前节点在缓存中存储新增的节点信息。用于管理节点状态,到底是 INITIAL 仍是 WORKING 。
节点间数据同步 Handler,该 Handler 被触发时,会经过版本号进行比对,若当前 DataServer 所存储数据版本号含有当前请求版本号,则会返回全部大于当前请求数据版本号的全部数据,便于节点间进行数据同步。
链接管理 Handler,当其余 DataServer 节点与当前 DataServer 节点链接时,会触发 connect 方法,从而在本地缓存中注册链接信息,而当其余 DataServer 节点与当前节点断连时,则会触发 disconnect 方法,从而删除缓存信息,进而保证当前 DataServer 节点存储有全部与之链接的 DataServer 节点。
dataSyncServer 调用链以下:
在 DataServerBootstrap 中有
private void openDataSyncServer() { try { if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } }
而后有
public class BoltExchange implements Exchange<ChannelHandler> { @Override public Server open(URL url, ChannelHandler... channelHandlers) { BoltServer server = createBoltServer(url, channelHandlers); setServer(server, url); server.startServer(); return server; } protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) { return new BoltServer(url, Arrays.asList(channelHandlers)); } }
BoltServer启动以下,而且用户自定义了UserProcessor。
public class BoltServer implements Server { public BoltServer(URL url, List<ChannelHandler> channelHandlers) { this.channelHandlers = channelHandlers; this.url = url; } public void startServer() { if (isStarted.compareAndSet(false, true)) { boltServer = new RpcServer(url.getPort(), true); initHandler(); boltServer.start(); } } private void initHandler() { if (initHandler.compareAndSet(false, true)) { boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT, new ConnectionEventAdapter(ConnectionEventType.CONNECT, getConnectionEventHandler(), this)); boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE, new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(), this)); boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION, new ConnectionEventAdapter(ConnectionEventType.EXCEPTION, getConnectionEventHandler(), this)); registerUserProcessorHandler(); } } //这里分了同步和异步 private void registerUserProcessorHandler() { if (channelHandlers != null) { for (ChannelHandler channelHandler : channelHandlers) { if (HandlerType.PROCESSER.equals(channelHandler.getType())) { if (InvokeType.SYNC.equals(channelHandler.getInvokeType())) { boltServer.registerUserProcessor(new SyncUserProcessorAdapter( channelHandler)); } else { boltServer.registerUserProcessor(new AsyncUserProcessorAdapter( channelHandler)); } } } } } }
HttpServer 是用于控制的Http 通讯组件以及其配置,提供一系列 REST 接口,用于 dashboard 管理、数据查询等;
private void openHttpServer() { try { if (httpServerStarted.compareAndSet(false, true)) { bindResourceConfig(); httpServer = jerseyExchange.open( new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig }); } } }
RaftClient 是基于Raft协议的客户端,用来基于raft协议获取meta server leader信息。
private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }
具体DefaultMetaServiceImpl中实现代码以下:
@Override public void startRaftClient() { try { if (clientStart.compareAndSet(false, true)) { String serverConf = getServerConfig(); raftClient = new RaftClient(getGroup(), serverConf); raftClient.start(); } } }
功能模块最后逻辑大体以下:
+-> getDataHandler | +-> publishDataProcessor | +--> unPublishDataHandler | +---------------+ +--> notifyFetchDatumHandler +----> | DataSyncServer+->+ | +---------------+ +--> notifyOnlineHandler | | | +--> syncDataHandler | +-------------+ | +-------------------+ +----> | HttpServer | +-> dataSyncServerConnectionHandler |DataServerBootstrap+-->+ +-------------+ +-------------------+ | | | +------------+ +-> getDataHandler +-----> | RaftClient | | | +------------+ +--> clientOffHandler | | | +-------------+ +--> getDataVersionsHandler +-----> | DataServer +-->+ +-------------+ +--> publishDataProcessor | +--> sessionServerRegisterHandler | +--> unPublishDataHandler | +--> dataServerConnectionHandler | +--> renewDatumHandler | +-> datumSnapshotHandler
Exchange 做为 Client / Server 链接的抽象,负责节点之间的链接。
Data Server 这里主要是 DataNodeExchanger 和 MetaNodeExchanger,用来:
封装 BoltExchange
把 Bolt Client 和 Bolt Channel 进行抽象。
提供能够直接使用的网络API,好比ForwardServiceImpl,GetSyncDataHandler这些散落的Bean能够直接使用DataNodeExchanger来作网络交互。
从对外接口中能够看出来,
这里有两个问题:
多是由于Session Server可能会不少,不必保存Bolt client和Server,可是Session对应的有ConnectionFactory。ConectionFactory 是较低层次的封装,下文会讲解。
咱们以 DataNodeExchanger 为例。
把全部 Data Server 相关的 非 “Server,Client概念 直接强相关” 的网络操做统一集中在这里。
能够看到,主要是对 boltExchange 进行了更高层次的封装。
具体代码以下:
import com.alipay.sofa.registry.remoting.Channel; import com.alipay.sofa.registry.remoting.ChannelHandler; import com.alipay.sofa.registry.remoting.Client; import com.alipay.sofa.registry.remoting.exchange.Exchange; import com.alipay.sofa.registry.remoting.exchange.NodeExchanger; import com.alipay.sofa.registry.remoting.exchange.message.Request; import com.alipay.sofa.registry.remoting.exchange.message.Response; public class DataNodeExchanger implements NodeExchanger { @Autowired private Exchange boltExchange; @Autowired private DataServerConfig dataServerConfig; @Resource(name = "dataClientHandlers") private Collection<AbstractClientHandler> dataClientHandlers; @Override public Response request(Request request) { Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE); if (null != request.getCallBackHandler()) { client.sendCallback(request.getRequestUrl(), request.getRequestBody(), request.getCallBackHandler(), request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout()); return () -> Response.ResultStatus.SUCCESSFUL; } else { final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(), dataServerConfig.getRpcTimeout()); return () -> result; } } public Channel connect(URL url) { Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE); if (client == null) { synchronized (this) { client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE); if (client == null) { client = boltExchange.connect(Exchange.DATA_SERVER_TYPE, url, dataClientHandlers.toArray(new ChannelHandler[dataClientHandlers.size()])); } } } Channel channel = client.getChannel(url); if (channel == null) { synchronized (this) { channel = client.getChannel(url); if (channel == null) { channel = client.connect(url); } } } return channel; } }
上面代码中使用了 dataClientHandlers,其是BoltClient所使用的,Server会对此进行推送,这两个Handler会处理。
@Bean(name = "dataClientHandlers") public Collection<AbstractClientHandler> dataClientHandlers() { Collection<AbstractClientHandler> list = new ArrayList<>(); list.add(notifyDataSyncHandler()); list.add(fetchDataHandler()); return list; }
此时具体网络概念以下:
+-------------------+ | ForwardServiceImpl| +-----------------+ +-------------------+ | | +--------------------+ | +-------------------+ | GetSyncDataHandler | +-----------------------> | DataNodeExchanger | +--------------------+ | +-------+-----------+ | | +----------------------------------+ | | | LocalDataServerChangeEventHandler| +--+ | +----------------------------------+ | | | | +------------------------------+ | v | DataServerChangeEventHandler +--------+ +-----+--------+ +------------------------------+ | BoltExchange | +-----+--------+ | +-----------------+ | | JerseyExchange | | +-------+---------+ +-------+-------+ | | | | | | v v v +--------+----------+ +------+-----+ +-----+------+ | JerseyJettyServer | | BoltServer | | BoltServer | +-------------------+ +------------+ +------------+ httpServer server dataSyncServer
AbstractServerHandler 和 AbstractClientHandler 对 com.alipay.sofa.registry.remoting.ChannelHandler
进行了实现。
这里须要结合SOFABolt来说解。
以 RpcServer 为例,SOFABolt在这里的使用是两种处理器:
sendResponse
方法返回处理结果;Handler主要代码分别以下:
public abstract class AbstractServerHandler<T> implements ChannelHandler<T> { protected NodeType getConnectNodeType() { return NodeType.DATA; } @Override public Object reply(Channel channel, T request) { try { logRequest(channel, request); checkParam(request); return doHandle(channel, request); } catch (Exception e) { return buildFailedResponse(e.getMessage()); } } } public abstract class AbstractClientHandler<T> implements ChannelHandler<T> { @Override public Object reply(Channel channel, T request) { try { logRequest(channel, request); checkParam(request); return doHandle(channel, request); } catch (Exception e) { return buildFailedResponse(e.getMessage()); } } }
系统能够据此实现各类派生类。
这里须要注意的是:ChannelHandler之中分红两类,分别以下,分别对应了RpcServer中的listener和userProcessor。
enum HandlerType { LISENTER, PROCESSER }
以serverSyncHandlers为例,只有dataSyncServerConnectionHandler是Listener,其他都是Processor。
这也符合常理,由于消息响应函数就是应该只有一个。
@Bean(name = "serverSyncHandlers") public Collection<AbstractServerHandler> serverSyncHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(getDataHandler()); list.add(publishDataProcessor()); list.add(unPublishDataHandler()); list.add(notifyFetchDatumHandler()); list.add(notifyOnlineHandler()); list.add(syncDataHandler()); list.add(dataSyncServerConnectionHandler()); 只有这个是Listener,其他都是Processor。 return list; }
总结以下:
+-> getDataHandler | +-> publishDataProcessor | +--> unPublishDataHandler +-------------------+ | | serverSyncHandlers+-------> notifyFetchDatumHandler +-------------------+ | +--> notifyOnlineHandler | +--> syncDataHandler | +-> dataSyncServerConnectionHandler(Listener)
在启动时,会使用serverSyncHandlers完成BoltServer的启动。
private void openDataSyncServer() { try { if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } }
BoltExchange中有:
@Override public Server open(URL url, ChannelHandler... channelHandlers) { BoltServer server = createBoltServer(url, channelHandlers); setServer(server, url); server.startServer(); return server; } protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) { return new BoltServer(url, Arrays.asList(channelHandlers)); }
在BoltServer中有以下代码,主要是设置Handler。
public void startServer() { if (isStarted.compareAndSet(false, true)) { try { boltServer = new RpcServer(url.getPort(), true); initHandler(); boltServer.start(); } } } private void initHandler() { if (initHandler.compareAndSet(false, true)) { boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT, new ConnectionEventAdapter(ConnectionEventType.CONNECT, getConnectionEventHandler(), this)); boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE, new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(), this)); boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION, new ConnectionEventAdapter(ConnectionEventType.EXCEPTION, getConnectionEventHandler(), this)); registerUserProcessorHandler(); } }
最终则是调用到RpcServer之中,注册了链接响应函数和用户定义函数。
/** * Add processor to process connection event. */ public void addConnectionEventProcessor(ConnectionEventType type, ConnectionEventProcessor processor) { this.connectionEventListener.addConnectionEventProcessor(type, processor); } /** * Use UserProcessorRegisterHelper{@link UserProcessorRegisterHelper} to help register user processor for server side. */ @Override public void registerUserProcessor(UserProcessor<?> processor) { UserProcessorRegisterHelper.registerUserProcessor(processor, this.userProcessors); }
此时与SOFABolt逻辑以下:
+----------------------------+ | [RpcServer] | | | | | +--> EXCEPTION +--> DataSyncServerConnectionHandler | | | | connectionEventListener--->--->-CONNECT +----> DataSyncServerConnectionHandler | | | | | +--> CLOSE +-----> DataSyncServerConnectionHandler | | | | | | +--> EXCEPTION +--> DataSyncServerConnectionHandler | | | | connectionEventHandler +-->---> CONNECT +----> DataSyncServerConnectionHandler | | | | | +--> CLOSE +-----> DataSyncServerConnectionHandler | | | | | | +----> GetDataRequest +--------> getDataHandler | | | | | +----> PublishDataRequest +-----> publishDataProcessor | | | | userProcessors +----------------> UnPublishDataRequest +----> unPublishDataHandler | | | | | +----> NotifyOnlineRequest-----> notifyFetchDatumHandler | | | | | +----> NotifyOnlineRequest +-----> notifyOnlineHandler +----------------------------+ | +----> SyncDataRequest +-------> syncDataHandler
至此,咱们把SOFARegistry网络封装和操做大体梳理了下。
从逻辑上看,阿里提供了两个层级的封装:
从链接角度看,阿里实现了基于 netty.channel.Channel 的封装,从下往上看是:
从应用角度看,阿里实现了Server,Client层次的封装,从下往上看是:
具体逻辑大体以下:
+---------------------+ +---------------------+ | | | | | DataNodeExchanger | | MetaNodeExchanger | | | | | +----------------+----+ +--------+------------+ | | +-----------+ +-------------+ | | v v +---------+---------------------+--------+ | BoltExchange | | +------------------------------------+ | | | | | | | Map<String, Client> | | | | | | | | ConcurrentHashMap<Integer, Server> | | | | | | | +------------------------------------+ | +----------------------------------------+ | | | | | | v v +-----------------------+----------+ +---------+---------------------------------+ | BoltClient | | BoltServer | | | | | | +------------------------------+ | | +---------------------------------------+ | | | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | | | | +-------------------------+ | | | | | | | | | ConnectionEventHandler | | | | | Map<String, Channel> channels | | | | | | | | | | | | | | | ConnectionFactory | | | | | List<ChannelHandler> channelHandlers | | | | +-------------------------+ | | | | | | | +------------------------------+ | | +---------------------------------------+ | +----------------------------------+ +-------------------------------------------+ | | | | | | | +---------------------------+ | | v | | v | +---+------------+ <--------------+ +-----v--------------+--------------+ | | ChannelHandler | | BoltChannel | | +----------------+ | | | | +------------------------------+ | | | |com.alipay.remoting.Connection| | | | +------------------------------+ | | +-----------------------------------+ | | v v +---+-------------+ +---------+------------+ | CallbackHandler | | netty.channel.Channel| +-----------------+ +----------------------+
阿里这里封装的很是细致。由于SOFARegistry是比较繁杂的系统,因此把网络概念,功能作封装是至关有必要的。你们在平常开发中可能不用这么细致的封装,能够参考阿里的思路,本身作选择和裁剪便可。
https://timyang.net/architecture/cell-distributed-system/
SOFABolt 源码分析12 - Connection 链接管理设计
SOFABolt 源码分析2 - RpcServer 服务端启动的设计
SOFABolt 源码分析3 - RpcClient 客户端启动的设计