rocketmq源码解析consumer、producer处理过程④

说在前面java

DefaultMQProducer、DefaultMQPullConsumer、DefaultMQPushConsumer 处理过程算法

 

源码解析apache

PushConsumer缓存

public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");consumer.subscribe("Jodie_topic_1023", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//wrong time format 2017_0422_221800consumer.setConsumeTimestamp("20170422221800");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");    }

进入方法,建立DefaultMQPushConsumer,org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#DefaultMQPushConsumer(java.lang.String)微信

public DefaultMQPushConsumer(final String consumerGroup) {//        这里采用平均散列队列算法this(consumerGroup, null, new AllocateMessageQueueAveragely());    }

进入方法,org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#subscribe(java.lang.String, java.lang.String)并发

@Overridepublic void subscribe(String topic, String subExpression) throws MQClientException {this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);    }

进入方法,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#subscribe(java.lang.String, java.lang.String)app

public void subscribe(String topic, String subExpression) throws MQClientException {try {//            构建topic订阅数据,默认集群消费=》SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subExpression);//            存储topic订阅数据=》this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);if (this.mQClientFactory != null) {//                同步向全部的broker发送心跳监测=》this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}} catch (Exception e) {throw new MQClientException("subscription exception", e);}    }

进入方法,构建topic订阅数据,默认集群消费,org.apache.rocketmq.common.filter.FilterAPI#buildSubscriptionData介绍过了。负载均衡

进入方法,同步向全部的broker发送心跳监测,org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBrokerWithLock介绍过了。ide

进入方法,org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#startoop












public synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;//                检查配置=》this.checkConfig();//                copy订阅配置=》this.copySubscription();if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}//                建立消费者this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());//                注册过滤消息的钩子方法this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING://                            广播 本地存储this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING://                            集群 远程存储this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}//                加载offset=》this.offsetStore.load();if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}//                消息消息服务启动=》this.consumeMessageService.start();boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown();throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.checkClientInBroker();this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();this.mQClientFactory.rebalanceImmediately();    }

进入方法,检查配置,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#checkConfig




















private void checkConfig() throws MQClientException {Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());if (null == this.defaultMQPushConsumer.getConsumerGroup()) {throw new MQClientException("consumerGroup is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}//        消费组名称不能和DEFAULT_CONSUMER同样if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {throw new MQClientException("consumerGroup can not equal "+ MixAll.DEFAULT_CONSUMER_GROUP+ ", please specify another one."+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}//        必须指定消息消费模式,默认集群消费if (null == this.defaultMQPushConsumer.getMessageModel()) {throw new MQClientException("messageModel is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}//        默认从最后的offset消费if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {throw new MQClientException("consumeFromWhere is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}//        默认消费时间半小时以前Date dt = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS);if (null == dt) {throw new MQClientException("consumeTimestamp is invalid, the valid format is yyyyMMddHHmmss,but received "+ this.defaultMQPushConsumer.getConsumeTimestamp()+ " " + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null);}// allocateMessageQueueStrategy 能够执行消息分配算法if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) {throw new MQClientException("allocateMessageQueueStrategy is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// subscriptionif (null == this.defaultMQPushConsumer.getSubscription()) {throw new MQClientException("subscription is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// messageListenerif (null == this.defaultMQPushConsumer.getMessageListener()) {throw new MQClientException("messageListener is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}//        书否顺序消费boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly;//        是否集群消费boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently;if (!orderly && !concurrently) {throw new MQClientException("messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// consumeThreadMin 消费最小线程数大于1小于1000,默认20if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1|| this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {throw new MQClientException("consumeThreadMin Out of range [1, 1000]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// consumeThreadMax 消费最大线程数大于1小于1000,默认64if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {throw new MQClientException("consumeThreadMax Out of range [1, 1000]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// consumeThreadMin can't be larger than consumeThreadMax 消费最小线程数不能大于最大线程数if (this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {throw new MQClientException("consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") "+ "is larger than consumeThreadMax (" + this.defaultMQPushConsumer.getConsumeThreadMax() + ")",null);}// consumeConcurrentlyMaxSpan 并发消费最大 大于1小于65535,默认2000if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1|| this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {throw new MQClientException("consumeConcurrentlyMaxSpan Out of range [1, 65535]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// pullThresholdForQueue 拉取消息批量大小 大于1小于65536 默认1000if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) {throw new MQClientException("pullThresholdForQueue Out of range [1, 65535]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// pullThresholdForTopicif (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) {if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) {throw new MQClientException("pullThresholdForTopic Out of range [1, 6553500]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}}// pullThresholdSizeForQueue 拉取消息批量大小 大于1M小于1G,默认100if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) {throw new MQClientException("pullThresholdSizeForQueue Out of range [1, 1024]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) {// pullThresholdSizeForTopicif (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) {throw new MQClientException("pullThresholdSizeForTopic Out of range [1, 102400]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}}// pullInterval 拉取消息频次 大于0小于65535 默认0if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {throw new MQClientException("pullInterval Out of range [0, 65535]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// consumeMessageBatchMaxSize 消费消息批量最大 大于1小于1024 默认1if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1|| this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {throw new MQClientException("consumeMessageBatchMaxSize Out of range [1, 1024]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// pullBatchSize 批量拉取大小 大于1小于1024,默认32if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) {throw new MQClientException("pullBatchSize Out of range [1, 1024]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}    }

返回方法,copy订阅配置,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#copySubscription


private void copySubscription() throws MQClientException {try {Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();if (sub != null) {for (final Map.Entry<String, String> entry : sub.entrySet()) {final String topic = entry.getKey();final String subString = entry.getValue();//                    构建订阅配置=》SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subString);//                    存储订阅配置=》this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);}}if (null == this.messageListenerInner) {this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();}switch (this.defaultMQPushConsumer.getMessageModel()) {//                广播case BROADCASTING:break;//                    集群消费case CLUSTERING://                    重试topicfinal String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());//                    构建重试topic订阅配置SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),retryTopic, SubscriptionData.SUB_ALL);//                    存储订阅配置=》this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);break;default:break;}} catch (Exception e) {throw new MQClientException("subscription exception", e);}    }

返回方法,消息消息服务启动,org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#start

public void start() {this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {//                清除过时的消息=》cleanExpireMsg();}//          15min 消费超时}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);    }

进入方法,清除过时的消息,org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#cleanExpireMsg

private void cleanExpireMsg() {Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();while (it.hasNext()) {Map.Entry<MessageQueue, ProcessQueue> next = it.next();ProcessQueue pq = next.getValue();//            =》pq.cleanExpiredMsg(this.defaultMQPushConsumer);}    }

进入方法,org.apache.rocketmq.client.impl.consumer.ProcessQueue#cleanExpiredMsg




public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {return;}int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;for (int i = 0; i < loop; i++) {MessageExt msg = null;try {this.lockTreeMap.readLock().lockInterruptibly();try {if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {msg = msgTreeMap.firstEntry().getValue();} else {break;}} finally {this.lockTreeMap.readLock().unlock();}} catch (InterruptedException e) {log.error("getExpiredMsg exception", e);}try {//                发送消息=》pushConsumer.sendMessageBack(msg, 3);log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());try {this.lockTreeMap.writeLock().lockInterruptibly();try {if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {try {removeMessage(Collections.singletonList(msg));} catch (Exception e) {log.error("send expired msg exception", e);}}} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("getExpiredMsg exception", e);}} catch (Exception e) {log.error("send expired msg exception", e);}}    }

进入方法,发送消息,org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#sendMessageBack(org.apache.rocketmq.common.message.MessageExt, int)

@Overridepublic void sendMessageBack(MessageExt msg, int delayLevel)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);    }

进入方法,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#sendMessageBack




public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {try {//            按brokerName找到master broker地址String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());//            =》this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,//                最大消费次数,默认16this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());} catch (Exception e) {log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);//            建立重试topic消息Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());String originMsgId = MessageAccessor.getOriginMessageId(msg);MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);newMsg.setFlag(msg.getFlag());MessageAccessor.setProperties(newMsg, msg.getProperties());MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());//            发送消息=》this.mQClientFactory.getDefaultMQProducer().send(newMsg);}    }

进入方法,org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack



public void consumerSendMessageBack(final String addr,final MessageExt msg,final String consumerGroup,final int delayLevel,final long timeoutMillis,final int maxConsumeRetryTimes) throws RemotingException, MQBrokerException, InterruptedException {ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);requestHeader.setGroup(consumerGroup);requestHeader.setOriginTopic(msg.getTopic());requestHeader.setOffset(msg.getCommitLogOffset());requestHeader.setDelayLevel(delayLevel);requestHeader.setOriginMsgId(msg.getMsgId());requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);//        同步执行RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());    }

返回方法,发送消息,org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)

@Overridepublic SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//        =》return this.defaultMQProducerImpl.send(msg);    }

进入方法,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message)介绍过了。

返回方法,org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#start

public void start() {if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {//                    锁定消费队列=》ConsumeMessageOrderlyService.this.lockMQPeriodically();}}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);}    }

进入方法,锁定消费队列,org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#lockMQPeriodically

public synchronized void lockMQPeriodically() {if (!this.stopped) {//            =》this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();}    }

进入方法,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#lockAll






public void lockAll() {//        按brokerName构建处理队列=》HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<MessageQueue>> entry = it.next();final String brokerName = entry.getKey();final Set<MessageQueue> mqs = entry.getValue();if (mqs.isEmpty())continue;//            找到broker master地址=》FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);if (findBrokerResult != null) {LockBatchRequestBody requestBody = new LockBatchRequestBody();requestBody.setConsumerGroup(this.consumerGroup);requestBody.setClientId(this.mQClientFactory.getClientId());requestBody.setMqSet(mqs);try {//                    批量锁定消息队列=》Set<MessageQueue> lockOKMQSet =this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);for (MessageQueue mq : lockOKMQSet) {ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {if (!processQueue.isLocked()) {log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);}//                            锁定处理队列processQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis());}}//                    消息队列锁定失败=》for (MessageQueue mq : mqs) {if (!lockOKMQSet.contains(mq)) {ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {processQueue.setLocked(false);log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);}}}} catch (Exception e) {log.error("lockBatchMQ exception, " + mqs, e);}}}    }

进入方法,按brokerName构建处理队列,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#buildProcessQueueTableByBrokerName


private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>();for (MessageQueue mq : this.processQueueTable.keySet()) {Set<MessageQueue> mqs = result.get(mq.getBrokerName());if (null == mqs) {mqs = new HashSet<MessageQueue>();result.put(mq.getBrokerName(), mqs);}mqs.add(mq);}return result;    }

返回方法,找到broker master地址,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe




public FindBrokerResult findBrokerAddressInSubscribe(final String brokerName,final long brokerId,final boolean onlyThisBroker) {String brokerAddr = null;boolean slave = false;boolean found = false;//        获取broker的缓存信息HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {brokerAddr = map.get(brokerId);slave = brokerId != MixAll.MASTER_ID;found = brokerAddr != null;if (!found && !onlyThisBroker) {Entry<Long, String> entry = map.entrySet().iterator().next();brokerAddr = entry.getValue();slave = entry.getKey() != MixAll.MASTER_ID;found = true;}}if (found) {return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));}return null;}//  获取broker的版本信息public int findBrokerVersion(String brokerName, String brokerAddr) {if (this.brokerVersionTable.containsKey(brokerName)) {if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {return this.brokerVersionTable.get(brokerName).get(brokerAddr);}}return 0;    }

返回方法,批量锁定消息队列,org.apache.rocketmq.client.impl.MQClientAPIImpl#lockBatchMQ


public Set<MessageQueue> lockBatchMQ(final String addr,final LockBatchRequestBody requestBody,final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);request.setBody(requestBody.encode());//        同步执行=》RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);switch (response.getCode()) {case ResponseCode.SUCCESS: {LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();return messageQueues;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());    }

进入方法,org.apache.rocketmq.client.impl.factory.MQClientInstance#start介绍过了。

进入方法,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#updateTopicSubscribeInfoWhenSubscriptionChanged

private void updateTopicSubscribeInfoWhenSubscriptionChanged() {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);}}    }

进入方法,org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)介绍过了。

进入方法,org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBrokerWithLock介绍过了。

进入方法,org.apache.rocketmq.client.impl.factory.MQClientInstance#rebalanceImmediately

public void rebalanceImmediately() {this.rebalanceService.wakeup();    }

进入方法,负载均衡处理,org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance 前面介绍过了,请翻阅前面章节。

返回方法,

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start结束。

 

说在最后

本次解析仅表明我的观点,仅供参考。

 

加入技术微信群

钉钉技术群

相关文章
相关标签/搜索