rocketmq源码解析client管理取消注册client

说在前面apache

client管理 取消注册client缓存

 

源码解析微信

进入这个方法,取消注册client,org.apache.rocketmq.broker.processor.ClientManageProcessor.unregisterClient(ChannelHandlerContext, RemotingCommand)this



public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);final UnregisterClientRequestHeader requestHeader =(UnregisterClientRequestHeader) request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(),requestHeader.getClientID(),request.getLanguage(),request.getVersion());{final String group = requestHeader.getProducerGroup();if (group != null) {//                取消注册生产者=》this.brokerController.getProducerManager().unregisterProducer(group, clientChannelInfo);}}{final String group = requestHeader.getConsumerGroup();if (group != null) {//                获取消费组的订阅配置信息=》SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);//                消费组订阅信息发生改变了是否须要通知消费者,默认是须要通知boolean isNotifyConsumerIdsChangedEnable = true;if (null != subscriptionGroupConfig) {isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();}//                取消注册消费者=》this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);}}response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;    }

进入这个方法,取消注册生产者,org.apache.rocketmq.broker.client.ProducerManager.unregisterProducer(String, ClientChannelInfo)code

public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {try {if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {//                    获取消费组的client的channel信息HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);if (null != channelTable && !channelTable.isEmpty()) {//                        删除channel信息ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());if (old != null) {log.info("unregister a producer[{}] from groupChannelTable {}", group,clientChannelInfo.toString());}if (channelTable.isEmpty()) {//                            删除消费组的channel信息this.groupChannelTable.remove(group);log.info("unregister a producer group[{}] from groupChannelTable", group);}}} finally {this.groupChannelLock.unlock();}} else {log.warn("ProducerManager unregisterProducer lock timeout");}} catch (InterruptedException e) {log.error("", e);}    }

进入这个方法,获取消费组的订阅配置信息,org.apache.rocketmq.broker.subscription.SubscriptionGroupManager.findSubscriptionGroupConfig(String)blog

public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {//        从缓存中获取组的订阅信息SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);if (null == subscriptionGroupConfig) {//            自动建立消费组或者是系统自用的消费组if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(group);SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);if (null == preConfig) {log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());}//                更新数据的版本号this.dataVersion.nextVersion();//                持久化=》this.persist();}}return subscriptionGroupConfig;    }

往上返回进入到这个方法,取消注册消费者,org.apache.rocketmq.broker.client.ConsumerManager.unregisterConsumer(String, ClientChannelInfo, boolean)事件

public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,boolean isNotifyConsumerIdsChangedEnable) {ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (null != consumerGroupInfo) {//            取消注册channel信息=》consumerGroupInfo.unregisterChannel(clientChannelInfo);if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {//                删除组的消费组信息ConsumerGroupInfo remove = this.consumerTable.remove(group);if (remove != null) {log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);//                    消费者改变监听器执行取消注册事件=》this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);}}//            消费组的订阅信息发生改变了是否须要通知消费者,默认是if (isNotifyConsumerIdsChangedEnable) {//                执行消费组改变事件=》this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());}}    }

进入这个方法,取消注册channel信息,org.apache.rocketmq.broker.client.ConsumerGroupInfo.unregisterChannel(ClientChannelInfo)ip

public void unregisterChannel(final ClientChannelInfo clientChannelInfo) {ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel());if (old != null) {log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString());}    }

进入这个方法,消费者改变监听器执行取消注册事件,org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener.handle(ConsumerGroupEvent, String, Object...) 这个方法前面介绍过了。rem

 

进入这个方法,执行消费组改变事件,org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener.handle(ConsumerGroupEvent, String, Object...) 这个方法前面介绍你过了。get

 

往上返回到这个方法,org.apache.rocketmq.broker.processor.ClientManageProcessor.unregisterClient(ChannelHandlerContext, RemotingCommand)结束。

 

说在最后

本次解析仅表明我的观点,仅供参考。

 

加入技术微信群

钉钉技术群

相关文章
相关标签/搜索