《RocketMq》1、网络传输篇

你是否想知道一个分布式系统的网络传输解决方案,那你能够学习下RocketMQ的网络传输原理,从RocketMQ的Remoting网络处理部分,能够学习到如何进行高效的网络传输,这些思想能够应用到不一样的业务中。java

 

1、要解决的问题

         其实大部分应用的网络处理都要解决以下图所示的问题:json

 

 

 

 

那么就以RocketMQ的源码入手,看看它是如何架构如上的结构的。网络

 

2、RocketMQ-remoting详解

2.1首先给出其总体的结构图

 

2.2 编码解码数据结构

在RocketMQ中,全部的通信都是使用RemotingCommand这个结构,这个结构的内容以下:架构

 

[java] view plain copyapp

  1. private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND  
  2. // 1, RESPONSE_COMMAND  
  3.   
  4. private static final int RPC_ONEWAY = 1; // 0, RPC  
  5. // 1, Oneway  
  6.   
  7. /** 
  8.  * Header 部分 
  9.  */  
  10. private int code; // 用于标示请求类型,参见RequestCode,ResponseCode  
  11. private LanguageCode language = LanguageCode.JAVA;  
  12. private int version = 0;  
  13. private int opaque = RequestId.getAndIncrement(); // 每一个消息的惟一标志,request和response经过该字段匹配  
  14. private int flag = 0;  
  15. private String remark;  
  16. private HashMap<String, String> extFields; // 传输时使用,CommandCustomHeader转为该结构<key,value>后,再统一转为json传输。所以  

[java] view plain copy异步

  1. CommandCustomHeader只能是String,Int,Long等基础数据结构,不能是复合数据结构  

private transient CommandCustomHeader customHeader; // 业务逻辑中使用该结构,传输时,使用 extFields
/** * Body 部分 */private transient byte[] body;async

 

 

2.2.1 RemotingCommand转为网络传输数据分布式

在MQ中,全部数据传输都使用该数据结构进行数据传输,当把数据转为网络传输时,会将customHeader转为HashMap的extFields,再转为json串函数

 

2.2.2 传输格式:

Length

Header length

Header data

Body

 

2.2.3 编码过程(重点函数:makeCustomHeaderToNet)

A. 将业务上的CustomHeader转为extFields;

B. 而后调用RemotingSerializable的encode,将RemotingCommand的Header部分转为byte类型

C. 在按照传输格式,将数据转为最终的header+body结构进行传输

 

2.2.4 解码过程(重点函数:decodeCommandCustomHeader)

A. 首先将获取的byteBuffer,按照传输格式进行解包,获得其headerData和bodyData

B. 将HeaderData部分进行decode,解包为RemotingCommand

C. 业务层调用decodeCommandCustomHeader(m.class)将头部解析为对应的m类

 

 

备注:

transient:当序列化类时,有该属性的变量不进行序列化

 

2.3 通讯层处理

Netty在处理通讯层的事件时,将其NettyEventExecuter的eventQueue中,再起一个线程,不断地处理存入消息。

2.3.1 Put消息

在Netty的注册部分,handler在addLast的时候,将NettyConnetManageHandler注册进去;这里面对应了connect、disconnect、close、channelRegistered等等事件,对于这些事件,将会调用NettyEventExecuter.putNettyEvent将消息放入Queue中;
2.3.2 Get处理消息
NettyEventExecuter处理线程会不断从queue中读取消息进行处理,调用注册的ChannelEventListener进行处理;
 

2.4 业务层处理

2.4.1 NettyRemotingAbstract:它是做为NettyRemotingServer和NettyRemotingClient的基类,对发送和接收的公共部分进行了处理

A. 数据结构和基础函数

A.1 首先保存了RPC处理器:HashMap<Integer/*request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable

A.2 其次保存了全部对外请求ConcurrentHashMap<Integer/* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer,ResponseFuture>(256);

A.3  scanResponseTable:扫描responseTable,将超时的ResponseFuture直接移除

 

B. 发送部分

B.1 invokeSyncImpl: 同步发送,发送时,生成ResponseFuture,放入responseTable中;而后发送后等待设置的timeout(3s)时间,若是对应的ResponseFuture为空,则报错;不然返回RemoteCommand进行业务逻辑处理;

B.2 invokeAsyncImpl:异步发送,发送时,生成ResponseFuture,放入responseTable中;若是超过scanResponseTable的timeout (30s),则报错;不然调用注册的invokeCallback进行回调处理;

B.3 invokeOnewayImpl:单向发送,不将消息写入responseTable中,直接返回;

 

C. 接收消息部分

C.1 processRequestCommand:接收消息,做为Server端,接收的消息是请求,那么调用processTable对应的事件进行处理

C.2 processResponseCommand:接收消息,做为Client端,接收的消息是回复,即接收到Server端的回复,那么从responseTable中,首先获取opaque对应的ResponseFuture,若是这个response是异步回调,则有InvokeCallback,那么调用invokeBack函数,而后将Response塞入ResponseFuture后返回;

 

 

 

2.4.2 NettyRemotingServer

处理过程以下:

首先全部的入口都在start函数:

若是是input方向,那么会先调用NettyDecoder->NettyConnectManageHandler->NettyServerHandler

NettyDecoder(底层编码):会将数据包从byte转为RemotingCommand

NettyConnectManageHandler(通讯层事件):会将请求转入channelRegistered、channelUnregistered、channelActive、channelInactive、userEventTriggered、exceptionCaught,对应的调用NettyRemotingAbstract.putNettyEvent将事件放入Queue中,等待NettyEventExecuter进行处理

NettyServerHandler(业务层事件):调用注册的<Integer/*request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable,进行业务逻辑处理,当processRequestCommand接收到消息时,进行对应的处理

 

2.4.3 NettyRemotingClient

首先全部的入口都在start函数:

若是是input方向,那么会先调用NettyDecoder->NettyConnectManageHandler->NettyClientHandler

NettyDecoder:会将数据包从byte转为RemotingCommand

NettyConnectManageHandler:会将请求转入channelRegistered、channelUnregistered、channelActive、channelInactive、userEventTriggered、exceptionCaught,对应的调用NettyRemotingAbstract.putNettyEvent将事件放入Queue中,等待NettyEventExecuter进行处理

NettyClientHandler:调用注册的<Integer/* request code */,Pair<NettyRequestProcessor, ExecutorService>> processorTable, 进行业务逻辑处理

ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>():一个remotingClient会管理不少个channel

最后,是client端的超时时间处理,若是链接超过120s没有接收到发送和请求,那么将会断开链接,不然将会是长链接的一个保持。

一个实例:在producer,consumer的链接保持中,虽然有120s的超时时间,可是他们基本都是长链接的一个保持,由于会经过心跳来保持全部的链接。

2.5 后台服务:

1. NettyEventExecutor和ChannelEventListener:主要负责处理connect,disconnect,close等消息。

2. scanResonseTable : 主要负责清理过时超时的response。

3. 异步回调:不算是标准的后台服务,当采用async的发送方式或sync的回调模式时,会在后台线程中执行。

 

三. 一些总结与想法

在整个网络传输部分,有以下值得思考借鉴的地方:

3.1 编解码:编码要节省资源,经常使用bit,位为单位进行编码,最终转为json或xml传输(固然还能够选择probuf等)

3.2 限流的使用,这里采用semaphore来进行限流处理

3.3 rpchook的设计,发送前,接受后的hook设计

3.4 发送与接收

发送:invokeAsyncImpl, invokeOnewayImpl, invokeSyncImpl

接收:processReceiveMessage, processRequestMessage, processResponseMessage

发送与接收使用opaque和responseFuture进行交互(即ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable),其中,opaque用于标示发送/接收对,responseFuture的countDownLatch字段用于通知客户端接收到消息,并控制超时时间。

3.5 后台服务的设计

有不少服务不须要是实时的,须要在一致性和可用性之间找到一个平衡,所以,不少非实时任务能够采用一个全局的单线程来维护,参考上面2.5的描述。

相关文章
相关标签/搜索