说在前面apache
管理请求 GET_PRODUCER_CONNECTION_LIST 获取生产者链接信息缓存
源码解析微信
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getProducerConnectionList 获取生产者链接信息this
private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetProducerConnectionListRequestHeader requestHeader = (GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class); // 构建生产者链接信息 ProducerConnection bodydata = new ProducerConnection(); HashMap<Channel, ClientChannelInfo> channelInfoHashMap = // 按生产组从缓存中获取生产者链接列表信息=》 this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup()); if (channelInfoHashMap != null) { // 遍历channelInfo信息 Iterator<Map.Entry<Channel, ClientChannelInfo>> it = channelInfoHashMap.entrySet().iterator(); while (it.hasNext()) { ClientChannelInfo info = it.next().getValue(); // 构建生产者链接 Connection connection = new Connection(); connection.setClientId(info.getClientId()); connection.setLanguage(info.getLanguage()); connection.setVersion(info.getVersion()); connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel())); bodydata.getConnectionSet().add(connection); } byte[] body = bodydata.encode(); response.setBody(body); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("the producer group[" + requestHeader.getProducerGroup() + "] not exist"); return response; }
进入这个方法org.apache.rocketmq.broker.client.ProducerManager#getGroupChannelTable 获取生产组的channel信息集合,再从生产组信息集合中按生产组名称获取该生产组的channel信息code
public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() { HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable = new HashMap<String, HashMap<Channel, ClientChannelInfo>>(); try { if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { newGroupChannelTable.putAll(groupChannelTable); } finally { groupChannelLock.unlock(); } } } catch (InterruptedException e) { log.error("", e); } return newGroupChannelTable; }
结束blog
说在最后get
本次解析仅表明我的观点,仅供参考。源码
加入技术微信群it
钉钉技术群io