此次天池中间件性能大赛初赛和复赛的成绩都正好是第五名
,本次整理了复赛《单机百万消息队列的存储设计》的思路方案分享给你们,实现方案上也是决赛队伍中相对比较特别的。linux
单机可支持100万队列以上
。数据发送、索引校检、数据消费
三个阶段评测。对于单机几百的大队列来讲业务已有成熟的方案,Kafka和RocketMQ。算法
方案 | 几百个大队列 |
---|---|
Kafka | 每一个队列一个文件(独立存储) |
RocketMQ | 全部队列共用一个文件(混合存储) |
若直接采用现有的方案,在百万量级的小队列场景都有极大的弊端。数组
方案 | 百万队列场景弊端 |
---|---|
Kafka独立存储 | 单个小队列数据量少,批量化程度彻底取决于内存大小,落盘时间长,写数据容易触发IOPS瓶颈 |
RocketMQ混合存储 | 随机读严重,一个块中连续数据很低,读速度很慢,消费速度彻底受限于IOPS |
为了兼顾读写速度,咱们最终采用了折中的设计方案:多个队列merge,共享一个块存储。
缓存
设计上要支持边写边读
多个队列须要合并处理
单个队列的数据存储部分连续
索引稀疏,尽量常驻内存
架构图中Bucket Manager和Group Manager分别对百万队列进行分桶以及合并管理,而后左右两边是分别是写模块和读模块,数据写入包括队列merge处理,消息块落盘。读模块包括索引管理和读缓存。(见左图)bash
bucket、group、queue的关系:对消息队列进行bucket处理,每一个bucket包含多个group,group是咱们进行队列merge的最小单元,每一个group管理固定数量的队列。(见右图)数据结构
接下来对整个存储每一个阶段的细节进行展开分析,包括队列合并、索引管理和数据落盘。架构
当Group达到M个后便造成一个固定分组。相同队列会在Group内进行合并,新的队列数据将继续分配Group接收。
当Block达到16k(可配置)时以队列为单位进行数据排序,保证单个队列数据连续。
L2二级索引与数据存储的位置息息相关,见下图。为每一个排序后的Block块创建一个L2索引,L2索引的结构分为文件偏移(file offset),数据压缩大小(size),原始大小(raw size),由于咱们是多个队列merge,而后接下来是每一个队列相对于起始位置的delta offset以及消息数量。
并发
为了加快查询速度,在L2基础上创建L1一级索引,每16个L2创建一个L1,L1按照时间前后顺序存放。
L1和L2的组织关系以下:异步
L1索引的结构很是简单,file id对应消息存储的文件id,以及16个Block块中每一个队列消息的起始序列号seq num。例如MQ1从序列号1000开始,MQ2从序列号2000开始等等。高并发
如何根据索引定位须要查找的数据?
对L1先进行二分查找,定位到上下界范围,而后对范围内的全部L2进行顺序遍历。
当blcok超过指定大小后,根据桶的hashcode再进行一次mask操做将group中的队列数据同步写入到m个文件中。
同步刷盘主要尝试了两种方案:Nio和Dio。Dio大约性能比Nio提高约5%
。CPP使用DIO是很是方便的,然而做为Java Coder你也许是第一次据说DIO,在Java中并无提供直接使用DIO的接口,能够经过JNA的方式调用。
DIO(DIRECT IO,直接IO),出于对系统cache和调度策略的不满,用户本身在应用层定制本身的文件读写。DIO最大的优势就是可以减小OS内核缓冲区和应用程序地址空间的数据拷贝次数,下降文件读写时的CPU开销以及内存的占用。然而DIO的缺陷也很明显,DIO在数据读取时会形成磁盘大量的IO,它并无缓冲IO从PageCache获取数据的优点。
这里就遇到一个问题,一样配置的阿里云机器测试随机数据同步写入性能是很是高的,可是线上的评测数据都是58字节,数据过于规整致使同一时间落盘的几率很大,出现了大量的锁竞争。因此这里作了一个小的改进:按几率随机4K、8K、16K进行落盘,写性能虽有必定提高,可是效果也是不太理想,因而采用了第二种思路异步刷盘。
采用RingBuffer接收block块,使用AIO对多个block块进行Batch刷盘,减小IO Copy的次数。异步刷盘写性能有了显著的提高。
如下是异步Flush的核心代码:
while (gWriterThread) {
if (taskQueue->pop(task)) {
writer->mWriting.store(true);
do {
// 使用异步IO
aiocb *pAiocb = aiocb_list[aio_size++];
memset(pAiocb, 0, sizeof(aiocb));
pAiocb->aio_lio_opcode = LIO_WRITE;
pAiocb->aio_buf = task.mWriteCache.mCache;
pAiocb->aio_nbytes = task.mWriteCache.mSize;
pAiocb->aio_offset = task.mWriteCache.mStartOffset;
pAiocb->aio_fildes = task.mBlockFile->mFd;
pAiocb->aio_sigevent.sigev_value.sival_ptr = task.mBlockFile;
task.mBlockFile->mWriting = true;
if (aio_size >= MAX_AIO_TASK_COUNT) {
break;
}
} while (taskQueue->pop(task));
if (aio_size > 0) {
if (0 != lio_listio(LIO_WAIT, aiocb_list, aio_size, NULL)) {
aos_fatal_log("aio error %d %s.", errno, strerror(errno));
}
for (int i = 0; i < aio_size; ++i) {
((BlockFile *) aiocb_list[i]->aio_sigevent.sigev_value.sival_ptr)->mWriting = false;
free((void *) aiocb_list[i]->aio_buf);
}
aio_size = 0;
}
} else {
++waitCount;
sched_yield();
if (waitCount > 100000) {
usleep(10000);
}
}
}
复制代码
整个流程主要有两个优化点:预读取和读缓存。
主要有两个做用:
顺序消费且已经消费到当前block尾,则进行预读取操做。如何判断顺序消费?判断上次消费的结束位置是否与此次消费的起始位置相等。
if (msgCount >= destCount) {
if (mLastGetSequeneNum == offsetCount &&
beginIndex + 1 < mL2IndexCount &&
beginOffsetCount + blockIndex.mMsgDeltaIndexCount <= offsetCount + msgCount + msgCount) {
MessageBlockIndex &nextIndex = mL2IndexArray[beginIndex + 1];
// 预读取
#ifdef __linux__
readahead(pManager->GetFd(hash), nextIndex.mFileOffset, PER_BLOCK_SIZE);
#endif
}
mLastGetSequeneNum = offsetCount + msgCount;
return msgCount;
}
复制代码
关于read cache作了一些精巧的小设计,保证足够简单高效。
分桶
(部分隔离),必定程度缓解缓存饿死现象。数组 + 自旋锁 + 原子变量
实现了一个循环分配缓存块的方案。双向指针绑定
高效定位缓存节点。Read Cache一共分为N=64(可配)个Bucket,每一个Bucket中包含M=3200(可配)个缓存块,大概总计20w左右的缓存块,每一个是4k,大约占用800M的内存空间。
关于缓存的核心数据结构,咱们并无从队列的角度出发,而是针对L2索引和缓存块进行了绑定,这里设计了一个双向指针。判断缓存是否有效的核心思路:check双向指针是否相等。
CacheItem cachedItem = (CacheItem *) index->mCache;
cachedItem->mIndexPtr == (void *) index;
复制代码
3.1 Bucket分桶
3.2 Alloc Cache Block
count.fetch_add(1) % M = index
3.3 Cache Hit
3.4 Cache Page Replace
count.fetch_add(1) % M = M-1
,找到新的缓存块进行从新绑定。说明:整个分配的逻辑是一个循环使用的过程,当全部的缓存桶都被使用,那么会从数组首地址开始从新分配、替换。
开始咱们尝试了两种读缓存方案:最简单的LRU缓存和直接使用PageCache读取。PageCache所实现的实际上是高级版的LRU缓存。在顺序读的场景下,咱们本身实现的读缓存(Cycle Cache Allocate,暂简称为CCA)与LRU、PageCache的优劣分析对好比下:
多队列Merge,保证队列局部连续的存储方式。
资源利用率低
,约300~400MB索引便可支撑百万队列、100GB数据量高并发读写。转载请注明出处,欢迎关注个人公众号:亚普的技术轮子