“消息队列的本质在于消息的发送、存储和接收”。那么,对于一款消息队列来讲,如何作到消息的高效发送与接收是重点和关键。api
1、RocketMQ中Remoting通讯模块概览数组
RocketMQ集群的一部分通讯以下:缓存
从上面(1)~(3)中能够看出在消息生产者, Broker和NameServer之间都会发生通讯(这里只说了MQ的部分通讯),所以如何设计一个良好的网络通讯模块在MQ中相当重要,它将决定RocketMQ集群总体的消息传输能力与最终的性能。bash
rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通讯的模块,它几乎被其余全部须要网络通讯的模块(诸如rocketmq-client、rocketmq-server、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通讯协议并在Netty的基础之上扩展了通讯模块。服务器
ps:鉴于RocketMQ的通讯模块是创建在Netty基础之上的,所以在阅读RocketMQ的源码以前,读者最好先对Netty的多线程模型、JAVA NIO模型均有必定的了解,这样子理解RocketMQ源码会较为快一些。网络
源码部分主要能够分为rocketmq-broker,rocketmq-client,rocketmq-common,rocketmq-filterSrv,rocketmq-namesrv和rocketmq-remoting等模块,通讯框架就封装在rocketmq-remoting模块中。 本文主要从RocketMQ的协议格式,消息编解码,通讯方式(同步/异步/单向)、通讯流程和Remoting模块的Netty多线程处理架构等方面介绍RocketMQ的通讯模块。数据结构
2、RocketMQ中Remoting通讯模块的具体实现多线程
从类层次结构来看:架构
(1)RemotingService:为最上层的接口,提供了三个方法:框架
void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);
复制代码
(2)RemotingClient/RemotingSever:两个接口继承了最上层接口—RemotingService,分别各自为Client和Server提供所必需的方法,下面所列的是RemotingServer的方法:
/**
* 同RemotingClient端同样
*
* @param requestCode
* @param processor
* @param executor
*/
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
/**
* 注册默认的处理器
*
* @param processor
* @param executor
*/
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
int localListenPort();
/**
* 根据请求code来获取不一样的处理Pair
*
* @param requestCode
* @return
*/
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
/**
* 同RemotingClient端同样,同步通讯,有返回RemotingCommand
* @param channel
* @param request
* @param timeoutMillis
* @return
* @throws InterruptedException
* @throws RemotingSendRequestException
* @throws RemotingTimeoutException
*/
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
/**
* 同RemotingClient端同样,异步通讯,无返回RemotingCommand
*
* @param channel
* @param request
* @param timeoutMillis
* @param invokeCallback
* @throws InterruptedException
* @throws RemotingTooMuchRequestException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
*/
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
/**
* 同RemotingClient端同样,单向通讯,诸如心跳包
*
* @param channel
* @param request
* @param timeoutMillis
* @throws InterruptedException
* @throws RemotingTooMuchRequestException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
*/
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
复制代码
(3)NettyRemotingAbstract:Netty通讯处理的抽象类,定义并封装了Netty处理的公共处理方法;
(4)NettyRemotingClient/NettyRemotingServer:分别实现了RemotingClient和RemotingServer, 都继承了NettyRemotingAbstract抽象类。RocketMQ中其余的组件(如client、nameServer、broker在进行消息的发送和接收时均使用这两个组件)
二、消息的协议设计与编码解码
在Client和Server之间完成一次消息发送时,须要对发送的消息进行一个协议约定,所以就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就须要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程当中对全部数据内容的封装,不但包含了全部的数据结构,还包含了编码解码操做。 RemotingCommand类的部分红员变量以下:
这里展现下Broker向NameServer发送一次心跳注册的报文:
[
code=103,//这里的103对应的code就是broker向nameserver注册本身的消息
language=JAVA,
version=137,
opaque=58,//这个就是requestId
flag(B)=0,
remark=null,
extFields={
brokerId=0,
clusterName=DefaultCluster,
brokerAddr=ip1: 10911,
haServerAddr=ip1: 10912,
brokerName=LAPTOP-SMF2CKDN
},
serializeTypeCurrentRPC=JSON
复制代码
RocketMQ通讯协议:
传输内容主要能够分为如下4部分:
1)消息长度:总长度,四个字节存储,占用一个int类型;
2)序列化类型&消息头长度:一样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
3)消息头数据:通过序列化后的消息头数据;
4)消息主体数据:消息主体的二进制字节数据内容;
消息的编码和解码分别在RemotingCommand类的encode和decode方法中完成,下面是消息编码encode方法的具体实现:
public ByteBuffer encode() {
// 1> header length size
int length = 4; //消息总长度
// 2> header data length
//将消息头编码成byte[]
byte[] headerData = this.headerEncode();
//计算头部长度
length += headerData.length;
// 3> body data length
if (this.body != null) {
//消息主体长度
length += body.length;
}
//分配ByteBuffer, 这边加了4,
//这是由于在消息总长度的计算中没有将存储头部长度的4个字节计算在内
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
//将消息总长度放入ByteBuffer
result.putInt(length);
// header length
//将消息头长度放入ByteBuffer
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
//将消息头数据放入ByteBuffer
result.put(headerData);
// body data;
if (this.body != null) {
//将消息主体放入ByteBuffer
result.put(this.body);
}
//重置ByteBuffer的position位置
result.flip();
return result;
}
/**
* markProtocolType方法是将RPC类型和headerData长度编码放到一个byte[4]数组中
*
* @param source
* @param type
* @return
*/
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode();
//右移16位后再和255与->“16-24位”
result[1] = (byte) ((source >> 16) & 0xFF);
//右移8位后再和255与->“8-16位”
result[2] = (byte) ((source >> 8) & 0xFF);
//右移0位后再和255与->“8-0位”
result[3] = (byte) (source & 0xFF);
return result;
}
复制代码
消息解码decode方法是编码的逆向过程,其具体实现以下:
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
//获取byteBuffer的总长度
int length = byteBuffer.limit();
//获取前4个字节,组装int类型,该长度为总长度
int oriHeaderLen = byteBuffer.getInt();
//获取消息头的长度,这里和0xFFFFFF作与运算,编码时候的长度即为24位
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
复制代码
三、消息的通讯方式和通讯流程
在RocketMQ消息队列中支持通讯的方式主要有如下三种:
(1)同步(sync)
(2)异步(async)
(3)单向(oneway)
其中“同步”通讯模式相对简单,通常用在发送心跳包场景下,无需关注其Response。本文将主要介绍RocketMQ的异步通讯流程(限于篇幅,读者能够按照一样的模式进行分析同步通讯流程)。
下面两小节内容主要介绍了Client端发送请求消息和Server端接收消息的具体实现,其中对于Client端的回调能够参考RocketMQ的源码来分析这里就不作详细介绍。
3.一、Client发送请求消息的具体实现
当客户端调用异步通讯接口—invokeAsync时候,先由RemotingClient的实现类—NettyRemotingClient根据addr获取相应的channel(若是本地缓存中没有则建立),随后调用invokeAsyncImpl方法,将数据流转给抽象类NettyRemotingAbstract处理(真正作完发送请求动做的是在NettyRemotingAbstract抽象类的invokeAsyncImpl方法里面)。具体发送请求消息的源代码以下所示:
/**
* invokeAsync(异步调用)
*
* @param channel
* @param request
* @param timeoutMillis
* @param invokeCallback
* @throws InterruptedException
* @throws RemotingTooMuchRequestException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
*/
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
//至关于request ID, RemotingCommand会为每个request产生一个request ID, 从0开始, 每次加1
final int opaque = request.getOpaque();
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
//根据request ID构建ResponseFuture
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
//将ResponseFuture放入responseTable
this.responseTable.put(opaque, responseFuture);
try {
//使用Netty的channel发送请求数据
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
//消息发送后执行
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
//若是发送消息成功给Server,那么这里直接Set后return
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseFuture.putResponse(null);
responseTable.remove(opaque);
try {
//执行回调
executeInvokeCallback(responseFuture);
} catch (Throwable e) {
log.warn("excute callback in writeAndFlush addListener, and callback throw", e);
} finally {
//释放信号量
responseFuture.release();
}
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
//异常处理
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreAsync.getQueueLength(),
this.semaphoreAsync.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
复制代码
在Client端发送请求消息时有个比较重要的数据结构须要注意下:
(1)responseTable—保存请求码与响应关联映射
protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable
复制代码
opaque表示请求发起方在同个链接上不一样的请求标识代码,每次发送一个消息的时候,能够选择同步阻塞/异步非阻塞的方式。不管是哪一种通讯方式,都会保存请求操做码至ResponseFuture的Map映射—responseTable中。
(2)ResponseFuture—保存返回响应(包括回调执行方法和信号量)
public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback,
SemaphoreReleaseOnlyOnce once) {
this.opaque = opaque;
this.timeoutMillis = timeoutMillis;
this.invokeCallback = invokeCallback;
this.once = once;
}
复制代码
对于同步通讯来讲,第3、四个参数为null;而对于异步通讯来讲,invokeCallback是在收到消息响应的时候可以根据responseTable找到请求码对应的回调执行方法,semaphore参数用做流控,当多个线程同时往一个链接写数据时能够经过信号量控制permit同时写许可的数量。
(3)异常发送流程处理—定时扫描responseTable本地缓存
在发送消息时候,若是遇到异常状况(好比服务端没有response返回给客户端或者response因网络而丢失),上面所述的responseTable的本地缓存Map将会出现堆积状况。这个时候须要一个定时任务来专门作responseTable的清理回收。在RocketMQ的客户端/服务端启动时候会产生一个频率为1s调用一次来的定时任务检查全部的responseTable缓存中的responseFuture变量,判断是否已经获得返回, 并进行相应的处理。
public void scanResponseTable() {
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue();
if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
log.warn("remove timeout request, " + rep);
}
}
for (ResponseFuture rf : rfList) {
try {
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
复制代码
3.二、Server端接收消息并进行处理的具体实现
Server端接收消息的处理入口在NettyServerHandler类的channelRead0方法中,其中调用了processMessageReceived方法(这里省略了Netty服务端消息流转的大部分流程和逻辑)。其中服务端最为重要的处理请求方法实现以下:
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
//根据RemotingCommand中的code获取processor和ExecutorService
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
//rpc hook
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
if (rpcHook != null) {
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
//processor处理请求
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
//rpc hook
if (rpcHook != null) {
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
}
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
PLOG.error("process request over, but response failed", e);
PLOG.error(cmd.toString());
PLOG.error(response.toString());
}
} else {
}
}
} catch (Throwable e) {
if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
.equals(e.getClass().getCanonicalName())) {
PLOG.error("process request exception", e);
PLOG.error(cmd.toString());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
//封装requestTask
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
//想线程池提交requestTask
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
+ ", too many requests and system thread pool busy, RejectedExecutionException " //
+ pair.getObject2().toString() //
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
//构建response
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
复制代码
上面的请求处理方法中根据RemotingCommand的请求业务码来匹配到相应的业务处理器;而后生成一个新的线程提交至对应的业务线程池进行异步处理。
1)processorTable—请求业务码与业务处理、业务线程池的映射变量
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
复制代码
我想RocketMQ这种作法是为了给不一样类型的请求业务码指定不一样的处理器Processor处理,同时消息实际的处理并非在当前线程,而是被封装成task放到业务处理器Processor对应的线程池中完成异步执行。(在RocketMQ中能看到不少地方都是这样的处理,这样的设计可以最大程度的保证异步,保证每一个线程都专一处理本身负责的东西)
3、总结
刚开始看RocketMQ源码—RPC通讯模块可能以为略微有点复杂,可是只要可以抓住Client端发送请求消息、Server端接收消息并处理的流程以及回调过程来分析和梳理,那么总体来讲并不复杂。RPC通讯部分也是RocketMQ源码中最重要的部分之一,想要对其中的全过程和细节有更为深入的理解,还须要多在本地环境Debug和分析对应的日志。
note:在漫长的人生道路上,不管是暴风骤雨,或是激流险滩。最要紧的是人生的欲火不能熄灭。