你是否想知道一个分布式系统的网络传输解决方案,那你能够学习下RocketMQ的网络传输原理,从RocketMQ的Remoting网络处理部分,能够学习到如何进行高效的网络传输,这些思想能够应用到不一样的业务中。java
其实大部分应用的网络处理都要解决以下图所示的问题:json
那么就以RocketMQ的源码入手,看看它是如何架构如上的结构的。网络
2.2 编码解码数据结构
在RocketMQ中,全部的通信都是使用RemotingCommand这个结构,这个结构的内容以下:架构
[java] view plain copyapp
[java] view plain copy异步
private transient CommandCustomHeader customHeader; // 业务逻辑中使用该结构,传输时,使用 extFields
/** * Body 部分 */private transient byte[] body;async
2.2.1 RemotingCommand转为网络传输数据分布式
在MQ中,全部数据传输都使用该数据结构进行数据传输,当把数据转为网络传输时,会将customHeader转为HashMap的extFields,再转为json串函数
2.2.2 传输格式:
Length |
Header length |
Header data |
Body |
2.2.3 编码过程(重点函数:makeCustomHeaderToNet)
A. 将业务上的CustomHeader转为extFields;
B. 而后调用RemotingSerializable的encode,将RemotingCommand的Header部分转为byte类型
C. 在按照传输格式,将数据转为最终的header+body结构进行传输
2.2.4 解码过程(重点函数:decodeCommandCustomHeader)
A. 首先将获取的byteBuffer,按照传输格式进行解包,获得其headerData和bodyData
B. 将HeaderData部分进行decode,解包为RemotingCommand
C. 业务层调用decodeCommandCustomHeader(m.class)将头部解析为对应的m类
备注:
transient:当序列化类时,有该属性的变量不进行序列化
Netty在处理通讯层的事件时,将其NettyEventExecuter的eventQueue中,再起一个线程,不断地处理存入消息。
2.3.1 Put消息
在Netty的注册部分,handler在addLast的时候,将NettyConnetManageHandler注册进去;这里面对应了connect、disconnect、close、channelRegistered等等事件,对于这些事件,将会调用NettyEventExecuter.putNettyEvent将消息放入Queue中;
2.3.2 Get处理消息
NettyEventExecuter处理线程会不断从queue中读取消息进行处理,调用注册的ChannelEventListener进行处理;
2.4.1 NettyRemotingAbstract:它是做为NettyRemotingServer和NettyRemotingClient的基类,对发送和接收的公共部分进行了处理
A. 数据结构和基础函数
A.1 首先保存了RPC处理器:HashMap<Integer/*request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable
A.2 其次保存了全部对外请求ConcurrentHashMap<Integer/* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer,ResponseFuture>(256);
A.3 scanResponseTable:扫描responseTable,将超时的ResponseFuture直接移除
B. 发送部分
B.1 invokeSyncImpl: 同步发送,发送时,生成ResponseFuture,放入responseTable中;而后发送后等待设置的timeout(3s)时间,若是对应的ResponseFuture为空,则报错;不然返回RemoteCommand进行业务逻辑处理;
B.2 invokeAsyncImpl:异步发送,发送时,生成ResponseFuture,放入responseTable中;若是超过scanResponseTable的timeout (30s),则报错;不然调用注册的invokeCallback进行回调处理;
B.3 invokeOnewayImpl:单向发送,不将消息写入responseTable中,直接返回;
C. 接收消息部分
C.1 processRequestCommand:接收消息,做为Server端,接收的消息是请求,那么调用processTable对应的事件进行处理
C.2 processResponseCommand:接收消息,做为Client端,接收的消息是回复,即接收到Server端的回复,那么从responseTable中,首先获取opaque对应的ResponseFuture,若是这个response是异步回调,则有InvokeCallback,那么调用invokeBack函数,而后将Response塞入ResponseFuture后返回;
2.4.2 NettyRemotingServer
处理过程以下:
首先全部的入口都在start函数:
若是是input方向,那么会先调用NettyDecoder->NettyConnectManageHandler->NettyServerHandler
NettyDecoder(底层编码):会将数据包从byte转为RemotingCommand
NettyConnectManageHandler(通讯层事件):会将请求转入channelRegistered、channelUnregistered、channelActive、channelInactive、userEventTriggered、exceptionCaught,对应的调用NettyRemotingAbstract.putNettyEvent将事件放入Queue中,等待NettyEventExecuter进行处理
NettyServerHandler(业务层事件):调用注册的<Integer/*request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable,进行业务逻辑处理,当processRequestCommand接收到消息时,进行对应的处理
2.4.3 NettyRemotingClient
首先全部的入口都在start函数:
若是是input方向,那么会先调用NettyDecoder->NettyConnectManageHandler->NettyClientHandler
NettyDecoder:会将数据包从byte转为RemotingCommand
NettyConnectManageHandler:会将请求转入channelRegistered、channelUnregistered、channelActive、channelInactive、userEventTriggered、exceptionCaught,对应的调用NettyRemotingAbstract.putNettyEvent将事件放入Queue中,等待NettyEventExecuter进行处理
NettyClientHandler:调用注册的<Integer/* request code */,Pair<NettyRequestProcessor, ExecutorService>> processorTable, 进行业务逻辑处理
ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>():一个remotingClient会管理不少个channel
最后,是client端的超时时间处理,若是链接超过120s没有接收到发送和请求,那么将会断开链接,不然将会是长链接的一个保持。
一个实例:在producer,consumer的链接保持中,虽然有120s的超时时间,可是他们基本都是长链接的一个保持,由于会经过心跳来保持全部的链接。
2.5 后台服务:
1. NettyEventExecutor和ChannelEventListener:主要负责处理connect,disconnect,close等消息。
2. scanResonseTable : 主要负责清理过时超时的response。
3. 异步回调:不算是标准的后台服务,当采用async的发送方式或sync的回调模式时,会在后台线程中执行。
在整个网络传输部分,有以下值得思考借鉴的地方:
3.1 编解码:编码要节省资源,经常使用bit,位为单位进行编码,最终转为json或xml传输(固然还能够选择probuf等)
3.2 限流的使用,这里采用semaphore来进行限流处理
3.3 rpchook的设计,发送前,接受后的hook设计
3.4 发送与接收
发送:invokeAsyncImpl, invokeOnewayImpl, invokeSyncImpl
接收:processReceiveMessage, processRequestMessage, processResponseMessage
发送与接收使用opaque和responseFuture进行交互(即ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable),其中,opaque用于标示发送/接收对,responseFuture的countDownLatch字段用于通知客户端接收到消息,并控制超时时间。
3.5 后台服务的设计
有不少服务不须要是实时的,须要在一致性和可用性之间找到一个平衡,所以,不少非实时任务能够采用一个全局的单线程来维护,参考上面2.5的描述。