说在前面apache
broker启动微信
源码解析app
进入方法,加载消费队列,org.apache.rocketmq.store.DefaultMessageStore#loadConsumeQueue分布式
private boolean loadConsumeQueue() {// System.getProperty("user.home") + File.separator + "store" File.separator + "consumequeue"File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));File[] fileTopicList = dirLogic.listFiles();if (fileTopicList != null) {for (File fileTopic : fileTopicList) {// 文件名是topic名String topic = fileTopic.getName();File[] fileQueueIdList = fileTopic.listFiles();if (fileQueueIdList != null) {for (File fileQueueId : fileQueueIdList) {int queueId;try {queueId = Integer.parseInt(fileQueueId.getName());} catch (NumberFormatException e) {continue;}ConsumeQueue logic = new ConsumeQueue(topic,queueId,StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),// 消费队列文件默认大小30wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),// 存储消息队列=》this);this.putConsumeQueue(topic, queueId, logic);// 消息队列加载=》if (!logic.load()) {return false;}}}}}log.info("load logics queue all over, OK");return true; }
进入方法,存储消息队列,org.apache.rocketmq.store.DefaultMessageStore#putConsumeQueueide
private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) {ConcurrentMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic);if (null == map) {map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>();map.put(queueId, consumeQueue);this.consumeQueueTable.put(topic, map);} else {map.put(queueId, consumeQueue);} }
返回方法,消息队列加载,org.apache.rocketmq.store.ConsumeQueue#loadthis
public boolean load() {// =》boolean result = this.mappedFileQueue.load();log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));if (isExtReadEnable()) {// 消息队列扩展加载=》result &= this.consumeQueueExt.load();}return result; }
进入方法,org.apache.rocketmq.store.MappedFileQueue#load介绍过了。spa
返回方法,消息队列扩展加载,org.apache.rocketmq.store.ConsumeQueueExt#loadcode
public boolean load() {boolean result = this.mappedFileQueue.load();log.info("load consume queue extend" + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));return result; }
进入方法,org.apache.rocketmq.store.MappedFileQueue#load介绍过了。orm
进入方法,恢复,org.apache.rocketmq.store.DefaultMessageStore#recoverblog
private void recover(final boolean lastExitOK) {// 恢复消息队列=》this.recoverConsumeQueue();if (lastExitOK) {// 正常恢复commitLog=》this.commitLog.recoverNormally();} else {// 异常恢复commitLog=》this.commitLog.recoverAbnormally();}// 恢复topicQueue信息=》this.recoverTopicQueueTable(); }
进入方法,恢复消息队列,org.apache.rocketmq.store.DefaultMessageStore#recoverConsumeQueue
private void recoverConsumeQueue() {for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {for (ConsumeQueue logic : maps.values()) {// 恢复单个消息队列=》logic.recover();}} }
进入方法,恢复单个消息队列,org.apache.rocketmq.store.ConsumeQueue#recover
public void recover() {final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {int index = mappedFiles.size() - 3;if (index < 0)index = 0;int mappedFileSizeLogics = this.mappedFileSize;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();// 映射文件处理的offsetlong processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;long maxExtAddr = 1;while (true) {for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {long offset = byteBuffer.getLong();int size = byteBuffer.getInt();long tagsCode = byteBuffer.getLong();if (offset >= 0 && size > 0) {mappedFileOffset = i + CQ_STORE_UNIT_SIZE;this.maxPhysicOffset = offset;if (isExtAddr(tagsCode)) {maxExtAddr = tagsCode;}} else {log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "+ offset + " " + size + " " + tagsCode);break;}}if (mappedFileOffset == mappedFileSizeLogics) {index++;if (index >= mappedFiles.size()) {log.info("recover last consume queue file over, last mapped file "+ mappedFile.getFileName());break;} else {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("recover next consume queue file, " + mappedFile.getFileName());}} else {log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "+ (processOffset + mappedFileOffset));break;}}processOffset += mappedFileOffset;// 设置刷新的offset位置this.mappedFileQueue.setFlushedWhere(processOffset);// 设置commit的offset位置this.mappedFileQueue.setCommittedWhere(processOffset);// 按处理offset删除脏数据文件=》this.mappedFileQueue.truncateDirtyFiles(processOffset);if (isExtReadEnable()) {// 恢复扩展消息=》this.consumeQueueExt.recover();log.info("Truncate consume queue extend file by max {}", maxExtAddr);// 映射文件队列删除最大offset的脏数据文件=》this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);}} }
进入方法,按处理offset删除脏数据文件,org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles
public void truncateDirtyFiles(long offset) {List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();for (MappedFile file : this.mappedFiles) {// 文件尾offsetlong fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;// 文件尾offset大于处理的offsetif (fileTailOffset > offset) {// 处理offset大于文件开始的offsetif (offset >= file.getFileFromOffset()) {// 设置映射文件写的位置file.setWrotePosition((int) (offset % this.mappedFileSize));// 设置文件commit的位置file.setCommittedPosition((int) (offset % this.mappedFileSize));// 设置文件刷新的位置file.setFlushedPosition((int) (offset % this.mappedFileSize));} else {// 文件销毁=》file.destroy(1000);willRemoveFiles.add(file);}}}// 删除映射文件队列中的文件=》this.deleteExpiredFile(willRemoveFiles); }
进入方法,删除映射文件队列中的文件,org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFile
void deleteExpiredFile(List<MappedFile> files) {if (!files.isEmpty()) {Iterator<MappedFile> iterator = files.iterator();while (iterator.hasNext()) {MappedFile cur = iterator.next();if (!this.mappedFiles.contains(cur)) {iterator.remove();log.info("This mappedFile {} is not contained by mappedFiles, so skip it.", cur.getFileName());}}try {if (!this.mappedFiles.removeAll(files)) {log.error("deleteExpiredFile remove failed.");}} catch (Exception e) {log.error("deleteExpiredFile has exception.", e);}} }
返回方法,恢复扩展消息,org.apache.rocketmq.store.ConsumeQueueExt#recover
public void recover() {final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (mappedFiles == null || mappedFiles.isEmpty()) {return;}// load all files, consume queue will truncate extend files.int index = 0;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;CqExtUnit extUnit = new CqExtUnit();while (true) {extUnit.readBySkip(byteBuffer);// check whether write sth.if (extUnit.getSize() > 0) {mappedFileOffset += extUnit.getSize();continue;}index++;if (index < mappedFiles.size()) {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("Recover next consume queue extend file, " + mappedFile.getFileName());continue;}log.info("All files of consume queue extend has been recovered over, last mapped file "+ mappedFile.getFileName());break;}processOffset += mappedFileOffset;// 映射文件队列设置刷新的offset位置this.mappedFileQueue.setFlushedWhere(processOffset);// 映射文件队列设置offset commit的位置this.mappedFileQueue.setCommittedWhere(processOffset);// 映射文件队列按处理offset删除脏数据文件=》this.mappedFileQueue.truncateDirtyFiles(processOffset); }
进入方法,映射文件队列按处理offset删除脏数据文件,org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles介绍过了。
返回方法,映射文件队列删除最大offset的脏数据文件,org.apache.rocketmq.store.ConsumeQueueExt#truncateByMaxAddress
public void truncateByMaxAddress(final long maxAddress) {if (!isExtAddr(maxAddress)) {return;}log.info("Truncate consume queue ext by max {}.", maxAddress);CqExtUnit cqExtUnit = get(maxAddress);if (cqExtUnit == null) {log.error("[BUG] address {} of consume queue extend not found!", maxAddress);return;}final long realOffset = unDecorate(maxAddress);// 删除脏数据文件=》this.mappedFileQueue.truncateDirtyFiles(realOffset + cqExtUnit.getSize()); }
进入方法,删除脏数据文件,org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles介绍过了。
返回方法,正常恢复commitLog,org.apache.rocketmq.store.CommitLog#recoverNormally
public void recoverNormally() {boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// Began to recover from the last third fileint index = mappedFiles.size() - 3;if (index < 0)index = 0;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;while (true) {// 建立转发请求DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);int size = dispatchRequest.getMsgSize();// Normal dataif (dispatchRequest.isSuccess() && size > 0) {mappedFileOffset += size;}// Come the end of the file, switch to the next file Since the// return 0 representatives met last hole,// this can not be included in truncate offsetelse if (dispatchRequest.isSuccess() && size == 0) {index++;if (index >= mappedFiles.size()) {// Current branch can not happenlog.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());break;} else {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("recover next physics file, " + mappedFile.getFileName());}}// Intermediate file read errorelse if (!dispatchRequest.isSuccess()) {log.info("recover physics file end, " + mappedFile.getFileName());break;}}processOffset += mappedFileOffset;// 设置刷新offsetthis.mappedFileQueue.setFlushedWhere(processOffset);// 设置commit offsetthis.mappedFileQueue.setCommittedWhere(processOffset);// 删除脏数据文件=》this.mappedFileQueue.truncateDirtyFiles(processOffset);} }
进入方法,删除脏数据文件,org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles介绍过了。
返回方法,异常恢复commitLog,org.apache.rocketmq.store.CommitLog#recoverAbnormally
public void recoverAbnormally() {// recover by the minimum time stampboolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// Looking beginning to recover from which file 从最后一个文件开始恢复int index = mappedFiles.size() - 1;MappedFile mappedFile = null;for (; index >= 0; index--) {mappedFile = mappedFiles.get(index);// 是否从最后一个文件恢复=》if (this.isMappedFileMatchedRecover(mappedFile)) {log.info("recover from this mapped file " + mappedFile.getFileName());break;}}if (index < 0) {index = 0;mappedFile = mappedFiles.get(index);}ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;while (true) {DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);int size = dispatchRequest.getMsgSize();// Normal dataif (size > 0) {mappedFileOffset += size;if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {// 消息存储转发=》this.defaultMessageStore.doDispatch(dispatchRequest);}} else {// =》this.defaultMessageStore.doDispatch(dispatchRequest);}}// Intermediate file read errorelse if (size == -1) {log.info("recover physics file end, " + mappedFile.getFileName());break;}// Come the end of the file, switch to the next file// Since the return 0 representatives met last hole, this can// not be included in truncate offsetelse if (size == 0) {index++;if (index >= mappedFiles.size()) {// The current branch under normal circumstances should// not happenlog.info("recover physics file over, last mapped file " + mappedFile.getFileName());break;} else {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("recover next physics file, " + mappedFile.getFileName());}}}processOffset += mappedFileOffset;// 设置刷新offset位置this.mappedFileQueue.setFlushedWhere(processOffset);// 设置commitOffsetthis.mappedFileQueue.setCommittedWhere(processOffset);// 删除脏数据文件=》this.mappedFileQueue.truncateDirtyFiles(processOffset);// Clear ConsumeQueue redundant data 清除消息队列冗余数据=》this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);}// Commitlog case files are deletedelse {this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);// 销毁消息队列=》this.defaultMessageStore.destroyLogics();} }
进入方法,是否从最后一个文件恢复,org.apache.rocketmq.store.CommitLog#isMappedFileMatchedRecover
private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);if (magicCode != MESSAGE_MAGIC_CODE) {return false;}// 消息存储时间long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);if (0 == storeTimestamp) {return false;}if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {log.info("find check timestamp, {} {}",storeTimestamp,UtilAll.timeMillisToHumanString(storeTimestamp));return true;}} else {if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {log.info("find check timestamp, {} {}",storeTimestamp,UtilAll.timeMillisToHumanString(storeTimestamp));return true;}}return false; }
进入方法,删除脏数据文件,org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles介绍过了。
返回方法,清除消息队列冗余数据,org.apache.rocketmq.store.DefaultMessageStore#truncateDirtyLogicFiles
public void truncateDirtyLogicFiles(long phyOffset) {ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {for (ConsumeQueue logic : maps.values()) {// =》logic.truncateDirtyLogicFiles(phyOffset);}} }
进入方法,org.apache.rocketmq.store.ConsumeQueue#truncateDirtyLogicFiles
public void truncateDirtyLogicFiles(long phyOffet) {int logicFileSize = this.mappedFileSize;this.maxPhysicOffset = phyOffet - 1;long maxExtAddr = 1;while (true) {// 获取映射队列中最后的映射文件=》MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();if (mappedFile != null) {ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();mappedFile.setWrotePosition(0);mappedFile.setCommittedPosition(0);mappedFile.setFlushedPosition(0);for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {long offset = byteBuffer.getLong();int size = byteBuffer.getInt();long tagsCode = byteBuffer.getLong();if (0 == i) {if (offset >= phyOffet) {this.mappedFileQueue.deleteLastMappedFile();break;} else {int pos = i + CQ_STORE_UNIT_SIZE;mappedFile.setWrotePosition(pos);mappedFile.setCommittedPosition(pos);mappedFile.setFlushedPosition(pos);this.maxPhysicOffset = offset;// This maybe not take effect, when not every consume queue has extend file.if (isExtAddr(tagsCode)) {maxExtAddr = tagsCode;}}} else {if (offset >= 0 && size > 0) {if (offset >= phyOffet) {return;}int pos = i + CQ_STORE_UNIT_SIZE;mappedFile.setWrotePosition(pos);mappedFile.setCommittedPosition(pos);mappedFile.setFlushedPosition(pos);this.maxPhysicOffset = offset;if (isExtAddr(tagsCode)) {maxExtAddr = tagsCode;}if (pos == logicFileSize) {return;}} else {return;}}}} else {break;}}if (isExtReadEnable()) {// 删除最大位置的消息队列=》this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);}}public long getLastOffset() {long lastOffset = -1;int logicFileSize = this.mappedFileSize;// 获取映射文件队列中最后一个映射文件=》MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();if (mappedFile != null) {// 有点疑问,这里是获取的commitOffset吗int position = mappedFile.getWrotePosition() - CQ_STORE_UNIT_SIZE;if (position < 0)position = 0;ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();byteBuffer.position(position);for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {long offset = byteBuffer.getLong();int size = byteBuffer.getInt();byteBuffer.getLong();if (offset >= 0 && size > 0) {lastOffset = offset + size;} else {break;}}}return lastOffset; }
返回方法,销毁消息队列,org.apache.rocketmq.store.DefaultMessageStore#destroyLogics
public void destroyLogics() {for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {for (ConsumeQueue logic : maps.values()) {// =》logic.destroy();}} }
进入方法,org.apache.rocketmq.store.ConsumeQueue#destroy
public void destroy() {this.maxPhysicOffset = -1;this.minLogicOffset = 0;// 映射文件队列销毁=》this.mappedFileQueue.destroy();if (isExtReadEnable()) {// 消费队列销毁=》this.consumeQueueExt.destroy();} }
进入方法,映射文件队列销毁,org.apache.rocketmq.store.MappedFileQueue#destroy
public void destroy() {for (MappedFile mf : this.mappedFiles) {// 映射文件销毁=》mf.destroy(1000 * 3);}// 同步删除映射文件队列this.mappedFiles.clear();this.flushedWhere = 0;// delete parent directory 删除父级文件夹File file = new File(storePath);if (file.isDirectory()) {file.delete();} }
返回方法,恢复topicQueue信息,org.apache.rocketmq.store.DefaultMessageStore#recoverTopicQueueTable
private void recoverTopicQueueTable() {HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);// 获取最小的物理offset=》long minPhyOffset = this.commitLog.getMinOffset();for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {for (ConsumeQueue logic : maps.values()) {String key = logic.getTopic() + "-" + logic.getQueueId();// 设置消息队列的最大offset=》table.put(key, logic.getMaxOffsetInQueue());// 设置正确的最小的物理offset=》logic.correctMinOffset(minPhyOffset);}}this.commitLog.setTopicQueueTable(table); }
进入方法,获取最小的物理offset,org.apache.rocketmq.store.CommitLog#getMinOffset
public long getMinOffset() {// 获取第一个映射文件=》MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();if (mappedFile != null) {if (mappedFile.isAvailable()) {// 获取映射文件的起始偏移量return mappedFile.getFileFromOffset();} else {// 获取下个文件的起始偏移量=》return this.rollNextFile(mappedFile.getFileFromOffset());}}return -1; }
进入方法,获取下个文件的起始偏移量,org.apache.rocketmq.store.CommitLog#rollNextFile
public long rollNextFile(final long offset) {int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();return offset + mappedFileSize - offset % mappedFileSize; }
进入方法,设置消息队列的最大offset,org.apache.rocketmq.store.ConsumeQueue#getMaxOffsetInQueue
public long getMaxOffsetInQueue() {// =》return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE; }
进入方法,org.apache.rocketmq.store.MappedFileQueue#getMaxOffset
public long getMaxOffset() {// 获取存储映射文件队列中索引位置最大的映射文件=》MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {// 映射文件的起始offset+映射文件的可读取的索引位置return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();}// 若是队列中没有存储映射文件直接返回0return 0; }
进入方法,获取存储映射文件队列中索引位置最大的映射文件,org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile()
public MappedFile getLastMappedFile() {MappedFile mappedFileLast = null;while (!this.mappedFiles.isEmpty()) {try {mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getLastMappedFile has exception.", e);break;}}return mappedFileLast; }
返回方法,设置正确的最小的物理offset,org.apache.rocketmq.store.ConsumeQueue#correctMinOffset
public void correctMinOffset(long phyMinOffset) {// 找到队列中一个映射文件=》MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();long minExtAddr = 1;if (mappedFile != null) {// 获取映射文件的bufferSelectMappedBufferResult result = mappedFile.selectMappedBuffer(0);if (result != null) {try {for (int i = 0; i < result.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {long offsetPy = result.getByteBuffer().getLong();result.getByteBuffer().getInt();long tagsCode = result.getByteBuffer().getLong();if (offsetPy >= phyMinOffset) {this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;log.info("Compute logical min offset: {}, topic: {}, queueId: {}",this.getMinOffsetInQueue(), this.topic, this.queueId);// This maybe not take effect, when not every consume queue has extend file.if (isExtAddr(tagsCode)) {minExtAddr = tagsCode;}break;}}} catch (Exception e) {log.error("Exception thrown when correctMinOffset", e);} finally {result.release();}}}if (isExtReadEnable()) {// 删除最小位置的扩展消息=》this.consumeQueueExt.truncateByMinAddress(minExtAddr);} }
进入方法,删除最小位置的扩展消息,org.apache.rocketmq.store.ConsumeQueueExt#truncateByMinAddress
public void truncateByMinAddress(final long minAddress) {if (!isExtAddr(minAddress)) {return;}log.info("Truncate consume queue ext by min {}.", minAddress);List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();final long realOffset = unDecorate(minAddress);for (MappedFile file : mappedFiles) {// 文件尾offsetlong fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;// 文件尾offset小于真实的offsetif (fileTailOffset < realOffset) {log.info("Destroy consume queue ext by min: file={}, fileTailOffset={}, minOffset={}", file.getFileName(),fileTailOffset, realOffset);// 文件销毁=》if (file.destroy(1000)) {willRemoveFiles.add(file);}}}// 删除指望的文件=》this.mappedFileQueue.deleteExpiredFile(willRemoveFiles); }
返回方法,注册处理器,org.apache.rocketmq.broker.BrokerController#registerProcessor
public void registerProcessor() {/*** SendMessageProcessor*/// 发送消息处理器=》SendMessageProcessor sendProcessor = new SendMessageProcessor(this);// 发送消息钩子方法实现sendProcessor.registerSendMessageHook(sendMessageHookList);// 消费消息钩子方法实现sendProcessor.registerConsumeMessageHook(consumeMessageHookList);// 注册处理器,这些处理器都是保存在hashMap中=》this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/*** PullMessageProcessor*/// 拉取消息处理器=》this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);// 注册客户端消息钩子方法this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** QueryMessageProcessor*/// 查询消息处理器NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*/// client管理处理器ClientManageProcessor clientProcessor = new ClientManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor*/// 消费者管理处理器ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** EndTransactionProcessor*/// 事务管理处理器this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);/*** Default*/// broker管理服务处理器AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);// 注册默认的处理器this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); }
各个处理器前面都介绍过了。
返回方法,broker保护机制,org.apache.rocketmq.broker.BrokerController#protectBroker
public void protectBroker() {// 消费缓慢时是否禁止消费if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();while (it.hasNext()) {final Map.Entry<String, MomentStatsItem> next = it.next();final long fallBehindBytes = next.getValue().getValue().get();// 若是大于消费阈值if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {final String[] split = next.getValue().getStatsKey().split("@");final String group = split[2];LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes);// 禁止消费=》this.subscriptionGroupManager.disableConsume(group);}}} }
进入方法,禁止消费,org.apache.rocketmq.broker.subscription.SubscriptionGroupManager#disableConsume
public void disableConsume(final String groupName) {// 消费组订阅信息SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);if (old != null) {// 设置禁止消费old.setConsumeEnable(false);this.dataVersion.nextVersion();} }
返回方法,初始化分布式消息事务,org.apache.rocketmq.broker.BrokerController#initialTransaction
private void initialTransaction() {// 加载分布式消息事务服务this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);if (null == this.transactionalMessageService) {this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());}// 加载分布式消息事务检查监听器this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);if (null == this.transactionalMessageCheckListener) {this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());}this.transactionalMessageCheckListener.setBrokerController(this);this.transactionalMessageCheckService = new TransactionalMessageCheckService(this); }
返回方法,初始化失败,关闭控制器,org.apache.rocketmq.broker.BrokerController#shutdown
public void shutdown() {if (this.brokerStatsManager != null) {this.brokerStatsManager.shutdown();}if (this.clientHousekeepingService != null) {this.clientHousekeepingService.shutdown();}if (this.pullRequestHoldService != null) {this.pullRequestHoldService.shutdown();}if (this.remotingServer != null) {this.remotingServer.shutdown();}if (this.fastRemotingServer != null) {this.fastRemotingServer.shutdown();}if (this.fileWatchService != null) {this.fileWatchService.shutdown();}if (this.messageStore != null) {// 消息存储服务关闭=》this.messageStore.shutdown();}this.scheduledExecutorService.shutdown();try {this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}// 取消注册broker=》this.unregisterBrokerAll();if (this.sendMessageExecutor != null) {this.sendMessageExecutor.shutdown();}if (this.pullMessageExecutor != null) {this.pullMessageExecutor.shutdown();}if (this.adminBrokerExecutor != null) {this.adminBrokerExecutor.shutdown();}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.shutdown();}// 消费者offset持久化=》this.consumerOffsetManager.persist();if (this.filterServerManager != null) {this.filterServerManager.shutdown();}if (this.brokerFastFailure != null) {this.brokerFastFailure.shutdown();}if (this.consumerFilterManager != null) {// 消费者过滤信息持久化=》this.consumerFilterManager.persist();}if (this.clientManageExecutor != null) {this.clientManageExecutor.shutdown();}if (this.queryMessageExecutor != null) {this.queryMessageExecutor.shutdown();}if (this.consumerManageExecutor != null) {this.consumerManageExecutor.shutdown();}if (this.fileWatchService != null) {this.fileWatchService.shutdown();} }
接下篇。
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群