说在前面apache
管理请求 GET_CONSUMER_RUNNING_INFO 查询消费者运行时信息缓存
源码解析微信
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getConsumerRunningInfoide
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final GetConsumerRunningInfoRequestHeader requestHeader = (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); // =》 return this.callConsumer(RequestCode.GET_CONSUMER_RUNNING_INFO, request, requestHeader.getConsumerGroup(), requestHeader.getClientId()); }
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#callConsumerthis
private RemotingCommand callConsumer( final int requestCode, final RemotingCommand request, final String consumerGroup, final String clientId) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); // 按消费组和clentId查询client的渠道信息 =》 ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId); if (null == clientChannelInfo) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("The Consumer <%s> <%s> not online", consumerGroup, clientId)); return response; } if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", clientId, MQVersion.getVersionDesc(clientChannelInfo.getVersion()))); return response; } try { RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null); newRequest.setExtFields(request.getExtFields()); newRequest.setBody(request.getBody()); // 调用client=》 return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest); } catch (RemotingTimeoutException e) { response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT); response .setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e))); return response; } catch (Exception e) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark( String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e))); return response; } }
进入这个方法org.apache.rocketmq.broker.client.ConsumerManager#findChannel根据consumerGroup、clientId查询client渠道信息.net
public ClientChannelInfo findChannel(final String group, final String clientId) { // 获取组的消费组信息 ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (consumerGroupInfo != null) { // 按clineId从消费组信息中获取client渠道信息 return consumerGroupInfo.findChannel(clientId); } return null; }
进入这个方法org.apache.rocketmq.broker.client.net.Broker2Client#callClient rpc调用netty
public RemotingCommand callClient(final Channel channel, final RemotingCommand request ) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException { // =》 return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000); }
同步请求,进入这个方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeSyncImpl 以前介绍过了code
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); // 缓存正在进行的响应 this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); // 响应解析完毕会解除countDownLatch的阻塞 =》 responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); // 这里用countDownLatch实现的阻塞 =》 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }
往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getConsumerRunningInfo结束orm
说在最后blog
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群