InfluxDB是一个由InfluxData开发的开源时序数据库,专一于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。sql
InfluxDB能够说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。数据库
接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。json
上一篇粗略的总结了写入的基本流程,详情见:服务器
https://my.oschina.net/u/3374539/blog/5033469微信
这一篇记录一下查询的主要流程。数据结构
在第六章中,写了一个查询示例,以下:app
let mut query = flight::Client::new(connection) .perform_query("databaseName", "select * from myMeasurement") .await .expect("query request should work");
其中connection,表明的创建了一个Grpc的链接。perform_query表明执行查询,其中第一个参数是数据库名字,第二个参数是要执行查询的sql语句。这个perform_query是封装了一下调用协议,而后调用了服务器端的do_get方法,do_get方法在服务器的src/influxdb_ioxd/rpc/flight.rs:139行
能够找到,以下:异步
async fn do_get( &self, //这个Ticket里就是保存的perform_query方法中封装的json数据 request: Request<Ticket>, ) -> Result<Response<Self::DoGetStream>, tonic::Status> { //这里就是把json还原回来 let ticket = request.into_inner(); let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicket { ticket: ticket.ticket, })?; //反序列化成了ReadInfo结构 let read_info: ReadInfo = serde_json::from_str(&json_str).context(InvalidQuery { query: &json_str })?; //拿到客户端设置的数据库名字 let database = DatabaseName::new(&read_info.database_name).context(InvalidDatabaseName)?; //从内存中查找是否存在这个database名字,若是不存在就会报DatabaseNotFound错误回去 //这里就是建立数据库的时候写入到内存里的 //同时还应该记得iox的数据库必须一个节点建立一次。。hhhhha let db = self.server.db(&database).context(DatabaseNotFound { database_name: &read_info.database_name, })?; //这个是拿到以前建立数据库时候设置的线程池,能够回去参考第五章 let executor = db.executor(); //这里是建立出sql语句对应的physical_plan,后面再看 let physical_plan = Planner::new(Arc::clone(&executor)) .sql(db, &read_info.sql_query) .await .context(Planning)?; //使用线程异步的执行查询 let results = executor //复制一下执行时候须要用到的信息 .new_context() //真正的去执行 .collect(Arc::clone(&physical_plan)) .await .map_err(|e| Box::new(e) as _) .context(Query { database_name: &read_info.database_name, })?; //在写入的章节里应该知道了在RBChunk里面存储的是Arrow格式的。 //在这个方法中就是调用arrow_flight工具包的方法,先把schema序列化到flight_buffer中 let options = arrow::ipc::writer::IpcWriteOptions::default(); let schema = physical_plan.schema(); let schema_flight_data = arrow_flight::utils::flight_data_from_arrow_schema(schema.as_ref(), &options); let mut flights: Vec<Result<FlightData, tonic::Status>> = vec![Ok(schema_flight_data)]; //上面获得的结果集,这里进行遍历,封装为要返回的数据结构 let mut batches: Vec<Result<FlightData, tonic::Status>> = results .iter() //这个是为了给下面flight_data_from_arrow_batch这个方法打补丁用的 //由于这个方法即使对于切片类型的batch也是盲目的序列化全部数据 .map(optimize_record_batch) .collect::<Result<Vec<_>, Error>>()? .iter() //这里就是一条一条的把数据序列化到缓冲区里 .flat_map(|batch| { let (flight_dictionaries, flight_batch) = arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options); //把数据包装在Result中 flight_dictionaries .into_iter() .chain(std::iter::once(flight_batch)) .map(Ok) }) .collect(); //前面是schema,后面是数据 flights.append(&mut batches); //返回一个数据的异步stream,有可能调用一次next就会释放一次cpu? let output = futures::stream::iter(flights); //数据以flight形式发送到了客户端,客户端先读取schema再读取数据。 Ok(Response::new(Box::pin(output) as Self::DoGetStream)) }
这里基本上是整个查询的主逻辑:async
- 异步的将sql转换为plan。
- 异步的去执行plan并返回结果和结果所对应的schema信息。
- 将返回的arrow数据封装到flights格式中。
- 经过Grpc返回
这一篇就到这里吧,下几章准备记录一下:工具
- sql是怎么被执行的
- 查询中都经历了什么
- 等等。。。
祝玩儿的开心。
欢迎关注微信公众号:
或添加微信好友: liutaohua001