如何使用ClickHouse实现时序数据管理和挖掘?

ClickHouse是一个高效的开源联机分析列式数据库管理系统,由俄罗斯IT公司Yandex开发的,并于2016年6月宣布开源。本次文章将详细解读京东城市时空数据引擎JUST(https://just.urban-computing.cn/)是如何使用ClickHouse实现时序数据管理和挖掘的。html

1,时序数据简介

时序数据全称是时间序列(TimeSeries)数据,是按照时间顺序索引的一系列数据点。最多见的是在连续的等时间间隔时间点上获取的序列,所以,它是一系列离散数据[1]。git

时序数据几乎无处不在,在目前单向的时间流中,人的脉搏、空气的湿度、股票的价格等都随着时间的流逝不断变化。时序数据是数据的一种,由于它显著而有价值的特色,成为咱们特别分析的对象。github

将时序数据能够建模为以下部分组成:算法

  • **Metric:**度量的数据集,相似于关系型数据库中的 table,是固定属性,通常不随时间而变化
  • **Timestamp:**时间戳,表征采集到数据的时间点
  • **Tags:**维度列,用于描述Metric,表明数据的归属、属性,代表是哪一个设备/模块产生的,通常不随着时间变化
  • **Field/Value:**指标列,表明数据的测量值,能够是单值也能够是多值

一个具体的多值模型时序数据案例如表1所示:docker

表1 时序数据案例数据库

2,时序数据管理概述

1. 时序数据管理的流程

一切数据的本质都是为价值服务的,获取价值的这个过程就是数据管理与分析。从技术上来讲,任何数据从产生到灭亡都会经历如图1所示的过程。缓存

图1 数据生命周期多线程

时序数据也不例外,只是每一个部分的处理不一样。并发

(1)数据采集。同一个场景下时序数据产生的频率通常恒定,但在不一样场景下采集数据的频率是变化的,每秒一千条和每秒一千万条数据使用的技术是彻底不一样的。因此,数据采集要考虑的主要是频率和并发。运维

(2)数据存储。数据存储是为了查询和分析服务的。以什么格式存储、建什么索引、存储数据量大小、存储时长是时序数据存储要考虑的,通常时序数据写多读少,数据具备时效性,因此存储时能够考虑冷热存储分离。

(3)数据查询和分析。时序数据的查询也具备显著特色,通常会按照时间范围读取,最近的数据读取频率高,而且按照不一样的时间粒度作聚合查询,好比统计最近一周天天的数据量。

分析是依赖于查询的,时序数据的分析一般是多维的,好比网页点击流量、从哪一个网站、来自哪一个IP、点击频率等维度众多,取决于具体场景。而时序数据也很是适合数据挖掘,利用历史预测将来。

(4)数据删除。这里的删除并非针对单条数据的,而是对特定时间范围内的批量数据进行过时处理。由于时序数据具备时效性,历史数据一般再也不具备价值,无论是定时删除仍是手动删除,都表明着其短暂的生命周期的结束。

2. 时序数据管理系统目标

根据时序数据的特色和场景,咱们须要一个能知足如下目标的时序数据管理平台:

  • 高吞吐写入:千万、上亿数据的秒级实时写入 & 持续高并发写入;
  • 无更新操做:数据大多表征设备状态,写入后无需更新;
  • 海量数据存储:从TB到PB级;
  • 高效实时的查询:按不一样维度对指标进行统计分析,存在明显的冷热数据,通常只会频繁查询近期数据;
  • 高可用;
  • 可扩展性;
  • 易于使用;
  • 易于维护;

3. 技术选型

说到数据库,你们第一个想到的确定是MySQL、Oracle等传统的已经存在不少年的关系型数据库。固然关系模型依然有效且实用。对于小数据量(几百万到几千万),MySQL是能够搞定的,再大一些就须要分库分表解决了。对时序数据通常按照时间分表,可是这对外部额外设计和运维的工做提出了高要求。显然,这不能知足大数据场景,因此几乎没有人选择这种方案。

纵观db-engine上排名前十的时序数据库[2],排除商用的,剩下开源的选择并很少。接下来介绍几款比较流行的时序数据库。

图2 db-engine时序数据库排名

**(1)OpenTSDB。**OpenTSDB开源快10年了,属于早期的解决方案。由于其基于Hadoop和HBase开发的索引,因此具备海量数据的存储能力,也号称每秒百万级别的写入速度。但一样由于其依赖的Hadoop生态过重, 运维成本很高,不够简洁与轻量;另外一个缺点就是它基于HBase的key-value存储方式,对于聚合查询并不友好高效,HBase存在的问题也会体现出来。

图3 OpenTSDB用户界面

**(2)InfluxDB。**InfluxDB能够说是时序行业的典范了,其已经发展成为一个平台,包括了时序数据应有的一切:从数据存储到界面展现。然而,InfluxDB虽然开源了其核心代码,但重要的集群功能只有企业版才提供[3], 而企业版并非免费的。不少大公司要么直接使用,要么本身开发集群功能。

图4 InfluxDB各版本支持的功能

**(3)TDengine。**TDengine是涛思团队开发的一个高效存储、查询和分析时序大数据的平台,其创始人陶建辉年近5旬,依然开发出了这个数据库。

TDengine的定位是物联网、车联网、运维监测等时序数据,其设计也是专门针对每一个设备。每一个采集点一张表,好比空气监测站有1000万个,那么就建1000万个表,为了对多个采集点聚合查询,又提出了超表的概念,将同类型的采集点表经过标签区分,结构同样。这种设计确实很是有针对性,虽然限制了范围,但极大提升了效率,根据其官方的测试报告[4], 其聚合查询速度是InfluxDB的上百倍,CPU、内存和硬盘消耗却更少。

图5 涛思团队给出的不一样时序数据库性能对比

TDengine无疑是时序数据库的一朵奇葩,加上在不久前开源了其集群功能[5],受到了更多用户青睐。当咱们选型时其尚未开源集群功能,后续也会归入观察之中。**(4)ClickHouse。**ClickHouse(以后简称CK)是一个开源的大数据分析数据库,也是一个完整的DBMS。CK无疑是OLAP数据库的一匹黑马,开源不到4

年,GitHub上的star数已经超过12k(InfluxDB也不过19k+),而它们的fork数却相差不大。

CK是俄罗斯的搜索引擎公司yandex开源的,最初是为了分析网页点击的流量,因此叫Click,迭代速度很快,每月一版,开发者500+,不少都是开源共享者,社区很是活跃。

CK是一个通用的分析数据库,并非为时序数据设计的,但只要使用得当,依然能发挥出其强大的性能。

3,CK原理介绍

要利用CK的优点,首先得知道它有哪些优点,而后理解其核心原理。根据咱们的测试结果,对于27个字段的表,单个实例每秒写入速度接近200MB,超过400万条数据/s。由于数据是随机生成的,对压缩并不友好。

而对于查询,在可以利用索引的状况下,不一样量级下(百万、千万、亿级)都能在毫秒级返回。对于极限状况:对多个没有索引的字段作聚合查询,也就是全表扫描时,也能达到400万条/s的聚合速度。

1. CK为何快?

能够归结为选择和细节,选择决定方向,细节决定成败。

CK选择最优的算法,好比列式压缩的LZ4[6];选择着眼硬件,充分利用CPU和分级缓存;针对不一样场景不一样处理,好比SIMD应用于文本和数据过滤;CK的持续迭代很是快,不只能够迅速修复bug,也能很快归入新的优秀算法。

2. CK基础

(1)CK是一个纯列式存储的数据库,一个列就是硬盘上的一个或多个文件(多个分区有多个文件),关于列式存储这里就不展开了,总之列存对于分析来说好处更大,由于每一个列单独存储,因此每一列数据能够压缩,不只节省了硬盘,还能够下降磁盘IO。

(2)CK是多核并行处理的,为了充分利用CPU资源,多线程和多核必不可少,同时向量化执行也会大幅提升速度。

(3)提供SQL查询接口,CK的客户端链接方式分为HTTP和TCP,TCP更加底层和高效,HTTP更容易使用和扩展,通常来讲HTTP足矣,社区已经有不少各类语言的链接客户端。

(4)CK不支持事务,大数据场景下对事务的要求没这么高。

(5)不建议按行更新和删除,CK的删除操做也会转化为增长操做,粒度过低严重影响效率。

3. CK集群

生产环境中一般是使用集群部署,CK的集群与Hadoop等集群稍微有些不同。如图6所示,CK集群共包含如下几个关键概念。

图6 CK集群示例

(1)CK实例。能够一台主机上起多个CK实例,端口不一样便可,也能够一台主机一个CK实例。

(2)分片。数据的水平划分,例如随机划分时,图5中每一个分片各有大约一半数据。

(3)副本。数据的冗余备份,同时也可做为查询节点。多个副本同时提供数据查询服务,可以加快数据的查询效率,提升并发度。图5中CK实例1和示例3存储了相同数据。

(4)多主集群模式。CK的每一个实例均可以叫作副本,每一个实体均可以提供查询,不区分主从,只是在写入数据时会在每一个分片里临时选一个主副本,来提供数据同步服务,具体见下文中的写入过程。

4. CK分布式引擎

要实现分片的功能,须要分布式引擎。在集群状况下,CK里的表分为本地表和分布式表,下面的两条语句可以建立一个分布式表。注意,分布式表是一个逻辑表,映射到多个本地表。

create table t_local on cluster shard2_replica2_cluster(t Datetime, id UInt64)  
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/t_local','{replica}')
PARTITION BY toYYYYMM(t)
ORDER BY id
create table t on cluster shard2_replica2_cluster  (t Datetime, id UInt64) 
ENGINE=Distributed(shard2_replica2_cluster,default,t_local,id)

这里的t_local就是本地表,t就是分布式表。ReplicatedMergeTree是实现副本同步的引擎,参数能够先忽略。Distributed引擎就是分布式引擎,参数分别为:集群名,数据库名,本地表名,分片键(能够指定为rand()随机数)。

分布式引擎在写入和查询过程当中都充当着重要的角色,具体过程见下面。

5. CK写入过程

根据使用的表引擎不一样,写入过程是不一样的,上文的建表方式是比较常规的作法,按照上面的建表语句,须要同时开启内部复制项。

<shard2_replica2_cluster>
       <shard>
               <weight>1</weight>
               <internal_replication>true</internal_replication>
               <replica>
                        …
               </replica>
               <replica>
                        …
                </replica>
       </shard>

写入2条数据:insert into t values(now(), 1), (now(),2),如图7所示,写入过程分为2步:分布式写入和副本同步。

图7 CK写入过程

(1)分布式写入

1)客户端会选择集群里一个副本创建链接,这里是实例1。写入的全部数据先在实例1完成写入,根据分片规则,属于01分片的写入实例1本地,属于02分片的先写入一个临时目录,而后向实例2(shard02的主副本)创建链接,发送数据到实例2。

2)实例2接收到数据,写入本地分区。

3)实例1返回写入成功给客户端(每一个分片写入一个副本便可返回,能够配置)。

(2)副本同步

同步的过程是须要用到ZK的,上面建表语句的ReplicatedMergeTree第一个参数就是ZK上的路径。建立表的时候会有一个副本选举过程,通常先起的会成为主副本,副本的节点信息会注册到ZK,ZK的做用只是用来维护副本和任务元数据以及分布式通讯,并不传输数据。副本一旦注册成功,就开始监听/log下的日志了,当副本上线,执行插入时会通过如下过程:

1)实例1在写入本地分区数据后,会发送操做日志到ZK的/log下,带上分区名称和源主机(实例1的主机)。

2)01分区的其余副本,这里就实例3,监听到日志的变化,拉取日志,建立任务,放入ZK上的执行队列/queue(这里都是异步进行),而后再根据队列执行任务。

3)执行任务的过程为:选择一个副本(数据量最全且队列任务最少的副本),创建到该副本(实例1)的链接,拉取数据。

注意,使用副本引擎却不开启内部复制是不明智的作法,由于数据会重复写,虽然数据校验能够保证数据不重复,但增长了无畏的开销。

6. CK查询过程

查询的是分布式表,但要定位到实际的本地表,也就是副本的选择,这里有几种选择算法,默认采用随机选择。响应客户端查询请求的只会有一个副本,可是执行过程可能涉及多个副本。好比:select count(*) from t。由于数据是分布在2个分片的,只查一个副本不能获得所有结果。

图8 CK多实例查询过程

7. CK中重要的索引引擎

CK核心的引擎就是MergeTree,在此之上产生了不少附加引擎,下面介绍几种比较经常使用的。

(1)ReplacingMergeTree。为了解决MergeTree主键能够重复的特色,可使用ReplacingMergeTree,但也只是必定程度上不重复:仅仅在一个分区内不重复。使用方式参考:https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/replacingmergetree/

(2)SummingMergeTree。对于肯定的group by + sum查询,若比较耗时,那么能够建SummingMergeTree, 按照order by的字段进行聚合或自定义聚合字段,其他字段求和。

(3)AggregatingMergeTree。聚合显然是分析查询的重点,通常使用聚合MergeTree都会结合物化视图,在插入数据时自动同步到物化视图里,这样直接查询物化视图中聚合的结果便可。

(4)ReplicatedXXXMergeTree。在全部引擎前加一个Replicated前缀,将引擎升级为支持副本功能。

(5)物化视图。物化视图就是将视图SQL查询的结果存在一张表里,CK里特殊的一点是:只有insert的数据才能进入触发视图查询,进入视图表,分布式状况下同步过去的数据是不会被触发的,为了在分布式下使用物化视图,能够将物化视图所依赖的表指定为分布式表。

4,CK与时序的结合

在了解了CK的基本原理后,咱们看看其在时序数据方面的处理能力。

(1)时间:时间是必不可少的,按照时间分区可以大幅下降数据扫描范围;

(2)过滤:对条件的过滤通常基于某些列,对于列式存储来讲优点明显;

(3)降采样:对于时序来讲很是重要的功能,能够经过聚合实现,CK自带时间各个粒度的时间转换函数以及强大的聚合能力,能够知足要求;

(4)分析挖掘:能够开发扩展的函数来支持。

另外CK做为一个大数据系统,也知足如下基础要求:

(1)高吞吐写入;

(2)海量数据存储:冷热备份,TTL;

(3)高效实时的查询;

(4)高可用;

(5)可扩展性:能够实现自定义开发;

(6)易于使用:提供了JDBC和HTTP接口;

(7)易于维护:数据迁移方便,恢复容易,后续可能会将依赖的ZK去掉,内置分布式功能。

所以,CK能够很方便的实现一个高性能、高可用的时序数据管理和分析系统。下面是关键点的详细介绍。

1. 时序索引与分区

时序查询场景会有不少聚合查询,对于特定场景,若是使用的很是频繁且数据量很是大,咱们能够采用物化视图进行预聚合,而后查询物化视图。可是,对于一个通用的分析平台,查询条件能够随意改变的状况下,使用物化视图的开销就太大了,所以咱们目前并无采用物化视图的方式,而是采用原始的表。物化视图的方案后续将会进一步验证。

下面给出的是JUST建时序表的语法格式:第一个括号为TAG字段,第二个括号为VALUE字段(必须是数值型),大括号是对底层存储的特殊配置,这里主要是CK的索引和参数。除了用户指定的字段外,还有一个隐含的time字段,专为时序保留。

create table my_ts_table as ts (
    tag1 string,
    tag2 String [:primarykey][:comment=’描述’]
)
(
    value1 double,
    value2 double
)

在JUST底层,对应了CK的2张表(一张本地表,一张分布式表),默认会根据time分区和排序,以下面的一个例子:

create table my_ts_table as ts (
    tag1 string,
    tag2 String [:primarykey][:comment=’描述’]
)
(
    value1 double,
    value2 double
)

实际对应的CK建表语句为:

CREATE TABLE just.username_dbname_airquality_local
(
    `id` Int32,
    `oid`Int32,
    `name`String,
    `city`String,
    `time`DateTime,
    `PM10`Float64,
    `PM25`Float64
)
ENGINE =ReplicatedMergeTree('/clickhouse/tables/{shard}/24518511-2939-489b-94a8-0567384d927d','{replica}')
ORDER BY (time)
SETTINGS index_granularity = 8192
PARTITION BY toYYYYMM(time)
​
CREATE TABLE just.wangpeng417_test_airquality
(
    `id` Int32,
    `oid`Int32,
    `name`String,
    `city`String,
    `time`DateTime,
    `PM10`Float64,
    `PM25`Float64
)
ENGINE = Distributed('just_default', 'just', ' username_dbname_airquality_local',rand())

这样保证在使用时间范围查询时能够利用到索引,假如还有其余按照TAG的查询条件,还能够自定义索引和排序字段[LL1] (CK规定索引字段必定是排序字段的前缀)。

在不一样场景下,仍是须要根据数据量和数据特色来选择索引分区和索引粒度。根据实验测试,假如在咱们环境里CK每秒能够扫描1GB数据量,再乘以1-10倍的压缩比,那么一个分区的数据量应该大于千万到亿级别能够保证较优的速度,CK自己是多线程查询的,能够保证同时对每一个分区查询的隔离性。可是根据查询的场景,好比最多查到一个月,但大部分状况都是查一周,那么分区精确到周可能更好,这是个综合权衡的过程。

2. 部署与高可用

在JUST中,高可扩展性和高可用性是咱们的追求。为实现高可扩展性,咱们对数据进行水平分片;为了实现高可用性,咱们对每一个分片存储至少两个副本。

关于集群部署,最小化的状况是2台机器,这会产生2种状况1)交叉副本;2)一主一备;如图9所示:

图9 两种副本的情形

这两种方案对查询和写入影响的实验结果如图10所示:

图10 两种副本的写入和查询结果对比

实验结果代表:写入速度(横坐标为写入示例数,纵坐标为速度MB/s)在达到极限时是差很少的,而查询性能(横坐标为SQL编号,SQL语句见附录1,纵坐标为耗时,单位为秒)对于简单查询差异不大,可是对于占用大量资源的复杂查询,一主一备更加高效。由于CK的强悍性能是创建在充分利用CPU的基础上,在咱们的测试中,裸机状况下CPU达到90%以上很是频繁,若是有单独的机器部署CK,那么无可厚非可以充分利用机器资源。但在咱们的环境中,与其余大数据平台共有机器,就须要避免CK占用过多资源,从而影响其余服务,因而咱们选择docker部署。docker容器部署也有开源的基于k8s的实现:clickhouse-operator,对于小型环境,能够选择手动配置,经过简单的脚本便可实现自动化部署。

基于以上测试结论,为了保证服务高可用,CK集群和数据冗余是必不可少的,咱们的方案是保证至少2个副本的状况下,分片数尽可能多,充分利用机器,且每一个机器有且仅有一个CK实例。因而就有了如下分片数与副本数的公式:

其中_f_(n)表明当有_n_台机器时,部署的分布状况,n>=2。f(2) = (1, 2)表示2台机器采用1个分片2个副本部署的策略,f(3)=(1, 3)表示3台机器时1个分片3个副本部署策略,f(4)=(2, 2)表示4台机器使用2个分片,每一个分片2个副本,以此类推。

3. 动态扩容

随着数据量增长,须要扩展节点时,能够在不停机的状况下动态扩容,主要利用的是分片之间的权重关系。

这里扩容分为两种状况:

(1)增长副本:只须要修改配置文件,增长副本实例,数据会自动同步,由于CK的多主特性,副本也能够看成查询节点,因此能够分担查询压力;

(2)增长分片:增长分片要麻烦点,须要根据当前数据量、增长数据量计算出权重,而后在数据量达到均衡时将权重修改回去

假如开始时咱们只有1个分片,已经有100条数据了。

<test_extend>
       <shard>
              <weight>1</weight>
              <internal_replication>true</internal_replication>
              <replica>
                     <host>10.220.48.106</host>
                     <port>9000</port>
              </replica>
              <replica>
                     <host>10.220.48.105</host>
                     <port>9000</port>
              </replica>
       </shard>
</test_extend>

如今要再加入一个分片,那么权重的计算过程以下(为了简化忽略这个期间插入的数据):

假如咱们打算再插n条数据时,集群数据可以均衡,那么每一个shard有(n+100)/2 条,如今shard01有100条,设权重为 w一、w2,那知足公式:n * (w2/(w1+w2)) = (n+100)/2 ,其中n>100, 因此,假如 w1=1,n=200,那么 w2=3。

因此,将配置修改以下:

<test_extend>
       <shard>
              <weight>1</weight>
              <internal_replication>true</internal_replication>
              <replica>
                     <host>10.220.48.106</host>
                     <port>9000</port>
              </replica>
              <replica>
                     <host>10.220.48.105</host>
                     <port>9000</port>
              </replica>
       </shard>
       <shard>
              <weight>3</weight>
              <internal_replication>true</internal_replication>
              <replica>
                     <host>10.220.48.103</host>
                     <port>9000</port>
              </replica>
       </shard>
</test_extend>

等到数据同步均匀后再改回1:1。

4. 系统介绍与不足

JUST时序分析底层使用了CK做为存储查询引擎,并开发了可复用的可视化分析界面,欢迎访问https://just.urban-computing.cn/进行体验。

图11 JUST时序分析模块示意图

用户可使用统一的查询界面创建时序表,而后导入数据,切换到时序分析模块进行可视化查询。

图12 JUST创建时序表示意图

目前提供的查询功能主要有:按时间查询、按TAG过滤,在数据量不少的状况下,能够按照大一些的时间粒度进行降采样,查看整个数据的趋势,同时提供了线性、拉格朗日等缺失值填补功能。

分析挖掘部分主要是按找特定值和百分比过滤,以及一些简单的函数转换。

目前时序模块的功能还比较简陋,对于时序数据的SQL查询支持还不够完备。将来还有集成如下功能:

(1)接入实时数据;

(2)针对复杂查询,面板功能能够采用聚合引擎预先聚合;

(3)更完善的分析和挖掘功能;

(4)对数据的容错与校验处理;

(5)与JUST一致的SQL查询支持。

参考连接:

[1]https://en.wikipedia.org/wiki/Time_series

[2]https://db-engines.com/en/ranking/time+series+dbms

[3]https://www.influxdata.com/blog/influxdb-clustering/

[4]https://www.taosdata.com/downloads/TDengine_Testing_Report_cn.pdf

[5]https://www.taosdata.com/blog/2020/08/03/1703.html

[6]lz4.LZ4[EB/OL].https://lz4.github.io/lz4/,2014-08-10.

[7]https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/mergetree/

推荐阅读:

欢迎点击京东智联云,了解开发者社区

更多精彩技术实践与独家干货解析

欢迎关注【京东智联云开发者】公众号

相关文章
相关标签/搜索