rocketmq源码解析之管理请求直接消费消息

说在前面apache

管理请求 CONSUME_MESSAGE_DIRECTLY 直接消费消息微信

 

源码解析app

进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#consumeMessageDirectlyide



private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);request.getExtFields().put("brokerName", this.brokerController.getBrokerConfig().getBrokerName());SelectMappedBufferResult selectMappedBufferResult = null;try {MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId());//            从commit的offset位置获取一条消息=》selectMappedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(messageId.getOffset());byte[] body = new byte[selectMappedBufferResult.getSize()];selectMappedBufferResult.getByteBuffer().get(body);request.setBody(body);} catch (UnknownHostException e) {} finally {if (selectMappedBufferResult != null) {selectMappedBufferResult.release();}}//        =》return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, request, requestHeader.getConsumerGroup(),requestHeader.getClientId());}

进入这个方法org.apache.rocketmq.store.DefaultMessageStore#selectOneMessageByOffset(long) 根据offset从commitLog中读取消息this

@Overridepublic SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {//        SelectMappedBufferResult中存储的是消息offset、建立时间=》SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);if (null != sbr) {try {// 1 TOTALSIZEint size = sbr.getByteBuffer().getInt();//                =》return this.commitLog.getMessage(commitLogOffset, size);} finally {sbr.release();}}return null;}

进入这个方法org.apache.rocketmq.store.CommitLog#getMessage查询消息.net

 

public SelectMappedBufferResult getMessage(final long offset, final int size) {int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();//        根据offset找到映射文件 =》MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);if (mappedFile != null) {int pos = (int) (offset % mappedFileSize);return mappedFile.selectMappedBuffer(pos, size);}return null;}

进入这个方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)根据offset查询映射文件netty

 




public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {//            获取队列中第一个映射文件MappedFile firstMappedFile = this.getFirstMappedFile();//            获取队列中最后一个映射文件MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {//                若是offset不在索引文件的offset范围内if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {//                   找到映射文件在队列中的索引位置int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {//                        获取索引文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}//                    offset在目标文件的起始offset和结束offset范围内if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}//                    若是按索引在队列中找不到映射文件就遍历队列查找映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}//                若是offset=0获取队列中第一个映射文件,我的感受这个逻辑是否放在前面判断更为合理,仍是放在这里另有深意if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null;}

 

进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#callConsumer调用消费者code

 



private RemotingCommand callConsumer(final int requestCode,final RemotingCommand request,final String consumerGroup,final String clientId) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);//        按消费组和clentId查询client的渠道信息 =》ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);if (null == clientChannelInfo) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("The Consumer <%s> <%s> not online", consumerGroup, clientId));return response;}if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT",clientId,MQVersion.getVersionDesc(clientChannelInfo.getVersion())));return response;}try {RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null);newRequest.setExtFields(request.getExtFields());newRequest.setBody(request.getBody());//            调用client=》return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);} catch (RemotingTimeoutException e) {response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT);response.setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));return response;} catch (Exception e) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));return response;}}

 

进入这个方法org.apache.rocketmq.broker.client.ConsumerManager#findChannel个护具clientId找到channelorm

 

public ClientChannelInfo findChannel(final String group, final String clientId) {//        获取组的消费组信息ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (consumerGroupInfo != null) {//            按clineId从消费组信息中获取client渠道信息return consumerGroupInfo.findChannel(clientId);}return null;}

 

进入这个方法org.apache.rocketmq.broker.client.net.Broker2Client#callClient调用clientblog

 

public RemotingCommand callClient(final Channel channel,final RemotingCommand request) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {//        =》return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);}

 

进入这个方法

org.apache.rocketmq.remoting.netty.NettyRemotingServer#invokeSync同步执行请求

@Overridepublic void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);}

 

这里前面介绍过了

往上返回到这个方法

org.apache.rocketmq.broker.processor.AdminBrokerProcessor#consumeMessageDirectly结束

 

说在最后

本次解析仅表明我的观点,仅供参考。

 

加入技术微信群

钉钉技术群

相关文章
相关标签/搜索