都知道Rocketmq中有ConsumerGroup的概念。在集群模式下,多台服务器配置相同的ConsumerGroup,可以使得每次只有一台服务器消费消息(注意,但不保证只消费一次,存在网络抖动的状况)。那么,笔者就很疑惑,Rocketmq是如何实现这个模式的?如何保证只有一台服务器消费?java
虽然答案很简单,但倒是一个很好的带着问题看源码的机会。web
从图中能够看到,MQ主要投递消息和拉取消息两个环节。安全
众多的架构都是顺应时代潮流而来,Rocketmq的结构体系固然也不是阿里所首创的,而是依据AMQP协议而来。Rocketmq中的Producer,Broker,以及Consumer都是依据AMQP中的概念衍生出来的。因此这里不妨讲讲AMQP(Advanced Message Queuing Protocal,高级消息队列协议),便于你们更好的理解技术的发展过程。服务器
paper下载 http://www.amqp.org/specification/0-9-1/amqp-org-download网络
固然基于这样一个协议,不仅仅是RocketMq一个闪耀在消息队列选型中,还有不一样的消息队列。架构
https://mp.weixin.qq.com/s/B1D-J_1wpaqj0sxcmaArbQ框架
主要分为了3大阵营:dom
固然,若是熟悉了AMQP协议,你也能够选择自研一个消息队列ui
https://zhuanlan.zhihu.com/p/28967866this
了解了一些背景,来看下RocketMQ中消息的投递过程。仍是那个具体的问题,RocketMQ是如何选择一个队列来投递的呢?
这里提一下,RocketMq中全部关于生产者和消费者的代码都在client包下。打开源码,能够看到Procuder下有个selector包,看到这个包是否是感到就是它的感受。
能够看到selector下的三个类都是实现了MessageQueueSelector,来看下MessageQueueSelector的代码。
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
public class MessageQueue {
private String topic;
private String brokerName;
private int queueId;
}
复制代码
看一下哪里调用了MessageQueueSelector.select(),发现是DefaultMQProducerImpl,那么能够确认就是由MessageQueueSelector提供了选择哪一个队列。
RocketMq提供了3种不一样的选择队列方式:
细心的同窗确定会问那么队列数量是无限大的吗?这个能够查阅RocketMq的使用手册,默认的队列数量是4 (defaultTopicQueueNums: 4),固然你也能够选择本身配置。
同时不知道有没有同窗找错地儿,笔者刚开始是找错地儿了,在TopicPublishInfo中也找到了个selectOneMessageQueue,代码以下。
public class TopicPublishInfo{
// 不一样版本,代码有些不一样,逻辑相似
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName != null) {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return null;
}
else {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
return this.messageQueueList.get(pos);
}
}
}
复制代码
查了下调用方发现是MQFaultStrategy,看来是Rocketmq消费失败时候,会将消息从新投递到不一样的队列,这样在集群模式下可以保证分布到不一样机器消费。(是否是还有疑惑,为何能保证到不一样机器,请往下看)
这里是比较难理解的一步,首先查阅RocketMQ手册能够看到:
RocketMQ 的 Consumer 都是从 Broker 拉消息来消费,可是为了能作到实时收消息,RocketMQ 使用长轮询方式,能够保证消息实时性同 Push 方式一致。返种长轮询方式相似亍 Web QQ 收収消息机制。请参考如下信息了解更多。http://www.ibm.com/developerworks/cn/web/wa-lo-comet/
虽然解释的很详细,可是对新手仍是不是很友好。简单的来讲,就是使用长轮询,客户端发起请求和服务端先链接上,可是若是服务端没有数据,这是链接仍是hold住,当有数据push给客户端的时候才关闭链接。这样不但保证了消费者不会被上游的消息打垮,也保证了消息的实时性。
那么还有个问题,Consumer如何从MessageQueue上拉取消息呢?是随机拉吗?
不妨来看下MQPullConsumer,DefaultMQPullConsumer就是继承于它。
public class MQPullConsumer {
// 拉消息,非阻塞
//
// @param mq from which message queue
// @param subExpression 订阅的tag,只支持"tag1 || tag2 || tag3"
// @param offset 标志位
// @param maxNums 消费最大数量
PullResult pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
复制代码
能够看到MessageQueue是传进来的,这就比较尴尬了,实在没法理解是何时决定好从哪一个队列拉取消息的。幸好有万能的搜索引擎,
https://zhuanlan.zhihu.com/p/25140744
RocketMq有专门的类AllocateMessageQueueStrategy.class,就藏在Client.Consumer.rebalance包下。
每一次Consumer数量的变动,都会触发AllocateMessageQueueStrategy。也就是每一次Consumer拉取的队列都是固定好的。
如今,在回过头来看看第一张RocketMQ的架构图,是否是以为画的很透彻。