深度解析RocketMQ Topic的建立机制

微信公众号「后端进阶」,专一后端技术分享:Java、Golang、WEB框架、分布式中间件、服务治理等等。
老司机倾囊相授,带你一路进阶,来不及解释了快上车!

我还记得第一次使用rocketmq的时候,须要去控制台预先建立topic,我当时就想为何要这么设计,因而我决定撸一波源码,带你们从根源上吃透rocketmq topic的建立机制。java

topic在rocketmq的设计思想里,是做为同一个业务逻辑消息的组织形式,它仅仅是一个逻辑上的概念,而在一个topic下又包含若干个逻辑队列,即消息队列,消息内容实际是存放在队列中,而队列又存储在broker中,下面我用一张图来讲明topic的存储模型:git

其实rocketmq中存在两种不一样的topic建立方式,一种是我刚刚说的预先建立,另外一种是自动建立,下面我开车带你们从源码的角度来详细地解读这两种建立机制。github

自动建立

默认状况下,topic不用手动建立,当producer进行消息发送时,会从nameserver拉取topic的路由信息,若是topic的路由信息不存在,那么会默认拉取broker启动时默认建立好名为“TBW102”的Topic:算法

org.apache.rocketmq.common.MixAll:apache

// Will be created at broker when isAutoCreateTopicEnable
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";

自动建立的开关配置在BrokerConfig中,经过autoCreateTopicEnable字段进行控制,后端

org.apache.rocketmq.common.BrokerConfig:缓存

@ImportantField
private boolean autoCreateTopicEnable = true;

在broker启动时,会调用TopicConfigManager的构造方法,autoCreateTopicEnable打开后,会将“TBW102”保存到topicConfigTable中:bash

org.apache.rocketmq.broker.topic.TopicConfigManager#TopicConfigManager:微信

// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
    String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
    TopicConfig topicConfig = new TopicConfig(topic);
    this.systemTopicList.add(topic);
    topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                                 .getDefaultTopicQueueNums());
    topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                                  .getDefaultTopicQueueNums());
    int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
    topicConfig.setPerm(perm);
    this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}

broker会经过发送心跳包将topicConfigTable的topic信息发送给nameserver,nameserver将topic信息注册到RouteInfoManager中。负载均衡

继续看消息发送时是如何从nameserver获取topic的路由信息:

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
  TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
  if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    // 生产者第一次发送消息,topic在nameserver中并不存在
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
  }

  if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
    return topicPublishInfo;
  } else {
    // 第二次请求会将isDefault=true,开启默认“TBW102”从namerserver获取路由信息
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
    return topicPublishInfo;
  }
}

如上方法,topic首次发送消息,此时并不能从namserver获取topic的路由信息,那么接下来会进行第二次请求namserver,这时会将isDefault=true,开启默认“TBW102”从namerserver获取路由信息,此时的“TBW102”topic已经被broker默认注册到nameserver了:

org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:

if (isDefault && defaultMQProducer != null) {
  // 使用默认的“TBW102”topic获取路由信息
  topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);
  if (topicRouteData != null) {
    for (QueueData data : topicRouteData.getQueueDatas()) {
      int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
      data.setReadQueueNums(queueNums);
      data.setWriteQueueNums(queueNums);
    }
  }
}

若是isDefault=true而且defaultMQProducer不为空,从nameserver中获取默认路由信息,此时会获取全部已开启自动建立开关的broker的默认“TBW102”topic路由信息,并保存默认的topic消息队列数量。

org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:

TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
  changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
  log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}

从本地缓存中取出topic的路由信息,因为topic是第一次发送消息,这时本地并无该topic的路由信息,因此对比该topic路由信息对比“TBW102”时changed为true,即有变化,进入如下逻辑:

org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:

// Update sub info
{
  Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
  Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
  while (it.hasNext()) {
    Entry<String, MQConsumerInner> entry = it.next();
    MQConsumerInner impl = entry.getValue();
    if (impl != null) {
      impl.updateTopicSubscribeInfo(topic, subscribeInfo);
    }
  }
}

将“TBW102”topic路由信息构建TopicPublishInfo,并将用topic为key,TopicPublishInfo为value更新本地缓存,到这里就明白了,原来broker们千辛万苦建立“TBW102”topic并将其路由信息注册到nameserver,被新来的topic获取后当即用“TBW102”topic的路由信息构建出一个TopicPublishInfo而且据为己有,因为TopicPublishInfo的路由信息时默认“TBW102”topic,所以真正要发送消息的topic也会被负载发送到“TBW102”topic所在的broker中,这里咱们能够将其称之为偷梁换柱的作法。

当broker接收到消息后,会在msgCheck方法中调用createTopicInSendMessageMethod方法,将topic的信息塞进topicConfigTable缓存中,而且broker会定时发送心跳将topicConfigTable发送给nameserver进行注册。

自动建立与消息发送时获取topic信息的时序图:

预先建立

其实这个叫预先建立彷佛更加适合,即预先在broker中建立好topic的相关信息并注册到nameserver中,而后client端发送消息时直接从nameserver中获取topic的路由信息,可是手动建立从动做上来将更加形象通俗易懂,直接告诉你,你的topic信息须要在控制台上本身手动建立。

预先建立须要经过mqadmin提供的topic相关命令进行建立,执行:

./mqadmin updateTopic

官方给出的各项参数以下:

usage: mqadmin updateTopic [-b <arg>] [-c <arg>] [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>]
-t <arg> [-u <arg>] [-w <arg>]
-b,--brokerAddr <arg>       create topic to which broker
-c,--clusterName <arg>      create topic to which cluster
-h,--help                   Print help
-n,--namesrvAddr <arg>      Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-o,--order <arg>            set topic's order(true|false
-p,--perm <arg>             set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]
-r,--readQueueNums <arg>    set read queue nums
-s,--hasUnitSub <arg>       has unit sub (true|false
-t,--topic <arg>            topic name
-u,--unit <arg>             is unit topic (true|false
-w,--writeQueueNums <arg>   set write queue nums

咱们直接定位到其实现类执行命令的方法:

经过broker模式建立:

org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:

// -b,--brokerAddr <arg>   create topic to which broker
if (commandLine.hasOption('b')) {
  String addr = commandLine.getOptionValue('b').trim();
  defaultMQAdminExt.start();
  defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
  return;
}

从commandLine命令行工具获取运行时-b参数重的broker的地址,defaultMQAdminExt是默认的rocketmq控制台执行的API,此时调用start方法,该方法建立了一个mqClientInstance,它封装了netty通讯的细节,接着就是最重要的一步,调用createAndUpdateTopicConfig将topic配置信息发送到指定的broker上,完成topic的建立。

经过集群模式建立:

org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:

// -c,--clusterName <arg>   create topic to which cluster
else if (commandLine.hasOption('c')) {
  String clusterName = commandLine.getOptionValue('c').trim();
  defaultMQAdminExt.start();
  Set<String> masterSet =
    CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
  for (String addr : masterSet) {
    defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
    System.out.printf("create topic to %s success.%n", addr);
  }
  return;
}

经过集群模式建立与经过broker模式建立的逻辑大体相同,多了根据集群从nameserver获取集群下全部broker的master地址这个步骤,而后在循环发送topic信息到集群中的每一个broker中,这个逻辑跟指定单个broker是一致的。

这也说明了当用集群模式去建立topic时,集群里面每一个broker的queue的数量相同,当用单个broker模式去建立topic时,每一个broker的queue数量能够不一致。

预先建立时序图:

什么时候须要预先建立Topic?

建议线下开启,线上关闭,不是我说的,是官方给出的建议:

rocketmq为何要这么设计呢?通过一波源码深度解析后,我获得了我想要的答案:

根据上面的源码分析,咱们得出,rocketmq在发送消息时,会先去获取topic的路由信息,若是topic是第一次发送消息,因为nameserver没有topic的路由信息,因此会再次以“TBW102”这个默认topic获取路由信息,假设broker都开启了自动建立开关,那么此时会获取全部broker的路由信息,消息的发送会根据负载算法选择其中一台Broker发送消息,消息到达broker后,发现本地没有该topic,会在建立该topic的信息塞进本地缓存中,同时会将topic路由信息注册到nameserver中,那么这样就会形成一个后果:之后全部该topic的消息,都将发送到这台broker上,若是该topic消息量很是大,会形成某个broker上负载过大,这样消息的存储就达不到负载均衡的目的了。

扫面下方二维码,关注「Java科表明」,开车带你临摹各类源码,来不及解释了快上车! 
公众号「后端进阶」,专一后端技术分享!

相关文章
相关标签/搜索