跟我学RocketMQ之批量消息发送源码解析

上篇文章 跟我学RocketMQ之消息发送源码解析 中,咱们已经对普通消息的发送流程进行了详细的解释,可是因为篇幅问题没有展开讲解批量消息的发送。本文中,咱们就一块儿来集中分析一下批量消息的发送是怎样的逻辑。java

DefaultProducer.send

RocketMQ提供了批量发送消息的API,一样在DefaultProducer.java中数组

@Override
    public SendResult send(
        Collection<Message> msgs) throws MQClientException, RemotingException, 
            MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(batch(msgs));
    }复制代码

它的参数为Message集合,也就是一批消息。它的另一个重载方法提供了发送超时时间参数框架

@Override
    public SendResult send(Collection<Message> msgs,
        long timeout) throws MQClientException, RemotingException,
             MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(batch(msgs), timeout);
    }复制代码

能够看到是将消息经过batch()方法打包为单条消息,咱们看一下batch方法的逻辑ide

DefaultProducer.batch

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {复制代码
// 声明批量消息体
        MessageBatch msgBatch;
        try {复制代码
// 从Message的list生成批量消息体MessageBatch
            msgBatch = MessageBatch.generateFromList(msgs);
            for (Message message : msgBatch) {
                Validators.checkMessage(message, this);
                MessageClientIDSetter.setUniqID(message);
                message.setTopic(withNamespace(message.getTopic()));
            }
            // 设置消息体,此时的消息体已是处理事后的批量消息体
            msgBatch.setBody(msgBatch.encode());
        } catch (Exception e) {
            throw new MQClientException("Failed to initiate the MessageBatch", e);
        }
        // 设置topic
        msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
        return msgBatch;
    }复制代码

从代码能够看到,核心思想是将一批消息(Collection msgs)打包为MessageBatch对象,咱们看下MessageBatch的声明 源码分析

public class MessageBatch extends Message implements Iterable<Message> {复制代码
private final List<Message> messages;复制代码
private MessageBatch(List<Message> messages) {
            this.messages = messages;
        }复制代码

能够看到MessageBatch继承自Message,持有List 引用。 学习

咱们接着看一下generateFromList方法this

MessageBatch.generateFromList

public static MessageBatch generateFromList(Collection<Message> messages) {
        assert messages != null;
        assert messages.size() > 0;复制代码
// 首先实例化一个Message的list
        List<Message> messageList = new ArrayList<Message>(messages.size());
        Message first = null;复制代码
// 对messages集合进行遍历
        for (Message message : messages) {复制代码
// 判断延时级别,若是大于0抛出异常,缘由为:批量消息发送不支持延时
            if (message.getDelayTimeLevel() > 0) {
                throw new UnsupportedOperationException
                    ("TimeDelayLevel in not supported for batching");
            }复制代码
// 判断topic是否以 **"%RETRY%"** 开头,若是是,
            // 则抛出异常,缘由为:批量发送消息不支持消息重试
            if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                throw new UnsupportedOperationException("Retry Group is not supported for batching");
            }复制代码
// 判断集合中的每一个Message的topic与批量发送topic是否一致,
            // 若是不一致则抛出异常,缘由为:
            // 批量消息中的每一个消息实体的Topic要和批量消息总体的topic保持一致。
            if (first == null) {
                first = message;
            } else {
                if (!first.getTopic().equals(message.getTopic())) {
                    throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
                }复制代码
// 判断批量消息的首个Message与其余的每一个Message实体的等待消息存储状态是否相同,
                // 若是不一样则报错,缘由为:批量消息中每一个消息的waitStoreMsgOK状态均应该相同。
                if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
                    throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
                }
            }复制代码
// 校验经过后,将message实体添加到messageList中
            messageList.add(message);
        }复制代码
// 将处理完成的messageList做为构造方法,
        // 初始化MessageBatch实体,并设置topic以及isWaitStoreMsgOK状态。
        MessageBatch messageBatch = new MessageBatch(messageList);复制代码
messageBatch.setTopic(first.getTopic());
        messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
        return messageBatch;
    }复制代码

总结一下,generateFromList方法对调用方设置的Collection 集合进行遍历,通过前置校验以后,转换为MessageBatch对象并返回给DefaultProducer.batch方法中,咱们接着看DefaultProducer.batch的逻辑。 编码

到此,经过MessageBatch.generateFromList方法,将发送端传入的一批消息集合转换为了MessageBatch实体。spa

DefaultProducer.batch

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {复制代码
// 声明批量消息体
        MessageBatch msgBatch;
        try {
            // 从Message的list生成批量消息体MessageBatch
            msgBatch = MessageBatch.generateFromList(msgs);
            for (Message message : msgBatch) {
                Validators.checkMessage(message, this);
                MessageClientIDSetter.setUniqID(message);
                message.setTopic(withNamespace(message.getTopic()));
            }
            // 设置消息体,此时的消息体已是处理事后的批量消息体
            msgBatch.setBody(msgBatch.encode());
        } catch (Exception e) {
            throw new MQClientException("Failed to initiate the MessageBatch", e);
        }
        // 设置topic
        msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
        return msgBatch;
    }复制代码

注意下面这行代码:.net

// 设置消息体,此时的消息体已是处理事后的批量消息体
        msgBatch.setBody(msgBatch.encode());复制代码

这里对MessageBatch进行消息编码处理,经过调用MessageBatch的encode方法实现,代码逻辑以下:

public byte[] encode() {
        return MessageDecoder.encodeMessages(messages);
    }复制代码

能够看到是经过静态方法 encodeMessages(List messages) 实现的。

咱们看一下encodeMessages方法的逻辑:

public static byte[] encodeMessages(List<Message> messages) {
        //TO DO refactor, accumulate in one buffer, avoid copies
        List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
        int allSize = 0;
        for (Message message : messages) {复制代码
// 遍历messages集合,分别对每一个Message实体进行编码操做,转换为byte[]
            byte[] tmp = encodeMessage(message);
            // 将转换后的单个Message的byte[]设置到encodedMessages中
            encodedMessages.add(tmp);
            // 批量消息的二进制数据长度随实际消息体递增
            allSize += tmp.length;
        }
        byte[] allBytes = new byte[allSize];
        int pos = 0;
        for (byte[] bytes : encodedMessages) {
            // 遍历encodedMessages,按序复制每一个Message的二进制格式消息体
            System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
            pos += bytes.length;
        }
        // 返回批量消息总体的消息体二进制数组
        return allBytes;
    }复制代码

encodeMessages的逻辑在注释中分析的已经比较清楚了,其实就是遍历messages,并按序拼接每一个Message实体的二进制数组格式消息体并返回。

咱们能够继续看一下单个Message是如何进行编码的,调用了 MessageDecoder.encodeMessage(message) 方法,逻辑以下:

public static byte[] encodeMessage(Message message) {
        //only need flag, body, properties
        byte[] body = message.getBody();
        int bodyLen = body.length;
        String properties = messageProperties2String(message.getProperties());
        byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
        //note properties length must not more than Short.MAX
        short propertiesLength = (short) propertiesBytes.length;
        int sysFlag = message.getFlag();
        int storeSize = 4 // 1 TOTALSIZE
            + 4 // 2 MAGICCOD
            + 4 // 3 BODYCRC
            + 4 // 4 FLAG
            + 4 + bodyLen // 4 BODY
            + 2 + propertiesLength;
        ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
        // 1 TOTALSIZE
        byteBuffer.putInt(storeSize);复制代码
// 2 MAGICCODE
        byteBuffer.putInt(0);复制代码
// 3 BODYCRC
        byteBuffer.putInt(0);复制代码
// 4 FLAG
        int flag = message.getFlag();
        byteBuffer.putInt(flag);复制代码
// 5 BODY
        byteBuffer.putInt(bodyLen);
        byteBuffer.put(body);复制代码
// 6 properties
        byteBuffer.putShort(propertiesLength);
        byteBuffer.put(propertiesBytes);复制代码
return byteBuffer.array();
    }复制代码

这里其实就是将消息按照RocektMQ的消息协议进行编码,格式为:

消息总长度          ---  4字节
    魔数                --- 4字节
    bodyCRC校验码       --- 4字节
    flag标识            --- 4字节
    body长度            --- 4字节
    消息体              --- 消息体实际长度N字节
    属性长度            --- 2字节
    扩展属性            --- N字节复制代码

经过encodeMessage方法处理以后,消息便会被编码为固定格式,最终会被Broker端进行处理并持久化。

其余

到此即是批量消息发送的源码分析,实际上RocketMQ在处理批量消息的时候是将其解析为单个消息再发送的,这样就在底层统一了单条消息、批量消息发送的逻辑,让整个框架的设计更加健壮,也便于咱们进行理解学习。

后续的发送流程这里就再也不重复展开了,感兴趣的同窗能够移步咱们的上一篇文章查看

跟我学RocketMQ之消息发送源码解析

批量消息的源码分析就暂时告一段落,更多的源码分析随后奉上,感谢您的阅读。

版权声明: 原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以连接形式标明本文地址。
相关文章
相关标签/搜索