目录java
源码分析 RocketMQ DLedger 多副本系列已经进行到第 8 篇了,前面的章节主要是介绍了基于 raft 协议的选主与日志复制,从本篇开始将开始关注如何将 DLedger 应用到 RocketMQ中。数据结构
摘要:详细分析了RocketMQ DLedger 多副本(主从切换) 是如何整合到 RocketMQ中,本文的行文思路首先结合已掌握的DLedger 多副本相关的知识初步思考其实现思路,而后从 Broker启动流程、DLedgerCommitlog 核心类的讲解,再从消息发送(追加)与消息查找来进一步探讨 DLedger 是如何支持平滑升级的。架构
@(本节目录)并发
RocketMQ 的消息存储文件主要包括 commitlog 文件、consumequeue 文件与 Index 文件。commitlog 文件存储全量的消息,consumequeue、index 文件都是基于 commitlog 文件构建的。要使用 DLedger 来实现消息存储的一致性,应该关键是要实现 commitlog 文件的一致性,即 DLedger 要整合的对象应该是 commitlog 文件,即只需保证 raft 协议的复制组内各个节点的 commitlog 文件一致便可。app
咱们知道使用文件存储消息都会基于必定的存储格式,rocketmq 的 commitlog 一个条目就包含魔数、消息长度,消息属性、消息体等,而咱们再来回顾一下 DLedger 日志的存储格式:
DLedger 要整合 commitlog 文件,是否是能够把 rocketmq 消息,即一个个 commitlog 条目总体当成 DLedger 的 body 字段便可。eclipse
还等什么,跟我一块儿来看源码吧!!!别急,再抛一个问题,DLedger 整合 RocketMQ commitlog,能不能作到平滑升级?分布式
带着这些思考和问题,一块儿来探究 DLedger 是如何整合 RocketMQ 的。ide
舒适提示:本文不会详细介绍 Broker 端的启动流程,只会点出在启动过程当中与 DLedger 相关的代码,如想详细了解 Broker 的启动流程,建议关注笔者的《RocketMQ技术内幕》一书。函数
Broker 涉及到 DLedger 相关关键点以下:
高并发
DefaultMessageStore 构造方法
if(messageStoreConfig.isEnableDLegerCommitLog()) { // @1 this.commitLog = new DLedgerCommitLog(this); else { this.commitLog = new CommitLog(this); // @2 }
代码@1:若是开启 DLedger ,commitlog 的实现类为 DLedgerCommitLog,也是本文须要关注的关键所在。
代码@2:若是未开启 DLedger,则使用旧版的 Commitlog实现类。
BrokerController#initialize
if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); }
主要调用 LedgerLeaderElector 的 addRoleChanneHandler 方法增长 节点角色变动事件监听器,DLedgerRoleChangeHandler 是实现主从切换的另一个关键点。
DefaultMessageStore#load
// load Commit Log result = result && this.commitLog.load(); // @1 // load Consume Queue result = result && this.loadConsumeQueue(); if (result) { this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); this.indexService.load(lastExitOK); this.recover(lastExitOK); // @2 log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); }
代码@一、@2 最终都是委托 commitlog 对象来执行,这里的关键又是若是开启了 DLedger,则最终调用的是 DLedgerCommitLog。
通过上面的铺垫,主角 DLedgerCommitLog “闪亮登场“了。
舒适提示:因为 Commitlog 的绝大部分方法都已经在《RocketMQ技术内幕》一书中详细介绍了,而且 DLedgerCommitLog 的实现原理与 Commitlog 文件的实现原理类同,本文会一笔带过关于存储部分的实现细节。
DLedgerCommitlog 继承自 Commitlog。让咱们一一来看一下它的核心属性。
接下来咱们将详细介绍 DLedgerCommitlog 各个核心方法及其实现要点。
public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) { super(defaultMessageStore); // @1 dLedgerConfig = new DLedgerConfig(); dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable()); dLedgerConfig.setStoreType(DLedgerConfig.FILE); dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId()); dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup()); dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers()); dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()); dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog()); dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1); id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1; // @2 dLedgerServer = new DLedgerServer(dLedgerConfig); // @3 dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore(); DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> { assert bodyOffset == DLedgerEntry.BODY_OFFSET; buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION); buffer.putLong(entry.getPos() + bodyOffset); }; dLedgerFileStore.addAppendHook(appendHook); // @4 dLedgerFileList = dLedgerFileStore.getDataFileList(); this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); // @5 }
代码@1:调用父类 即 CommitLog 的构造函数,加载 ${ROCKETMQ_HOME}/store/ comitlog 下的 commitlog 文件,以便兼容升级 DLedger 的消息。咱们稍微看一下 CommitLog 的构造函数:
代码@2:构建 DLedgerConfig 相关配置属性,其主要属性以下:
代码@3:根据 DLedger 配置信息建立 DLedgerServer,即建立 DLedger 集群节点,集群内各个节点启动后,就会触发选主。
代码@4:构建 appendHook 追加钩子函数,这是兼容 Commitlog 文件很关键的一步,后面会详细介绍其做用。
代码@5:构建消息序列化。
根据上述的流程图,构建好 DefaultMessageStore 实现后,就是调用其 load 方法,在启用 DLedger 机制后,会依次调用 DLedgerCommitlog 的 load、recover 方法。
public boolean load() { boolean result = super.load(); if (!result) { return false; } return true; }
DLedgerCommitLog 的 laod 方法实现比较简单,就是调用 其父类 Commitlog 的 load 方法,即这里也是为了启用 DLedger 时可以兼容之前的消息。
在 Broker 启动时会加载 commitlog、consumequeue等文件,须要恢复其相关是数据结构,特别是与写入、刷盘、提交等指针,其具体调用 recover 方法。
DLedgerCommitLog#recover
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { // @1 recover(maxPhyOffsetOfConsumeQueue); }
首先会先恢复 consumequeue,得出 consumequeue 中记录的最大有效物理偏移量,而后根据该物理偏移量进行恢复。
接下来看一下该方法的处理流程与关键点。
DLedgerCommitLog#recover
dLedgerFileStore.load();
Step1:加载 DLedger 相关的存储文件,并一一构建对应的 MmapFile,其初始化三个重要的指针 wrotePosition、flushedPosition、committedPosition 三个指针为文件的大小。
DLedgerCommitLog#recover
if (dLedgerFileList.getMappedFiles().size() > 0) { dLedgerFileStore.recover(); // @1 dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset(); // @2 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile != null) { // @3 disableDeleteDledger(); } long maxPhyOffset = dLedgerFileList.getMaxWrotePosition(); // Clear ConsumeQueue redundant data if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) { // @4 log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset); this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset); } return; }
Step2:若是已存在 DLedger 的数据文件,则只须要恢复 DLedger 相关数据文建,由于在加载旧的 commitlog 文件时已经将其重要的数据指针设置为最大值。其关键实现点以下:
舒适提示:为何当存在 commitlog 文件的状况下,不能删除 DLedger 相关的日志文件呢?
由于在此种状况下,若是 DLedger 中的物理文件有删除,则物理偏移量会断层。
正常状况下, maxCommitlogPhyOffset 与 dividedCommitlogOffset 是连续的,这样很是方即是访问 commitlog 仍是 访问 DLedger ,但若是DLedger 部分文件删除后,这两个值就变的不连续,就会形成中间的文件空洞,没法被连续访问。
DLedgerCommitLog#recover
isInrecoveringOldCommitlog = true; super.recoverNormally(maxPhyOffsetOfConsumeQueue); isInrecoveringOldCommitlog = false;
Step3:若是启用了 DLedger 而且是初次启动(还未生成 DLedger 相关的日志文件),则须要恢复 旧的 commitlog 文件。
DLedgerCommitLog#recover
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile == null) { // @1 return; } ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); byteBuffer.position(mappedFile.getWrotePosition()); boolean needWriteMagicCode = true; // 1 TOTAL SIZE byteBuffer.getInt(); //size int magicCode = byteBuffer.getInt(); if (magicCode == CommitLog.BLANK_MAGIC_CODE) { // @2 needWriteMagicCode = false; } else { log.info("Recover old commitlog found a illegal magic code={}", magicCode); } dLedgerConfig.setEnableDiskForceClean(false); dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize(); // @3 log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset); if (needWriteMagicCode) { // @4 byteBuffer.position(mappedFile.getWrotePosition()); byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition()); byteBuffer.putInt(BLANK_MAGIC_CODE); mappedFile.flush(0); } mappedFile.setWrotePosition(mappedFile.getFileSize()); // @5 mappedFile.setCommittedPosition(mappedFile.getFileSize()); mappedFile.setFlushedPosition(mappedFile.getFileSize()); dLedgerFileList.getLastMappedFile(dividedCommitlogOffset); log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset); }
Step4:若是存在旧的 commitlog 文件,须要将最后的文件剩余部分所有填充,即再也不接受新的数据写入,新的数据所有写入到 DLedger 的数据文件中。其关键实现点以下:
在启用 DLedger 机制时 Broker 的启动流程就介绍到这里了,相信你们已经了解 DLedger 在整合 RocketMQ 上作的努力,接下来咱们从消息追加、消息读取两个方面再来探讨 DLedger 是如何无缝整合 RocketMQ 的,实现平滑升级的。
舒适提示:本节一样也不会详细介绍整个消息追加(存储流程),只是要点出与 DLedger(多副本、主从切换)相关的核心关键点。若是想详细了解消息追加的流程,能够阅读笔者所著的《RocketMQ技术内幕》一书。
DLedgerCommitLog#putMessage
AppendEntryRequest request = new AppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setBody(encodeResult.data); dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request); if (dledgerFuture.getPos() == -1) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); }
关键点一:消息追加时,则再也不写入到原先的 commitlog 文件中,而是调用 DLedgerServer 的 handleAppend 进行消息追加,该方法会有集群内的 Leader 节点负责消息追加以及在消息复制,只有超过集群内的半数节点成功写入消息后,才会返回写入成功。若是追加成功,将会返回本次追加成功后的起始偏移量,即 pos 属性,即相似于 rocketmq 中 commitlog 的偏移量,即物理偏移量。
DLedgerCommitLog#putMessage
long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, eclipseTimeInLock);
关键点二:根据 DLedger 的起始偏移量计算真正的消息的物理偏移量,从开头部分得知,DLedger 自身有其存储协议,其 body 字段存储真实的消息,即 commitlog 条目的存储结构,返回给客户端的消息偏移量为 body 字段的开始偏移量,即经过 putMessage 返回的物理偏移量与不使用Dledger 方式返回的物理偏移量的含义是同样的,即从开偏移量开始,能够正确读取消息,这样 DLedger 完美的兼容了 RocketMQ Commitlog。关于 pos 以及 wroteOffset 的图解以下:
DLedgerCommitLog#getMessage
public SelectMappedBufferResult getMessage(final long offset, final int size) { if (offset < dividedCommitlogOffset) { // @1 return super.getMessage(offset, size); } int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData(); MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0); // @2 if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); return convertSbr(mappedFile.selectMappedBuffer(pos, size)); // @3 } return null; }
消息查找比较简单,由于返回给客户端消息,转发给 consumequeue 的消息物理偏移量并非 DLedger 条目的偏移量,而是真实消息的起始偏移量。其实现关键点以下:
根据上面详细的介绍,我想读者朋友们应该不可贵出以下结论:
RocketMQ 整合 DLedger(多副本)实现平滑升级的设计技巧就介绍到这里了。
若是本文对您有必定的帮助话,麻烦帮忙点个赞,很是感谢。
推荐阅读:源码分析 RocketMQ DLedger 多副本即主从切换系列文章:
一、RocketMQ 多副本前置篇:初探raft协议
二、源码分析 RocketMQ DLedger 多副本之 Leader 选主
三、源码分析 RocketMQ DLedger 多副本存储实现
四、源码分析 RocketMQ DLedger(多副本) 之日志追加流程
五、源码分析 RocketMQ DLedger(多副本) 之日志复制(传播)
六、基于 raft 协议的 RocketMQ DLedger 多副本日志复制设计原理
做者介绍:丁威,《RocketMQ技术内幕》做者,RocketMQ 社区布道师,公众号:中间件兴趣圈 维护者,目前已陆续发表源码分析Java集合、Java 并发包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源码专栏。能够点击连接加入中间件知识星球 ,一块儿探讨高并发、分布式服务架构,交流源码。