本文主要研究一下rocketmq的FlushConsumeQueueServicejava
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.javagit
class FlushConsumeQueueService extends ServiceThread { private static final int RETRY_TIMES_OVER = 3; private long lastFlushTimestamp = 0; private void doFlush(int retryTimes) { int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages(); if (retryTimes == RETRY_TIMES_OVER) { flushConsumeQueueLeastPages = 0; } long logicsMsgTimestamp = 0; int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval(); long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushConsumeQueueLeastPages = 0; logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp(); } ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) { for (ConsumeQueue cq : maps.values()) { boolean result = false; for (int i = 0; i < retryTimes && !result; i++) { result = cq.flush(flushConsumeQueueLeastPages); } } } if (0 == flushConsumeQueueLeastPages) { if (logicsMsgTimestamp > 0) { DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp); } DefaultMessageStore.this.getStoreCheckpoint().flush(); } } public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue(); this.waitForRunning(interval); this.doFlush(1); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } this.doFlush(RETRY_TIMES_OVER); DefaultMessageStore.log.info(this.getServiceName() + " service end"); } @Override public String getServiceName() { return FlushConsumeQueueService.class.getSimpleName(); } @Override public long getJointime() { return 1000 * 60; } }
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.javagithub
public class ConsumeQueue { //...... public boolean flush(final int flushLeastPages) { boolean result = this.mappedFileQueue.flush(flushLeastPages); if (isExtReadEnable()) { result = result & this.consumeQueueExt.flush(flushLeastPages); } return result; } //...... }
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.javaapache
public class MappedFileQueue { //...... public boolean flush(final int flushLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; } //...... }
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/MappedFile.javaapp
public class MappedFile extends ReferenceResource { //...... public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); try { //We only append data to fileChannel or mappedByteBuffer, never both. if (writeBuffer != null || this.fileChannel.position() != 0) { this.fileChannel.force(false); } else { this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); } //...... }
FlushConsumeQueueService继承了ServiceThread,其run方法,首先经过DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue()获取interval,而后执行waitForRunning(interval),最后执行doFlush(1);doFlush方法,首先经过getMessageStoreConfig().getFlushConsumeQueueLeastPages()获取flushConsumeQueueLeastPages,以后会遍历consumeQueueTable,执行ConsumeQueue的flush(flushConsumeQueueLeastPages);最后执行getStoreCheckpoint().flush()ide