rocketmq之源码分析netty流程及细节(七)

netty的标准设计流程为:编码,解码,检测,连接,其余handler,业务。按照这个流程将rocketmq的netty的实现流程进行细化。java

编码
    NettyEncoder
        继承MessageToByteEncoder,netty的编码规范要求
        将RemotingCommand的请求数据结构数据头:存数据的长度描述;数据体:存数据的内容
            RemotingCommand的功能强大:涉及ByteBuffer的频繁操做,实现数据协议的转换
            总体的协议格式:<length><header length><header data><body data>
            1,4个字节的int型数据存储数据的总长度
            2,4个字节的int型数据存储报文头的字节长度
            3,存储的头部数据内容
            4,存储的报文的数据内容数组

public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    @Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
            //得到请求体消息头,封装为byte的协议要求
            ByteBuffer header = remotingCommand.encodeHeader();
            //写入头
            out.writeBytes(header);
            //得到消息体
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                //写入协议
                out.writeBytes(body);
            }
        } catch (Exception e) {
            log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            if (remotingCommand != null) {
                log.error(remotingCommand.toString());
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }
}

解码
    NettyDecoder
        继承LengthFieldBasedFrameDecoder,netty的编码要求,该实现是基于长度的要求解决拆包粘包问题
        将ByteBuf的数据输入按照长度的要求,数据解码到ByteBuf的存储体内
            将netty封装的ByteBuf对象转换为java的Nio的ByteBuffer对象:ByteBuf.nioBuffer()
            全部的操做都是基于RemotingCommand的decode来实现
            解码的操做过程基于编码的规范来
            1,得到数据的最大有效位置
            2,得到第一个int位的值,该值为数据长度,包括头数据和报文数据
            3,byteBuffer操做得到头部的数据
            4,将数据的最大有效位置减去 4(标识),再减去头数据的长度,获得报文的长度
            5,根据报文长度,byteBuffer操做得到报文长度网络

public class NettyDecoder extends LengthFieldBasedFrameDecoder {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    //基于长度的解码器,定义长度的标准,友好处理粘包及拆包
    private static final int FRAME_MAX_LENGTH =
        Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));

    public NettyDecoder() {
        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
    }

    @Override
    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = null;
        try {
            //基于父类的解码,共性操做
            frame = (ByteBuf) super.decode(ctx, in);
            if (null == frame) {
                return null;
            }

            //将netty的bytebuf转换为java标准的butebuffer
            ByteBuffer byteBuffer = frame.nioBuffer();

            //按照编码协议,将结果转换为对象
            return RemotingCommand.decode(byteBuffer);
        } catch (Exception e) {
            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            RemotingUtil.closeChannel(ctx.channel());
        } finally {
            if (null != frame) {
                frame.release();
            }
        }

        return null;
    }
}

Idle
    netty的系统的IdleStateHandler,操做,只是配置了指定的最大idle时间
    能够实现心跳的检测数据结构

连接管理
    NettyConnectManageHandler
        继承ChannelDuplexHandler,netty的输入输入及写出的标准设计
        将重点对服务的注册,连接激活,连接关闭,连接的检测,连接的操做异常进行管理
            连接激活:若是本机的监听存在则设置到永久循环队列中,之间内部业务管理,事件为连接
            激活关闭:若是本机的监听存在则设置到永久循环队列中,之间内部业务管理,事件为关闭
            连接检测:若是本机的监听存在则设置到永久循环队列中,之间内部业务管理,事件为检测
            异常操做:若是本机的监听存在则设置到永久循环队列中,之间内部业务管理,事件为异常
        底层是基于无限循环的操做队列内容,对事件进行操做,基于接口设计,不一样的服务端有不一样的业务需求ide

class NettyConnectManageHandler extends ChannelDuplexHandler {
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
        super.channelActive(ctx);

        //处理链接的事件
        if (NettyRemotingServer.this.channelEventListener != null) {
            NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
        super.channelInactive(ctx);

        //处理链接关闭的事件
        if (NettyRemotingServer.this.channelEventListener != null) {
            NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state().equals(IdleState.ALL_IDLE)) {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
                RemotingUtil.closeChannel(ctx.channel());
                if (NettyRemotingServer.this.channelEventListener != null) {
                    //处理检测的事件
                    NettyRemotingServer.this
                        .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                }
            }
        }

        ctx.fireUserEventTriggered(evt);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
        log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);

        //处理异常的事件
        if (NettyRemotingServer.this.channelEventListener != null) {
            NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
        }

        RemotingUtil.closeChannel(ctx.channel());
    }
}

业务处理
    NettyServerHandler:服务端;NettyClientHandler:客户端
    都继承SimpleChannelInboundHandler,基于netty的标准规范,主要实现数据的接受
        实现channelRead0方法,实现标准的数据接收
        封装服务端和客户端共享的方法processMessageReceived来处理数据
        处理数据中根据数据类型区分为请求类型和相应类型,若是是服务端,更多的处理请求数据,若是是发送端更多的是出来响应数据
        若是是请求数据,内部将不一样的事件封装成事件处理模型,根据事件类型得到对应的事件模型来处理
        若是是响应数据,主要是针对前面的请求数据,进行结果的设置this

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

    //读取网络请求
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        processMessageReceived(ctx, msg);
    }
}

=========================================================================================编码

下面将协议的实现细节增长介绍,主要是编码和解码部分设计

编码消息头的对象netty

public ByteBuffer encodeHeader() {
    return encodeHeader(this.body != null ? this.body.length : 0);
}

public ByteBuffer encodeHeader(final int bodyLength) {
    // 1> header length size
    //固定长度,头部长度的设置
    int length = 4;

    // 2> header data length
    //编码头部内容为byte数组,并得到长度
    byte[] headerData;
    headerData = this.headerEncode();

    //增长头部长度
    length += headerData.length;

    // 3> body data length
    //增长报文的长度
    length += bodyLength;

    //数据头的总长度
    ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

    // length 数据的总长度
    result.putInt(length);

    // header length 
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

    // header data
    result.put(headerData);

    result.flip();

    return result;
}

解码是基于编码的规范要求和实现进行逆操做code

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
    //得到消息走长度
    int length = byteBuffer.limit();
    //得到第一个int位值
    int oriHeaderLen = byteBuffer.getInt();
    //获取消息头内容的长度
    int headerLength = getHeaderLength(oriHeaderLen);

    //得到消息头内容
    byte[] headerData = new byte[headerLength];
    byteBuffer.get(headerData);

    //将消息头解码为对象
    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

    //得到报文体的长度
    int bodyLength = length - 4 - headerLength;
    byte[] bodyData = null;
    if (bodyLength > 0) {
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);
    }
    cmd.body = bodyData;

    return cmd;
}
相关文章
相关标签/搜索