说在前面apache
管理请求 CLEAN_UNUSED_TOPIC 清除无用的topic缓存
源码解析微信
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cleanUnusedTopic并发
public RemotingCommand cleanUnusedTopic() { log.warn("invoke cleanUnusedTopic start."); final RemotingCommand response = RemotingCommand.createResponseCommand(null); // =》 brokerController.getMessageStore().cleanUnusedTopic(brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); log.warn("invoke cleanUnusedTopic end."); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
进入这个方法org.apache.rocketmq.store.DefaultMessageStore#cleanUnusedTopic清除无用的topicapp
@Override public int cleanUnusedTopic(Set<String> topics) { // 遍历缓存的消息队列 Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next(); String topic = next.getKey(); if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue(); for (ConsumeQueue cq : queueTable.values()) { // 消费队列销毁=》 cq.destroy(); log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", cq.getTopic(), cq.getQueueId() ); // 删除消息队列的offset=》 this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId()); } // topic所在的消息队列删除完毕后消费队列所在的集合元素删除 it.remove(); log.info("cleanUnusedTopic: {},topic destroyed", topic); } } return 0; }
进入这个方法org.apache.rocketmq.store.ConsumeQueue#destroy销毁消费队列ide
public void destroy() { this.maxPhysicOffset = -1; this.minLogicOffset = 0; // 映射文件队列销毁=》 this.mappedFileQueue.destroy(); if (isExtReadEnable()) { // 消费队列销毁=》 this.consumeQueueExt.destroy(); } }
进入这个方法org.apache.rocketmq.store.MappedFileQueue#destroy映射队列文件销毁性能
public void destroy() { for (MappedFile mf : this.mappedFiles) { // 映射文件销毁=》 mf.destroy(1000 * 3); } // 同步删除映射文件队列 this.mappedFiles.clear(); this.flushedWhere = 0; // delete parent directory 删除父级文件夹 File file = new File(storePath); if (file.isDirectory()) { file.delete(); } }
进入这个方法org.apache.rocketmq.store.MappedFile#destroy映射文件销毁this
public boolean destroy(final long intervalForcibly) { // =》 this.shutdown(intervalForcibly); if (this.isCleanupOver()) { try { // 关闭文件channel this.fileChannel.close(); log.info("close file channel " + this.fileName + " OK"); long beginTime = System.currentTimeMillis(); // 删除文件 boolean result = this.file.delete(); log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:" + this.getFlushedPosition() + ", " + UtilAll.computeEclipseTimeMilliseconds(beginTime)); } catch (Exception e) { log.warn("close file channel " + this.fileName + " Failed. ", e); } return true; } else { log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName + " Failed. cleanupOver: " + this.cleanupOver); } return false; }
往上返回到这个方法org.apache.rocketmq.store.ConsumeQueueExt#destroy消费队列销毁blog
public void destroy() { this.mappedFileQueue.destroy(); }
往上返回到这个方法org.apache.rocketmq.store.CommitLog#removeQueueFromTopicQueueTable按topic、queueId删除消费队列队列
public void removeQueueFromTopicQueueTable(final String topic, final int queueId) { String key = topic + "-" + queueId; // 这个地方同步用synchronized有点疑问,若是并发量比较小,synchronized性能也能够的,可是并发量达到必定量级lock或者其余无锁实现 // 应该会好一点,难道消息队列过时这种状况出现过时未消费的几率较低 synchronized (this) { this.topicQueueTable.remove(key); } log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId); }
往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cleanUnusedTopic结束
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群