netty的标准设计流程为:编码,解码,检测,连接,其余handler,业务。按照这个流程将rocketmq的netty的实现流程进行细化。java
编码
NettyEncoder
继承MessageToByteEncoder,netty的编码规范要求
将RemotingCommand的请求数据结构数据头:存数据的长度描述;数据体:存数据的内容
RemotingCommand的功能强大:涉及ByteBuffer的频繁操做,实现数据协议的转换
总体的协议格式:<length><header length><header data><body data>
1,4个字节的int型数据存储数据的总长度
2,4个字节的int型数据存储报文头的字节长度
3,存储的头部数据内容
4,存储的报文的数据内容数组
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { //得到请求体消息头,封装为byte的协议要求 ByteBuffer header = remotingCommand.encodeHeader(); //写入头 out.writeBytes(header); //得到消息体 byte[] body = remotingCommand.getBody(); if (body != null) { //写入协议 out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } } }
解码
NettyDecoder
继承LengthFieldBasedFrameDecoder,netty的编码要求,该实现是基于长度的要求解决拆包粘包问题
将ByteBuf的数据输入按照长度的要求,数据解码到ByteBuf的存储体内
将netty封装的ByteBuf对象转换为java的Nio的ByteBuffer对象:ByteBuf.nioBuffer()
全部的操做都是基于RemotingCommand的decode来实现
解码的操做过程基于编码的规范来
1,得到数据的最大有效位置
2,得到第一个int位的值,该值为数据长度,包括头数据和报文数据
3,byteBuffer操做得到头部的数据
4,将数据的最大有效位置减去 4(标识),再减去头数据的长度,获得报文的长度
5,根据报文长度,byteBuffer操做得到报文长度网络
public class NettyDecoder extends LengthFieldBasedFrameDecoder { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); //基于长度的解码器,定义长度的标准,友好处理粘包及拆包 private static final int FRAME_MAX_LENGTH = Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216")); public NettyDecoder() { super(FRAME_MAX_LENGTH, 0, 4, 0, 4); } @Override public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = null; try { //基于父类的解码,共性操做 frame = (ByteBuf) super.decode(ctx, in); if (null == frame) { return null; } //将netty的bytebuf转换为java标准的butebuffer ByteBuffer byteBuffer = frame.nioBuffer(); //按照编码协议,将结果转换为对象 return RemotingCommand.decode(byteBuffer); } catch (Exception e) { log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); RemotingUtil.closeChannel(ctx.channel()); } finally { if (null != frame) { frame.release(); } } return null; } }
Idle
netty的系统的IdleStateHandler,操做,只是配置了指定的最大idle时间
能够实现心跳的检测数据结构
连接管理
NettyConnectManageHandler
继承ChannelDuplexHandler,netty的输入输入及写出的标准设计
将重点对服务的注册,连接激活,连接关闭,连接的检测,连接的操做异常进行管理
连接激活:若是本机的监听存在则设置到永久循环队列中,之间内部业务管理,事件为连接
激活关闭:若是本机的监听存在则设置到永久循环队列中,之间内部业务管理,事件为关闭
连接检测:若是本机的监听存在则设置到永久循环队列中,之间内部业务管理,事件为检测
异常操做:若是本机的监听存在则设置到永久循环队列中,之间内部业务管理,事件为异常
底层是基于无限循环的操做队列内容,对事件进行操做,基于接口设计,不一样的服务端有不一样的业务需求ide
class NettyConnectManageHandler extends ChannelDuplexHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress); super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress); super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress); super.channelActive(ctx); //处理链接的事件 if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel())); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress); super.channelInactive(ctx); //处理链接关闭的事件 if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel())); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.ALL_IDLE)) { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress); RemotingUtil.closeChannel(ctx.channel()); if (NettyRemotingServer.this.channelEventListener != null) { //处理检测的事件 NettyRemotingServer.this .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel())); } } } ctx.fireUserEventTriggered(evt); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress); log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause); //处理异常的事件 if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel())); } RemotingUtil.closeChannel(ctx.channel()); } }
业务处理
NettyServerHandler:服务端;NettyClientHandler:客户端
都继承SimpleChannelInboundHandler,基于netty的标准规范,主要实现数据的接受
实现channelRead0方法,实现标准的数据接收
封装服务端和客户端共享的方法processMessageReceived来处理数据
处理数据中根据数据类型区分为请求类型和相应类型,若是是服务端,更多的处理请求数据,若是是发送端更多的是出来响应数据
若是是请求数据,内部将不一样的事件封装成事件处理模型,根据事件类型得到对应的事件模型来处理
若是是响应数据,主要是针对前面的请求数据,进行结果的设置this
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { //读取网络请求 @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } }
=========================================================================================编码
下面将协议的实现细节增长介绍,主要是编码和解码部分设计
编码消息头的对象netty
public ByteBuffer encodeHeader() { return encodeHeader(this.body != null ? this.body.length : 0); } public ByteBuffer encodeHeader(final int bodyLength) { // 1> header length size //固定长度,头部长度的设置 int length = 4; // 2> header data length //编码头部内容为byte数组,并得到长度 byte[] headerData; headerData = this.headerEncode(); //增长头部长度 length += headerData.length; // 3> body data length //增长报文的长度 length += bodyLength; //数据头的总长度 ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // length 数据的总长度 result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); result.flip(); return result; }
解码是基于编码的规范要求和实现进行逆操做code
public static RemotingCommand decode(final ByteBuffer byteBuffer) { //得到消息走长度 int length = byteBuffer.limit(); //得到第一个int位值 int oriHeaderLen = byteBuffer.getInt(); //获取消息头内容的长度 int headerLength = getHeaderLength(oriHeaderLen); //得到消息头内容 byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); //将消息头解码为对象 RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); //得到报文体的长度 int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; }