说在前面apache
管理请求 LOCK_BATCH_MQ 批量锁定消息队列缓存
源码解析微信
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#lockBatchMQ批量锁定消息队列this
private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class); // 获取可锁定的消息集合=》 Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch( requestBody.getConsumerGroup(), requestBody.getMqSet(), requestBody.getClientId()); LockBatchResponseBody responseBody = new LockBatchResponseBody(); responseBody.setLockOKMQSet(lockOKMQSet); response.setBody(responseBody.encode()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
进入这个方法org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager#tryLockBatch按消费组、消息队列集合、clientId锁定消息队列code
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) { Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size()); Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size()); for (MessageQueue mq : mqs) { // 当前消息队列是否锁定=》 if (this.isLocked(group, mq, clientId)) { lockedMqs.add(mq); } else { notLockedMqs.add(mq); } } if (!notLockedMqs.isEmpty()) { try { this.lock.lockInterruptibly(); try { ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); // 若是这个组的消息队列锁已释放 if (null == groupValue) { groupValue = new ConcurrentHashMap<>(32); this.mqLockTable.put(group, groupValue); } for (MessageQueue mq : notLockedMqs) { LockEntry lockEntry = groupValue.get(mq); if (null == lockEntry) { lockEntry = new LockEntry(); lockEntry.setClientId(clientId); groupValue.put(mq, lockEntry); log.info( "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", group, clientId, mq); } if (lockEntry.isLocked(clientId)) { lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); lockedMqs.add(mq); continue; } String oldClientId = lockEntry.getClientId(); if (lockEntry.isExpired()) { lockEntry.setClientId(clientId); lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); log.warn( "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", group, oldClientId, clientId, mq); lockedMqs.add(mq); continue; } log.warn( "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", group, oldClientId, clientId, mq); } } finally { this.lock.unlock(); } } catch (InterruptedException e) { log.error("putMessage exception", e); } } return lockedMqs; }
进入这个方法org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager#isLocked按消费组、消息队列、clientId判断消息队列是否是已锁定blog
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) { // 按组从缓存中查找到消息队列集合 ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); if (groupValue != null) { // 找到消息队列的锁信息 LockEntry lockEntry = groupValue.get(mq); if (lockEntry != null) { // 若是仍是锁状态 boolean locked = lockEntry.isLocked(clientId); if (locked) { lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); } return locked; } } return false; }
往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#lockBatchMQ结束队列
说在最后get
本次解析仅表明我的观点,仅供参考。源码
加入技术微信群消息队列
钉钉技术群