消息中间件存储分为三种,一是保存在内存中,速度快但会由于系统宕机等因素形成消息丢失;二是保存在内存中,同时定时将消息写入DB中,好处是持久化消息,如何读写DB是MQ的瓶颈;三是内存+磁盘,定时将消息保存在磁盘中,如何设计好的存储机制决定MQ的高并发、高可用。java
经过阅读RocketMQ源码,了解下列问题的解法:数组
消息存储在文件中,须要有一个角色专门来管理对应的文件,MappedFile为此而生。管理这些MappedFile的角色是MappedFileQueue,看作一个文件夹,维护CopyOnWriteArrayList<MappedFile> mappedFiles。并发
public class MappedFile { //记录每次写消息到内存以后的位置 protected final AtomicInteger wrotePosition = new AtomicInteger(0); //记录每次提交到FileChannel以后的位置 protected final AtomicInteger committedPosition = new AtomicInteger(0); //记录刷新到物理文件以后的位置 private final AtomicInteger flushedPosition = new AtomicInteger(0); //文件大小默认是1G protected int fileSize; //对应的文件NIO通道 protected FileChannel fileChannel; //对应的文件 private File file; //内存缓冲区,保存暂时写入的消息 protected ByteBuffer writeBuffer = null; protected MappedByteBuffer mappedByteBuffer = null; private void init(final String fileName, final int fileSize) throws IOException { this.fileFromOffset = Long.parseLong(this.file.getName()); ensureDirOK(this.file.getParent()); this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); } }
MappedFile的名字(file.getName)是00000000000000000000、0000000000107374182四、00000000002147483648,fileName[n] = fileName[n - 1] + mappedFileSize。直接用起始偏移量记录每一个文件名称,00000000001073741824换算成大小=1G,即每一个文件名称是该文件的startOffset。app
MappedFile提供三种做用:写消息、提交消息到FileChannel、写磁盘dom
一、AppendMessageResult appendMessagesInner(MessageExt messageExt, final AppendMessageCallback cb)异步
二、boolean appendMessage(final byte[] data, final int offset, final int length)函数
三、int commit(final int commitLeastPages) 高并发
四、int flush(final int flushLeastPages)性能
先看appendMessage操做this
MappedFile#appendMessage public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { int currentPos = this.wrotePosition.get(); if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, messageExt); this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } ....... }
1.先获取上一次写入位置,从Buffer中取一个分区出来
2.设置buffer即将写入的开始位置,即上一次写入位置以后
3.由回调函数AppendMessageCallback负责消息写入,该函数由CommitLog提供,逻辑是对Message作一些额外处理,如附加消息长度、时间戳等。具体以下:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | MsgLen | 消息总长度 | Int | 4 |
2 | MagicCode | MESSAGE_MAGIC_CODE | Int | 4 |
3 | BodyCRC | 消息内容CRC | Int | 4 |
4 | QueueId | 消息队列编号 | Int | 4 |
5 | Flag | flag | Int | 4 |
6 | QueueOffset | 消息队列位置 | Long | 8 |
7 | PhysicalOffset | 物理位置。在 CommitLog 的顺序存储位置。 |
Long | 8 |
8 | SysFlag | MessageSysFlag | Int | 4 |
9 | BornTimestamp | 生成消息时间戳 | Long | 8 |
10 | BornHost | 生效消息的地址+端口 | Long | 8 |
11 | StoreTimestamp | 存储消息时间戳 | Long | 8 |
12 | StoreHost | 存储消息的地址+端口 | Long | 8 |
13 | ReconsumeTimes | 从新消费消息次数 | Int | 4 |
14 | PreparedTransationOffset | Long | 8 | |
15 | BodyLength + Body | 内容长度 + 内容 | Int + Bytes | 4 + bodyLength |
16 | TopicLength + Topic | Topic长度 + Topic | Byte + Bytes | 1 + topicLength |
17 | PropertiesLength + Properties | 拓展字段长度 + 拓展字段 | Short + Bytes | 2 + PropertiesLength |
封装后转为字节数组写入到Buffer中便可。返回写入长度告诉wrotePosition再偏移WroteBytes长度;因而可知,ByteBuffer针对是消息维度
commit操做
public int commit(final int commitLeastPages) { if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { commit0(commitLeastPages); this.release(); } } // All dirty data has been committed to FileChannel. if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; } return this.committedPosition.get(); } protected void commit0(final int commitLeastPages) { int writePos = this.wrotePosition.get(); int lastCommittedPosition = this.committedPosition.get(); if (writePos - this.committedPosition.get() > 0) { try { ByteBuffer byteBuffer = writeBuffer.slice(); byteBuffer.position(lastCommittedPosition); byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); this.fileChannel.write(byteBuffer); this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } } /** * 是否可以commit。知足以下条件任意条件: * 1. 映射文件已经写满 * 2. commitLeastPages > 0 && 未commit部分超过commitLeastPages * 3. commitLeastPages = 0 && 有新写入部分 * @param commitLeastPages commit最小分页 * @return 是否可以写入 */ protected boolean isAbleToCommit(final int commitLeastPages) { int flush = this.committedPosition.get(); int write = this.wrotePosition.get(); if (this.isFull()) { //this.fileSize == this.wrotePosition.get() return true; } if (commitLeastPages > 0) { return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages; } return write > flush; }
commit操做主要由上面三个方法构成,isAbleToCommit负责判断可否写入,每次写入超过4KB(OS页大小)。commit0把buffer中的内容(上次提交后的位置——最近一次写入Buffer的位置)写入到FileChannel中,更新committedPosition。commit操做主要针对FileChannel维度。
flush操做
public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); if (writeBuffer != null || this.fileChannel.position() != 0) { this.fileChannel.force(false); } else { this.mappedByteBuffer.force(); } this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }
刷新时isAbleToFlush思路和isAbletoCommit同样,保证超过4KB。刷新到磁盘后更新flushedPosition,记录物理文件的最后写入位置。flush操做针对物理文件级别。
下面再来看下CommitLog如何操做commit && flush的
FlushCommitLogService继承了ServiceThread-->Thread,所以异步执行。
线程服务 | 场景 | 插入消息性能 |
---|---|---|
CommitRealTimeService | 异步刷盘 && 开启内存字节缓冲区 | 第一 |
FlushRealTimeService | 异步刷盘 && 关闭内存字节缓冲区 | 第二 |
GroupCommitService | 同步刷盘 | 第三 |
CommitRealTimeService定时调用mappedFileQueue.commit(commitDataLeastPages)执行提交。提交以后唤醒flushCommitLogService执行落盘。
【MappedFileQueue】 public boolean commit(final int commitLeastPages) { boolean result = true; MappedFile mappedFile = findMappedFileByOffset(committedWhere,committedWhere == 0); if (mappedFile != null) { int offset = mappedFile.commit(commitLeastPages); // 更新以后的位置,即下一次提交开始位置 long where = mappedFile.getFileFromOffset() + offset; //若是不相等,说明有写入,不然上一步操做offset是零,相加以后才可能依然等于committedWhere result = where == this.committedWhere; this.committedWhere = where; } return result; }
首先findMappedFileByOffset找到要提交的文件,公式是 index (文件在集合中的下标)= (committedWhere-startOffset)/fileSize,committedWhere即要提交的位置,例如committedWhere = 4000,startOffset = 0,fileSize = 1024,那么index = 3,从Queue中获取第4个MappedFile,由它负责把自身的buffer提交到FileChannel。
FlushRealTimeService也是定时刷新内容到物理文件中,刷新成功后更新flushedWhere,主要步骤和commit类似。
public boolean flush(final int flushLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; }