【RocketMQ】消息的发送

Producer

发送方式

生产者发送消息有三种方式java

  • 同步(Sync):发送方线程发送后同步堵塞等待SendResult,若failed则重试下一个broker
  • 异步(Async):发送方线程发送后无需等待返回结果(返回的是null),当Broker返回结果后,生产者使用回调函数异步处理,超时可抛出Timeout异常
  • 单向(OneWay):发送方不等待broker响应也没有回调函数触发,速度快但可靠性弱

生产者类图设计

个人理解是分为三层:数组

一、Producer实例层:该层用于建立Topic、消息请求的建立、执行消息发送时的先后置处理、发送不一样方式的消息,有几个关键参数在DefaultMQProducer中。从类图中看出,DefaultMQProducerImpl是具体的实现者,DefaultMQProducer实际是提供一些参数的。缓存

参数 默认值 解释
defaultTopicQueueNums 4 一个主题建立4个消息队列
sendMsgTimeout 3000ms 发送消息超时时间
compressMsgBodyOverHowmuch 4K 消息体(Msg Body)长度超过4K,须要压缩
retryTimesWhenSendFailed 2 同步模式下消息发送失败重试次数
retryTimesWhenSendAsyncFailed 2 异步模式下消息发送失败重试次数
maxMessageSize 4M 消息最大长度

二、实例管理&服务层:该层主要有MQClientInstance实现,生产者和消费者都会调用这个类,该类具体包含如下功能:安全

  • 注册&取消生产者(消费者)
  • 寻找&维护Broker路由
  • 发送&查询消息
  • 基础服务:包含向全部Broker发送心跳、定时任务执行(见表格)、拉取消息、负载均衡、启动生产者。
fetchNameServerAddr 间隔2分钟利用Http获取NameServer的地址
updateTopicRouteInfoFromNameServer 间隔30s请求NameServer,获取最新的Topic路由关系
cleanOfflineBroker 间隔30s清理不在线的Broker
sendHeartbeatToAllBrokerWithLock 间隔30s向全部Broker发送心跳
persistAllConsumerOffset 间隔5s把每一个队列消费到的位置保存到本地文件或者Broker
adjustThreadPool 间隔1分钟调整线程池,只针对于PushConsumer,调整策略是若是一个消费者未消费消息总和超过100000,增长线程CoreSize,小于80000,减少线程CoreSize,实际RocketMQ未实现该功能

三、网络通讯层网络

RocketMQ网络通讯依赖netty,所以RemotingClient接口的实际实现是NettyRemotingClient,不管是消息的消费仍是生产,最终都会由它执行,具体负责消息发送拉取、接受Broker回传结果并异步处理(执行回调)、消息处理器管理(不一样的请求类型使用不一样的处理器,思想很好,实现时做者仍是使用了同一个Processor)。负载均衡

【MQClientAPIImpl】
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);

this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);

this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);

this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);

this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);

this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);

在这一层,NettyRemotingClient不会感知到上层是发送消息仍是拉取消息,对它来讲,看到的都是RemotingCommand(具体封装了请求类型),只有同步、异步、单向执行,因此这三个方式和生产者的那三个(同步发送、异步发送、单向发送)仍是有所不一样的。异步

发送流程

介绍完Producer发送消息所经历的层次,分析具体的发送流程。函数

一、查询本地缓存是否存储了TopicPublishInfo,不然从NameServer获取。
二、根据选择策略获取待发送队列。
三、获取消息队列对应的broker实际IP。
四、设置消息Unique ID,zip压缩消息。
五、检查信息合法性,调用NettyClient发送消息fetch

TopicPublishInfo包含队列优先级、消息队列列表、路由信息以及一个线程安全的index坐标。this

public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
}

队列选择

队列选择是经过MQFaultStrategy的selectOneMessageQueue方法完成。MQFaultStrategy重要的属性包含

//延迟容错对象,维护延迟Brokers的信息
LatencyFaultTolerance<String /*brokerName*/> latencyFaultTolerance = new LatencyFaultToleranceImpl();
//延迟容错开关
boolean sendLatencyFaultEnable = false;
//延迟级别数组
long[] latencyMax = { 50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L };
//不可用时长数组
long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

MQFaultStrategy中最重要的属性是latencyFaultTolerance,它维护了那些消息发送延迟较高的brokers的信息,同时延迟的时间长短对应了延迟级别latencyMax 和时长notAvailableDuration ,sendLatencyFaultEnable 控制了是否开启发送消息延迟功能。LatencyFaultToleranceImpl负责判断队列是否可用、更新Broker的延迟时间。

public boolean isAvailable(final String name) {
     final FaultItem faultItem = this.faultItemTable.get(name);
     if (faultItem != null) {
         return faultItem.isAvailable();
     }
     //若是队列中没找到,说明没有延迟记录
     return true;
}

//计算Broker被禁用时间是否到了
public boolean isAvailable() {
    return (System.currentTimeMillis() - startTimestamp) >= 0;
}

 

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            //计算不可用时间持续多久
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
}

private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }

        return 0;
}
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        FaultItem old = this.faultItemTable.get(name);
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
}

最后再来看看selectOneMessageQueue究竟作了什么?

一、获取上一次使用以后的队列,从这个队列开始判断该队列所在的Broker是否可用

二、若是该Broker可用,则返回该队列

三、若是发现都不符合要求,则至少须要选择一个相对好的broker,并返回对应的队列

消息的发送先写到这里。下一篇会提到如何消费消息。

相关文章
相关标签/搜索