说在前面apache
管理请求 GET_ALL_CONSUMER_OFFSET 获取全部消费者的offsetjson
源码解析微信
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getAllConsumerOffset获取全部消费者的offsetthis
private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); // 消费者offset json编码 String content = this.brokerController.getConsumerOffsetManager().encode(); if (content != null && content.length() > 0) { try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { log.error("get all consumer offset from master error.", e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UnsupportedEncodingException " + e); return response; } } else { log.error("No consumer offset in this broker, client: {} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No consumer offset in this broker"); return response; } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
进入这个方法org.apache.rocketmq.broker.offset.ConsumerOffsetManager#encode(boolean)编码
public String encode(final boolean prettyFormat) { return RemotingSerializable.toJson(this, prettyFormat); }
消费者offset内存存储位置3d
// 消费者offset private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer/*queueId*/, Long/*offset*/>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getAllConsumerOffset结束code
说在最后orm
本次解析仅表明我的观点,仅供参考。blog
加入技术微信群内存
钉钉技术群