RocketMq中MessageQueue的分配

都知道Rocketmq中有ConsumerGroup的概念。在集群模式下,多台服务器配置相同的ConsumerGroup,可以使得每次只有一台服务器消费消息(注意,但不保证只消费一次,存在网络抖动的状况)。那么,笔者就很疑惑,Rocketmq是如何实现这个模式的?如何保证只有一台服务器消费?java

虽然答案很简单,但倒是一个很好的带着问题看源码的机会。web

RocketMq结构

RocketMQ架构示意图

从图中能够看到,MQ主要投递消息和拉取消息两个环节。安全

众多的架构都是顺应时代潮流而来,Rocketmq的结构体系固然也不是阿里所首创的,而是依据AMQP协议而来。Rocketmq中的Producer,Broker,以及Consumer都是依据AMQP中的概念衍生出来的。因此这里不妨讲讲AMQP(Advanced Message Queuing Protocal,高级消息队列协议),便于你们更好的理解技术的发展过程。服务器

paper下载 http://www.amqp.org/specification/0-9-1/amqp-org-download网络

  • Broker: 接收和分发的应用
  • Virtual host:出于多租户和安全因素,把AMQP的基本组件划分到一个虚拟分组中。各个租户之间是网络隔离的,相似Linux中的namespace概念(可自行Google)
  • Connection:publisher/consumer 和broker之间的TCP链接
  • Channel:是相较于Connection更加轻量的链接,是Connection上的逻辑链接
  • Exchange: 负责将message分发到不一样的Queue中
  • Queue: 消息最终会落到Queue中,消息由Broker push给Consumer或者由Consumer来pull消息
  • Binding:exchange和queue之间的消息路由策略
AMQP架构示意图

消息队列的3大类型

固然基于这样一个协议,不仅仅是RocketMq一个闪耀在消息队列选型中,还有不一样的消息队列。架构

https://mp.weixin.qq.com/s/B1D-J_1wpaqj0sxcmaArbQ框架

主要分为了3大阵营:dom

  • 有Broker 重Topic流:kafka,JMS
  • 有Broker 轻Topic流: RocketMQ
  • 无Broker: ZeroMQ

固然,若是熟悉了AMQP协议,你也能够选择自研一个消息队列ui

https://zhuanlan.zhihu.com/p/28967866this

了解了一些背景,来看下RocketMQ中消息的投递过程。仍是那个具体的问题,RocketMQ是如何选择一个队列来投递的呢?

Producer如何投递消息到不一样队列

这里提一下,RocketMq中全部关于生产者和消费者的代码都在client包下。打开源码,能够看到Procuder下有个selector包,看到这个包是否是感到就是它的感受。

能够看到selector下的三个类都是实现了MessageQueueSelector,来看下MessageQueueSelector的代码。

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

public class MessageQueue {
	private String topic;
	private String brokerName;
	private int queueId;
}
复制代码

看一下哪里调用了MessageQueueSelector.select(),发现是DefaultMQProducerImpl,那么能够确认就是由MessageQueueSelector提供了选择哪一个队列。

RocketMq提供了3种不一样的选择队列方式:

  • SelectMessageQueueByHash
  • SelectMessageQueueByMachineRoom
  • SelectMessageQueueByRandom

默认队列数量

细心的同窗确定会问那么队列数量是无限大的吗?这个能够查阅RocketMq的使用手册,默认的队列数量是4 (defaultTopicQueueNums: 4),固然你也能够选择本身配置。

同时不知道有没有同窗找错地儿,笔者刚开始是找错地儿了,在TopicPublishInfo中也找到了个selectOneMessageQueue,代码以下。

public class TopicPublishInfo{
    // 不一样版本,代码有些不一样,逻辑相似
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName != null) {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }

            return null;
        }
        else {
            int index = this.sendWhichQueue.getAndIncrement();
            int pos = Math.abs(index) % this.messageQueueList.size();
            return this.messageQueueList.get(pos);
        }
    }
}
复制代码

查了下调用方发现是MQFaultStrategy,看来是Rocketmq消费失败时候,会将消息从新投递到不一样的队列,这样在集群模式下可以保证分布到不一样机器消费。(是否是还有疑惑,为何能保证到不一样机器,请往下看)

Consumer如何从消息队列获取消息

这里是比较难理解的一步,首先查阅RocketMQ手册能够看到:

RocketMQ 的 Consumer 都是从 Broker 拉消息来消费,可是为了能作到实时收消息,RocketMQ 使用长轮询方式,能够保证消息实时性同 Push 方式一致。返种长轮询方式相似亍 Web QQ 收収消息机制。请参考如下信息了解更多。http://www.ibm.com/developerworks/cn/web/wa-lo-comet/

虽然解释的很详细,可是对新手仍是不是很友好。简单的来讲,就是使用长轮询,客户端发起请求和服务端先链接上,可是若是服务端没有数据,这是链接仍是hold住,当有数据push给客户端的时候才关闭链接。这样不但保证了消费者不会被上游的消息打垮,也保证了消息的实时性。

那么还有个问题,Consumer如何从MessageQueue上拉取消息呢?是随机拉吗?

不妨来看下MQPullConsumer,DefaultMQPullConsumer就是继承于它。

public class MQPullConsumer {

    // 拉消息,非阻塞
    // 
    // @param mq from which message queue
    // @param subExpression 订阅的tag,只支持"tag1 || tag2 || tag3"
    // @param offset 标志位
    // @param maxNums 消费最大数量
    PullResult pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
复制代码

能够看到MessageQueue是传进来的,这就比较尴尬了,实在没法理解是何时决定好从哪一个队列拉取消息的。幸好有万能的搜索引擎,

https://zhuanlan.zhihu.com/p/25140744

RocketMq有专门的类AllocateMessageQueueStrategy.class,就藏在Client.Consumer.rebalance包下。

  • AllocateMessageQueueAveragely
  • AllocateMessageQueueAveragelyByCircle
  • AllocateMessageQueueByConfig
  • AllocateMessageQueueByMachineRoom
  • AllocateMessageQueueConsistentHash

每一次Consumer数量的变动,都会触发AllocateMessageQueueStrategy。也就是每一次Consumer拉取的队列都是固定好的。

如今,在回过头来看看第一张RocketMQ的架构图,是否是以为画的很透彻。

总结

  1. 任何的框架都有它衍生变化的历史,了解架构变化的历史,才能更好的理解一个框架
  2. 好好研读使用手册,包含了不少架构的细节
  3. 带着问题去研读源码
相关文章
相关标签/搜索