rocketmq源码解析broker启动②

说在前面apache

broker启动微信

 

源码解析app

进入方法,加载消费队列,org.apache.rocketmq.store.DefaultMessageStore#loadConsumeQueue分布式




private boolean loadConsumeQueue() {//        System.getProperty("user.home") + File.separator + "store" File.separator + "consumequeue"File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));File[] fileTopicList = dirLogic.listFiles();if (fileTopicList != null) {for (File fileTopic : fileTopicList) {//                文件名是topic名String topic = fileTopic.getName();File[] fileQueueIdList = fileTopic.listFiles();if (fileQueueIdList != null) {for (File fileQueueId : fileQueueIdList) {int queueId;try {queueId = Integer.parseInt(fileQueueId.getName());} catch (NumberFormatException e) {continue;}ConsumeQueue logic = new ConsumeQueue(topic,queueId,StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//                            消费队列文件默认大小30wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),//                            存储消息队列=》this);this.putConsumeQueue(topic, queueId, logic);//                        消息队列加载=》if (!logic.load()) {return false;}}}}}log.info("load logics queue all over, OK");return true;    }

进入方法,存储消息队列,org.apache.rocketmq.store.DefaultMessageStore#putConsumeQueueide

private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) {ConcurrentMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic);if (null == map) {map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>();map.put(queueId, consumeQueue);this.consumeQueueTable.put(topic, map);} else {map.put(queueId, consumeQueue);}    }

返回方法,消息队列加载,org.apache.rocketmq.store.ConsumeQueue#loadthis

public boolean load() {//        =》boolean result = this.mappedFileQueue.load();log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));if (isExtReadEnable()) {//            消息队列扩展加载=》result &= this.consumeQueueExt.load();}return result;    }

进入方法,org.apache.rocketmq.store.MappedFileQueue#load介绍过了。spa

返回方法,消息队列扩展加载,org.apache.rocketmq.store.ConsumeQueueExt#loadcode

public boolean load() {boolean result = this.mappedFileQueue.load();log.info("load consume queue extend" + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));return result;    }

进入方法,org.apache.rocketmq.store.MappedFileQueue#load介绍过了。orm

进入方法,恢复,org.apache.rocketmq.store.DefaultMessageStore#recoverblog


private void recover(final boolean lastExitOK) {//        恢复消息队列=》this.recoverConsumeQueue();if (lastExitOK) {//            正常恢复commitLog=》this.commitLog.recoverNormally();} else {//            异常恢复commitLog=》this.commitLog.recoverAbnormally();}//        恢复topicQueue信息=》this.recoverTopicQueueTable();    }

进入方法,恢复消息队列,org.apache.rocketmq.store.DefaultMessageStore#recoverConsumeQueue

private void recoverConsumeQueue() {for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {for (ConsumeQueue logic : maps.values()) {//                恢复单个消息队列=》logic.recover();}}    }

进入方法,恢复单个消息队列,org.apache.rocketmq.store.ConsumeQueue#recover







public void recover() {final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {int index = mappedFiles.size() - 3;if (index < 0)index = 0;int mappedFileSizeLogics = this.mappedFileSize;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();//            映射文件处理的offsetlong processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;long maxExtAddr = 1;while (true) {for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {long offset = byteBuffer.getLong();int size = byteBuffer.getInt();long tagsCode = byteBuffer.getLong();if (offset >= 0 && size > 0) {mappedFileOffset = i + CQ_STORE_UNIT_SIZE;this.maxPhysicOffset = offset;if (isExtAddr(tagsCode)) {maxExtAddr = tagsCode;}} else {log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "+ offset + " " + size + " " + tagsCode);break;}}if (mappedFileOffset == mappedFileSizeLogics) {index++;if (index >= mappedFiles.size()) {log.info("recover last consume queue file over, last mapped file "+ mappedFile.getFileName());break;} else {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("recover next consume queue file, " + mappedFile.getFileName());}} else {log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "+ (processOffset + mappedFileOffset));break;}}processOffset += mappedFileOffset;//            设置刷新的offset位置this.mappedFileQueue.setFlushedWhere(processOffset);//            设置commit的offset位置this.mappedFileQueue.setCommittedWhere(processOffset);//            按处理offset删除脏数据文件=》this.mappedFileQueue.truncateDirtyFiles(processOffset);if (isExtReadEnable()) {//                恢复扩展消息=》this.consumeQueueExt.recover();log.info("Truncate consume queue extend file by max {}", maxExtAddr);//                映射文件队列删除最大offset的脏数据文件=》this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);}}    }

进入方法,按处理offset删除脏数据文件,org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles


public void truncateDirtyFiles(long offset) {List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();for (MappedFile file : this.mappedFiles) {//            文件尾offsetlong fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;//            文件尾offset大于处理的offsetif (fileTailOffset > offset) {//                处理offset大于文件开始的offsetif (offset >= file.getFileFromOffset()) {//                    设置映射文件写的位置file.setWrotePosition((int) (offset % this.mappedFileSize));//                    设置文件commit的位置file.setCommittedPosition((int) (offset % this.mappedFileSize));//                    设置文件刷新的位置file.setFlushedPosition((int) (offset % this.mappedFileSize));} else {//                    文件销毁=》file.destroy(1000);willRemoveFiles.add(file);}}}//        删除映射文件队列中的文件=》this.deleteExpiredFile(willRemoveFiles);    }

进入方法,删除映射文件队列中的文件,org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFile



void deleteExpiredFile(List<MappedFile> files) {if (!files.isEmpty()) {Iterator<MappedFile> iterator = files.iterator();while (iterator.hasNext()) {MappedFile cur = iterator.next();if (!this.mappedFiles.contains(cur)) {iterator.remove();log.info("This mappedFile {} is not contained by mappedFiles, so skip it.", cur.getFileName());}}try {if (!this.mappedFiles.removeAll(files)) {log.error("deleteExpiredFile remove failed.");}} catch (Exception e) {log.error("deleteExpiredFile has exception.", e);}}    }

返回方法,恢复扩展消息,org.apache.rocketmq.store.ConsumeQueueExt#recover






public void recover() {final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (mappedFiles == null || mappedFiles.isEmpty()) {return;}// load all files, consume queue will truncate extend files.int index = 0;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;CqExtUnit extUnit = new CqExtUnit();while (true) {extUnit.readBySkip(byteBuffer);// check whether write sth.if (extUnit.getSize() > 0) {mappedFileOffset += extUnit.getSize();continue;}index++;if (index < mappedFiles.size()) {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("Recover next consume queue extend file, " + mappedFile.getFileName());continue;}log.info("All files of consume queue extend has been recovered over, last mapped file "+ mappedFile.getFileName());break;}processOffset += mappedFileOffset;//       映射文件队列设置刷新的offset位置this.mappedFileQueue.setFlushedWhere(processOffset);//        映射文件队列设置offset commit的位置this.mappedFileQueue.setCommittedWhere(processOffset);//        映射文件队列按处理offset删除脏数据文件=》this.mappedFileQueue.truncateDirtyFiles(processOffset);    }

进入方法,映射文件队列按处理offset删除脏数据文件,org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles介绍过了。

返回方法,映射文件队列删除最大offset的脏数据文件,org.apache.rocketmq.store.ConsumeQueueExt#truncateByMaxAddress




public void truncateByMaxAddress(final long maxAddress) {if (!isExtAddr(maxAddress)) {return;}log.info("Truncate consume queue ext by max {}.", maxAddress);CqExtUnit cqExtUnit = get(maxAddress);if (cqExtUnit == null) {log.error("[BUG] address {} of consume queue extend not found!", maxAddress);return;}final long realOffset = unDecorate(maxAddress);//        删除脏数据文件=》this.mappedFileQueue.truncateDirtyFiles(realOffset + cqExtUnit.getSize());    }

进入方法,删除脏数据文件,org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles介绍过了。

返回方法,正常恢复commitLog,org.apache.rocketmq.store.CommitLog#recoverNormally


public void recoverNormally() {boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// Began to recover from the last third fileint index = mappedFiles.size() - 3;if (index < 0)index = 0;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;while (true) {//                建立转发请求DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);int size = dispatchRequest.getMsgSize();// Normal dataif (dispatchRequest.isSuccess() && size > 0) {mappedFileOffset += size;}// Come the end of the file, switch to the next file Since the// return 0 representatives met last hole,// this can not be included in truncate offsetelse if (dispatchRequest.isSuccess() && size == 0) {index++;if (index >= mappedFiles.size()) {// Current branch can not happenlog.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());break;} else {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("recover next physics file, " + mappedFile.getFileName());}}// Intermediate file read errorelse if (!dispatchRequest.isSuccess()) {log.info("recover physics file end, " + mappedFile.getFileName());break;}}processOffset += mappedFileOffset;//            设置刷新offsetthis.mappedFileQueue.setFlushedWhere(processOffset);//            设置commit offsetthis.mappedFileQueue.setCommittedWhere(processOffset);//            删除脏数据文件=》this.mappedFileQueue.truncateDirtyFiles(processOffset);}    }

进入方法,删除脏数据文件,org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles介绍过了。

返回方法,异常恢复commitLog,org.apache.rocketmq.store.CommitLog#recoverAbnormally






public void recoverAbnormally() {// recover by the minimum time stampboolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// Looking beginning to recover from which file 从最后一个文件开始恢复int index = mappedFiles.size() - 1;MappedFile mappedFile = null;for (; index >= 0; index--) {mappedFile = mappedFiles.get(index);//                是否从最后一个文件恢复=》if (this.isMappedFileMatchedRecover(mappedFile)) {log.info("recover from this mapped file " + mappedFile.getFileName());break;}}if (index < 0) {index = 0;mappedFile = mappedFiles.get(index);}ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;while (true) {DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);int size = dispatchRequest.getMsgSize();// Normal dataif (size > 0) {mappedFileOffset += size;if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {//                            消息存储转发=》this.defaultMessageStore.doDispatch(dispatchRequest);}} else {//                        =》this.defaultMessageStore.doDispatch(dispatchRequest);}}// Intermediate file read errorelse if (size == -1) {log.info("recover physics file end, " + mappedFile.getFileName());break;}// Come the end of the file, switch to the next file// Since the return 0 representatives met last hole, this can// not be included in truncate offsetelse if (size == 0) {index++;if (index >= mappedFiles.size()) {// The current branch under normal circumstances should// not happenlog.info("recover physics file over, last mapped file " + mappedFile.getFileName());break;} else {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("recover next physics file, " + mappedFile.getFileName());}}}processOffset += mappedFileOffset;//            设置刷新offset位置this.mappedFileQueue.setFlushedWhere(processOffset);//            设置commitOffsetthis.mappedFileQueue.setCommittedWhere(processOffset);//            删除脏数据文件=》this.mappedFileQueue.truncateDirtyFiles(processOffset);// Clear ConsumeQueue redundant data 清除消息队列冗余数据=》this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);}// Commitlog case files are deletedelse {this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);//            销毁消息队列=》this.defaultMessageStore.destroyLogics();}    }

进入方法,是否从最后一个文件恢复,org.apache.rocketmq.store.CommitLog#isMappedFileMatchedRecover




private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);if (magicCode != MESSAGE_MAGIC_CODE) {return false;}//        消息存储时间long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);if (0 == storeTimestamp) {return false;}if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {log.info("find check timestamp, {} {}",storeTimestamp,UtilAll.timeMillisToHumanString(storeTimestamp));return true;}} else {if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {log.info("find check timestamp, {} {}",storeTimestamp,UtilAll.timeMillisToHumanString(storeTimestamp));return true;}}return false;    }

进入方法,删除脏数据文件,org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles介绍过了。

返回方法,清除消息队列冗余数据,org.apache.rocketmq.store.DefaultMessageStore#truncateDirtyLogicFiles

public void truncateDirtyLogicFiles(long phyOffset) {ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {for (ConsumeQueue logic : maps.values()) {//                =》logic.truncateDirtyLogicFiles(phyOffset);}}    }

进入方法,org.apache.rocketmq.store.ConsumeQueue#truncateDirtyLogicFiles

















public void truncateDirtyLogicFiles(long phyOffet) {int logicFileSize = this.mappedFileSize;this.maxPhysicOffset = phyOffet - 1;long maxExtAddr = 1;while (true) {//            获取映射队列中最后的映射文件=》MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();if (mappedFile != null) {ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();mappedFile.setWrotePosition(0);mappedFile.setCommittedPosition(0);mappedFile.setFlushedPosition(0);for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {long offset = byteBuffer.getLong();int size = byteBuffer.getInt();long tagsCode = byteBuffer.getLong();if (0 == i) {if (offset >= phyOffet) {this.mappedFileQueue.deleteLastMappedFile();break;} else {int pos = i + CQ_STORE_UNIT_SIZE;mappedFile.setWrotePosition(pos);mappedFile.setCommittedPosition(pos);mappedFile.setFlushedPosition(pos);this.maxPhysicOffset = offset;// This maybe not take effect, when not every consume queue has extend file.if (isExtAddr(tagsCode)) {maxExtAddr = tagsCode;}}} else {if (offset >= 0 && size > 0) {if (offset >= phyOffet) {return;}int pos = i + CQ_STORE_UNIT_SIZE;mappedFile.setWrotePosition(pos);mappedFile.setCommittedPosition(pos);mappedFile.setFlushedPosition(pos);this.maxPhysicOffset = offset;if (isExtAddr(tagsCode)) {maxExtAddr = tagsCode;}if (pos == logicFileSize) {return;}} else {return;}}}} else {break;}}if (isExtReadEnable()) {//            删除最大位置的消息队列=》this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);}}public long getLastOffset() {long lastOffset = -1;int logicFileSize = this.mappedFileSize;//        获取映射文件队列中最后一个映射文件=》MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();if (mappedFile != null) {//            有点疑问,这里是获取的commitOffset吗int position = mappedFile.getWrotePosition() - CQ_STORE_UNIT_SIZE;if (position < 0)position = 0;ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();byteBuffer.position(position);for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {long offset = byteBuffer.getLong();int size = byteBuffer.getInt();byteBuffer.getLong();if (offset >= 0 && size > 0) {lastOffset = offset + size;} else {break;}}}return lastOffset;    }

返回方法,销毁消息队列,org.apache.rocketmq.store.DefaultMessageStore#destroyLogics

public void destroyLogics() {for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {for (ConsumeQueue logic : maps.values()) {//                =》logic.destroy();}}    }

进入方法,org.apache.rocketmq.store.ConsumeQueue#destroy

public void destroy() {this.maxPhysicOffset = -1;this.minLogicOffset = 0;//        映射文件队列销毁=》this.mappedFileQueue.destroy();if (isExtReadEnable()) {//            消费队列销毁=》this.consumeQueueExt.destroy();}    }

进入方法,映射文件队列销毁,org.apache.rocketmq.store.MappedFileQueue#destroy

public void destroy() {for (MappedFile mf : this.mappedFiles) {//        映射文件销毁=》mf.destroy(1000 * 3);}//        同步删除映射文件队列this.mappedFiles.clear();this.flushedWhere = 0;// delete parent directory 删除父级文件夹File file = new File(storePath);if (file.isDirectory()) {file.delete();}    }

返回方法,恢复topicQueue信息,org.apache.rocketmq.store.DefaultMessageStore#recoverTopicQueueTable

private void recoverTopicQueueTable() {HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);//        获取最小的物理offset=》long minPhyOffset = this.commitLog.getMinOffset();for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {for (ConsumeQueue logic : maps.values()) {String key = logic.getTopic() + "-" + logic.getQueueId();//                设置消息队列的最大offset=》table.put(key, logic.getMaxOffsetInQueue());//               设置正确的最小的物理offset=》logic.correctMinOffset(minPhyOffset);}}this.commitLog.setTopicQueueTable(table);    }

进入方法,获取最小的物理offset,org.apache.rocketmq.store.CommitLog#getMinOffset

public long getMinOffset() {//        获取第一个映射文件=》MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();if (mappedFile != null) {if (mappedFile.isAvailable()) {//                获取映射文件的起始偏移量return mappedFile.getFileFromOffset();} else {//                获取下个文件的起始偏移量=》return this.rollNextFile(mappedFile.getFileFromOffset());}}return -1;    }

进入方法,获取下个文件的起始偏移量,org.apache.rocketmq.store.CommitLog#rollNextFile

public long rollNextFile(final long offset) {int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();return offset + mappedFileSize - offset % mappedFileSize;    }

进入方法,设置消息队列的最大offset,org.apache.rocketmq.store.ConsumeQueue#getMaxOffsetInQueue

public long getMaxOffsetInQueue() {//        =》return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;    }

进入方法,org.apache.rocketmq.store.MappedFileQueue#getMaxOffset

public long getMaxOffset() {//        获取存储映射文件队列中索引位置最大的映射文件=》MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {//            映射文件的起始offset+映射文件的可读取的索引位置return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();}//        若是队列中没有存储映射文件直接返回0return 0;    }

进入方法,获取存储映射文件队列中索引位置最大的映射文件,org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile()


public MappedFile getLastMappedFile() {MappedFile mappedFileLast = null;while (!this.mappedFiles.isEmpty()) {try {mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getLastMappedFile has exception.", e);break;}}return mappedFileLast;    }

返回方法,设置正确的最小的物理offset,org.apache.rocketmq.store.ConsumeQueue#correctMinOffset


public void correctMinOffset(long phyMinOffset) {//        找到队列中一个映射文件=》MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();long minExtAddr = 1;if (mappedFile != null) {//           获取映射文件的bufferSelectMappedBufferResult result = mappedFile.selectMappedBuffer(0);if (result != null) {try {for (int i = 0; i < result.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {long offsetPy = result.getByteBuffer().getLong();result.getByteBuffer().getInt();long tagsCode = result.getByteBuffer().getLong();if (offsetPy >= phyMinOffset) {this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;log.info("Compute logical min offset: {}, topic: {}, queueId: {}",this.getMinOffsetInQueue(), this.topic, this.queueId);// This maybe not take effect, when not every consume queue has extend file.if (isExtAddr(tagsCode)) {minExtAddr = tagsCode;}break;}}} catch (Exception e) {log.error("Exception thrown when correctMinOffset", e);} finally {result.release();}}}if (isExtReadEnable()) {//            删除最小位置的扩展消息=》this.consumeQueueExt.truncateByMinAddress(minExtAddr);}    }

进入方法,删除最小位置的扩展消息,org.apache.rocketmq.store.ConsumeQueueExt#truncateByMinAddress






public void truncateByMinAddress(final long minAddress) {if (!isExtAddr(minAddress)) {return;}log.info("Truncate consume queue ext by min {}.", minAddress);List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();final long realOffset = unDecorate(minAddress);for (MappedFile file : mappedFiles) {//            文件尾offsetlong fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;//            文件尾offset小于真实的offsetif (fileTailOffset < realOffset) {log.info("Destroy consume queue ext by min: file={}, fileTailOffset={}, minOffset={}", file.getFileName(),fileTailOffset, realOffset);//                文件销毁=》if (file.destroy(1000)) {willRemoveFiles.add(file);}}}//        删除指望的文件=》this.mappedFileQueue.deleteExpiredFile(willRemoveFiles);    }

返回方法,注册处理器,org.apache.rocketmq.broker.BrokerController#registerProcessor









public void registerProcessor() {/*** SendMessageProcessor*///        发送消息处理器=》SendMessageProcessor sendProcessor = new SendMessageProcessor(this);//        发送消息钩子方法实现sendProcessor.registerSendMessageHook(sendMessageHookList);//      消费消息钩子方法实现sendProcessor.registerConsumeMessageHook(consumeMessageHookList);//        注册处理器,这些处理器都是保存在hashMap中=》this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/*** PullMessageProcessor*///        拉取消息处理器=》this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);//        注册客户端消息钩子方法this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** QueryMessageProcessor*///        查询消息处理器NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*///        client管理处理器ClientManageProcessor clientProcessor = new ClientManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor*///        消费者管理处理器ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** EndTransactionProcessor*///        事务管理处理器this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);/*** Default*///        broker管理服务处理器AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);//        注册默认的处理器this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);    }

各个处理器前面都介绍过了。

返回方法,broker保护机制,org.apache.rocketmq.broker.BrokerController#protectBroker

public void protectBroker() {//        消费缓慢时是否禁止消费if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();while (it.hasNext()) {final Map.Entry<String, MomentStatsItem> next = it.next();final long fallBehindBytes = next.getValue().getValue().get();//                若是大于消费阈值if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {final String[] split = next.getValue().getStatsKey().split("@");final String group = split[2];LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes);//                    禁止消费=》this.subscriptionGroupManager.disableConsume(group);}}}    }

进入方法,禁止消费,org.apache.rocketmq.broker.subscription.SubscriptionGroupManager#disableConsume

public void disableConsume(final String groupName) {//        消费组订阅信息SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);if (old != null) {//            设置禁止消费old.setConsumeEnable(false);this.dataVersion.nextVersion();}    }

返回方法,初始化分布式消息事务,org.apache.rocketmq.broker.BrokerController#initialTransaction

private void initialTransaction() {//        加载分布式消息事务服务this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);if (null == this.transactionalMessageService) {this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());}//        加载分布式消息事务检查监听器this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);if (null == this.transactionalMessageCheckListener) {this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());}this.transactionalMessageCheckListener.setBrokerController(this);this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);    }

返回方法,初始化失败,关闭控制器,org.apache.rocketmq.broker.BrokerController#shutdown




















public void shutdown() {if (this.brokerStatsManager != null) {this.brokerStatsManager.shutdown();}if (this.clientHousekeepingService != null) {this.clientHousekeepingService.shutdown();}if (this.pullRequestHoldService != null) {this.pullRequestHoldService.shutdown();}if (this.remotingServer != null) {this.remotingServer.shutdown();}if (this.fastRemotingServer != null) {this.fastRemotingServer.shutdown();}if (this.fileWatchService != null) {this.fileWatchService.shutdown();}if (this.messageStore != null) {//            消息存储服务关闭=》this.messageStore.shutdown();}this.scheduledExecutorService.shutdown();try {this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}//        取消注册broker=》this.unregisterBrokerAll();if (this.sendMessageExecutor != null) {this.sendMessageExecutor.shutdown();}if (this.pullMessageExecutor != null) {this.pullMessageExecutor.shutdown();}if (this.adminBrokerExecutor != null) {this.adminBrokerExecutor.shutdown();}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.shutdown();}//        消费者offset持久化=》this.consumerOffsetManager.persist();if (this.filterServerManager != null) {this.filterServerManager.shutdown();}if (this.brokerFastFailure != null) {this.brokerFastFailure.shutdown();}if (this.consumerFilterManager != null) {//            消费者过滤信息持久化=》this.consumerFilterManager.persist();}if (this.clientManageExecutor != null) {this.clientManageExecutor.shutdown();}if (this.queryMessageExecutor != null) {this.queryMessageExecutor.shutdown();}if (this.consumerManageExecutor != null) {this.consumerManageExecutor.shutdown();}if (this.fileWatchService != null) {this.fileWatchService.shutdown();}    }

接下篇。

 

说在最后

本次解析仅表明我的观点,仅供参考。

 

加入技术微信群

钉钉技术群

相关文章
相关标签/搜索