RocketMQ的消息投递分分为两种:一种是生产者
往MQ Broker中投递;另一种则是MQ broker 往消费者
投递(这种投递
的说法是从消息传递的角度阐述的,实际上底层是消费者
从MQ broker 中Pull拉取的)。本文将从模型的角度来阐述这两种机制。html
RocketMQ 的消息模型总体并不复杂,以下图所示:java
一个Topic(消息主题)
可能对应多个实际的消息队列(MessgeQueue)
在底层实现上,为了提升MQ的可用性和灵活性,一个Topic在实际存储的过程当中,采用了多队列的方式,具体形式如上图所示。每一个消息队列在使用中应当保证先入先出(FIFO,First In First Out)的方式进行消费。
那么,基于这种模型,就会引伸出两个问题:web
消息的系统间传递时,会跨越不一样的网络载体,这会致使消息的传播没法保证其有序请算法
Queue队列
轮询算法投递默认状况下,采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每个Queue队列
的消息投递数量尽量均匀,算法以下图所示:apache
/** * 根据 TopicPublishInfo Topic发布信息对象中维护的index,每次选择队列时,都会递增 * 而后根据 index % queueSize 进行取余,达到轮询的效果 * */ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return tpInfo.selectOneMessageQueue(lastBrokerName); } /** * TopicPublishInfo Topic发布信息对象中 */ public class TopicPublishInfo { //基于线程上下文的计数递增,用于轮询目的 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { //轮询计算 int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } } public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); } }
Queue队列
轮询算法和消息投递延迟最小
的策略投递默认的投递方式比较简单,可是也暴露了一个问题,就是有些Queue队列
可能因为自身数量积压等缘由,可能在投递的过程比较长,对于这样的Queue队列
会影响后续投递的效果。
基于这种现象,RocketMQ在每发送一个MQ消息后,都会统计一下消息投递的时间延迟
,根据这个时间延迟
,能够知道往哪些Queue队列
投递的速度快。
在这种场景下,会优先使用消息投递延迟最小
的策略,若是没有生效,再使用Queue队列轮询
的方式。设计模式
public class MQFaultStrategy { /** * 根据 TopicPublishInfo 内部维护的index,在每次操做时,都会递增, * 而后根据 index % queueList.size(),使用了轮询的基础算法 * */ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { // 从queueid 为 0 开始,依次验证broker 是否有效,若是有效 int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { //基于index和队列数量取余,肯定位置 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } // 从延迟容错broker列表中挑选一个容错性最好的一个 broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { // 取余挑选其中一个队列 final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } // 取余挑选其中一个队列 return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); } }
上述两种投递方式属于对消息投递的时序性没有要求的场景,这种投递的速度和效率比较高。而在有些场景下,须要保证同类型消息投递和消费的顺序性。
例如,假设如今有TOPIC TOPIC_SALE_ORDER
,该 Topic下有4个Queue队列
,该Topic用于传递订单的状态变迁,假设订单有状态:未支付
、已支付
、发货中(处理中)
、发货成功
、发货失败
。
在时序上,生产者从时序上能够生成以下几个消息:
订单T0000001:未支付
--> 订单T0000001:已支付
--> 订单T0000001:发货中(处理中)
--> 订单T0000001:发货失败
消息发送到MQ中以后,可能因为轮询投递的缘由,消息在MQ的存储可能以下:
网络
这种状况下,咱们但愿消费者
消费消息的顺序和咱们发送是一致的,然而,有上述MQ的投递和消费机制,咱们没法保证顺序是正确的,对于顺序异常的消息,消费者
即便有必定的状态容错,也不能彻底处理好这么多种随机出现组合状况。架构
基于上述的状况,RockeMQ
采用了这种实现方案:对于相同订单号的消息,经过必定的策略,将其放置在一个 queue队列中
,而后消费者
再采用必定的策略(一个线程独立处理一个queue
,保证处理消息的顺序性),可以保证消费的顺序性
app
至于消费者是如何保证消费的顺序行的,后续再详细展开,咱们先看生产者
是如何能将相同订单号的消息发送到同一个queue队列
的:
生产者在消息投递的过程当中,使用了 MessageQueueSelector
做为队列选择的策略接口,其定义以下:dom
package org.apache.rocketmq.client.producer; import java.util.List; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; public interface MessageQueueSelector { /** * 根据消息体和参数,从一批消息队列中挑选出一个合适的消息队列 * @param mqs 待选择的MQ队列选择列表 * @param msg 待发送的消息体 * @param arg 附加参数 * @return 选择后的队列 */ MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg); }
相应地,目前RocketMQ提供了以下几种实现:
默认实现:
投递策略 | 策略实现类 | 说明 |
---|---|---|
随机分配策略 | SelectMessageQueueByRandom | 使用了简单的随机数选择算法 |
基于Hash分配策略 | SelectMessageQueueByHash | 根据附加参数的Hash值,按照消息队列列表的大小取余数,获得消息队列的index |
基于机器机房位置分配策略 | SelectMessageQueueByMachineRoom | 开源的版本没有具体的实现,基本的目的应该是机器的就近原则分配 |
如今大概看下策略的代码实现:
public class SelectMessageQueueByHash implements MessageQueueSelector { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = arg.hashCode(); if (value < 0) { value = Math.abs(value); } value = value % mqs.size(); return mqs.get(value); } }
实际的操做代码样例以下,经过订单号做为hash运算对象,就能保证相同订单号的消息可以落在相同的queue队列上
。
rocketMQTemplate.asyncSendOrderly(saleOrderTopic + ":" + tag, msg,saleOrderId /*传入订单号做为hash运算对象*/, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("SALE ORDER NOTIFICATION SUCCESS:{}",sendResult.getMsgId()); } @Override public void onException(Throwable throwable) { //exception happens } });
queue队列
?RocketMQ对于消费者消费消息有两种形式:
BROADCASTING
:广播式消费,这种模式下,一个消息会被通知到每个消费者
CLUSTERING
: 集群式消费,这种模式下,一个消息最多只会被投递到一个消费者
上进行消费广播式
的消息模式比较简单,下面咱们介绍下集群式
。对于使用了消费模式为MessageModel.CLUSTERING
进行消费时,须要保证一个消息
在整个集群中只须要被消费一次。实际上,在RoketMQ底层,消息指定分配给消费者的实现,是经过queue队列
分配给消费者
的方式完成的:也就是说,消息
分配的单位是消息所在的queue队列
。即:
将
queue队列
指定给特定的消费者
后,queue队列
内的全部消息将会被指定到消费者
进行消费。
RocketMQ定义了策略接口AllocateMessageQueueStrategy
,对于给定的消费者分组
,和消息队列列表
、消费者列表
,当前消费者
应当被分配到哪些queue队列
,定义以下:
/** * 为消费者分配queue的策略算法接口 */ public interface AllocateMessageQueueStrategy { /** * Allocating by consumer id * * @param consumerGroup 当前 consumer群组 * @param currentCID 当前consumer id * @param mqAll 当前topic的全部queue实例引用 * @param cidAll 当前 consumer群组下全部的consumer id set集合 * @return 根据策略给当前consumer分配的queue列表 */ List<MessageQueue> allocate( final String consumerGroup, final String currentCID, final List<MessageQueue> mqAll, final List<String> cidAll ); /** * 算法名称 * * @return The strategy name */ String getName(); }
相应地,RocketMQ提供了以下几种实现:
算法名称 | 含义 |
---|---|
AllocateMessageQueueAveragely |
平均分配算法 |
AllocateMessageQueueAveragelyByCircle |
基于环形平均分配算法 |
AllocateMachineRoomNearby |
基于机房临近原则算法 |
AllocateMessageQueueByMachineRoom |
基于机房分配算法 |
AllocateMessageQueueConsistentHash |
基于一致性hash算法 |
AllocateMessageQueueByConfig |
基于配置分配算法 |
为了讲述清楚上述算法的基本原理,咱们先假设一个例子,下面全部的算法将基于这个例子讲解。
假设当前同一个topic下有queue队列
10
个,消费者共有4
个,以下图所示:
下面依次介绍其原理:
AllocateMessageQueueAveragely
- 平均分配算法这里所谓的平均分配算法,并非指的严格意义上的彻底平均,如上面的例子中,10个queue,而消费者只有4个,没法是整除关系,除了整除以外的多出来的queue,将依次根据消费者的顺序均摊。
按照上述例子来看,10/4=2
,即表示每一个消费者
平均均摊2个queue;而10%4=2
,即除了均摊以外,多出来2个queue
尚未分配,那么,根据消费者的顺序consumer-1
、consumer-2
、consumer-3
、consumer-4
,则多出来的2个queue
将分别给consumer-1
和consumer-2
。最终,分摊关系以下:
consumer-1
:3个
;consumer-2
:3个
;consumer-3
:2个
;consumer-4
:2个
,以下图所示:
其代码实现很是简单:
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { private final InternalLogger log = ClientLogger.getLog(); @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { if (currentCID == null || currentCID.length() < 1) { throw new IllegalArgumentException("currentCID is empty"); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException("mqAll is null or mqAll empty"); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException("cidAll is null or cidAll empty"); } List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", consumerGroup, currentCID, cidAll); return result; } int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result; } @Override public String getName() { return "AVG"; } }
AllocateMessageQueueAveragelyByCircle
-基于环形平均算法环形平均算法,是指根据消费者的顺序,依次在由queue队列
组成的环形图中逐个分配。具体流程以下所示:
这种算法最终分配的结果是:
consumer-1
: #0,#4,#8
consumer-2
: #1, #5, # 9
consumer-3
: #2,#6
consumer-4
: #3,#7
其代码实现以下所示:
/** * Cycle average Hashing queue algorithm */ public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy { private final InternalLogger log = ClientLogger.getLog(); @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { if (currentCID == null || currentCID.length() < 1) { throw new IllegalArgumentException("currentCID is empty"); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException("mqAll is null or mqAll empty"); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException("cidAll is null or cidAll empty"); } List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", consumerGroup, currentCID, cidAll); return result; } int index = cidAll.indexOf(currentCID); for (int i = index; i < mqAll.size(); i++) { if (i % cidAll.size() == index) { result.add(mqAll.get(i)); } } return result; } @Override public String getName() { return "AVG_BY_CIRCLE"; } }
AllocateMachineRoomNearby
-基于机房临近原则算法该算法使用了装饰者设计模式
,对分配策略进行了加强。通常在生产环境,若是是微服务架构下,RocketMQ集群的部署多是在不一样的机房中部署,其基本结构可能以下图所示:
对于跨机房的场景,会存在网络、稳定性和隔离心的缘由,该算法会根据queue
的部署机房位置和消费者consumer
的位置,过滤出当前消费者consumer
相同机房的queue队列
,而后再结合上述的算法,如基于平均分配算法在queue队列
子集的基础上再挑选。相关代码实现以下:
@Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { //省略部分代码 List<MessageQueue> result = new ArrayList<MessageQueue>(); //将MQ按照 机房进行分组 Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>(); for (MessageQueue mq : mqAll) { String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq); if (StringUtils.isNoneEmpty(brokerMachineRoom)) { if (mr2Mq.get(brokerMachineRoom) == null) { mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>()); } mr2Mq.get(brokerMachineRoom).add(mq); } else { throw new IllegalArgumentException("Machine room is null for mq " + mq); } } //将消费者 按照机房进行分组 Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>(); for (String cid : cidAll) { String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid); if (StringUtils.isNoneEmpty(consumerMachineRoom)) { if (mr2c.get(consumerMachineRoom) == null) { mr2c.put(consumerMachineRoom, new ArrayList<String>()); } mr2c.get(consumerMachineRoom).add(cid); } else { throw new IllegalArgumentException("Machine room is null for consumer id " + cid); } } List<MessageQueue> allocateResults = new ArrayList<MessageQueue>(); //1.过滤出当前机房内的MQ队列子集,在此基础上使用分配算法挑选 String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID); List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom); List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom); if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) { allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom)); } //2.不在同一机房,按照通常策略进行操做 for (String machineRoom : mr2Mq.keySet()) { if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll)); } } return allocateResults; }
AllocateMessageQueueByMachineRoom
- 基于机房分配算法该算法适用于属于同一个机房内部的消息,去分配queue。这种方式很是明确,基于上面的机房临近分配算法
的场景,这种更完全,直接指定基于机房消费的策略。这种方式具备强约定性,好比broker
名称按照机房的名称进行拼接,在算法中经过约定解析进行分配。
其代码实现以下:
/** * Computer room Hashing queue algorithm, such as Alipay logic room */ public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy { private Set<String> consumeridcs; @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { List<MessageQueue> result = new ArrayList<MessageQueue>(); int currentIndex = cidAll.indexOf(currentCID); if (currentIndex < 0) { return result; } List<MessageQueue> premqAll = new ArrayList<MessageQueue>(); for (MessageQueue mq : mqAll) { String[] temp = mq.getBrokerName().split("@"); if (temp.length == 2 && consumeridcs.contains(temp[0])) { premqAll.add(mq); } } int mod = premqAll.size() / cidAll.size(); int rem = premqAll.size() % cidAll.size(); int startIndex = mod * currentIndex; int endIndex = startIndex + mod; for (int i = startIndex; i < endIndex; i++) { result.add(mqAll.get(i)); } if (rem > currentIndex) { result.add(premqAll.get(currentIndex + mod * cidAll.size())); } return result; } @Override public String getName() { return "MACHINE_ROOM"; } public Set<String> getConsumeridcs() { return consumeridcs; } public void setConsumeridcs(Set<String> consumeridcs) { this.consumeridcs = consumeridcs; }
AllocateMessageQueueConsistentHash
基于一致性hash算法使用这种算法,会将consumer消费者
做为Node节点构形成一个hash环,而后queue队列
经过这个hash环来决定被分配给哪一个consumer消费者
。
其基本模式以下:
什么是一致性hash 算法 ?
一致性hash算法用于在分布式系统中,保证数据的一致性而提出的一种基于hash环实现的算法,限于文章篇幅,不在这里展开描述,有兴趣的同窗能够参考下 别人的博文:一致性哈希算法原理
算法实现上也不复杂,以下图所示:
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { //省略部分代码 List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", consumerGroup, currentCID, cidAll); return result; } Collection<ClientNode> cidNodes = new ArrayList<ClientNode>(); for (String cid : cidAll) { cidNodes.add(new ClientNode(cid)); } //使用consumer id 构造hash环 final ConsistentHashRouter<ClientNode> router; //for building hash ring if (customHashFunction != null) { router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction); } else { router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt); } //依次为 队列分配 consumer List<MessageQueue> results = new ArrayList<MessageQueue>(); for (MessageQueue mq : mqAll) { ClientNode clientNode = router.routeNode(mq.toString()); if (clientNode != null && currentCID.equals(clientNode.getKey())) { results.add(mq); } } return results; }
AllocateMessageQueueByConfig
–基于配置分配算法这种算法单纯基于配置的,很是简单,实际使用中可能用途不大。代码以下:
public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy { private List<MessageQueue> messageQueueList; @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { return this.messageQueueList; } @Override public String getName() { return "CONFIG"; } public List<MessageQueue> getMessageQueueList() { return messageQueueList; } public void setMessageQueueList(List<MessageQueue> messageQueueList) { this.messageQueueList = messageQueueList; } }
默认状况下,消费者使用的是AllocateMessageQueueAveragely
算法,也能够本身指定:
public class DefaultMQPushConsumer{ /** * Default constructor. */ public DefaultMQPushConsumer() { this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely()); } /** * Constructor specifying consumer group, RPC hook and message queue allocating algorithm. * * @param consumerGroup Consume queue. * @param rpcHook RPC hook to execute before each remoting command. * @param allocateMessageQueueStrategy message queue allocating algorithm. */ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.consumerGroup = consumerGroup; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); } }
以上是从设计上简单介绍了RocketMQ的投递机制,若是想了解详细的设计原理,可关注下方的个人公众帐号,会同步更新,谢谢支持 !
做者水平有限,欢迎留言指正吐槽!