说在前面apache
管理请求 GET_ALL_DELAY_OFFSET 延迟的offset缓存
源码解析微信
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getAllDelayOffset获取延迟的offsetapp
private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); // 消息编码=》 String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode(); if (content != null && content.length() > 0) { try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { log.error("get all delay offset from master error.", e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UnsupportedEncodingException " + e); return response; } } else { log.error("No delay offset in this broker, client: {} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No delay offset in this broker"); return response; } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
进入这个方法org.apache.rocketmq.store.schedule.ScheduleMessageService#encode(boolean) 消息编码this
public String encode(final boolean prettyFormat) { DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new DelayOffsetSerializeWrapper(); delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable); return delayOffsetSerializeWrapper.toJson(prettyFormat); }
延迟消息offset内存存储位置编码
// delay的offset缓存 private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = new ConcurrentHashMap<Integer, Long>(32);
往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getAllDelayOffset结束code
说在最后orm
本次解析仅表明我的观点,仅供参考。blog
加入技术微信群内存
钉钉技术群