说在前面apache
管理请求 CONSUME_MESSAGE_DIRECTLY 直接消费消息微信
源码解析app
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#consumeMessageDirectlyide
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);request.getExtFields().put("brokerName", this.brokerController.getBrokerConfig().getBrokerName());SelectMappedBufferResult selectMappedBufferResult = null;try {MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId());// 从commit的offset位置获取一条消息=》selectMappedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(messageId.getOffset());byte[] body = new byte[selectMappedBufferResult.getSize()];selectMappedBufferResult.getByteBuffer().get(body);request.setBody(body);} catch (UnknownHostException e) {} finally {if (selectMappedBufferResult != null) {selectMappedBufferResult.release();}}// =》return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, request, requestHeader.getConsumerGroup(),requestHeader.getClientId());}
进入这个方法org.apache.rocketmq.store.DefaultMessageStore#selectOneMessageByOffset(long) 根据offset从commitLog中读取消息this
@Overridepublic SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {// SelectMappedBufferResult中存储的是消息offset、建立时间=》SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);if (null != sbr) {try {// 1 TOTALSIZEint size = sbr.getByteBuffer().getInt();// =》return this.commitLog.getMessage(commitLogOffset, size);} finally {sbr.release();}}return null;}
进入这个方法org.apache.rocketmq.store.CommitLog#getMessage查询消息.net
public SelectMappedBufferResult getMessage(final long offset, final int size) {int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();// 根据offset找到映射文件 =》MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);if (mappedFile != null) {int pos = (int) (offset % mappedFileSize);return mappedFile.selectMappedBuffer(pos, size);}return null;}
进入这个方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)根据offset查询映射文件netty
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {// 获取队列中第一个映射文件MappedFile firstMappedFile = this.getFirstMappedFile();// 获取队列中最后一个映射文件MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {// 若是offset不在索引文件的offset范围内if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {// 找到映射文件在队列中的索引位置int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {// 获取索引文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}// offset在目标文件的起始offset和结束offset范围内if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}// 若是按索引在队列中找不到映射文件就遍历队列查找映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}// 若是offset=0获取队列中第一个映射文件,我的感受这个逻辑是否放在前面判断更为合理,仍是放在这里另有深意if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null;}
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#callConsumer调用消费者code
private RemotingCommand callConsumer(final int requestCode,final RemotingCommand request,final String consumerGroup,final String clientId) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 按消费组和clentId查询client的渠道信息 =》ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);if (null == clientChannelInfo) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("The Consumer <%s> <%s> not online", consumerGroup, clientId));return response;}if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT",clientId,MQVersion.getVersionDesc(clientChannelInfo.getVersion())));return response;}try {RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null);newRequest.setExtFields(request.getExtFields());newRequest.setBody(request.getBody());// 调用client=》return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);} catch (RemotingTimeoutException e) {response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT);response.setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));return response;} catch (Exception e) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));return response;}}
进入这个方法org.apache.rocketmq.broker.client.ConsumerManager#findChannel个护具clientId找到channelorm
public ClientChannelInfo findChannel(final String group, final String clientId) {// 获取组的消费组信息ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (consumerGroupInfo != null) {// 按clineId从消费组信息中获取client渠道信息return consumerGroupInfo.findChannel(clientId);}return null;}
进入这个方法org.apache.rocketmq.broker.client.net.Broker2Client#callClient调用clientblog
public RemotingCommand callClient(final Channel channel,final RemotingCommand request) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {// =》return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);}
进入这个方法
org.apache.rocketmq.remoting.netty.NettyRemotingServer#invokeSync同步执行请求
@Overridepublic void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);}
这里前面介绍过了
往上返回到这个方法
org.apache.rocketmq.broker.processor.AdminBrokerProcessor#consumeMessageDirectly结束
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群