不少网友会问,为何明明集群中有多台Broker服务器,autoCreateTopicEnable设置为true,表示开启Topic自动建立,但新建立的Topic的路由信息只包含在其中一台Broker服务器上,这是为何呢?json
指望值:为了消息发送的高可用,但愿新建立的Topic在集群中的每台Broker上建立对应的队列,避免Broker的单节点故障。缓存
现象截图以下:
正如上图所示,自动建立的topicTest5的路由信息:服务器
咱们再来看一下RocketMQ默认topic的路由信息截图以下:
从图中能够默认Topic的路由信息为broker-a、broker-b上各8个队列。3d
默认Topic的路由信息是如何建立的?server
回到本文的主题:autoCreateTopicEnable,开启自动建立主题,试想一下,若是生产者向一个不存在的主题发送消息时,上面的任何一个步骤都没法获取一个不存在的主题的路由信息,那该如何处理这种状况呢?中间件
在RocketMQ中,若是autoCreateTopicEnable设置为true,消息发送者向NameServer查询主题的路由消息返回空时,会尝试用一个系统默认的主题名称(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC),此时消息发送者获得的路由信息为:
但问题就来了,默认Topic在集群的每一台Broker上建立8个队列,那问题来了,为啥新建立的Topic只在一个Broker上建立4个队列?对象
舒适提示:本文不会详细跟踪整个建立过程,只会点出源码的关键入口点,如想详细了解NameServer路由消息、消息发送高可用的实现原理,建议查阅笔者的书籍《RocketMQ技术内幕》第2、三章。blog
Step1:在Broker启动流程中,会构建TopicConfigManager对象,其构造方法中首先会判断是否开启了容许自动建立主题,若是启用了自动建立主题,则向topicConfigTable中添加默认主题的路由信息。
TopicConfigManager构造方法
队列
备注:该topicConfigTable中全部的路由信息,会随着Broker向Nameserver发送心跳包中,Nameserver收到这些信息后,更新对应Topic的路由信息表。图片
BrokerConfig的defaultTopicQueueNum默认为8。两台Broker服务器都会运行上面的过程,故最终Nameserver中关于默认主题的路由信息中,会包含两个Broker分别各8个队列信息。
Step2:生产者寻找路由信息
生产者首先向NameServer查询路由信息,因为是一个不存在的主题,故此时返回的路由信息为空,RocketMQ会使用默认的主题再次寻找,因为开启了自动建立路由信息,NameServer会向生产者返回默认主题的路由信息。而后从返回的路由信息中选择一个队列(默认轮询)。消息发送者从Nameserver获取到默认的Topic的队列信息后,队列的个数会改变吗?答案是会的,其代码以下:
MQClientInstance#updateTopicRouteInfoFromNameServer
舒适提示:消息发送者在到默认路由信息时,其队列数量,会选择DefaultMQProducer#defaultTopicQueueNums与Nameserver返回的的队列数取最小值,DefaultMQProducer#defaultTopicQueueNums默认值为4,故自动建立的主题,其队列数量默认为4。
Step3:发送消息
DefaultMQProducerImpl#sendKernelImpl
在消息发送时的请求报文中,设置默认topic名称,消息发送topic名称,使用的队列数量为DefaultMQProducer#defaultTopicQueueNums,即默认为4。
Step4:Broker端收到消息后的处理流程
服务端收到消息发送的处理器为:SendMessageProcessor,在处理消息发送时,会调用super.msgCheck方法:
AbstractSendMessageProcessor#msgCheck
在Broker端,首先会使用TopicConfigManager根据topic查询路由信息,若是Broker端不存在该主题的路由配置(路由信息),此时若是Broker中存在默认主题的路由配置信息,则根据消息发送请求中的队列数量,在Broker建立新Topic的路由信息。这样Broker服务端就会存在主题的路由信息。
在Broker端的topic配置管理器中存在的路由信息,一会向Nameserver发送心跳包,汇报到Nameserver,另外一方面会有一个定时任务,定时存储在broker端,具体路径为${ROCKET_HOME}/store/config/topics.json中,这样在Broker关闭后再重启,并不会丢失路由信息。
广大读者朋友,跟踪到这一步的时候,你们应该对启用自动建立主题机制时,新主题是的路由信息是如何建立的,为了方便理解,给出建立主题序列图:
通过上面自动建立路由机制的建立流程,咱们能够比较容易的分析得出以下结论:
由于开启了自动建立路由信息,消息发送者根据Topic去NameServer没法获得路由信息,但接下来根据默认Topic从NameServer是能拿到路由信息(在每一个Broker中,存在8个队列),由于两个Broker在启动时都会向NameServer汇报路由信息。此时消息发送者缓存的路由信息是2个Broker,每一个Broker默认4个队列(缘由见3.2.1:Step2的分析)。消息发送者而后按照轮询机制,发送第一条消息选择(broker-a的messageQueue:0),向Broker发送消息,Broker服务器在处理消息时,首先会查看本身的路由配置管理器(TopicConfigManager)中的路由信息,此时不存在对应的路由信息,而后尝试查询是否存在默认Topic的路由信息,若是存在,说明启用了autoCreateTopicEnable,则在TopicConfigManager中建立新Topic的路由信息,此时存在与Broker服务端的内存中,而后本次消息发送结束。此时,在NameServer中还不存在新建立的Topic的路由信息。
这里有三个关键点:
缘由就分析到这里了,如今咱们还能够的大胆假设,开启autoCreateTopicEnable机制,什么状况会在两个Broker上都建立队列,其实,咱们只须要连续快速的发送9条消息,就有可能在2个Broker上都建立队列,验证代码以下:
~~~
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 9; i++) {
try {
Message msg = new Message("TopicTest10" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
~~~
验证结果如图所示:
本文就分析到这里了,你们若是喜欢这篇文章,麻烦你们帮忙点点赞,同时你们也能够给做者留言,告知在使用RocketMQ的过程当中遇到的疑难杂症,与做者互动。
做者简介:《RocketMQ技术内幕》做者,维护公众号:中间件兴趣圈,主要关注目前主流的开源中间件,例如Netty、Mycat、Dubbo、ElasticSearch、ElasticJob、RocketMQ、Mybatis等。