rocketmq源码解析发送消息处理器②

说在前面apache

发小消息处理器微信

 

源码解析app

返回方法,在发送消息以后建立topic配置,org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageBackMethod介绍过了。this

返回方法,存储批量消息,org.apache.rocketmq.store.DefaultMessageStore#putMessages介绍过了。code

返回方法,解析存储结果,org.apache.rocketmq.broker.processor.SendMessageProcessor#handlePutMessageResultserver











private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,RemotingCommand request, MessageExt msg,SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,int queueIdInt) {if (putMessageResult == null) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("store putMessage return null");return response;}boolean sendOK = false;switch (putMessageResult.getPutMessageStatus()) {// Successcase PUT_OK:sendOK = true;response.setCode(ResponseCode.SUCCESS);break;case FLUSH_DISK_TIMEOUT:response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);sendOK = true;break;case FLUSH_SLAVE_TIMEOUT:response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);sendOK = true;break;case SLAVE_NOT_AVAILABLE:response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);sendOK = true;break;// Failedcase CREATE_MAPEDFILE_FAILED:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("create mapped file failed, server is busy or broken.");break;case MESSAGE_ILLEGAL:case PROPERTIES_SIZE_EXCEEDED:response.setCode(ResponseCode.MESSAGE_ILLEGAL);response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");break;case SERVICE_NOT_AVAILABLE:response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);response.setRemark("service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");break;case OS_PAGECACHE_BUSY:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");break;case UNKNOWN_ERROR:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UNKNOWN_ERROR");break;default:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UNKNOWN_ERROR DEFAULT");break;}String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);if (sendOK) {this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),putMessageResult.getAppendMessageResult().getWroteBytes());this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());response.setRemark(null);responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());responseHeader.setQueueId(queueIdInt);responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());//            响应=》doResponse(ctx, request, response);if (hasSendMessageHook()) {sendMessageContext.setMsgId(responseHeader.getMsgId());sendMessageContext.setQueueId(responseHeader.getQueueId());sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);sendMessageContext.setCommercialSendTimes(incValue);sendMessageContext.setCommercialSendSize(wroteSize);sendMessageContext.setCommercialOwner(owner);}return null;} else {if (hasSendMessageHook()) {int wroteSize = request.getBody().length;int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);sendMessageContext.setCommercialSendTimes(incValue);sendMessageContext.setCommercialSendSize(wroteSize);sendMessageContext.setCommercialOwner(owner);}}return response;    }

进入方法,响应,org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#doResponseblog

protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request,final RemotingCommand response) {if (!request.isOnewayRPC()) {try {ctx.writeAndFlush(response);} catch (Throwable e) {log.error("SendMessageProcessor process request over, but response failed", e);log.error(request.toString());log.error(response.toString());}}    }

返回方法,发送消息,org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage介绍过了。ci

返回方法,执行发送消息以后的钩子方法,org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#executeSendMessageHookAfterget

public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) {if (hasSendMessageHook()) {for (SendMessageHook hook : this.sendMessageHookList) {try {if (response != null) {final SendMessageResponseHeader responseHeader =(SendMessageResponseHeader) response.readCustomHeader();context.setMsgId(responseHeader.getMsgId());context.setQueueId(responseHeader.getQueueId());context.setQueueOffset(responseHeader.getQueueOffset());context.setCode(response.getCode());context.setErrorMsg(response.getRemark());}hook.sendMessageAfter(context);} catch (Throwable e) {// Ignore}}}    }

返回方法,org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest结束。源码

 

说在最后

本次解析仅表明我的观点,仅供参考。

 

加入技术微信群

钉钉技术群

相关文章
相关标签/搜索