说在前面apache
管理请求 UNLOCK_BATCH_MQ 批量解锁消息队列缓存
源码解析微信
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#unlockBatchMQ批量解锁消息队列this
private RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class); // =》 this.brokerController.getRebalanceLockManager().unlockBatch( requestBody.getConsumerGroup(), requestBody.getMqSet(), requestBody.getClientId()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
进入这个方法org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager#unlockBatch按消费组、消息队列集合、clientId解锁消息队列code
public void unlockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) { try { this.lock.lockInterruptibly(); try { // 从缓存中获取锁定的消息队列 ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); if (null != groupValue) { for (MessageQueue mq : mqs) { LockEntry lockEntry = groupValue.get(mq); if (null != lockEntry) { if (lockEntry.getClientId().equals(clientId)) { groupValue.remove(mq); log.info("unlockBatch, Group: {} {} {}", group, mq, clientId); } else { log.warn("unlockBatch, but mq locked by other client: {}, Group: {} {} {}", lockEntry.getClientId(), group, mq, clientId); } } else { log.warn("unlockBatch, but mq not locked, Group: {} {} {}", group, mq, clientId); } } } else { log.warn("unlockBatch, group not exist, Group: {} {}", group, clientId); } } finally { this.lock.unlock(); } } catch (InterruptedException e) { log.error("putMessage exception", e); } }
往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#unlockBatchMQ结束blog
说在最后队列
本次解析仅表明我的观点,仅供参考。rem
加入技术微信群get
钉钉技术群源码