说在前面apache
broker启动缓存
源码解析微信
返回方法,取消注册broker,org.apache.rocketmq.broker.BrokerController#unregisterBrokerAllapp
private void unregisterBrokerAll() {// =》this.brokerOuterAPI.unregisterBrokerAll(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId()); }
进入方法,org.apache.rocketmq.broker.out.BrokerOuterAPI#unregisterBrokerAll分布式
public void unregisterBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId) {// 获取namesrv地址列表List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null) {for (String namesrvAddr : nameServerAddressList) {try {// =》this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);} catch (Exception e) {log.warn("unregisterBroker Exception, {}", namesrvAddr, e);}}} }
进入方法,org.apache.rocketmq.broker.out.BrokerOuterAPI#unregisterBrokeride
public void unregisterBroker(final String namesrvAddr,final String clusterName,final String brokerAddr,final String brokerName,final long brokerId) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader);// 同步执行RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark()); }
进入方法,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync介绍过了。ui
返回方法,org.apache.rocketmq.broker.BrokerStartup#startthis
public void start() throws Exception {if (this.messageStore != null) {// 消息存储服务启动=》this.messageStore.start();}if (this.remotingServer != null) {this.remotingServer.start();}if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}if (this.fileWatchService != null) {this.fileWatchService.start();}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}if (this.filterServerManager != null) {this.filterServerManager.start();}// 注册broker=》this.registerBrokerAll(true, false, true);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// =》BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}// 10s注册一次}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {// broker快速失败服务=》this.brokerFastFailure.start();}if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) {if (this.transactionalMessageCheckService != null) {log.info("Start transaction service!");// 分布式消息事务服务启动=》this.transactionalMessageCheckService.start();}} }
进入方法,消息存储服务启动,org.apache.rocketmq.store.DefaultMessageStore#startspa
public void start() throws Exception {lock = lockFile.getChannel().tryLock(0, 1, false);if (lock == null || lock.isShared() || !lock.isValid()) {throw new RuntimeException("Lock failed,MQ already started");}lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));lockFile.getChannel().force(true);// 刷新消费队列启动=》this.flushConsumeQueueService.start();// commitLog服务启动=》this.commitLog.start();this.storeStatsService.start();if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {// 调度消息服务启动=》this.scheduleMessageService.start();}if (this.getMessageStoreConfig().isDuplicationEnable()) {this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());} else {// 从commitLog中获取消息队列最大offset=》this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());}// 存储消息服务启动this.reputMessageService.start();// ha服务启动=》this.haService.start();this.createTempFile();// 添加调度服务=》this.addScheduleTask();this.shutdown = false; }
进入方法,刷新消费队列启动,org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#run.net
public void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 刷新消息队列的频次1sint interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();this.waitForRunning(interval);// =》this.doFlush(1);} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}this.doFlush(RETRY_TIMES_OVER);DefaultMessageStore.log.info(this.getServiceName() + " service end"); }
进入方法,org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#doFlush
private void doFlush(int retryTimes) {// 刷新消息队列页数2int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();if (retryTimes == RETRY_TIMES_OVER) {flushConsumeQueueLeastPages = 0;}long logicsMsgTimestamp = 0;// 刷新消息队列最大频次60sint flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();long currentTimeMillis = System.currentTimeMillis();if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {this.lastFlushTimestamp = currentTimeMillis;flushConsumeQueueLeastPages = 0;logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();}ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {for (ConsumeQueue cq : maps.values()) {boolean result = false;for (int i = 0; i < retryTimes && !result; i++) {// =》result = cq.flush(flushConsumeQueueLeastPages);}}}if (0 == flushConsumeQueueLeastPages) {if (logicsMsgTimestamp > 0) {DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);}DefaultMessageStore.this.getStoreCheckpoint().flush();} }
进入方法,org.apache.rocketmq.store.ConsumeQueue#flush
public boolean flush(final int flushLeastPages) {// 队列映射文件刷新=》boolean result = this.mappedFileQueue.flush(flushLeastPages);if (isExtReadEnable()) {result = result & this.consumeQueueExt.flush(flushLeastPages);}return result; }
进入方法,队列映射文件刷新,org.apache.rocketmq.store.MappedFileQueue#flush
public boolean flush(final int flushLeastPages) {boolean result = true;// 根据offset找到映射文件=》MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);if (mappedFile != null) {long tmpTimeStamp = mappedFile.getStoreTimestamp();// =》int offset = mappedFile.flush(flushLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.flushedWhere;this.flushedWhere = where;if (0 == flushLeastPages) {this.storeTimestamp = tmpTimeStamp;}}return result; }
进入方法,根据offset找到映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {// 获取队列中第一个映射文件MappedFile firstMappedFile = this.getFirstMappedFile();// 获取队列中最后一个映射文件MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {// 若是offset不在索引文件的offset范围内if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {// 找到映射文件在队列中的索引位置int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {// 获取索引文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}// offset在目标文件的起始offset和结束offset范围内if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}// 若是按索引在队列中找不到映射文件就遍历队列查找映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}// 若是offset=0获取队列中第一个映射文件,我的感受这个逻辑是否放在前面判断更为合理,仍是放在这里另有深意if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null; }
返回方法,消息队列刷新,org.apache.rocketmq.store.MappedFileQueue#flush
public boolean flush(final int flushLeastPages) {boolean result = true;// 根据offset找到映射文件=》MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);if (mappedFile != null) {long tmpTimeStamp = mappedFile.getStoreTimestamp();// =》int offset = mappedFile.flush(flushLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.flushedWhere;this.flushedWhere = where;if (0 == flushLeastPages) {this.storeTimestamp = tmpTimeStamp;}}return result; }
进入方法,根据offset找到映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)介绍过了。
返回方法,commitLog服务启动,org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run
@Overridepublic void run() {CommitLog.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {// 刷新commitLog频次200msint interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();// 提交数据页数4int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();// 提交commitLog最大频次200msint commitDataThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();long begin = System.currentTimeMillis();if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {this.lastCommitTimestamp = begin;commitDataLeastPages = 0;}try {// 提交数据=》boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);long end = System.currentTimeMillis();if (!result) {this.lastCommitTimestamp = end; // result = false means some data committed.//now wake up flush thread.flushCommitLogService.wakeup();}if (end - begin > 500) {log.info("Commit data to file costs {} ms", end - begin);}this.waitForRunning(interval);} catch (Throwable e) {CommitLog.log.error(this.getServiceName() + " service has exception. ", e);}}boolean result = false;for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {result = CommitLog.this.mappedFileQueue.commit(0);CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));}CommitLog.log.info(this.getServiceName() + " service end");} }
进入方法,提交数据,org.apache.rocketmq.store.MappedFileQueue#commit
public boolean commit(final int commitLeastPages) {boolean result = true;// commitOffset找到mappedFile=》MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);if (mappedFile != null) {int offset = mappedFile.commit(commitLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.committedWhere;this.committedWhere = where;}return result; }
进入方法,commitOffset找到mappedFile,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)介绍过了。
返回方法,调度消息服务启动,org.apache.rocketmq.store.schedule.ScheduleMessageService#start
public void start() {for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {Integer level = entry.getKey();Long timeDelay = entry.getValue();Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}if (timeDelay != null) {// 这里用timer,为何不用调度线程池呢this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {// 消息持久化=》ScheduleMessageService.this.persist();} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }
返回方法,存储消息服务启动,org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run
@Overridepublic void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {Thread.sleep(1);// =》this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}DefaultMessageStore.log.info(this.getServiceName() + " service end"); }
进入方法,org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput
private void doReput() {for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {break;}// 获取消息数据=》SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);if (result != null) {try {this.reputFromOffset = result.getStartOffset();for (int readSize = 0; readSize < result.getSize() && doNext; ) {DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);int size = dispatchRequest.getMsgSize();if (dispatchRequest.isSuccess()) {if (size > 0) {// 存储服务转发请求=》DefaultMessageStore.this.doDispatch(dispatchRequest);// broker角色是master长轮询模式if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {// 消息到达通知监听器=》DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());}this.reputFromOffset += size;readSize += size;// 若是broker是slaveif (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).addAndGet(dispatchRequest.getMsgSize());}} else if (size == 0) {this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) {if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",this.reputFromOffset);this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext = false;}} }
进入方法,获取消息数据,org.apache.rocketmq.store.CommitLog#getData(long, boolean)
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();// 按offset查询映射文件=》MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);if (mappedFile != null) {int pos = (int) (offset % mappedFileSize);// 查询消息所在映射bufferSelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);return result;}return null; }
进入方法,按offset查询映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)介绍过了。
进入方法,存储服务转发请求,org.apache.rocketmq.store.DefaultMessageStore#doDispatch
public void doDispatch(DispatchRequest req) {for (CommitLogDispatcher dispatcher : this.dispatcherList) {// =》dispatcher.dispatch(req);} }
进入方法,org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch
@Overridepublic void dispatch(DispatchRequest request) {// 获取事务类型=》final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());switch (tranType) {case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:// 构建存放消息位置信息=》DefaultMessageStore.this.putMessagePositionInfo(request);break;case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;} }
进入方法,构建存放消息位置信息,org.apache.rocketmq.store.DefaultMessageStore#putMessagePositionInfo
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {// 按topic、queueId查询到消息队列=》ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());// 包装消息存储信息=》cq.putMessagePositionInfoWrapper(dispatchRequest); }
进入方法,按topic、queueId查询到消息队列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue
public ConsumeQueue findConsumeQueue(String topic, int queueId) {// 找到topic的全部消息队列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}// 按queue id查找消费者队列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,// 消费者队列存储地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),// 每一个文件存储默认30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic; }
返回方法,包装消息存储信息,org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper
public void putMessagePositionInfoWrapper(DispatchRequest request) {final int maxRetries = 30;boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();for (int i = 0; i < maxRetries && canWrite; i++) {long tagsCode = request.getTagsCode();// 消息扩展服务是否开启=》if (isExtWriteEnable()) {ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();cqExtUnit.setFilterBitMap(request.getBitMap());cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());cqExtUnit.setTagsCode(request.getTagsCode());long extAddr = this.consumeQueueExt.put(cqExtUnit);if (isExtAddr(extAddr)) {tagsCode = extAddr;} else {log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,topic, queueId, request.getCommitLogOffset());}}// 组装消息存储位置信息=》boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());if (result) {this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());return;} else {// XXX: warn and notify melog.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()+ " failed, retry " + i + " times");try {Thread.sleep(1000);} catch (InterruptedException e) {log.warn("", e);}}}// XXX: warn and notify melog.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);this.defaultMessageStore.getRunningFlags().makeLogicsQueueError(); }
进入方法,组装消息存储位置信息,org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) {if (offset <= this.maxPhysicOffset) {return true;}this.byteBufferIndex.flip();this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this.byteBufferIndex.putLong(offset);this.byteBufferIndex.putInt(size);this.byteBufferIndex.putLong(tagsCode);final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;// 从映射文件队列中获取最后一个映射文件=》MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);if (mappedFile != null) {// 映射文是第一个建立、consumerOffset不是0,映射文件写位置是0if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {this.minLogicOffset = expectLogicOffset;this.mappedFileQueue.setFlushedWhere(expectLogicOffset);this.mappedFileQueue.setCommittedWhere(expectLogicOffset);// 填充文件=》this.fillPreBlank(mappedFile, expectLogicOffset);log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "+ mappedFile.getWrotePosition());}// consumerOffset不是0if (cqOffset != 0) {long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();if (expectLogicOffset < currentLogicOffset) {log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);return true;}if (expectLogicOffset != currentLogicOffset) {LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset,currentLogicOffset,this.topic,this.queueId,expectLogicOffset - currentLogicOffset);}}this.maxPhysicOffset = offset;// 消息写入映射文件=》return mappedFile.appendMessage(this.byteBufferIndex.array());}return false; }
进入方法,从映射文件队列中获取最后一个映射文件,org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile(long, boolean)
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {long createOffset = -1;// 获取映射文件队列中最后一个映射文件MappedFile mappedFileLast = getLastMappedFile();if (mappedFileLast == null) {createOffset = startOffset - (startOffset % this.mappedFileSize);}if (mappedFileLast != null && mappedFileLast.isFull()) {// 建立的offset=最后映射文件的开始offset+映射文件的大小createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}// 建立文件的offset不是-1且须要建立映射文件if (createOffset != -1 && needCreate) {// 下个文件存储路径 System.getProperty("user.home") + File.separator + "store"// + File.separator + "commitlog",根据offset建立文件名String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);// 下下个文件存储路经String nextNextFilePath = this.storePath + File.separator+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);MappedFile mappedFile = null;if (this.allocateMappedFileService != null) {// 处理请求返回映射文件=》mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {// 建立映射文件=》mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {log.error("create mappedFile exception", e);}}if (mappedFile != null) {if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}this.mappedFiles.add(mappedFile);}return mappedFile;}return mappedFileLast; }
进入方法,处理请求返回映射文件,org.apache.rocketmq.store.AllocateMappedFileService#putRequestAndReturnMappedFile
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {int canSubmitRequests = 2;// 是否瞬间持久化if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {// 若是broker是master,buffer不够用瞬间失败if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in poolcanSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();}}AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);// 缓存存储请求boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;if (nextPutOK) {if (canSubmitRequests <= 0) {log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());this.requestTable.remove(nextFilePath);return null;}// 下一个请求添加到优先级阻塞队列中boolean offerOK = this.requestQueue.offer(nextReq);if (!offerOK) {log.warn("never expected here, add a request to preallocate queue failed");}canSubmitRequests--;}AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);// 缓存下下个请求boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;if (nextNextPutOK) {if (canSubmitRequests <= 0) {log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());this.requestTable.remove(nextNextFilePath);} else {// 下下个请求加入优先级阻塞队列boolean offerOK = this.requestQueue.offer(nextNextReq);if (!offerOK) {log.warn("never expected here, add a request to preallocate queue failed");}}}if (hasException) {log.warn(this.getServiceName() + " service has exception. so return null");return null;}AllocateRequest result = this.requestTable.get(nextFilePath);try {if (result != null) {// 同步等待boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);if (!waitOK) {log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());return null;} else {this.requestTable.remove(nextFilePath);return result.getMappedFile();}} else {log.error("find preallocate mmap failed, this never happen");}} catch (InterruptedException e) {log.warn(this.getServiceName() + " service has exception. ", e);}return null; }
返回方法,org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildIndex#dispatch
@Overridepublic void dispatch(DispatchRequest request) {if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {// 构建索引=》DefaultMessageStore.this.indexService.buildIndex(request);} }
接下篇。
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群