rocketmq源码解析消费者管理处理器①

说在前面apache

消费者管理处理器,按消费组获取全部的消费者微信

 

源码解析this

进入这个方法,按消费组获取全部的消费者,org.apache.rocketmq.broker.processor.ConsumerManageProcessor#getConsumerListByGroupcode


public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);final GetConsumerListByGroupRequestHeader requestHeader =(GetConsumerListByGroupRequestHeader) request.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);//        获取消费组信息ConsumerGroupInfo consumerGroupInfo =this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());if (consumerGroupInfo != null) {//            获取全部的client=》List<String> clientIds = consumerGroupInfo.getAllClientId();if (!clientIds.isEmpty()) {GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();body.setConsumerIdList(clientIds);response.setBody(body.encode());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;} else {log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}} else {log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());return response;    }

进入这个方法,获取全部的client,org.apache.rocketmq.broker.client.ConsumerGroupInfo#getAllClientIdblog



//public List<String> getAllClientId() {List<String> result = new ArrayList<>();Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();while (it.hasNext()) {Entry<Channel, ClientChannelInfo> entry = it.next();ClientChannelInfo clientChannelInfo = entry.getValue();result.add(clientChannelInfo.getClientId());}return result;    }

往上返回到这个方法,org.apache.rocketmq.broker.processor.ConsumerManageProcessor#getConsumerListByGroup结束。get

 

说在最后源码

本次解析仅表明我的观点,仅供参考。it

 

加入技术微信群io

钉钉技术群class

相关文章
相关标签/搜索