本文参考 Druid 官方文档。
Apache Druid 是一个集时间序列数据库、数据仓库和全文检索系统特色于一体的分析性数据平台(OLAP)。Druid 做为一个高可用、高性能和多特性的 OLAP 平台,使用场景丰富。mysql
许多互联网公司基于 Druid 搭建 OLAP 数据分析和 BI 平台。如:sql
此前 Druid 系列文章已经详解过 Druid 的特性、使用场景、架构和实现原理。能够参考:shell
本文将指导读者完整定义一个完整 Spec
,并指出关键注意事项。Spec
是 Druid 数据摄入的配置信息,使用 json
格式,使用 Druid 时能够经过界面配置,最后生成 Spec
文件,也能够直接编写 Spec
文件,而后上传配置。不管使用哪一种方式,深刻了解 Spec
的编写既是开始使用 Druid 的第一步,也是深刻了解 Druid 各类概念,继而深刻了解 Druid 原理的必经之路。数据库
假设咱们有如下网络流量数据:json
srcIP
: 发送端 IP 地址srcPort
: 发送端端口号dstIP
: 接收端 IP 地址dstPort
: 接收端端口号protocol
: IP 协议号packets
: 传输的包的数量bytes
: 传输字节数cost
: 传输耗费的时间{"ts":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":10, "bytes":1000, "cost": 1.4} {"ts":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":20, "bytes":2000, "cost": 3.1} {"ts":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":30, "bytes":3000, "cost": 0.4} {"ts":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":40, "bytes":4000, "cost": 7.9} {"ts":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":50, "bytes":5000, "cost": 10.2} {"ts":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3} {"ts":"2018-01-01T02:33:14Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":100, "bytes":10000, "cost": 22.4} {"ts":"2018-01-01T02:33:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":200, "bytes":20000, "cost": 34.5} {"ts":"2018-01-01T02:35:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":300, "bytes":30000, "cost": 46.3}
将上面 JSON 内容保存到 $druid_root\quickstart\
目录下的 ingestion-tutorial-data.json
文件中。网络
下面咱们开始编写一个 Spect
将上面的数据写入 Druid。架构
在本教程中,咱们将使用本地批处理indexing
任务。若是使用其余任务类型,摄入规范的某些地方将会不同,咱们将在教程中指出。并发
下面将详细讲解 Spec
配置,你将了解如下内容:高并发
Druid 摄入 spec 的核心元素是 dataSchema
。dataSchema
定义如何将输入的数据解析成 Druid 可以存储的列集合。post
咱们从一个空的dataSchema
开始,并按教程一步步添加字段。
在quickstart/
目录下建立 ingestion-tutorial-index.json
文件,将如下内容写入文件:
"dataSchema" : {}
随着教程的进行,咱们将不断的修改此 spec 文件。
数据源名称经过dataSchema
下的 dataSource
参数指定。dataSource
相似于 RDBMS 的 Table Name,写入的数据经过此名称查询,如:select * from $dataSource
。
"dataSchema" : { "dataSource" : "ingestion-tutorial", }
让咱们将教程中的数据源命名为 ingestion-tutorial
。
dataSchema
须要知道如何从输入的数据中提取主时间字段。Druid 的数据必须有时间字段,Druid 底层按时间分 segment 来存储数据,详情能够参考《Apache Druid 的集群设计与工做流程》。
咱们数据中的时间戳列是"ts",它是一个 ISO 8601
规范的时间戳,咱们将配置此字段的 timestampSpec
信息加到 dataSchema
下:
"dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" } }
如今,咱们已经定义了时间列,让咱们看一下其余列的定义。
Druid 支持如下列类型:String,Long,Float,Double。下面章节中咱们将看到这些类型如何被使用。
在咱们讲如何定义其余非时间列以前,先讨论一下 rollup
。
在摄入数据时,咱们须要考虑是否须要 rollup。
在此教程中,咱们开启 rollup。在 dataSchema
的 granularitySpec
中指定:
"dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "granularitySpec" : { "rollup" : true } }
在此教程中,咱们按如下方式划分维度列和指标列:
这些维度是一组属性,用以标识一组网络流量数据,而指标表明按此维度组合的网络流量的实际状况。
让咱们看看如何在 spec 中定义维度和指标吧。
维度由 dataSchema
中的 dimensionsSpec
参数指定。
"dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "granularitySpec" : { "rollup" : true } }
每一个维度都有一个 name
和 一个 type
,其中 type
能够是"long", "float", "double", or "string"。
注意,srcIP
是一个 "string" 维度。对于字符串维度,只须要指定维度的名称就能够了,由于它的类型默认为"string"。
也请注意, protocol
在输入数据中是数字类型,但咱们以 "string" 列类型提取它,因此 Druid 在摄入数据时会将其强制由 long 类型转换成 string 类型。
数字类型的数据应该做为数字维度仍是字符串维度?
数字维度相对于字符串维度有如下优点和劣势:
指标经过 dataSchema
中的 metricsSpec
参数指定:
"dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" } ], "granularitySpec" : { "rollup" : true } }
在定义指标时,必须指定当前列在 rollup 时应该执行的聚合类型。
这里,咱们在 packets
和 bytes
两个指标列上定义了 long 类型的 sum 聚合,在 cost
列上定义了一个 double 的 sum 聚合。
注意 metricsSpec
与 dimensionSpec
和 parseSpec
的嵌套层级不同。它和 dataSchema
中的 parser
在同一嵌套层级。
注意,咱们也定义了一个 count
聚合器。这个计数聚合器将统计原始数据摄入的行数。
若是咱们不使用 rollup,将在 dimensionsSpec
中指定全部列,如:
"dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" }, { "name" : "packets", "type" : "long" }, { "name" : "bytes", "type" : "long" }, { "name" : "srcPort", "type" : "double" } ] },
此时,咱们已经完成了 parser
和 metricSpec
的定义,并几乎要完成 Spec 的 dataSchema
。
咱们还须要在 granularitySpec
中设置一些额外的参数:
uniform
和 arbitrary
。在本教程中,咱们将使用 uniform
,这样全部的 segment 将有统一的时间范围大小(本示例中,全部 segment 覆盖一个小时的数据量)。DAY
,WEEK
。queryGranularity
)。segment 粒度经过 granularitySpec
中的 segmentGranularity
属性配置。此文档中,咱们将建立 hourly 粒度的 segment:
"dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "rollup" : true } }
咱们的输入数据包含两个小时的事件,因此此任务将生成两个 segment。
查询粒度经过 granularitySpec
中的 queryGranularity
属性配置。此教程中,咱们使用 minute 级粒度:
"dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE", "rollup" : true } }
为了查看查询粒度配置的效果,让咱们从原始输入数据中查看这一行:
{"ts":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}
当使用 minute 粒度摄入这行数据时,Druid 将把这行数据的时间戳 floor 成 minute 桶的时间:
{"ts":"2018-01-01T01:03:00Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}
对于批量任务,必须定义时间范围。在时间范围以外的输入数据将不被摄入。
这个时间范围也在 granularitySpec
中指定:
"dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE", "intervals" : ["2018-01-01/2018-01-02"], "rollup" : true } }
如今咱们已经完成了 dataSchema
的定义。接下来要作的就是将咱们建立的 dataSchema
放入一个数据摄入任务中,并指定输入的源。
dataSchema
适用于全部类型的任务,但每种任务类型都有自生的规范格式。在本教程中,咱们使用本地批量数据摄入任务类型(the native ingestion task):
{ "type" : "index_parallel", "spec" : { "dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE", "intervals" : ["2018-01-01/2018-01-02"], "rollup" : true } } } }
如今,让咱们来定义咱们本身的输入源,它在 ioConfig
对象中指定。每种任务类型都有本身的 ioConfig
类型。为了读取输入数据,咱们须要指定一个 inputSource
。咱们以前保存的网络流量数据须要从一个本地文件读取,其配置以下:
"ioConfig" : { "type" : "index_parallel", "inputSource" : { "type" : "local", "baseDir" : "quickstart/", "filter" : "ingestion-tutorial-data.json" } }
由于咱们的数据是 JSON 字符串形式的,咱们使用 inputFormat
json
格式化数据(还支持 csv、protobuf 等数据类型):
"ioConfig" : { "type" : "index_parallel", "inputSource" : { "type" : "local", "baseDir" : "quickstart/", "filter" : "ingestion-tutorial-data.json" }, "inputFormat" : { "type" : "json" } }
{ "type" : "index_parallel", "spec" : { "dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE", "intervals" : ["2018-01-01/2018-01-02"], "rollup" : true } }, "ioConfig" : { "type" : "index_parallel", "inputSource" : { "type" : "local", "baseDir" : "quickstart/", "filter" : "ingestion-tutorial-data.json" }, "inputFormat" : { "type" : "json" } } } }
每个摄取任务都有 tuningConfig
配置项,它容许用户调整各类摄取参数。
举例来讲,让咱们添加一个tuningConfig
,以设置本次批量摄取任务的目标 segment 大小:
"tuningConfig" : { "type" : "index_parallel", "maxRowsPerSegment" : 5000000 }
如今咱们已经定义完成了一个摄取规范,它如今看起来以下所示:
{ "type" : "index_parallel", "spec" : { "dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE", "intervals" : ["2018-01-01/2018-01-02"], "rollup" : true } }, "ioConfig" : { "type" : "index_parallel", "inputSource" : { "type" : "local", "baseDir" : "quickstart/", "filter" : "ingestion-tutorial-data.json" }, "inputFormat" : { "type" : "json" } }, "tuningConfig" : { "type" : "index_parallel", "maxRowsPerSegment" : 5000000 } } }
在包根目录下,运行下面命令:
bin/post-index-task --file quickstart/ingestion-tutorial-index.json --url http://localhost:8081
脚本执行完成后,咱们来查询数据。
让咱们运行 bin/dsql
并发送一个 select * from "ingestion-tutorial";
语句,查询已经被写入的数据:
$ bin/dsql Welcome to dsql, the command-line client for Druid SQL. Type "\h" for help. dsql> select * from "ingestion-tutorial"; ┌──────────────────────────┬───────┬──────┬───────┬─────────┬─────────┬─────────┬──────────┬─────────┬─────────┐ │ __time │ bytes │ cost │ count │ dstIP │ dstPort │ packets │ protocol │ srcIP │ srcPort │ ├──────────────────────────┼───────┼──────┼───────┼─────────┼─────────┼─────────┼──────────┼─────────┼─────────┤ │ 2018-01-01T01:01:00.000Z │ 6000 │ 4.9 │ 3 │ 2.2.2.2 │ 3000 │ 60 │ 6 │ 1.1.1.1 │ 2000 │ │ 2018-01-01T01:02:00.000Z │ 9000 │ 18.1 │ 2 │ 2.2.2.2 │ 7000 │ 90 │ 6 │ 1.1.1.1 │ 5000 │ │ 2018-01-01T01:03:00.000Z │ 6000 │ 4.3 │ 1 │ 2.2.2.2 │ 7000 │ 60 │ 6 │ 1.1.1.1 │ 5000 │ │ 2018-01-01T02:33:00.000Z │ 30000 │ 56.9 │ 2 │ 8.8.8.8 │ 5000 │ 300 │ 17 │ 7.7.7.7 │ 4000 │ │ 2018-01-01T02:35:00.000Z │ 30000 │ 46.3 │ 1 │ 8.8.8.8 │ 5000 │ 300 │ 17 │ 7.7.7.7 │ 4000 │ └──────────────────────────┴───────┴──────┴───────┴─────────┴─────────┴─────────┴──────────┴─────────┴─────────┘ Retrieved 5 rows in 0.12s. dsql>
若是以为阅读后对你有帮助,但愿分享、点赞、在看三连哦。
关注 【码哥字节】解锁更多硬核。
推荐阅读
如下几篇文章阅读量与读者反馈都很好,推荐你们阅读:
公众号后台回复 ”加群“,加入读者技术群,里面有阿里、腾讯的小伙伴一块儿探讨技术。