rocketmq怎么作序列化的?

首先看一下RemotingCommand的几个重要属性:json

private int code;
    private LanguageCode language = LanguageCode.JAVA;
    private int version = 0;
    private int opaque = requestId.getAndIncrement();
    private int flag = 0;
    private String remark;
    private HashMap<String, String> extFields;
    private transient CommandCustomHeader customHeader;

    private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;

    private transient byte[] body;

  

除了static以外,还有body、extfields是transitent,除此以外都是要直接进行序列化的,默认用fastjson直接序列化。app

这里面的extfields跟customHeader是互相转换的,也就是序列化的时候用前者传入,在代码里面用反序列化后的customer对象作操做。customHeader是一个接口,他有不少实现类,不一样的request请求体除了body不一样以外,还有一个不一样就在于customHeader不一样。ide

 

好比注册broker的:this

 

final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setCompressed(compressed);

            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
            final byte[] body = requestBody.encode(compressed);
            final int bodyCrc32 = UtilAll.crc32(body);

  这里的requestHeader属于RegisterBrokerRequestHeader,就是一种特殊的commandheader,他有本身独特的属性,好比brokeraddr、brokerId等等。通通塞入header里面。netty

  再构造须要的body体,把header跟序列化后的body一块儿:code

 

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
 request.setBody(body);

  

public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
        RemotingCommand cmd = new RemotingCommand();
        cmd.setCode(code);
        cmd.customHeader = customHeader;
        setCmdVersion(cmd);
        return cmd;
    }  

 

把header、body都放入RemotingCommand里面,回到command的属性,只有code、version、remark、opaque才是通用的属性,其余通通经过header、body实现个性化。对于注册broker来讲,个性化的body就是注册的具体内容。对象

 

拿到一个完整的command之后,就是如何在netty里面进行序列化和反序列化了,这里面的body已经被序列化了,还剩下通用属性和header属性。blog

回到encode方法:接口

@Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
            ByteBuffer header = remotingCommand.encodeHeader();
            out.writeBytes(header);
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                out.writeBytes(body);
            }
        } catch (Exception e) {
            log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            if (remotingCommand != null) {
                log.error(remotingCommand.toString());
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }

  

简单来看就是header跟body一块儿放入out里面。进入encodeHeader方法:ip

 

public ByteBuffer encodeHeader() {
        return encodeHeader(this.body != null ? this.body.length : 0);
    }

    public ByteBuffer encodeHeader(final int bodyLength) {
        // 1> header length size
        int length = 4;

        // 2> header data length
        byte[] headerData;
        headerData = this.headerEncode();

        length += headerData.length;

        // 3> body data length
        length += bodyLength;

        ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

        // length
        result.putInt(length);

        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

        // header data
        result.put(headerData);

        result.flip();

        return result;
    }

 

使用LengthFieldBasedFrameDecoder的时候,第一个put的数据就是整体长度:全部header(包括公共的和extrafields的)+ body+markprotocol获得的长度(全部header长度),这里的整体长度不须要包括整体长度自己,也就是putint(length)自己不须要额外算4个字节,这里面多出来的4是因为markprotocol的长度。

 

private byte[] headerEncode() {
        this.makeCustomHeaderToNet();
        if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
            return RocketMQSerializable.rocketMQProtocolEncode(this);
        } else {
            return RemotingSerializable.encode(this);
        }
    }

  在makeCustomHeaderToNet方法里面,经过反射把header的内容所有放入extrafields里面,这样再经过encode(this)就能够直接把header的内容(已经转换成extrafields)和公共属性所有序列化成result。

 

 

 

再看看反序列化:

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        int length = byteBuffer.limit();
        int oriHeaderLen = byteBuffer.getInt();
        int headerLength = getHeaderLength(oriHeaderLen);

        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);

        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }

  虽然咱们在encode的时候塞入了整体length,可是LengthFieldBasedFrameDecoder已经帮咱们解析了去掉了,咱们第一个取数据的时候是后面的markeprotocolType获得的header长度,

根据header长度之后拿到header,他就是除了body以外全部的数据。body能够经过后面的buff拿到,header反序列化后获得的extrafields能够进一步转换成commandHeader,因为在公共属性里面咱们塞入了code,因而咱们能够知道应该转换成具体哪种commandHeader。

相关文章
相关标签/搜索