面试官:你了解RocketMQ是如何存储消息的吗?
我:额,,,你等下,我看下这篇文字, (逃html
因为这部份内容优势多,因此请哥哥姐姐们自备茶水,欢迎留言!node
RocketMQ存储设计是高可用和高性能的保证, 利用磁盘存储来知足海量堆积能力。Kafka单机在topic数量在100+的时候,性能会降低不少,而RocketMQ可以在多个topic存在时,依然保持高性能面试
下面主要从存储结构、存储流程、存储优化的技术来造成文字缓存
基于的版本是RocketMQ4.5.2数据结构
存储的文件主要分为:架构
文件地址:${user.home} \store${commitlog}${fileName}app
commitlog特色:异步
下面的表格说明了,每一个消息体不是定长的,会存储消息的哪些内容,包括物理偏移量、consumeQueue的偏移量、消息体等信息ide
顺序 | 字段名 | 说明 |
---|---|---|
1 | totalSize(4Byte) | 消息大小 |
2 | magicCode(4) | 设置为daa320a7 (这个不太明白) |
3 | bodyCRC(4) | 当broker重启recover时会校验 |
4 | queueId(4) | 消息对应的consumeQueueId |
5 | flag(4) | rocketmq不作处理,只存储后透传 |
6 | queueOffset(8) | 消息在consumeQueue中的偏移量 |
7 | physicalOffset(8) | 消息在commitlog中的偏移量 |
8 | sysFlg(4) | 事务标示,NOT_TYPE/PREPARED_TYPE/COMMIT_TYPE/ROLLBACK_TYPE |
9 | bronTimestamp(8) | 消息产生端(producer)的时间戳 |
10 | bronHost(8) | 消息产生端(producer)地址(address:port) |
11 | storeTimestamp(8) | 消息在broker存储时间 |
12 | storeHostAddress(8) | 消息存储到broker的地址(address:port) |
13 | reconsumeTimes(4) | 消息重试次数 |
14 | preparedTransactionOffset(8) | 事务消息的物理偏移量 |
15 | bodyLength(4) | 消息长度,最长不超过4MB |
16 | body(body length Bytes) | 消息体内容 |
17 | topicLength(1) | 主题长度,最长不超过255Byte |
18 | topic(topic length Bytes) | 主题内容 |
19 | propertiesLength(2) | 消息属性长度,最长不超过65535Bytes |
20 | properties(properties length Bytes) | 消息属性内容 |
文件地址:${user.home}\store\consumeQueue${topic}${queueId}${fileName}性能
consumequeue文件特色:
每一个Topic下的每一个MessageQueue都有一个对应的ConsumeQueue文件
该结构对应于消费者逻辑队列,为何要将一个topic抽象出不少的queue呢?这样的话,对集群模式更有好处,可使多个消费者共同消费,而不用上锁;
顺序 | 字段名 | 说明 |
---|---|---|
1 | offset(8) | commitlog的偏移量 |
2 | size(4) | commitlog消息大小 |
3 | tagHashCode | tag的哈希值 |
文件地址:${user.home}\store\index${fileName}
index文件特色:
索引文件(Index)提供消息检索的能力,主要在问题排查和数据统计等场景应用
顺序 | 字段名 | 说明 |
---|---|---|
1 | keyHash(4) | key的结构是 |
2 | phyOffset(8) | commitLog真实的物理位移 |
3 | timeOffset(4) | 时间偏移量 |
4 | slotValue(4) | 下一个记录的slot值 |
层次从上到下依次为:
业务层 | QueueMessageProcessor | PullMessageProcessor SendMessageProcessor |
|
DefaultMessageStore | |||
存储逻辑层 | IndexService | ConsumeQueue | CommitLog |
IndexFile | MappedFileQueue | ||
磁盘交互IO层 | MappedFile | ||
MappedByteBuffer | |||
Disk |
RocketMQ 的存储核心类为 DefaultMessageStore,入口方法是putMessage方法
1 // DefaultMessageStore#putMessage 2 public PutMessageResult putMessage(MessageExtBrokerInner msg) { 3 // 判断该服务是否shutdown,不可用直接返回【代码省略】 4 // 判断broke的角色,若是是从节点直接返回【代码省略】 5 // 判断runningFlags是不是可写状态,不可写直接返回,可写把printTimes设为0【代码省略】 6 // 判断topic名字是否大于byte字节127, 大于则直接返回【代码省略】 7 // 判断msg中properties属性长度是否大于short最大长度32767,大于则直接返回【代码省略】 8 9 if (this.isOSPageCacheBusy()) { // 判断操做系统页写入是否繁忙 10 return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); 11 } 12 13 long beginTime = this.getSystemClock().now(); 14 PutMessageResult result = this.commitLog.putMessage(msg); // $2 查看下方代码,写msg核心 15 16 long elapsedTime = this.getSystemClock().now() - beginTime; 17 if (elapsedTime > 500) { 18 log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); 19 } 20 // 记录写commitlog时间,大于最大时间则设置为这个最新的时间 21 this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); 22 23 if (null == result || !result.isOk()) { 24 // 记录写commitlog 失败次数 25 this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); 26 } 27 28 return result; 29 }
public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting // on the client) msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // Back to Results AppendMessageResult result = null; StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); String topic = msg.getTopic(); int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); // $1 if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // $2 // Delay Delivery if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } long elapsedTimeInLock = 0; MappedFile unlockMappedFile = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // $3 putMessageLock.lock(); //spin or ReentrantLock ,depending on store config // $4 try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; // Here settings are stored timestamp, in order to ensure an orderly // global msg.setStoreTimestamp(beginLockTimestamp); if (null == mappedFile || mappedFile.isFull()) { // $5 mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); } result = mappedFile.appendMessage(msg, this.appendMessageCallback); // $6 switch (result.getStatus()) { // $7 case PUT_OK: break; case END_OF_FILE: unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } result = mappedFile.appendMessage(msg, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); case UNKNOWN_ERROR: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); default: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); } elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { putMessageLock.unlock(); } if (elapsedTimeInLock > 500) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result); } if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { this.defaultMessageStore.unlockMappedFile(unlockMappedFile); } PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // Statistics storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); handleDiskFlush(result, putMessageResult, msg); // $8 handleHA(result, putMessageResult, msg); // $9 return putMessageResult; }
mappedFile.appendMessage方法会调用this.appendMessagesInner方法
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt != null; assert cb != null; int currentPos = this.wrotePosition.get(); // $1 if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); // $2 byteBuffer.position(currentPos); AppendMessageResult result; if (messageExt instanceof MessageExtBrokerInner) { // $3 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); // $4 } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } this.wrotePosition.addAndGet(result.getWroteBytes()); // $5 this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); }
代码在CommitLog内部类 DefaultAppendMessageCallback中
// CommitLog$DefaultAppendMessageCallback#doAppend public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br> long wroteOffset = fileFromOffset + byteBuffer.position(); // $1 this.resetByteBuffer(hostHolder, 8); // $2 String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset); // Record ConsumeQueue information keyBuilder.setLength(0); keyBuilder.append(msgInner.getTopic()); keyBuilder.append('-'); keyBuilder.append(msgInner.getQueueId()); String key = keyBuilder.toString(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); // $3 if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); } // Transaction messages that require special handling final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queuec case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: // $4 queueOffset = 0L; break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: default: break; } // Serialize message // $5 final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; if (propertiesLength > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); } final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength); // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize); return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); } // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { // $6 this.resetByteBuffer(this.msgStoreItemMemory, maxBlank); this.msgStoreItemMemory.putInt(maxBlank); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 2 MAGICCODE // 3 The remaining space may be any value // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } // $7 【代码省略】 if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); // $8 AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, // $9 msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // The next update ConsumeQueue information CommitLog.this.topicQueueTable.put(key, ++queueOffset); break; default: break; } return result; }
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset(); for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) { maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset(); } } } if (maxPhysicalPosInLogicQueue < 0) { maxPhysicalPosInLogicQueue = 0; } if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) { maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset(); log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset()); } this.reputMessageService.start();
private void doReput() { if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { // $1 log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset(); } for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { // $2 if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); // $3 if (result != null) { try { this.reputFromOffset = result.getStartOffset(); // $4 for (int readSize = 0; readSize < result.getSize() && doNext; ) { DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); // $5 构建dispatchRequest int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { DefaultMessageStore.this.doDispatch(dispatchRequest); // $6 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() // 若是该broker是主broker,能够推送消息到达conusmerQueue的消息,这里用户也客户自定定推送的监听 && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } this.reputFromOffset += size; // $7 readSize += size; if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) .addAndGet(dispatchRequest.getMsgSize()); } } else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); readSize = result.getSize(); } } else if (!dispatchRequest.isSuccess()) { if (size > 0) { // &8 log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); this.reputFromOffset += size; } else { doNext = false; // If user open the dledger pattern or the broker is master node, // it will not ignore the exception and fix the reputFromOffset variable if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() || DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}", this.reputFromOffset); this.reputFromOffset += result.getSize() - readSize; } } } } } finally { result.release(); } } else { doNext = false; } } }
Step1: 根据topicId和queueId获取ConsumeQueue
Step2: 将消息偏移量、消息size、tagHashCode(查看ConsumeQueue的数据结构)),把消息追加到ConsumeQueue的内存映射文件(mappedFile)中(不刷盘),consumeQueue默认异步刷盘
1 return mappedFile.appendMessage(this.byteBufferIndex.array());
若是messageIndexEnable设置为true, 则转发此任务,不然不转发
step1: 获取indexFile, 若是indexFileList的内存中没有indexFile,则根据路径从新构建indexFile
step2: 若是消息的惟一键不存在,则条件到放到indexFile中
上面说到DefaultMessageStore是存储的业务层,putMessage是入口方法
从上面的属性能够观察到有几类属性:
这里会另起一篇文字来讲明
这里会另起一篇文字来讲明
Page cache 也叫页缓冲或文件缓冲,是由好几个磁盘块构成,大小一般为4k,在64位系统上为8k,构成的几个磁盘块在物理磁盘上不必定连续,文件的组织单位为一页, 也就是一个page cache大小,文件读取是由外存上不连续的几个磁盘块,到buffer cache,而后组成page cache,而后供给应用程序。
操做系统操做I/O时,会先在pageCache中查找,若是未命中,则启动磁盘I/O,并把磁盘文件中的数据加载到pageCache的一个空闲快中,而后在copy到用户缓冲区
对于每一个文件的第一个读请求操做,系统在读入所请求页面的同时会顺序读入后面少数几个页面
MQ读取消息依赖系统PageCache,PageCache命中率越高,读性能越高
ConsumeQueue逻辑消费队列是顺序读取,在pageCache机制的预读取做用下,ConsumeQueue的读性能会比较高近乎内存,即便在有消息堆积状况下也不会影响性能。
另外,RocketMQ主要经过MappedByteBuffer对文件进行读写操做。其中,利用了NIO中的FileChannel模型直接将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减小了传统IO将磁盘文件数据在操做系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操做转化为直接对内存地址进行操做,从而极大地提升了文件的读写效率
参考:
欢迎关注个人公众号