@(rocketmq源码解读)java
先解释一下题目,咱们假设有一个Producer
和两个Consumer
,Producer
向TOPICA
和TOPICB
发送消息,两个Consumer
分别订阅两个topic
。咱们看下这时候会出现的问题,以及根据源码分析一下为何出现问题。ide
现象其实仍是比较隐蔽的,broker
上会打印:the consumer's subscription not exist,group ...
的日志(Consumer
端也会打印相似的日志)。源码分析
还会有一些subscription changed, group: ...
相似的日志,而且若是仔细的话还会发现,其中一个消费者消费消息时,另一个就不会消费。ui
咱们看一下为何会致使这样的问题,一开始生看或者debug都是很难下手,这时候可能就须要使用必杀技(通常不外传那种)——问。this
问天问地,谷歌百度必应。我直接问了一个大神——芋艿。大神说这种状况会出问题,具体缘由他也记不清了,致使这种现象的问题应该是消费关系不停地相互覆盖。spa
好了,听到这句话咱们就有入口了,至少知道应该从Broker
上找起。debug
顺藤摸瓜找到了缘由,下面一块儿看一下源码。日志
首先咱们知道,消费者的两种实现(推和拉)中都维护一个MQClientInstance
,这个类很是重要,在启动消费者的时候,都会去启动这个类,咱们看下启动的代码,其中有这么一部分:code
// Start various schedule tasks
this.startScheduledTask();
复制代码
这里启动了好多定时任务,咱们追进去看一下:ip
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
//定时发送心跳
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
复制代码
这里咱们看到,消费者会定时发送心跳给Broker
,咱们继续追进去,最后找到sendHeartbeatToAllBroker
方法:
//给全部的broker发送心跳
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}
try {
//真正发送心跳的部分
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr);
}
}
}
}
}
}
}
复制代码
这里会向全部的Broker
发送心跳,咱们根据咱们的例子,这时候Broker
是一台,咱们再去Broker
上看一下Broker
如何处理心跳消息,咱们根据发送的是HEART_BEAT
类型的消息,能够在Broker
上看到,这类消息使用ClientManageProcessor
处理,咱们看下处理心跳的部分(heartBeat
方法):
//循环全部发送过来的数据
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
//根据消费组的名字获取broker上记录的消费消息
SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
//注册消费者
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
if (changed) {
log.info("registerConsumer info changed {} {}",
data.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel())
);
}
}
复制代码
咱们能够看到,broker
会根据consumer
放过来的消息,获取本身这边记录的消费者订阅的信息,注意,获取时是按照消费组获取的,咱们看下registerConsumer
:
//根据消费组获取消费者信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
//注意这里,这里consumerTable的键就是group
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
复制代码
咱们注意ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
这里,这句话告诉咱们consumerTable
中存放的消费者信息是按照消费组来的,那么一个组的消费信息若是不同,按照咱们的例子中,则订阅了TOPICA
的消费者心跳信息告诉Broker
:咱们组订阅的是TOPICA
!而后Broker
就记录下来了。过了一会订阅了TOPICB
的消费者心跳信息高速Broker
:咱们订阅的是TOPICB
!
这里就致使了订阅消息相互覆盖,那么拉取消息时,确定有一个消费者无法拉到消息,由于Broker
上查询不到订阅信息。
至此咱们就知道了致使上述现象的缘由。