生产者发送消息有三种方式java
个人理解是分为三层:数组
一、Producer实例层:该层用于建立Topic、消息请求的建立、执行消息发送时的先后置处理、发送不一样方式的消息,有几个关键参数在DefaultMQProducer中。从类图中看出,DefaultMQProducerImpl是具体的实现者,DefaultMQProducer实际是提供一些参数的。缓存
参数 | 默认值 | 解释 |
defaultTopicQueueNums | 4 | 一个主题建立4个消息队列 |
sendMsgTimeout | 3000ms | 发送消息超时时间 |
compressMsgBodyOverHowmuch | 4K | 消息体(Msg Body)长度超过4K,须要压缩 |
retryTimesWhenSendFailed | 2 | 同步模式下消息发送失败重试次数 |
retryTimesWhenSendAsyncFailed | 2 | 异步模式下消息发送失败重试次数 |
maxMessageSize | 4M | 消息最大长度 |
二、实例管理&服务层:该层主要有MQClientInstance实现,生产者和消费者都会调用这个类,该类具体包含如下功能:安全
fetchNameServerAddr | 间隔2分钟利用Http获取NameServer的地址 |
updateTopicRouteInfoFromNameServer | 间隔30s请求NameServer,获取最新的Topic路由关系 |
cleanOfflineBroker | 间隔30s清理不在线的Broker |
sendHeartbeatToAllBrokerWithLock | 间隔30s向全部Broker发送心跳 |
persistAllConsumerOffset | 间隔5s把每一个队列消费到的位置保存到本地文件或者Broker |
adjustThreadPool | 间隔1分钟调整线程池,只针对于PushConsumer,调整策略是若是一个消费者未消费消息总和超过100000,增长线程CoreSize,小于80000,减少线程CoreSize,实际RocketMQ未实现该功能 |
三、网络通讯层网络
RocketMQ网络通讯依赖netty,所以RemotingClient接口的实际实现是NettyRemotingClient,不管是消息的消费仍是生产,最终都会由它执行,具体负责消息发送拉取、接受Broker回传结果并异步处理(执行回调)、消息处理器管理(不一样的请求类型使用不一样的处理器,思想很好,实现时做者仍是使用了同一个Processor)。负载均衡
【MQClientAPIImpl】 this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
在这一层,NettyRemotingClient不会感知到上层是发送消息仍是拉取消息,对它来讲,看到的都是RemotingCommand(具体封装了请求类型),只有同步、异步、单向执行,因此这三个方式和生产者的那三个(同步发送、异步发送、单向发送)仍是有所不一样的。异步
介绍完Producer发送消息所经历的层次,分析具体的发送流程。函数
一、查询本地缓存是否存储了TopicPublishInfo,不然从NameServer获取。
二、根据选择策略获取待发送队列。
三、获取消息队列对应的broker实际IP。
四、设置消息Unique ID,zip压缩消息。
五、检查信息合法性,调用NettyClient发送消息fetch
TopicPublishInfo包含队列优先级、消息队列列表、路由信息以及一个线程安全的index坐标。this
public class TopicPublishInfo { private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private TopicRouteData topicRouteData; }
队列选择是经过MQFaultStrategy的selectOneMessageQueue方法完成。MQFaultStrategy重要的属性包含
//延迟容错对象,维护延迟Brokers的信息 LatencyFaultTolerance<String /*brokerName*/> latencyFaultTolerance = new LatencyFaultToleranceImpl(); //延迟容错开关 boolean sendLatencyFaultEnable = false; //延迟级别数组 long[] latencyMax = { 50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L }; //不可用时长数组 long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
MQFaultStrategy中最重要的属性是latencyFaultTolerance,它维护了那些消息发送延迟较高的brokers的信息,同时延迟的时间长短对应了延迟级别latencyMax 和时长notAvailableDuration ,sendLatencyFaultEnable 控制了是否开启发送消息延迟功能。LatencyFaultToleranceImpl负责判断队列是否可用、更新Broker的延迟时间。
public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } //若是队列中没找到,说明没有延迟记录 return true; } //计算Broker被禁用时间是否到了 public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { //计算不可用时间持续多久 long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; } public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
最后再来看看selectOneMessageQueue究竟作了什么?
一、获取上一次使用以后的队列,从这个队列开始判断该队列所在的Broker是否可用
二、若是该Broker可用,则返回该队列
三、若是发现都不符合要求,则至少须要选择一个相对好的broker,并返回对应的队列
消息的发送先写到这里。下一篇会提到如何消费消息。