InfluxDB是一个由InfluxData开发的开源时序数据库,专一于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。数据库
InfluxDB能够说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。编程
接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。微信
上一章介绍了Chunk是怎样被管理的,以及各个阶段的操做。详情见: https://my.oschina.net/u/3374539/blog/5029926异步
这一章记录一下Chunk是怎样持久化的。async
ChunkState::Moved(_) if would_write => { let partition_key = chunk_guard.key().to_string(); let table_name = chunk_guard.table_name().to_string(); let chunk_id = chunk_guard.id(); std::mem::drop(chunk_guard); write_active = true; //处于Moved状态下的Chunk会调用write_to_object_store方法进行持久化 self.write_to_object_store(partition_key, table_name, chunk_id); } //write_to_object_store实际调用到write_chunk_to_object_store_in_background方法来进行持久化 pub fn write_chunk_to_object_store_in_background( self: &Arc<Self>, partition_key: String, table_name: String, chunk_id: u32, ) -> TaskTracker<Job> { //获取数据库名称 let name = self.rules.read().name.clone(); //新建一个后台任务的管理器,用来记录db中都在执行哪些任务及状态, let (tracker, registration) = self.jobs.register(Job::WriteChunk { db_name: name.to_string(), partition_key: partition_key.clone(), table_name: table_name.clone(), chunk_id, }); let captured = Arc::clone(&self); //异步写入 let task = async move { let result = captured //真正的写入方法 .write_chunk_to_object_store(&partition_key, &table_name, chunk_id) .await; if let Err(e) = result { info!(?e, %name, %partition_key, %chunk_id, "background task error loading object store chunk"); return Err(e); } Ok(()) }; tokio::spawn(task.track(registration)); tracker }
后面的方法有点儿长,但愿可以耐心观看。。性能
pub async fn write_chunk_to_object_store( &self, partition_key: &str, table_name: &str, chunk_id: u32, ) -> Result<Arc<DbChunk>> { //从catalog中取回chunk let chunk = { //先找partition let partition = self.catalog .valid_partition(partition_key) .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })?; let partition = partition.read(); //从partition里根据表名和chunk_id拿到chunk partition .chunk(table_name, chunk_id) .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })? }; let rb_chunk = { //先加写锁 let mut chunk = chunk.write(); //修改Chunk的状态为WritingToObjectStore chunk .set_writing_to_object_store() .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })? }; //获取全部Chunk下全部表的Statistics信息 let table_stats = rb_chunk.table_summaries(); //建立一个parquet Chunk,这个在上一章里有提到各类Chunk类型 let mut parquet_chunk = Chunk::new( partition_key.to_string(), chunk_id, //用来统计parquet占用的内存 self.memory_registries.parquet.as_ref(), ); //建立一个Storage结构,使用的是启动数据库时候指定的存储类型,这个在第3章里有提到 let storage = Storage::new( Arc::clone(&self.store), self.server_id, self.rules.read().name.to_string(), ); //遍历全部表的统计数据 for stats in table_stats { //构建一个空的查询,也就是 select * from table,不加where let predicate = read_buffer::Predicate::default(); //从rb_chunk筛选数据, Selection::All表明全部列,predicate表明没有where条件 //意思就是 `stats` 指向的单个表内的全部数据 let read_results = rb_chunk .read_filter(stats.name.as_str(), predicate, Selection::All) .context(ReadBufferChunkError { table_name, chunk_id, })?; //再拿出来schema信息,由于arrow是分开存的,因此须要拿两次 let arrow_schema: ArrowSchemaRef = rb_chunk .read_filter_table_schema(stats.name.as_str(), Selection::All) .context(ReadBufferChunkSchemaError { table_name, chunk_id, })? .into(); //再拿出来这个表里的最大最小的时间 //这个是从readBuffer::Column::from里完成的最大最小时间统计 //也就是当从mutbuffer转移到readbuffer的时候 let time_range = rb_chunk.table_time_range(stats.name.as_str()).context( ReadBufferChunkTimestampError { table_name, chunk_id, }, )?; //建立一个ReadFilterResultsStream //官方文档里面说的是这是一个转变ReadFilterResults为异步流的适配器 let stream: SendableRecordBatchStream = Box::pin( streams::ReadFilterResultsStream::new(read_results, Arc::clone(&arrow_schema)), ); // 写到持久化存储当中 let path = storage .write_to_object_store( partition_key.to_string(), chunk_id, stats.name.to_string(), stream, ) .await .context(WritingToObjectStore)?; // 这里就是把写入parquet的摘要信息存储在内存中 let schema = Arc::clone(&arrow_schema) .try_into() .context(SchemaConversion)?; let table_time_range = time_range.map(|(start, end)| TimestampRange::new(start, end)); parquet_chunk.add_table(stats, path, schema, table_time_range); } //对`catlog::chunk`加写锁,而后更新这个chunk的状态为WrittenToObjectStore let mut chunk = chunk.write(); let parquet_chunk = Arc::clone(&Arc::new(parquet_chunk)); chunk .set_written_to_object_store(parquet_chunk) .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })?; //包装`catlog::chunk`为`ParquetChunk` Ok(DbChunk::snapshot(&chunk)) }
这里面看起来有点儿绕,不容易理解的就是chunk.set_written_to_object_store
这种方法。ui
由于Rust中enum是存在变种的,因此基于这种特性,虽然都是Chunk,可是存储的内容变化了。spa
pub enum ChunkState { ....省略 //这里就是mutbuffer里的chunk Moving(Arc<MBChunk>), //这里就变成存储的readbuffer的chunk结构 Moved(Arc<ReadBufferChunk>), //这里又开始存储ParquetChunk结构 WrittenToObjectStore(Arc<ReadBufferChunk>, Arc<ParquetChunk>), }
还须要继续查看storage.write_to_object_store
这个逻辑,这里涉及到了从mem
的arrow
结构转为Parquet
结构,就不在文章中展现了,使用的是arrow
的ArrowWriter
直接转换的。.net
//这里直接跳跃到ObjectStore的put方法里,来看怎么组织的写入 async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()> where S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static, { use ObjectStoreIntegration::*; //匹配启动时候配置的存储方式,转到真正的实现去,这里只看文件的 match (&self.0, location) { ...省略 //文件存储 (File(file), path::Path::File(location)) => file .put(location, bytes, length) .await .context(FileObjectStoreError)?, _ => unreachable!(), } Ok(()) } //为File实现了ObjectStoreApi trait,至关于文件存储时候的实际实现 async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()> where S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static, { //读取以前ReadFilterResultsStream里的全部数据到content里 let content = bytes .map_ok(|b| bytes::BytesMut::from(&b[..])) .try_concat() .await .context(UnableToStreamDataIntoMemory)?; //这里就是一个验证长度不然报错DataDoesNotMatchLength。宏编程,不用关注 if let Some(length) = length { ensure!( content.len() == length, DataDoesNotMatchLength { actual: content.len(), expected: length, } ); } //获取文件路径,就是启动时候配置的根路径加上数据路径 let path = self.path(location); //建立这个文件出来 let mut file = match fs::File::create(&path).await { Ok(f) => f, //若是是没有找到父路径,那就重新建立一次 Err(err) if err.kind() == std::io::ErrorKind::NotFound => { let parent = path .parent() .context(UnableToCreateFile { path: &path, err })?; fs::create_dir_all(&parent) .await .context(UnableToCreateDir { path: parent })?; match fs::File::create(&path).await { Ok(f) => f, Err(err) => return UnableToCreateFile { path, err }.fail(), } } //不然就失败了 Err(err) => return UnableToCreateFile { path, err }.fail(), }; //这里就是拷贝全部数据到这个文件中去 tokio::io::copy(&mut &content[..], &mut file) .await .context(UnableToCopyDataToFile)?; //大功告成 Ok(()) }
这个写入的逻辑比较庞大了,可是基本也能捋清楚。线程
- 先写入mutBuffer,写到必定大小会关闭
- 异步线程来监控是否是该关掉mutBuffer
- 生命周期的转换,而后开始写入readBuffer
- 以后开始异步的写入持久化存储
- 检查内存是否是须要清理readbuffer
大概就这些。源代码中还有不少逻辑没有完成,好比WAL。先总体看完流程再回来看遗漏的,留给Influx写更多完整逻辑的时间。
祝玩儿的开心。
欢迎关注微信公众号:
或添加微信好友: liutaohua001