InfluxDB是一个由InfluxData开发的开源时序数据库,专一于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。正则表达式
InfluxDB能够说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。算法
接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。数据库
上一章说到如何建立一个数据库,而且数据库的描述信息是如何保存的。详情见:https://my.oschina.net/u/3374539/blog/5025128服务器
这一章记录一下,数据是如何写入并保存的,具体会分为两篇来写:微信
- 一篇介绍分区是如何完成的
- 一篇介绍具体的写入
说到数据写入,必然是须要可以链接到服务器。IOx
项目为提供了多种方式能够于服务器进行交互,分别是Grpc
和Http
基于这两种通讯方式,又扩展支持了influxdb2_client
以及influxdb_iox_client
。async
基于influxdb_iox_client
我写了一个数据写入及查询的示例来观测接口是如何组织的,代码以下:性能
#[tokio::main] async fn main() { { let connection = Builder::default() .build("http://127.0.0.1:8081") .await .unwrap(); write::Client::new(connection) .write("a", r#"myMeasurement,tag1=value1,tag2=value2 fieldKey="123" 1556813561098000000"#) .await .expect("failed to write data"); } let connection = Builder::default() .build("http://127.0.0.1:8081") .await .unwrap(); let mut query = flight::Client::new(connection) .perform_query("a", "select * from myMeasurement") .await .expect("query request should work"); let mut batches = vec![]; while let Some(data) = query.next().await.expect("valid batches") { batches.push(data); } let format1 = format::QueryOutputFormat::Pretty; println!("{}", format1.format(&batches).unwrap()); } +------------+--------+--------+-------------------------+ | fieldKey | tag1 | tag2 | time | +------------+--------+--------+-------------------------+ | 123 | value1 | value2 | 2019-05-02 16:12:41.098 | | 123 | value1 | value2 | 2019-05-02 16:12:41.098 | | fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 | | fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 | | 123 | value1 | value2 | 2019-05-02 16:12:41.098 | +------------+--------+--------+-------------------------+
由于我多运行了几回,因此能看到数据被重复插入了。ui
这里还须要说一下的是写入的语句格式能够参见:url
[LineProtocol] https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format.net
write::Client
中的write
方法生成了一个WriteRequest
结构,并使用RPC
调用远程的write
方法。打开src/influxdb_ioxd/rpc/write.rs : 22行
能够看到方法的具体实现。
async fn write( &self, request: tonic::Request<WriteRequest>, ) -> Result<tonic::Response<WriteResponse>, tonic::Status> { let request = request.into_inner(); //获得上面在客户端中写入的数据库名字,在上面的例子中传入的"a" let db_name = request.db_name; //这里获得了写入的LineProtocol let lp_data = request.lp_data; let lp_chars = lp_data.len(); //解析LineProtocol的内容 //示例中的lp会被解析为: //measurement: "myMeasurement" //tag_set: [("tag1", "value1"), ("tag2", "value2")] //field_set: [("fieldKey", "123")] //timestamp: 1556813561098000000 let lines = parse_lines(&lp_data) .collect::<Result<Vec<_>, influxdb_line_protocol::Error>>() .map_err(|e| FieldViolation { field: "lp_data".into(), description: format!("Invalid Line Protocol: {}", e), })?; let lp_line_count = lines.len(); debug!(%db_name, %lp_chars, lp_line_count, "Writing lines into database"); //对数据进行保存 self.server .write_lines(&db_name, &lines) .await .map_err(default_server_error_handler)?; //返回成功 let lines_written = lp_line_count as u64; Ok(Response::new(WriteResponse { lines_written })) }
继续看self.server.write_lines
的执行:
pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> { self.require_id()?; //验证一下名字,而后拿到以前建立数据库时候在内存中存储的相关信息 let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?; let db = self .config .db(&db_name) .context(DatabaseNotFound { db_name: &*db_name })?; //这里就开始执行分片相关的策略 let (sharded_entries, shards) = { //读取建立数据库时候配置的分片策略 let rules = db.rules.read(); let shard_config = &rules.shard_config; //根据数据和shard策略,把逐个数据对应的分区找到 //写入到一个List<分区标识,List<数据>>这样的结构中 //具体的结构信息后面看 let sharded_entries = lines_to_sharded_entries(lines, shard_config.as_ref(), &*rules) .context(LineConversion)?; //再把全部分区的配置返回给调用者 let shards = shard_config .as_ref() .map(|cfg| Arc::clone(&cfg.shards)) .unwrap_or_default(); (sharded_entries, shards) }; //根据上面返回的集合进行map方法遍历,写到每一个分区中 futures_util::future::try_join_all( sharded_entries .into_iter() .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)), ) .await?; Ok(()) }
这里描述了写入一条数据的主逻辑:数据写入的时候,先把数据划分到具体的分区里(使用List结构存储下全部的分区对应的数据),而后并行的进行数据写入
接下来看,数据是如何进行分区的:
pub fn lines_to_sharded_entries( lines: &[ParsedLine<'_>], sharder: Option<&impl Sharder>, partitioner: &impl Partitioner, ) -> Result<Vec<ShardedEntry>> { let default_time = Utc::now(); let mut sharded_lines = BTreeMap::new(); //对全部要插入的数据进行遍历 for line in lines { //先找到符合哪一个shard let shard_id = match &sharder { Some(s) => Some(s.shard(line).context(GeneratingShardId)?), None => None, }; //再判断属于哪一个分区 let partition_key = partitioner .partition_key(line, &default_time) .context(GeneratingPartitionKey)?; let table = line.series.measurement.as_str(); //最后存储到一个map中 //shard-> partition -> table -> List<data> 的映射关系 sharded_lines .entry(shard_id) .or_insert_with(BTreeMap::new) .entry(partition_key) .or_insert_with(BTreeMap::new) .entry(table) .or_insert_with(Vec::new) .push(line); } let default_time = Utc::now(); //最后遍历这个map 转换到以前提到的List结构中 let sharded_entries = sharded_lines .into_iter() .map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, &default_time)) .collect::<Result<Vec<_>>>()?; Ok(sharded_entries) }
这里理解shard
的概念就是一个或者一组机器,称为一个shard
,他们负责真正的存储数据。
partition
理解为一个个文件夹,在shard
上具体的存储路径。
这里看一下是怎样完成shard
的划分的:
impl Sharder for ShardConfig { fn shard(&self, line: &ParsedLine<'_>) -> Result<ShardId, Error> { if let Some(specific_targets) = &self.specific_targets { //若是对数据进行匹配,若是符合规则就返回,能够采用当前的shard //官方的代码中只实现了根据表名进行shard的策略 //这个配置彷佛只能经过grpc来进行设置,这样好处多是未来有个什么管理界面能动态修改 if specific_targets.matcher.match_line(line) { return Ok(specific_targets.shard); } } //若是没有配置就使用hash的方式 //对整条数据进行hash,而后比较机器的hash,找到合适的节点 //若是没找到,就放在hashring的第一个节点 //hash算法见后面 if let Some(hash_ring) = &self.hash_ring { return hash_ring .shards .find(LineHasher { line, hash_ring }) .context(NoShardsDefined); } NoShardingRuleMatches { line: line.to_string(), } .fail() } } //具体的Hash算法,若是全配置的话分的就会特别散,几乎不一样测点都放到了不一样的地方 impl<'a, 'b, 'c> Hash for LineHasher<'a, 'b, 'c> { fn hash<H: Hasher>(&self, state: &mut H) { //若是配置了使用table名字就在hash中加入tablename if self.hash_ring.table_name { self.line.series.measurement.hash(state); } //而后按照配置的列的值进行hash for column in &self.hash_ring.columns { if let Some(tag_value) = self.line.tag_value(column) { tag_value.hash(state); } else if let Some(field_value) = self.line.field_value(column) { field_value.to_string().hash(state);t } state.write_u8(0); // column separator } } }
接下来看默认的partition
分区方式:
impl Partitioner for PartitionTemplate { fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime<Utc>) -> Result<String> { let parts: Vec<_> = self .parts .iter() //匹配分区策略,或者是单一的,或者是复合的 //目前支持基于表、值、时间 //其他还会支持正则表达式和strftime模式 .map(|p| match p { TemplatePart::Table => line.series.measurement.to_string(), TemplatePart::Column(column) => match line.tag_value(&column) { Some(v) => format!("{}_{}", column, v), None => match line.field_value(&column) { Some(v) => format!("{}_{}", column, v), None => "".to_string(), }, }, TemplatePart::TimeFormat(format) => match line.timestamp { Some(t) => Utc.timestamp_nanos(t).format(&format).to_string(), None => default_time.format(&format).to_string(), }, _ => unimplemented!(), }) .collect(); //最后返回一个组合文件名,或者是 a-b-c 或者是一个单一的值 Ok(parts.join("-")) } }
到这里分区的工做就完成了,下一篇继续分析是怎样写入的。
祝玩儿的开心
欢迎关注微信公众号:
或添加微信好友: liutaohua001