说在前面apache
发送消息处理器微信
源码解析并发
进入这个方法,org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequestapp
@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:// 消费者发送消息返回=》return this.consumerSendMsgBack(ctx, request);default:// 解析请求消息头,队序列化作了些优化,消息头中字段过多,字段过长并发状况下会对序列化效率产生影响=》SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return null;}// 构建消息上下文=》mqtraceContext = buildMsgContext(ctx, requestHeader);// 发送消息以前执行钩子方法=》this.executeSendMessageHookBefore(ctx, request, mqtraceContext);RemotingCommand response;if (requestHeader.isBatch()) {// 批量消息发送=》response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {// 发送消息=》response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);}// 执行发送消息以后的钩子方法=》this.executeSendMessageHookAfter(response, mqtraceContext);return response;} }
进入这个方法,消费者发送消息返回,org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBackdom
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final ConsumerSendMsgBackRequestHeader requestHeader =(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {ConsumeMessageContext context = new ConsumeMessageContext();context.setConsumerGroup(requestHeader.getGroup());context.setTopic(requestHeader.getOriginTopic());context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);context.setCommercialRcvTimes(1);context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));// 执行消费消息的钩子方法this.executeConsumeMessageHookAfter(context);}// 获取消费组的订阅配置信息=》SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());if (null == subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));return response;}// 无写权限if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");return response;}if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}// 重试topic=%RETRY%+消费组String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();int topicSysFlag = 0;if (requestHeader.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}// 在发送消息返回后建立topic配置信息=》TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return response;}// 无写权限if (!PermName.isWriteable(topicConfig.getPerm())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));return response;}// 按offset查询消息=》MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());if (null == msgExt) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("look message by offset failed, " + requestHeader.getOffset());return response;}final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (null == retryTopic) {MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());}msgExt.setWaitStoreMsgOK(false);int delayLevel = requestHeader.getDelayLevel();// 最大重试次数,默认16次int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();}// 重试16次后if (msgExt.getReconsumeTimes() >= maxReconsumeTimes|| delayLevel < 0) {// 建立%DLQ%+消费组 topicnewTopic = MixAll.getDLQTopic(requestHeader.getGroup());queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;// 建立topic配置信息在发送消息返回=》topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE, 0);if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return response;}} else {if (0 == delayLevel) {delayLevel = 3 + msgExt.getReconsumeTimes();}msgExt.setDelayTimeLevel(delayLevel);}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(newTopic);msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));msgInner.setQueueId(queueIdInt);msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);String originMsgId = MessageAccessor.getOriginMessageId(msgExt);MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);// 存储消息=》PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);if (putMessageResult != null) {switch (putMessageResult.getPutMessageStatus()) {case PUT_OK:String backTopic = msgExt.getTopic();String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (correctTopic != null) {backTopic = correctTopic;}this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;default:break;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(putMessageResult.getPutMessageStatus().name());return response;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("putMessageResult is null");return response; }
进入这个方法,执行消费消息的钩子方法,org.apache.rocketmq.broker.processor.SendMessageProcessor#executeConsumeMessageHookAfteride
public void executeConsumeMessageHookAfter(final ConsumeMessageContext context) {if (hasConsumeMessageHook()) {for (ConsumeMessageHook hook : this.consumeMessageHookList) {try {// 客户能够实现本身的消费消息后的钩子方法hook.consumeMessageAfter(context);} catch (Throwable e) {// Ignore}}} }
往上返回到这个方法,获取消费组的订阅配置信息,org.apache.rocketmq.broker.subscription.SubscriptionGroupManager#findSubscriptionGroupConfig前面介绍过了。优化
往上返回到这个方法,在发送消息返回后建立topic配置信息,org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageBackMethodui
public TopicConfig createTopicInSendMessageBackMethod(final String topic,final int clientDefaultTopicQueueNums,final int perm,final int topicSysFlag) {// 获取topic配置信息TopicConfig topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;boolean createNew = false;try {if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;topicConfig = new TopicConfig(topic);topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);topicConfig.setPerm(perm);topicConfig.setTopicSysFlag(topicSysFlag);log.info("create new topic {}", topicConfig);// 存储重试的tpic配置信息this.topicConfigTable.put(topic, topicConfig);createNew = true;// 修饰数据的版本号this.dataVersion.nextVersion();// 持久化=》this.persist();} finally {this.lockTopicConfigTable.unlock();}}} catch (InterruptedException e) {log.error("createTopicInSendMessageBackMethod exception", e);}// 若是topic配置信息是从新建立的,注册到broker集群中=》if (createNew) {this.brokerController.registerBrokerAll(false, true,true);}return topicConfig; }
进入这个方法,若是topic配置信息是从新建立的,注册到broker集群中,org.apache.rocketmq.broker.BrokerController#registerBrokerAll前面介绍过了。this
往上返回到这个方法,按offset查询消息,org.apache.rocketmq.store.DefaultMessageStore#lookMessageByOffset(long)前面介绍过了。debug
往上返回到这个方法,建立topic配置信息在发送消息返回,org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageBackMethod前面介绍过了。
往上返回到这个方法,存储消息,org.apache.rocketmq.store.DefaultMessageStore#putMessage前面介绍过了。
往上返回到这个方法,解析请求消息头,队序列化作了些优化,消息头中字段过多,字段过长并发状况下会对序列化效率产生影响,org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#parseRequestHeader
protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request)throws RemotingCommandException {SendMessageRequestHeaderV2 requestHeaderV2 = null;SendMessageRequestHeader requestHeader = null;switch (request.getCode()) {case RequestCode.SEND_BATCH_MESSAGE:case RequestCode.SEND_MESSAGE_V2:requestHeaderV2 =(SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);case RequestCode.SEND_MESSAGE:if (null == requestHeaderV2) {requestHeader =(SendMessageRequestHeader) request.decodeCommandCustomHeader(SendMessageRequestHeader.class);} else {requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);}default:break;}return requestHeader; }
返回到这个方法,发送消息以前执行钩子方法,org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#executeSendMessageHookBefore
public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,SendMessageContext context) {if (hasSendMessageHook()) {for (SendMessageHook hook : this.sendMessageHookList) {try {// 解析消息头,对序列化的优化final SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (null != requestHeader) {context.setProducerGroup(requestHeader.getProducerGroup());context.setTopic(requestHeader.getTopic());context.setBodyLength(request.getBody().length);context.setMsgProps(requestHeader.getProperties());context.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));context.setBrokerAddr(this.brokerController.getBrokerAddr());context.setQueueId(requestHeader.getQueueId());}// 执行发送消息以前的钩子方法hook.sendMessageBefore(context);if (requestHeader != null) {requestHeader.setProperties(context.getMsgProps());}} catch (Throwable e) {// Ignore}}} }
返回到这个方法,批量消息发送,org.apache.rocketmq.broker.processor.SendMessageProcessor#sendBatchMessage
private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,final RemotingCommand request,final SendMessageContext sendMessageContext,final SendMessageRequestHeader requestHeader) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();response.setOpaque(request.getOpaque());response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));log.debug("Receive SendMessage request command {}", request);// 开始接受发送请求的时间,默认是当即发送final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();if (this.brokerController.getMessageStore().now() < startTimstamp) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));return response;}response.setCode(-1);// 消息检查=》super.msgCheck(ctx, requestHeader, response);if (response.getCode() != -1) {return response;}int queueIdInt = requestHeader.getQueueId();// 查询topic配置TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();}// topic长度大于127,非法if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {response.setCode(ResponseCode.MESSAGE_ILLEGAL);response.setRemark("message topic length too long " + requestHeader.getTopic().length());return response;}// 若是是重试topic,非法if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {response.setCode(ResponseCode.MESSAGE_ILLEGAL);response.setRemark("batch request does not support retry group " + requestHeader.getTopic());return response;}MessageExtBatch messageExtBatch = new MessageExtBatch();messageExtBatch.setTopic(requestHeader.getTopic());messageExtBatch.setQueueId(queueIdInt);int sysFlag = requestHeader.getSysFlag();if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;}messageExtBatch.setSysFlag(sysFlag);messageExtBatch.setFlag(requestHeader.getFlag());MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));messageExtBatch.setBody(request.getBody());messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());messageExtBatch.setBornHost(ctx.channel().remoteAddress());messageExtBatch.setStoreHost(this.getStoreHost());messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());// 存储批量消息=》PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);// 解析存储结果=》return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt); }
进入这个方法,消息检查,org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#msgCheck
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,final SendMessageRequestHeader requestHeader, final RemotingCommand response) {if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending message is forbidden");return response;}// 不是自动建立的topic不能自动发送消息=》if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";log.warn(errorMsg);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorMsg);return response;}// 查询topic配置TopicConfig topicConfig =this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (null == topicConfig) {int topicSysFlag = 0;if (requestHeader.isUnitMode()) {// 根据是否是重试topic设置topicSysFlagif (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);} else {topicSysFlag = TopicSysFlag.buildSysFlag(true, false);}}log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());// 建立topic配置在发送消息以后=》topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(requestHeader.getTopic(),requestHeader.getDefaultTopic(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.getDefaultTopicQueueNums(), topicSysFlag);if (null == topicConfig) {// 若是是重试topicif (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {// 在发送消息以后建立topic配置=》topicConfig =this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,topicSysFlag);}}if (null == topicConfig) {response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;}}int queueIdInt = requestHeader.getQueueId();int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());if (queueIdInt >= idValid) {String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",queueIdInt,topicConfig.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(errorInfo);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorInfo);return response;}return response; }
进入这个方法,建立topic配置在发送消息以后,org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageMethod前面介绍过了。
接下篇。
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群