InfluxDB是一个由InfluxData开发的开源时序数据库,专一于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。数据库
InfluxDB能够说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。缓存
接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。服务器
上一章介绍了数据从客户端写入到服务器端的内存中的整个过程。详情见: http://www.javashuo.com/article/p-dxwpkohf-vk.html微信
这一章记录一下数据库中数据管理单元Chunk的生命周期。异步
在开篇,先介绍一下一个Chunk
拥有的生命周期:async
//这里须要注意,这些变体里的Chunk结构都是不相同的 //也就是有内存数据拷贝的工做 pub enum ChunkState { //内部移动数据时候用的 Invalid, //能够写入 Open(MBChunk), //还能继续写入,但很快会被关闭 Closing(MBChunk), //已经不能写入了,准备移动到readbuffer Moving(Arc<MBChunk>), //已经被移动到了read buffer Moved(Arc<ReadBufferChunk>), //准备写入持久化存储 WritingToObjectStore(Arc<ReadBufferChunk>), //写入持久化存储完成 WrittenToObjectStore(Arc<ReadBufferChunk>, Arc<ParquetChunk>), }
在第五章中有提到,在Create Database以后,会启动一个后台线程。性能
该后台线程完成了部分对Chunk
的管理功能,经过理解这个后台线程,可以基本理解Chunk
的全部生命周期。fetch
//后台线程的方法入口,在建立完成数据库后,就会调用到这个方法 pub async fn background_worker( self: &Arc<Self>, shutdown: tokio_util::sync::CancellationToken, ) { //建立一个定时器,周期性的执行 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); let mut lifecycle_manager = LifecycleManager::new(Arc::clone(&self)); //没有收到中止服务器时候的信号就一直执行,1秒一次 while !shutdown.is_cancelled() { //记录执行的次数,每次加1,Ordering::Relaxed表明的单线程里的原子操做 self.worker_iterations.fetch_add(1, Ordering::Relaxed); //进入生命周期的管理 lifecycle_manager.check_for_work(); //收到不一样信号以后的处理方法 tokio::select! { _ = interval.tick() => {}, _ = shutdown.cancelled() => break } } info!("finished background worker"); }
前方高能,请注意:ui
fn check_for_work(&mut self, now: DateTime<Utc>) { //获取建立数据库的时候,对于Chunk的相关配置 let rules = self.rules(); //根据配置的排序规则,获取出内存里全部的chunk let chunks = self.chunks(&rules.sort_order); let mut buffer_size = 0; //判断是否是有其余的任务正在执行,move我理解针对于read buffer,write对于持久化 let mut move_active = self.is_move_active(); let mut write_active = self.is_write_active(); //遍历全部块,检查哪些块能够被持久化 for chunk in &chunks { //获取当前chunk的锁 let chunk_guard = chunk.upgradable_read(); //获取chunk占用的内存大小 buffer_size += Self::chunk_size(&*chunk_guard); //没有移动任务而且Chunk里最后的写入时间比较老 let would_move = !move_active && can_move(&rules, &*chunk_guard, now); //没有写出任务,而且开启了持久化 let would_write = !write_active && rules.persist; //判断chunk的生命周期 match chunk_guard.state() { //属于open状态,而且是须要移动的(上面的逻辑里有展现什么是须要移动的) //这里我理解就是至关于实时写入时候的一个补充方案 //试想,若是一个chunk一直不写入数据,可能有一年了,查询都再也不用这些数据了,内存却被一直占用 ChunkState::Open(_) if would_move => { let mut chunk_guard = RwLockUpgradableReadGuard::upgrade(chunk_guard); //切换状态到closing chunk_guard.set_closing().expect("cannot close open chunk"); let partition_key = chunk_guard.key().to_string(); let chunk_id = chunk_guard.id(); std::mem::drop(chunk_guard); move_active = true; //移动到read_buffer,变为不可写入状态(启动了一个异步的线程,后面看) self.move_to_read_buffer(partition_key, chunk_id); } //这里有几种状况,一样会在别处触发为closing //例如:chunk大小超过了设置的可变内存大小的时候 ChunkState::Closing(_) if would_move => { let partition_key = chunk_guard.key().to_string(); let chunk_id = chunk_guard.id(); std::mem::drop(chunk_guard); move_active = true; //移动到read_buffer self.move_to_read_buffer(partition_key, chunk_id); } //已经被挪动到readbuffer中的 ChunkState::Moved(_) if would_write => { let partition_key = chunk_guard.key().to_string(); let chunk_id = chunk_guard.id(); std::mem::drop(chunk_guard); write_active = true; //写入到对象存储 self.write_to_object_store(partition_key, chunk_id); } _ => {} } } //这里是主要检查内存限制的逻辑,当全部chunk的大小超过限制的时候就要清理Chunk if let Some(soft_limit) = rules.buffer_size_soft { let mut chunks = chunks.iter(); while buffer_size > soft_limit.get() { match chunks.next() { Some(chunk) => { //获取读锁 let chunk_guard = chunk.read(); //若是配置了能够清理未持久化数据,那么处在read_buffer里的数据也会被清理 //必定会清理已经被持久化到对象存储上的数据 if (rules.drop_non_persisted && matches!(chunk_guard.state(), ChunkState::Moved(_))) || matches!(chunk_guard.state(), ChunkState::WrittenToObjectStore(_, _)) { let partition_key = chunk_guard.key().to_string(); let chunk_id = chunk_guard.id(); buffer_size = buffer_size.saturating_sub(Self::chunk_size(&*chunk_guard)); std::mem::drop(chunk_guard); //真真正正的删除逻辑后面看 self.drop_chunk(partition_key, chunk_id) } } //没有什么能够释放的了 None => { warn!(db_name=self.db_name(), soft_limit, buffer_size, "soft limited exceeded, but no chunks found that can be evicted. Check lifecycle rules"); break; } } } } }
这里基本看清楚了Chunk
的周期:.net
- 在写入时候,若是没有
Chunk
就会open
一个,并处在open
状态。 - 若是写入超过了一些限制,就会被标记为
closing
;若是数据时间超过了配置的时间,也会被标记为closing
。标记为closing
的会添加一个后台进程,准备将Chunk
移动到read_buffer
中。 - 后台任务启动后,会标记为
moving
状态,此时禁止Chunk
再写入任何数据。 - 一旦移动完成,会被标记为
moved
。 - 程序会对
moved
状态下的Chunk
开始进行持久化。 - 扫描任务会不断判断内存使用是否超过了限制,若是超过限制,会清理已经持久化的
Chunk
。若是配置了drop_non_persisted
,会把read_buffer
中未持久化的也删除掉。
而后继续看程序是怎样将一个chunk
移动到read_buffer
的,由于篇幅的影响,将会在下一篇介绍数据是怎样真正写入到持久化存储当中的。
pub async fn load_chunk_to_read_buffer( &self, partition_key: &str, chunk_id: u32, ) -> Result<Arc<DbChunk>> { //根据partition_key及chunk_id获取内存中存储的Chunk let chunk = { let partition = self .catalog .valid_partition(partition_key) .context(LoadingChunk { partition_key, chunk_id, })?; let partition = partition.read(); partition.chunk(chunk_id).context(LoadingChunk { partition_key, chunk_id, })? }; //设置当前的Chunk为Moving状态 let mb_chunk = { let mut chunk = chunk.write(); chunk.set_moving().context(LoadingChunk { partition_key, chunk_id, })? }; info!(%partition_key, %chunk_id, "chunk marked MOVING, loading tables into read buffer"); let mut batches = Vec::new(); //这里是拿到Chunk中每一个Cloumn的统计信息,分别是min,max,count let table_stats = mb_chunk.table_summaries(); //重新建立一个ReadBufferChunk,后面准备把全部数据都拷贝到这里 //还须要告诉内存管理这里新申请了多少空间 let rb_chunk = ReadBufferChunk::new_with_memory_tracker(chunk_id, &self.memory_registries.read_buffer); for stats in table_stats { //把内存中的数据,所有从新拷贝一次,转换为arrow格式 mb_chunk .table_to_arrow(&mut batches, &stats.name, Selection::All) //这里应该是尚未写完,若是出现错误,这个Chunk该怎么处理? .expect("Loading chunk to mutable buffer"); //循环拷贝 for batch in batches.drain(..) { rb_chunk.upsert_table(&stats.name, batch) } } let mut chunk = chunk.write(); //更新写入缓存里的Chunk为Moved状态,同时Chunk内容修改成了ReadBuffer的Chunk //对于Chunk的结构后面看 chunk.set_moved(Arc::new(rb_chunk)).context(LoadingChunk { partition_key, chunk_id, })?; //工做所有都完成了,调用作快照的方法,方法里什么都没作,返回新Chunk的一个Arc指针 Ok(DbChunk::snapshot(&chunk)) }
到这里基本清楚了整个Chunk
的工做方式,由于Chunk
这个名字被代码中重复使用到了,因此特地在文章末尾说一下都有什么Chunk
。
//主要是存储一个数据块的描述信息,名字、最后写入时间等 Server::db::catalog::chunk //数据从客户端直接写入的内存块 mutable_buffer::chunk //在moving时候拷贝的新数据块,arrow结构 read_buffer::chunk //parquet对应的chunk parquet_file::chunk //query模块下对PartitionChunk从新命名了一下 //对于相同的partition key的数据抽象的行为 query -> type Chunk: PartitionChunk; //实现PartitionChunk定义的方法,对不一样位置下的chunk的操做 //如ParquetFile、MutableBuffer等 server::db::chunk
好了就到这里,但愿你也学到了不少
祝玩儿的开心
欢迎关注微信公众号:
或添加微信好友: liutaohua001