说在前面apache
client管理 取消注册client缓存
源码解析微信
进入这个方法,取消注册client,org.apache.rocketmq.broker.processor.ClientManageProcessor.unregisterClient(ChannelHandlerContext, RemotingCommand)this
public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);final UnregisterClientRequestHeader requestHeader =(UnregisterClientRequestHeader) request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(),requestHeader.getClientID(),request.getLanguage(),request.getVersion());{final String group = requestHeader.getProducerGroup();if (group != null) {// 取消注册生产者=》this.brokerController.getProducerManager().unregisterProducer(group, clientChannelInfo);}}{final String group = requestHeader.getConsumerGroup();if (group != null) {// 获取消费组的订阅配置信息=》SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);// 消费组订阅信息发生改变了是否须要通知消费者,默认是须要通知boolean isNotifyConsumerIdsChangedEnable = true;if (null != subscriptionGroupConfig) {isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();}// 取消注册消费者=》this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);}}response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
进入这个方法,取消注册生产者,org.apache.rocketmq.broker.client.ProducerManager.unregisterProducer(String, ClientChannelInfo)code
public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {try {if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {// 获取消费组的client的channel信息HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);if (null != channelTable && !channelTable.isEmpty()) {// 删除channel信息ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());if (old != null) {log.info("unregister a producer[{}] from groupChannelTable {}", group,clientChannelInfo.toString());}if (channelTable.isEmpty()) {// 删除消费组的channel信息this.groupChannelTable.remove(group);log.info("unregister a producer group[{}] from groupChannelTable", group);}}} finally {this.groupChannelLock.unlock();}} else {log.warn("ProducerManager unregisterProducer lock timeout");}} catch (InterruptedException e) {log.error("", e);} }
进入这个方法,获取消费组的订阅配置信息,org.apache.rocketmq.broker.subscription.SubscriptionGroupManager.findSubscriptionGroupConfig(String)blog
public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {// 从缓存中获取组的订阅信息SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);if (null == subscriptionGroupConfig) {// 自动建立消费组或者是系统自用的消费组if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(group);SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);if (null == preConfig) {log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());}// 更新数据的版本号this.dataVersion.nextVersion();// 持久化=》this.persist();}}return subscriptionGroupConfig; }
往上返回进入到这个方法,取消注册消费者,org.apache.rocketmq.broker.client.ConsumerManager.unregisterConsumer(String, ClientChannelInfo, boolean)事件
public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,boolean isNotifyConsumerIdsChangedEnable) {ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (null != consumerGroupInfo) {// 取消注册channel信息=》consumerGroupInfo.unregisterChannel(clientChannelInfo);if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {// 删除组的消费组信息ConsumerGroupInfo remove = this.consumerTable.remove(group);if (remove != null) {log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);// 消费者改变监听器执行取消注册事件=》this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);}}// 消费组的订阅信息发生改变了是否须要通知消费者,默认是if (isNotifyConsumerIdsChangedEnable) {// 执行消费组改变事件=》this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());}} }
进入这个方法,取消注册channel信息,org.apache.rocketmq.broker.client.ConsumerGroupInfo.unregisterChannel(ClientChannelInfo)ip
public void unregisterChannel(final ClientChannelInfo clientChannelInfo) {ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel());if (old != null) {log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString());} }
进入这个方法,消费者改变监听器执行取消注册事件,org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener.handle(ConsumerGroupEvent, String, Object...) 这个方法前面介绍过了。rem
进入这个方法,执行消费组改变事件,org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener.handle(ConsumerGroupEvent, String, Object...) 这个方法前面介绍你过了。get
往上返回到这个方法,org.apache.rocketmq.broker.processor.ClientManageProcessor.unregisterClient(ChannelHandlerContext, RemotingCommand)结束。
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群