说在前面java
DefaultMQProducer、DefaultMQPullConsumer、DefaultMQPushConsumer 处理过程算法
源码解析express
PullConsumerapache
public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");for (MessageQueue mq : mqs) {System.out.printf("Consume from the queue: %s%n", mq);SINGLE_MQ:while (true) {try {PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);System.out.printf("%s%n", pullResult);putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:break;case NO_MATCHED_MSG:break;case NO_NEW_MSG:break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}} catch (Exception e) {e.printStackTrace();}}}consumer.shutdown(); }
进入方法,pullConsumer启动,org.apache.rocketmq.client.consumer.DefaultMQPullConsumer#start缓存
@Overridepublic void start() throws MQClientException {// 服务启动=》this.defaultMQPullConsumerImpl.start(); }
进入方法,服务启动,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#start微信
public synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 检查配置=》this.checkConfig();// copy topic订阅配置=》this.copySubscription();if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPullConsumer.changeInstanceNameToPID();}// 建立mqclient对象=》this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());// 注册过滤消息钩子方法this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPullConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();} else {switch (this.defaultMQPullConsumer.getMessageModel()) {// 广播 本地存储case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());break;// 集群 远程存储case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());break;default:break;}this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);}// offset加载=》this.offsetStore.load();// 注册消费者=》boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}// 服务启动=》mQClientFactory.start();log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PullConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}}
进入方法,检查配置,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#checkConfigapp
private void checkConfig() throws MQClientException {// check consumerGroup =》Validators.checkGroup(this.defaultMQPullConsumer.getConsumerGroup());// consumerGroupif (null == this.defaultMQPullConsumer.getConsumerGroup()) {throw new MQClientException("consumerGroup is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// consumerGroup 消费组名称不能和DEFAULT_CONSUMER同样if (this.defaultMQPullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {throw new MQClientException("consumerGroup can not equal "+ MixAll.DEFAULT_CONSUMER_GROUP+ ", please specify another one."+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// messageModel 消费模式不能为空,默认集群消费if (null == this.defaultMQPullConsumer.getMessageModel()) {throw new MQClientException("messageModel is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// allocateMessageQueueStrategy 默认平均散列队列算法if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) {throw new MQClientException("allocateMessageQueueStrategy is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// allocateMessageQueueStrategy 30s超时if (this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis()) {throw new MQClientException("Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);} }
进入方法, copy topic订阅配置,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#copySubscriptiondom
private void copySubscription() throws MQClientException {try {// 全部注册的topicSet<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();if (registerTopics != null) {for (final String topic : registerTopics) {// 构建topic订阅配置=》SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),topic, SubscriptionData.SUB_ALL);// 存储topic订阅配置=》this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);}}} catch (Exception e) {throw new MQClientException("subscription exception", e);} }
进入方法,构建topic订阅配置,org.apache.rocketmq.common.filter.FilterAPI#buildSubscriptionData异步
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,String subString) throws Exception {SubscriptionData subscriptionData = new SubscriptionData();subscriptionData.setTopic(topic);subscriptionData.setSubString(subString);// 订阅全部的topicif (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {subscriptionData.setSubString(SubscriptionData.SUB_ALL);} else {// 解析tags,多个tag用|分开String[] tags = subString.split("\\|\\|");if (tags.length > 0) {for (String tag : tags) {if (tag.length() > 0) {String trimString = tag.trim();if (trimString.length() > 0) {// 解析订阅信息中的tagsubscriptionData.getTagsSet().add(trimString);subscriptionData.getCodeSet().add(trimString.hashCode());}}}} else {throw new Exception("subString split error");}}return subscriptionData; }
返回方法,建立mqclient对象,org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)ide
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {String clientId = clientConfig.buildMQClientId();// 从本地缓存中获取client对象,简单的通常会concurrentHashMap当本地缓存,性能很高MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance; }
返回方法,offset加载,org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore#load
@Overridepublic void load() throws MQClientException {// 读取本地的offset=》OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);log.info("load consumer's offset, {} {} {}",this.groupName,mq,offset.get());}} }
进入方法,org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore#readLocalOffset
private OffsetSerializeWrapper readLocalOffset() throws MQClientException {String content = null;try {content = MixAll.file2String(this.storePath);} catch (IOException e) {log.warn("Load local offset store file exception", e);}// 若是文件中读取不到,从备份文件中读取=》if (null == content || content.length() == 0) {return this.readLocalOffsetBak();} else {OffsetSerializeWrapper offsetSerializeWrapper = null;try {offsetSerializeWrapper =OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);} catch (Exception e) {log.warn("readLocalOffset Exception, and try to correct", e);return this.readLocalOffsetBak();}return offsetSerializeWrapper;} }
返回方法,注册消费者,org.apache.rocketmq.client.impl.factory.MQClientInstance#registerConsumer
public boolean registerConsumer(final String group, final MQConsumerInner consumer) {if (null == group || null == consumer) {return false;}MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);if (prev != null) {log.warn("the consumer group[" + group + "] exist already.");return false;}return true; }
进入方法,服务启动,org.apache.rocketmq.client.impl.factory.MQClientInstance#start 前面介绍过了,客户以翻阅前面的章节。
进入方法,按topic找到订阅的消费队列,org.apache.rocketmq.client.consumer.DefaultMQPullConsumer#fetchSubscribeMessageQueues
@Overridepublic Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(topic); }
进入方法,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#fetchSubscribeMessageQueues
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {this.makeSureStateOK();return this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic); }
进入方法,org.apache.rocketmq.client.impl.MQAdminImpl#fetchSubscribeMessageQueues
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {try {// 找到topic的topic路由信息TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);if (topicRouteData != null) {Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);if (!mqList.isEmpty()) {return mqList;} else {throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);}}} catch (Exception e) {throw new MQClientException("Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),e);}throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null); }
进入方法,拉取消息,org.apache.rocketmq.client.consumer.DefaultMQPullConsumer#pullBlockIfNotFound(org.apache.rocketmq.common.message.MessageQueue, java.lang.String, long, int)
@Overridepublic PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 拉取消息,若是没找到阻塞=》return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums); }
进入方法,拉取消息,若是没找到阻塞,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#pullBlockIfNotFound(org.apache.rocketmq.common.message.MessageQueue, java.lang.String, long, int)
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// =》return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); }
进入方法,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#pullSyncImpl
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();if (null == mq) {throw new MQClientException("mq is null", null);}if (offset < 0) {throw new MQClientException("offset < 0", null);}if (maxNums <= 0) {throw new MQClientException("maxNums <= 0", null);}// 自动订阅=》this.subscriptionAutomatically(mq.getTopic());int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);SubscriptionData subscriptionData;try {// 构建订阅数据=》subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),mq.getTopic(), subExpression);} catch (Exception e) {throw new MQClientException("parse subscription error", e);}// 30s超时long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;// 拉取消息=》PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(mq,subscriptionData.getSubString(),0L,offset,maxNums,sysFlag,0,this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),timeoutMillis,CommunicationMode.SYNC,null);// 处理拉取消息请求=》this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);if (!this.consumeMessageHookList.isEmpty()) {ConsumeMessageContext consumeMessageContext = null;consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(this.groupName());consumeMessageContext.setMq(mq);consumeMessageContext.setMsgList(pullResult.getMsgFoundList());consumeMessageContext.setSuccess(false);this.executeHookBefore(consumeMessageContext);consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());consumeMessageContext.setSuccess(true);// 执行消费消息后的钩子方法this.executeHookAfter(consumeMessageContext);}return pullResult; }
进入方法,自动订阅,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#subscriptionAutomatically
public void subscriptionAutomatically(final String topic) {if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {try {// 构建订阅数据=》SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),topic, SubscriptionData.SUB_ALL);this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);} catch (Exception ignore) {}} }
进入方法,构建订阅数据,org.apache.rocketmq.common.filter.FilterAPI#buildSubscriptionData介绍过了。
进入方法,拉取消息,org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl(org.apache.rocketmq.common.message.MessageQueue, java.lang.String, long, long, int, int, long, long, long, org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.consumer.PullCallback)
public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// =》return pullKernelImpl(mq,subExpression,ExpressionType.TAG,subVersion, offset,maxNums,sysFlag,commitOffset,brokerSuspendMaxTimeMillis,timeoutMillis,communicationMode,pullCallback); }
进入方法,org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl(org.apache.rocketmq.common.message.MessageQueue, java.lang.String, java.lang.String, long, long, int, int, long, long, long, org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.consumer.PullCallback)
public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final String expressionType,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 查询broker=》FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);if (null == findBrokerResult) {// 从namesrv更新topic路由=》this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());// 查询broker=》findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);}if (findBrokerResult != null) {{// check version 不是tag表达式类型if (!ExpressionType.isTagType(expressionType)&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {throw new MQClientException("The broker[" + mq.getBrokerName() + ", "+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);}}int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);requestHeader.setExpressionType(expressionType);String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {// 找到拉取消息的broker=》brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);}// 拉取消息=》PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
进入方法,查询broker,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe
public FindBrokerResult findBrokerAddressInSubscribe(final String brokerName,final long brokerId,final boolean onlyThisBroker) {String brokerAddr = null;boolean slave = false;boolean found = false;// 获取broker的缓存信息HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {brokerAddr = map.get(brokerId);slave = brokerId != MixAll.MASTER_ID;found = brokerAddr != null;if (!found && !onlyThisBroker) {Entry<Long, String> entry = map.entrySet().iterator().next();brokerAddr = entry.getValue();slave = entry.getKey() != MixAll.MASTER_ID;found = true;}}if (found) {return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));}return null;}// 获取broker的版本信息public int findBrokerVersion(String brokerName, String brokerAddr) {if (this.brokerVersionTable.containsKey(brokerName)) {if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {return this.brokerVersionTable.get(brokerName).get(brokerAddr);}}return 0; }
返回方法,从namesrv更新topic路由,org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)介绍过了。
返回方法,查询broker,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe介绍过了。
返回方法,找到拉取消息的broker,org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#computPullFromWhichFilterServer
private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)throws MQClientException {// 获取topic路由=》ConcurrentMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();if (topicRouteTable != null) {TopicRouteData topicRouteData = topicRouteTable.get(topic);List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);// 随机策略if (list != null && !list.isEmpty()) {return list.get(randomNum() % list.size());}}throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "+ topic, null); }
返回方法,拉取消息,org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessage
public PullResult pullMessage(final String addr,final PullMessageRequestHeader requestHeader,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);switch (communicationMode) {case ONEWAY:assert false;return null;case ASYNC:// =》this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);return null;case SYNC:// =》return this.pullMessageSync(addr, request, timeoutMillis);default:assert false;break;}return null; }
进入方法,org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync
private void pullMessageAsync(final String addr,final RemotingCommand request,final long timeoutMillis,final PullCallback pullCallback) throws RemotingException, InterruptedException {// 异步拉取消息=》this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {@Overridepublic void operationComplete(ResponseFuture responseFuture) {RemotingCommand response = responseFuture.getResponseCommand();if (response != null) {try {// 处理拉取响应=》PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);assert pullResult != null;pullCallback.onSuccess(pullResult);} catch (Exception e) {pullCallback.onException(e);}} else {if (!responseFuture.isSendRequestOK()) {pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));} else if (responseFuture.isTimeout()) {pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,responseFuture.getCause()));} else {pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));}}}}); }
进入方法,异步拉取消息,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeAsync介绍过了。
返回方法,处理拉取响应,org.apache.rocketmq.client.impl.MQClientAPIImpl#processPullResponse
private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException {PullStatus pullStatus = PullStatus.NO_NEW_MSG;switch (response.getCode()) {case ResponseCode.SUCCESS:pullStatus = PullStatus.FOUND;break;case ResponseCode.PULL_NOT_FOUND:pullStatus = PullStatus.NO_NEW_MSG;break;case ResponseCode.PULL_RETRY_IMMEDIATELY:pullStatus = PullStatus.NO_MATCHED_MSG;break;case ResponseCode.PULL_OFFSET_MOVED:pullStatus = PullStatus.OFFSET_ILLEGAL;break;default:throw new MQBrokerException(response.getCode(), response.getRemark());}PullMessageResponseHeader responseHeader =(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody()); }
返回方法,处理拉取消息请求,org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#processPullResult
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,final SubscriptionData subscriptionData) {PullResultExt pullResultExt = (PullResultExt) pullResult;// 根据brokerId肯定从哪一个节点拉去消息=》this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());// 若是消息没找到if (PullStatus.FOUND == pullResult.getPullStatus()) {ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());// 消息解码List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);List<MessageExt> msgListFilterAgain = msgList;if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());for (MessageExt msg : msgList) {if (msg.getTags() != null) {if (subscriptionData.getTagsSet().contains(msg.getTags())) {msgListFilterAgain.add(msg);}}}}// 是否有消息过滤钩子方法if (this.hasHook()) {FilterMessageContext filterMessageContext = new FilterMessageContext();filterMessageContext.setUnitMode(unitMode);filterMessageContext.setMsgList(msgListFilterAgain);// 执行钩子方法=》this.executeHook(filterMessageContext);}for (MessageExt msg : msgListFilterAgain) {String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (traFlag != null && Boolean.parseBoolean(traFlag)) {msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));}MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,Long.toString(pullResult.getMinOffset()));MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,Long.toString(pullResult.getMaxOffset()));}pullResultExt.setMsgFoundList(msgListFilterAgain);}pullResultExt.setMessageBinary(null);return pullResult; }
pullConsumer启动、拉取消息解析完毕。
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群