2.1.1 CommitLog文件(物理队列)
CommitLog是用于存储真实的物理消息的结构,保存消息元数据,全部消息到达Broker后都会保存到commitLog文件,这里须要强调的是全部topic的消息都会统一保存在commitLog中。
举个例子:当前集群有TopicA, TopicB,
这两个Toipc的消息会按照消息到达的前后顺序保存到同一个commitLog中,而不是每一个Topic有本身独立的commitLog
onsumeQueue是逻辑队列,仅仅存储了CommitLog的位移而已,真实的存储都在本结构中。
首先这里会使用CommitLog.this.topicQueueTable.put(key, queueOffset),其中的key是 topic-queueId, queueOffset是当前这个key中的消息数,每增长一个消息增长一(不会自减);
这里queueOffset的用途以下:每次用户请求putMessage的时候,将queueOffset返回给客户端使用,这里的queueoffset表示逻辑上的队列偏移。
消息存放物理文件,每台broker上的commitLog被本机器全部queue共享不作区分
- commitlog文件的存储地址:$HOME\store\commitlog\${fileName}
- 一个消息存储单元长度是不定的,顺序写可是随机读
- 每一个commitLog文件的默认大小为 1G =1024*1024*1024,满1G以后会自动新建CommitLog文件作保存数据用
- commitlog的文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量;好比00000000000000000000表明了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当这个文件满了,
第二个文件名字为00000000001073741824,起始偏移量为1073741824,以此类推,第三个文件名字为00000000002147483648,起始偏移量为2147483648消息存储的时候会顺序写入文件,
当文件满了,写入下一个文件。
CommitLog的清理机制:
- 按时间清理,rocketmq默认会清理3天前的commitLog文件;
- 按磁盘水位清理:当磁盘使用量到达磁盘容量75%,开始清理最老的commitLog文件。
1)、CommitLog 文件生成规则
偏移量:每一个 CommitLog 文件的大小为 1G,通常状况下第一个 CommitLog 的起始偏移量为 0,第二个 CommitLog 的起始偏移量为 1073741824 (1G = 1073741824byte)。
2)、怎么知道消息存储在哪一个 CommitLog 文件上?
假设 1073742827 为物理偏移量(物理偏移量也即全局偏移量),则其对应的相对偏移量为 1003(1003 = 1073742827 - 1073741824),而且该偏移量位于第二个 CommitLog。
index 和 ComsumerQueue 中都有消息对应的物理偏移量,经过物理偏移量就能够计算出该消息位于哪一个 CommitLog 文件上。
文件地址:${user.home} \store\${commitlog}\${fileName} 消息存储结构: flag 这个标志值rocketmq不作处理,只存储后透传 QUEUEOFFSET这个值是个自增值不是真正的consume queue的偏移量,能够表明这个队列中消息的个数,要经过这个值查找到consume queue中数据,QUEUEOFFSET * 20才是偏移地址 PHYSICALOFFSET 表明消息在commitLog中的物理起始地址偏移量 SYSFLAG消息标志,指明消息是事物事物状态等等消息特征 BORNTIMESTAMP 消息产生端(producer)的时间戳 BORNHOST 消息产生端(producer)地址(address:port) STORETIMESTAMP 消息在broker存储时间 STOREHOSTADDRESS 消息存储到broker的地址(address:port) RECONSUMETIMES消息被某个订阅组从新消费了几回(订阅组之间独立计数),由于重试消息发送到了topic名字为%retry%groupName的队列queueId=0的队列中去了 Prepared Transaction Offset 表示是prepared状态的事物消息
2.1.2 ConsumeQueue文件组织:
ConsumerQueue至关于CommitLog的索引文件,消费者消费时会先从ConsumerQueue中查找消息的在commitLog中的offset,再去CommitLog中找元数据。
若是某个消息只在CommitLog中有数据,没在ConsumerQueue中, 则消费者没法消费,Rocktet的事务消息就是这个原理
Consumequeue类对应的是每一个topic和queuId下面的全部文件,至关于字典的目录用来指定消息在消息的真正的物理文件commitLog上的位置
每条数据的结构以下图所示:
消息的起始物理偏移量physical offset(long 8字节)+消息大小size(int 4字节)+tagsCode(long 8字节)。
- 每一个topic下的每一个queue都有一个对应的consumequeue文件。
- 文件默认存储路径:${user.home} \store\consumequeue\${topicName}\${queueId}\${fileName}
- 每一个文件由30W条数据组成,每条数据的大小为20个字节,从而每一个文件的默认大小为600万个字节(consume queue中存储单元是一个20字节定长的数据)是顺序写顺序读
- commitLogOffset是指这条消息在commitLog文件实际偏移量
- size就是指消息大小
- 消息tag的哈希值
ConsumeQueue几个重要的字段
private final String topic; private final int queueId;//队列id private final ByteBuffer byteBufferIndex;// 写索引时用到的ByteBuffer private long maxPhysicOffset = -1;// 最后一个消息对应的物理Offset
每一个cosumequeue文件的名称fileName,名字长度为20位,左边补零,剩余为起始偏移量;
好比00000000000000000000表明了第一个文件,起始偏移量为0,文件大小为600W,
当第一个文件满以后建立的第二个文件的名字为00000000000006000000,起始偏移量为6000000,以此类推,
第三个文件名字为00000000000012000000,起始偏移量为12000000,消息存储的时候会顺序写入文件,当文件满了,写入下一个文件。
- topic queueId来组织的:好比TopicA配了读写队列0、1,那么TopicA和Queue=0组成一个ConsumeQueue, TopicA和Queue=1组成一个另外一个ConsumeQueue.
- 按消费端group分组重试队列,若是消费端消费失败,发送到retry消费队列中
- 按消费端group分组死信队列,若是消费端重试超过指定次数,发送死信队列
- 每一个ConsumeQueue能够由多个文件组成无限队列被MapedFileQueue对象管理
2.1.3 MapedFile 是PageCache文件封装
操做物理文件在内存中的映射以及将内存数据持久化到物理文件中,代码中写死了要求os系统的页大小为4k, 消息刷盘根据参数(commitLog默认至少刷4页, consumeQueue默认至少刷2页)才刷
如下io对象构建了物理文件映射内存的对象 FileChannel fileChannel = new RandomAccessFile(file,“rw”).getChannel(); MappedByteBuffer mappedByteBuffer=fileChannel.map(READE_WRITE,0,fileSize); 构建mapedFile对象须要两个参数 fileSize: 映射的物理文件的大小 commitLog每一个文件的大小默认1G =1024*1024*1024 ConsumeQueue每一个文件默认存30W条 = 300000 *CQStoreUnitSize(每条大小) filename: filename文件名称但不只仅是名称还表示文件记录的初始偏移量, 文件名实际上是个long类型的值
2.1.4 MapedFileQueue 存储队列,数据定时删除,无限增加。
队列有多个文件(MapedFile)组成,由集合对象List表示升序排列,前面讲到文件名便是消息在此文件的中初始偏移量,排好序后组成了一个连续的消息队
当消息到达broker时,须要获取最新的MapedFile写入数据,调用MapedFileQueue的getLastMapedFile获取,此函数若是集合中一个也没有建立一个,若是最后一个写满了也建立一个新的。 MapedFileQueue在获取getLastMapedFile时,若是须要建立新的MapedFile会计算出下一个MapedFile文件地址,经过预分配服务AllocateMapedFileService异步预建立下一个MapedFile文件,这样下次建立新文件请求就不要等待,由于建立文件特别是一个1G的文件仍是有点耗时的, getMinOffset获取队列消息最少偏移量,即第一个文件的文件起始偏移量 getMaxOffset获取队列目前写到位置偏移量 getCommitWhere刷盘刷到哪里了
2.1.5 消息存储及消费过程
1)消息发送流程:
- Broker启动时,向NameServer注册信息
- 客户端调用producer发送消息时,会先从NameServer获取该topic的路由信息。消息头code为GET_ROUTEINFO_BY_TOPIC
- 从NameServer返回的路由信息,包括topic包含的队列列表和broker列表
- Producer端根据查询策略,选出其中一个队列,用于后续存储消息
- 每条消息会生成一个惟一id,添加到消息的属性中。属性的key为UNIQ_KEY
- 对消息作一些特殊处理,好比:超过4M会对消息进行压缩
- producer向Broker发送rpc请求,将消息保存到broker端。消息头的code为SEND_MESSAGE或SEND_MESSAGE_V2(配置文件设置了特殊标志)
消息存储流程
- Broker端收到消息后,将消息原始信息保存在CommitLog文件对应的MappedFile中,而后异步刷新到磁盘
- ReputMessageServie线程异步的将CommitLog中MappedFile中的消息保存到ConsumerQueue和IndexFile中
- ConsumerQueue和IndexFile只是原始文件的索引信息
1)消息消费过程:
如今咱们再来看 Broker 服务器端。首先咱们应该知道,消息往 Broker 存储就是在向 CommitLog 消息文件中写入数据的一个过程。
在 Broker 启动过程当中,其会启动一个叫作 ReputMessageService
的服务,这个服务每隔 1 秒会检查一下这个 CommitLog 是否有新的数据写入。
ReputMessageService
自身维护了一个偏移量 reputFromOffset
,用以对比和 CommitLog 文件中的消息总偏移量的差距。
当这两个偏移量不一样的时候,就表明有新的消息到来了,在有新的消息到来以后,doReput()
函数会取出新到来的全部消息,每一条消息都会封装为一个 DispatchRequest
请求,
进而将这条请求分发给不一样的请求消费者,咱们在这篇文章中只会关注利用消息建立消费队列的服务 CommitLogDispatcherBuildConsumeQueue,
CommitLogDispatcherBuildConsumeQueue
服务会根据这条请求按照不一样的队列 ID 建立不一样的消费队列文件,并在内存中维护一份消费队列列表。
而后将 DispatchRequest
请求中这条消息的消息偏移量、消息大小以及消息在发送时候附带的标签的 Hash 值写入到相应的消费队列文件中去。
3)客户端如何记录本身所消费的队列消费到哪里了呢?
答案就是:消费队列偏移量。
集群模式:因为每一个客户端所消费的消息队列不一样,因此每一个消息队列已经消费到哪里的消费偏移量是记录在 Broker 服务器端的。
广播模式:因为每一个客户端分配消费这个话题的全部消息队列,因此每一个消息队列已经消费到哪里的消费偏移量是记录在客户端本地的。
(1) 集群模式
在集群模式下,消费者客户端在内存中维护了一个
offsetTable
表,一样在 Broker 服务器端也维护了一个偏移量表,在消费者客户端,RebalanceService
服务会定时地 (默认 20 秒) 从 Broker 服务器获取当前客户端所须要消费的消息队列,并与当前消费者客户端的消费队列进行对比,看是否有变化。对于每一个消费队列,会从 Broker 服务器查询这个队列当前的消费偏移量。而后根据这几个消费队列,建立对应的拉取请求PullRequest
准备从 Broker 服务器拉取消息,当从 Broker 服务器拉取下来消息之后,只有当用户成功消费的时候,才会更新本地的偏移量表。本地的偏移量表再经过定时服务每隔 5 秒同步到 Broker 服务器端,而维护在 Broker 服务器端的偏移量表也会每隔 5 秒钟序列化到磁盘中(文件地址:${user.home} /store/config/consume/consumerOffset.json)保存的格式以下所示:
(2) 广播模式
对于广播模式而言,每一个消费队列的偏移量确定不能存储在 Broker 服务器端,由于多个消费者对于同一个队列的消费可能不一致,偏移量会互相覆盖掉。所以,在广播模式下,每一个客户端的消费偏移量是存储在本地的,而后每隔 5 秒将内存中的
offsetTable
持久化到磁盘中。当首次从服务器获取可消费队列的时候,偏移量不像集群模式下是从 Broker 服务器读取的,而是直接从本地文件中读取这里提一下,在广播模式下,消息队列的偏移量默认放在用户目录下的
.rocketmq_offsets
目录下存储格式以下:
3. load、recover
Broker启动的时候须要加载一系列的配置,启动一系列的任务,主要分布在BrokerController 的initialize()和start()方法中
1.加载topic配置 2.加载消费进度consumer offset 3.加载消费者订阅关系consumer subscription 4.加载本地消息messageStore.load() Load 定时进度,Load commit log,commitLog其实调用存储消费队列mapedFileQueue.load()方法来加载的。 遍历出${user.home} \store\${commitlog}目录下全部commitLog文件,按文件名(文件名就是文件的初始偏移量)升序排一下, 每一个文件构建一个MapedFile对象, 在MapedFileQueue中用集合list把这些MapedFile文件组成一个逻辑上连续的队列 Load consume Queue 遍历${user.home} \store\consumequeue下的全部文件夹(每一个topic就是一个文件夹) 遍历${user.home} \store\consumequeue\${topic}下的全部文件夹(每一个queueId就是一个文件夹) 遍历${user.home} \store\consumequeue\${topic}\${queueId}下全部文件,根据topic, queueId, 文件来构建ConsueQueue对象 DefaultMessageStore中存储结构Map<topic,Map<queueId, CosnueQueue>> 每一个Consumequeue利用MapedFileQueue把mapedFile组成一个逻辑上连续的队列 加载事物模块 加载存储检查点 加载${user.home} \store\checkpoint 这个文件存储了3个long类型的值来记录存储模型最终一致的时间点,这个3个long的值为 physicMsgTimestamp为commitLog最后刷盘的时间 logicMsgTimestamp为consumeQueue最终刷盘的时间 indexMsgTimestamp为索引最终刷盘时间 checkpoint做用是当异常恢复时须要根据checkpoint点来恢复消息 加载索引服务indexService recover尝试数据恢复 判断是不是正常恢复,系统启动的启动存储服务(DefaultMessageStore)的时候会建立一个临时文件abort, 当系统正常关闭的时候会把这个文件删掉,这个相似在Linux下打开vi编辑器生成那个临时文件,全部当这个abort文件存在,系统认为是异常恢复
1) 先按照正常流程恢复ConsumeQueue 什么是恢复ConsumeQueue, 前面不是有步骤load了ConsumeQueue吗,为何还要恢复? 前面load步骤建立了MapedFile对象创建了文件的内存映射,可是数据是否正确,如今文件写到哪了(wrotePosition),
Flush到了什么位置(committedPosition)?恢复数据来帮我解决这些问题。 每一个ConsumeQueue的mapedFiles集合中,从倒数第三个文件开始恢复(为何只恢复倒数三个文件,看消息消费程度),
由于consumequeue的存储单元是20字节的定长数据,因此是依次分别取了 Offset long类型存储了commitLog的数据偏移量 Size int类型存储了在commitLog上消息大小 tagcode tag的哈希值 目前rocketmq判断存储的consumequeue数据是否有效的方式为判断offset>= 0 && size > 0 若是数据有效读取下20个字节判断是否有效 若是数据无效跳出循环,记录此时有效数据的偏移量processOffset 若是读到文件尾,读取下一个文件 proccessOffset是有效数据的偏移量,获取这个值的做用什么? (1)proccessOffset后面的数据属于脏数据,后面的文件要删除掉 (2)设置proccessOffset所在文件MapedFile的wrotePosition和commitedPosition值,值为 proccessOffset%mapedFileSize 2正常恢复commitLog文件 步骤跟流程恢复Consume Queue 判断消息有效, 根据消息的存储格式读取消息到DispatchRequest对象,获取消息大小值msgSize 大于 0 正常数据 等于-1 文件读取错误 恢复结束 等于0 读到文件末尾 3) 异常数据恢复,OSCRASH或者JVM CRASH或者机器掉电 当${user.home}\store\abort文件存在,表明异常恢复 读取${user.home} \store\checkpoint获取最终一致的时间点 判断最终一致的点所在的文件是哪一个 从最新的mapedFile开始,获取存储的一条消息在broker的生成时间,大于checkpoint时间点的放弃找前一个文件,小于等于checkpoint时间点的说明checkpoint
在此mapedfile文件中 从checkpoint所在mapedFile开始恢复数据,它的总体过程跟正常恢复commitlog相似,最重要的区别在于 (1)读取消息后派送到分发消息服务DispatchMessageService中,来重建ConsumeQueue以及索引 (2)根据恢复的物理offset,清除ConsumeQueue多余的数据 4)恢复TopicQueueTable=Map<topic-queueid,offset> (1)恢复写入消息时,消费记录队列的offset (2)恢复每一个队列的最小offset 初始化通讯层 初始化线程池 注册broker端处理器用来接收client请求后选择处理器处理 启动天天凌晨00:00:00统计消费量任务 启动定时刷消费进度任务 启动扫描数据被删除了的topic,offset记录也对应删除任务 若是namesrv地址不是指定的,而是从静态服务器取的,启动定时向静态服务器获取namesrv地址的任务 若是broker是master,启动任务打印slave落后master没有同步的bytes 若是broker是slave,启动任务定时到mastser同步配置信息
3. master slave
在broker启动的时候BrokerController若是是slave,配置了master地址更新,没有配置全部broker会想namesrv注册,从namesrv获取haServerAddr,而后更新到HAClient
当HAClient的MasterAddress不为空的时候(由于broker master和slave都构建了HAClient)会主动链接master获取SocketChannel Master监听Slave请求的端口,默认为服务端口+1
接收slave上传的offset long类型 int pos = this.byteBufferRead.position() -(this.byteBufferRead.position() % 8)
//没有理解意图
long readOffset =this.byteBufferRead.getLong(pos - 8); this.processPostion = pos;
主从复制从哪里开始复制:若是请求时0 ,从最后一个文件开始复制
Slave启动的时候brokerController开启定时任务定时拷贝master的配置信息
SlaveSynchronize类表明slave从master同步信息(非消息) syncTopicConfig 同步topic的配置信息 syncConsumerOffset 同步消费进度 syncDelayOffset 同步定时进度 syncSubcriptionGroupConfig 同步订阅组配7F6E
HaService类实现了HA服务,负责同步双写,异步复制功能, 这个类master和slave的broker都会实例化,
Master经过AcceptSocketService监听slave的链接,每一个masterslave链接都会构建一个HAConnection对象搭建他们之间的桥梁,对于一个master多slave部署结构的会有多个HAConnection实例,
Master构建HAConnection时会构建向slave写入数据服务线程对象WriteSocketService对象和读取Slave反馈服务线程对象ReadSocketService
WriteSocketService
向slave同步commitLog数据线程, slaveRequestOffset是每次slave同步完数据都会向master发送一个ack表示下次同步的数据的offset。 若是slave是第一次启动的话slaveRequestOffset=0, master会从最近那个commitLog文件开始同步。(若是要把master上的全部commitLog文件同步到slave的话, 把masterOffset值赋为minOffset)
向socket写入同步数据: 传输数据协议<Phy Offset> <Body Size> <Body Data>
ReadSocketService:
4 ReadSocketService
读取slave经过HAClient向master返回同步commitLog的物理偏移量phyOffset值 通知前端线程,若是是同步复制的话通知是否复制成功 Slave 经过HAClient创建与master的链接, 来定时汇报slave最大物理offset,默认5秒汇报一次也表明了跟master之间的心跳检测 读取master向slave写入commitlog的数据, master向slave写入数据的格式是
Slave初始化DefaultMessageStore时候会构建ReputMessageService服务线程并在启动存储服务的start方法中被启动 ReputMessageService的做用是slave从物理队列(由commitlog文件构成的MapedFileQueue)加载数据,并分发到各个逻辑队列 HA同步复制, 当msg写入master的commitlog文件后,判断maser的角色若是是同步双写SYNC_MASTER, 等待master同步到slave在返回结果
5 HA异步复制

6.索引服务
1索引结构
IndexFile 存储具体消息索引的文件,文件的内容结构如图:
索引文件由索引文件头IndexHeader, 槽位Slot和消息的索引内容三部分构成
IndexHeader:索引文件头信息40个字节的数据组成
beginTimestamp 8位long类型,索引文件构建第一个索引的消息落在broker的时间 endTimestamp 8位long类型,索引文件构建最后一个索引消息落broker时间 beginPhyOffset 8位long类型,索引文件构建第一个索引的消息commitLog偏移量 endPhyOffset 8位long类型,索引文件构建最后一个索引消息commitLog偏移量 hashSlotCount 4位int类型,构建索引占用的槽位数(这个值貌似没有具体做用) indexCount 4位int类型,索引文件中构建的索引个数
槽位slot, 默认每一个文件配置的slot个数为500万个,每一个slot是4位的int类型数据
计算消息的对应的slotPos=Math.abs(keyHash)%hashSlotNum
消息在IndexFile中的偏移量absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos *HASH_SLOT_SIZE
Slot存储的值为消息个数索引
消息的索引内容是20位定长内容的数据
4位int值, 存储的是key的hash值 8位long值 存储的是消息在commitlog的物理偏移量phyOffset 4位int值 存储了当前消息跟索引文件中第一个消息在broker落地的时间差 4位int值 若是存在hash冲突,存储的是上一个消息的索引地址
7. 索引服务IndexService线程
1. 索引配置:hashSlotNum哈希槽位个数、indexNum存储索引的最大个数、storePath索引文件indexFile存储的路径 2. Load broker启动的时候加载本地IndexFile, 若是是异常启动删除以后storeCheckPoint文件,由于commitLog根据storeCheckPoint会重建以后的索引文件, 3. Run方法,任务从阻塞队列中获取请求构建索引 4. queryOffset 根据topic key 时间跨度来查询消息 倒叙遍历全部索引文件 每个indexfile存储了第一个消息和最后一个消息的存储时间,根据传入时间范围来判断索引是否落在此索引文件
8. 构建索引服务
分发消息索引服务将消息位置分发到ConsumeQueue中后,加入IndexService的LinkedBlockingQueue队列中,IndexService经过任务向队列中获取请求来构建索引 剔除commitType或者rollbackType消息,由于这两种消息都有对应的preparedType的消息 构建索引key(topic + "#" + key) 根据key的hashcode计算槽位,即跟槽位最大值取余数 计算槽位在indexfile的具体偏移量位置 根据槽位偏移量获取存储的上一个索引 计算消息跟文件头存储开始时间的时间差 根据消息头记录的存储消息个数计算消息索引存储的集体偏移量位置 写入真正的索引,内容参考上面索引内容格式 将槽位中的更新为此消息索引 更新索引头文件信息
9. Broker与client(comsumer ,producer)之间的心跳,
一:Broker接收client心跳ClientManageProcessor处理client的心跳请求 1. 构建ClientChannelInfo对象 1) 持有channel对象,表示与客户端的链接通道 2) ClientID表示客户端 ….. 2. 每次心跳会更新ClientChannelInfo的时间戳,来表示client还活着 3. 注册或者更新consumer的订阅关系(是以group为单位来组织的, group下可能有多个订阅关系) 4. 注册producer,其实就是发送producer的group(这个在事物消息中才有点做用) 二:ClientHouseKeepingService线程定时清除不活动的链接 1) ProducerManager.scanNotActiveChannel 默认两分钟producer没有发送心跳清除 2) ConsumerManager.scanNotActiveChannel 默认两份中Consumer没有发送心跳清除
10. Broker与namesrv之间的心跳
1) namesrv接收borker心跳DefaultRequestProcessor的REGISTER_BROKE事件处理, (1) 注册broker的topic信息 (2) 构建或者更新BrokerLiveInfo的时间戳 NamesrvController初始化时启动线程定时调用RouteInfoManger的scanNotActiveBroker方法来定时不活动的broker(默认两分钟没有向namesrv发送心跳更新时间戳的)