rocketmq源码解析consumer、producer处理过程②

说在前面java

DefaultMQProducer、DefaultMQPullConsumer、DefaultMQPushConsumer 处理过程git

 

源码解析github

进入方法,获取默认的topic路由信息,org.apache.rocketmq.client.impl.MQClientAPIImpl#getDefaultTopicRouteInfoFromNameServerapache

public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)throws RemotingException, MQClientException, InterruptedException {return getTopicRouteInfoFromNameServer(topic, timeoutMillis, false);    }

进入方法,org.apache.rocketmq.client.impl.MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long, boolean)微信




public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();requestHeader.setTopic(topic);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);//        同步获取topic的路由信息=》RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.TOPIC_NOT_EXIST: {if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);}break;}case ResponseCode.SUCCESS: {byte[] body = response.getBody();if (body != null) {return TopicRouteData.decode(body, TopicRouteData.class);}}default:break;}throw new MQClientException(response.getCode(), response.getRemark());    }

进入方法,同步获取topic的路由信息,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync介绍过了。app

返回方法,获取topic路由信息,org.apache.rocketmq.client.impl.MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long, boolean)介绍过了。异步

返回方法,判断topic路由是否改变,org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteDataIsChangeasync

private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {if (olddata == null || nowdata == null)return true;TopicRouteData old = olddata.cloneTopicRouteData();TopicRouteData now = nowdata.cloneTopicRouteData();Collections.sort(old.getQueueDatas());Collections.sort(old.getBrokerDatas());Collections.sort(now.getQueueDatas());Collections.sort(now.getBrokerDatas());return !old.equals(now);}

返回方法,按brokerName选择一个消息队列,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueueide

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {//        =》return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);    }

进入方法,org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueueui



public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().getAndIncrement();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;MessageQueue mq = tpInfo.getMessageQueueList().get(pos);if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))return mq;}}//                找到一个brokerfinal String notBestBroker = latencyFaultTolerance.pickOneAtLeast();//                获取写队列数量int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {//                    选择一个消息队列=》final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}//            选择一个消息队列=》return tpInfo.selectOneMessageQueue();}//        按brokerName选择一个消息队列=》return tpInfo.selectOneMessageQueue(lastBrokerName);    }

进入方法,选择一个消息队列,org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue()

public MessageQueue selectOneMessageQueue() {int index = this.sendWhichQueue.getAndIncrement();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;return this.messageQueueList.get(pos);    }

返回方法,发送,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl













private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();//        按brokerName找到master地址=》String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null == brokerAddr) {//            找到topic的发布信息=》tryToFindTopicPublishInfo(mq.getTopic());//            找到broker的master地址=》brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}int sysFlag = 0;boolean msgBodyCompressed = false;//                默认对4k的消息压缩=》if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true;}//                是不是事务消息final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}//                    执行发送消息以前的钩子方法实现=》this.executeSendMessageHookBefore(context);}SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);//                若是是重试的topicif (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {//                    重试消费次数String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}//                    最大重试消费次数String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {case ASYNC:Message tmpMessage = msg;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);msg.setBody(prevBody);}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}//                        发送消息=》sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,//                            重试2次this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}//                        同步发送=》sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}if (this.hasSendMessageHook()) {context.setSendResult(sendResult);//                    执行发送消息后的钩子方法this.executeSendMessageHookAfter(context);}return sendResult;} catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {msg.setBody(prevBody);}}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);    }

进入方法,按brokerName找到master地址,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInPublish

public String findBrokerAddressInPublish(final String brokerName) {HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {return map.get(MixAll.MASTER_ID);}return null;    }

进入方法,找到topic的发布信息,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());//           从namesrv更新topic路由信息=》this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {//            从namesrv更新topic路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}    }

进入方法,从namesrv更新topic路由信息,org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String)

public boolean updateTopicRouteInfoFromNameServer(final String topic) {//        =》return updateTopicRouteInfoFromNameServer(topic, false, null);    }

进入方法,org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)





public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {try {if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {TopicRouteData topicRouteData;if (isDefault && defaultMQProducer != null) {//                        获取默认的topic路由信息 =》topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);if (topicRouteData != null) {//                            获取队列信息for (QueueData data : topicRouteData.getQueueDatas()) {//                                读写队列最大数量4int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {//                        获取topic路由信息=》topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);}if (topicRouteData != null) {TopicRouteData old = this.topicRouteTable.get(topic);//                        判断topic路由是否改变=》boolean changed = topicRouteDataIsChange(old, topicRouteData);if (!changed) {//                            须要更新路由信息changed = this.isNeedUpdateTopicRouteInfo(topic);} else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}if (changed) {TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {//                                更新broker的地址列表this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{//                                topic路由信息转换成topic发布信息=》TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);//                                遍历生产者信息Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {//                                        更新topic发布信息=》impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info{//                                获取消息队列订阅信息Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);//                                遍历消费者Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {//                                        更新topic的订阅信息=》impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);}} catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}} finally {this.lockNamesrv.unlock();}} else {log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);}} catch (InterruptedException e) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}return false;    }

进入方法,获取默认的topic路由信息,org.apache.rocketmq.client.impl.MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long, boolean)




public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();requestHeader.setTopic(topic);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);//        同步获取topic的路由信息=》RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.TOPIC_NOT_EXIST: {if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);}break;}case ResponseCode.SUCCESS: {byte[] body = response.getBody();if (body != null) {return TopicRouteData.decode(body, TopicRouteData.class);}}default:break;}throw new MQClientException(response.getCode(), response.getRemark());    }

返回方法,获取topic路由信息,org.apache.rocketmq.client.impl.MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long, boolean)介绍过了。

返回方法,判断topic路由是否改变,org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteDataIsChange

private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {if (olddata == null || nowdata == null)return true;TopicRouteData old = olddata.cloneTopicRouteData();TopicRouteData now = nowdata.cloneTopicRouteData();Collections.sort(old.getQueueDatas());Collections.sort(old.getBrokerDatas());Collections.sort(now.getQueueDatas());Collections.sort(now.getBrokerDatas());return !old.equals(now);}

进入方法,须要更新路由信息,org.apache.rocketmq.client.impl.factory.MQClientInstance#isNeedUpdateTopicRouteInfo


private boolean isNeedUpdateTopicRouteInfo(final String topic) {boolean result = false;{//            遍历生产者Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext() && !result) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {//                    获取topic的发布信息是否须要更新=》result = impl.isPublishTopicNeedUpdate(topic);}}}{//            遍历消费者Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext() && !result) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {//                    订阅的信息是否须要更新=》result = impl.isSubscribeTopicNeedUpdate(topic);}}}return result;    }

返回方法,找到broker的master地址,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInPublish

进入方法,发送消息,org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage(java.lang.String, java.lang.String, org.apache.rocketmq.common.message.Message, org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader, long, org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.producer.SendCallback, org.apache.rocketmq.client.impl.producer.TopicPublishInfo, org.apache.rocketmq.client.impl.factory.MQClientInstance, int, org.apache.rocketmq.client.hook.SendMessageContext, org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl)public String findBrokerAddressInPublish(final String brokerName) {HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {return map.get(MixAll.MASTER_ID);}return null;    }


public SendResult sendMessage(final String addr,final String brokerName,final Message msg,final SendMessageRequestHeader requestHeader,final long timeoutMillis,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final MQClientInstance instance,final int retryTimesWhenSendFailed,final SendMessageContext context,final DefaultMQProducerImpl producer) throws RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();RemotingCommand request = null;if (sendSmartMsg || msg instanceof MessageBatch) {SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);} else {request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);}request.setBody(msg.getBody());switch (communicationMode) {case ONEWAY://                单途请求=》this.remotingClient.invokeOneway(addr, request, timeoutMillis);return null;case ASYNC:final AtomicInteger times = new AtomicInteger();long costTimeAsync = System.currentTimeMillis() - beginStartTime;//                发送超时if (timeoutMillis < costTimeAsync) {throw new RemotingTooMuchRequestException("sendMessage call timeout");}//                异步发送=》this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, context, producer);return null;case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTimeSync) {throw new RemotingTooMuchRequestException("sendMessage call timeout");}//                同步发送=》return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);default:assert false;break;}return null;    }

进入方法,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway介绍过了。

返回方法,异步发送,org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageAsync





private void sendMessageAsync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final MQClientInstance instance,final int retryTimesWhenSendFailed,final AtomicInteger times,final SendMessageContext context,final DefaultMQProducerImpl producer) throws InterruptedException, RemotingException {//        异步发送this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {@Overridepublic void operationComplete(ResponseFuture responseFuture) {RemotingCommand response = responseFuture.getResponseCommand();if (null == sendCallback && response != null) {try {//                       处理发送响应=》SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);if (context != null && sendResult != null) {context.setSendResult(sendResult);//                            执行发送消息后的钩子方法context.getProducer().executeSendMessageHookAfter(context);}} catch (Throwable e) {}producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);return;}if (response != null) {try {//                        处理发送响应=》SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);assert sendResult != null;if (context != null) {context.setSendResult(sendResult);//                            执行发送消息后的钩子方法context.getProducer().executeSendMessageHookAfter(context);}try {//                            执行回调=》sendCallback.onSuccess(sendResult);} catch (Throwable e) {}producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);} catch (Exception e) {producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);//                        异常处理=》onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, e, context, false, producer);}} else {producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);if (!responseFuture.isSendRequestOK()) {MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, ex, context, true, producer);} else if (responseFuture.isTimeout()) {MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",responseFuture.getCause());onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, ex, context, true, producer);} else {MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, ex, context, true, producer);}}}});    }

进入方法,异步发送,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeAsync介绍过了。

返回方法,处理发送响应,org.apache.rocketmq.client.impl.MQClientAPIImpl#processSendResponse




private SendResult processSendResponse(final String brokerName,final Message msg,final RemotingCommand response) throws MQBrokerException, RemotingCommandException {switch (response.getCode()) {case ResponseCode.FLUSH_DISK_TIMEOUT:case ResponseCode.FLUSH_SLAVE_TIMEOUT:case ResponseCode.SLAVE_NOT_AVAILABLE: {}case ResponseCode.SUCCESS: {SendStatus sendStatus = SendStatus.SEND_OK;switch (response.getCode()) {case ResponseCode.FLUSH_DISK_TIMEOUT:sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;break;case ResponseCode.FLUSH_SLAVE_TIMEOUT:sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;break;case ResponseCode.SLAVE_NOT_AVAILABLE:sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;break;case ResponseCode.SUCCESS:sendStatus = SendStatus.SEND_OK;break;default:assert false;break;}SendMessageResponseHeader responseHeader =(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId());String uniqMsgId = MessageClientIDSetter.getUniqID(msg);if (msg instanceof MessageBatch) {StringBuilder sb = new StringBuilder();for (Message message : (MessageBatch) msg) {sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message));}uniqMsgId = sb.toString();}SendResult sendResult = new SendResult(sendStatus,uniqMsgId,responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());sendResult.setTransactionId(responseHeader.getTransactionId());String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);if (regionId == null || regionId.isEmpty()) {regionId = MixAll.DEFAULT_TRACE_REGION_ID;}if (traceOn != null && traceOn.equals("false")) {sendResult.setTraceOn(false);} else {sendResult.setTraceOn(true);}sendResult.setRegionId(regionId);return sendResult;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());    }

返回方法,异常处理,org.apache.rocketmq.client.impl.MQClientAPIImpl#onExceptionImpl


private void onExceptionImpl(final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final MQClientInstance instance,final int timesTotal,final AtomicInteger curTimes,final Exception e,final SendMessageContext context,final boolean needRetry,final DefaultMQProducerImpl producer) {int tmp = curTimes.incrementAndGet();if (needRetry && tmp <= timesTotal) {String retryBrokerName = brokerName;//by default, it will send to the same brokerif (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send//                按brokerName查找消息队列=》MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);retryBrokerName = mqChosen.getBrokerName();}//            按brokerName查找master地址=》String addr = instance.findBrokerAddressInPublish(retryBrokerName);log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,retryBrokerName);try {request.setOpaque(RemotingCommand.createNewRequestId());//                异步发送消息=》sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,timesTotal, curTimes, context, producer);} catch (InterruptedException e1) {onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,context, false, producer);} catch (RemotingConnectException e1) {producer.updateFaultItem(brokerName, 3000, true);onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,context, true, producer);} catch (RemotingTooMuchRequestException e1) {onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,context, false, producer);} catch (RemotingException e1) {producer.updateFaultItem(brokerName, 3000, true);onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,context, true, producer);}} else {if (context != null) {context.setException(e);//                执行发送消息后的钩子方法context.getProducer().executeSendMessageHookAfter(context);}try {//                执行回调sendCallback.onException(e);} catch (Exception ignored) {}}    }

进入方法,按brokerName查找消息队列,org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue介绍过了。

返回方法,按brokerName查找master地址,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInPublish

public String findBrokerAddressInPublish(final String brokerName) {HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {return map.get(MixAll.MASTER_ID);}return null;    }

返回方法,异步发送消息,org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageAsync介绍过了。

返回方法,同步发送,org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageSync

private SendResult sendMessageSync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request) throws RemotingException, MQBrokerException, InterruptedException {//        =》RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;//       处理请求响应=》return this.processSendResponse(brokerName, msg, response);    }

进入方法,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync介绍过了。

返回方法,处理请求响应,org.apache.rocketmq.client.impl.MQClientAPIImpl#processSendResponse介绍过了。

返回方法,org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message) 生产者启动、发送消息解析完毕。

 

说在最后

本次解析仅表明我的观点,仅供参考。

 

加入技术微信群

钉钉技术群

相关文章
相关标签/搜索