从Spec编写深刻了解 Druid

本文参考 Druid 官方文档。

Apache Druid 是一个集时间序列数据库、数据仓库和全文检索系统特色于一体的分析性数据平台(OLAP)。Druid 做为一个高可用、高性能和多特性的 OLAP 平台,使用场景丰富。mysql

image

许多互联网公司基于 Druid 搭建 OLAP 数据分析和 BI 平台。如:sql

此前 Druid 系列文章已经详解过 Druid 的特性、使用场景、架构和实现原理。能够参考:shell

本文将指导读者完整定义一个完整 Spec,并指出关键注意事项。Spec 是 Druid 数据摄入的配置信息,使用 json 格式,使用 Druid 时能够经过界面配置,最后生成 Spec 文件,也能够直接编写 Spec 文件,而后上传配置。不管使用哪一种方式,深刻了解 Spec 的编写既是开始使用 Druid 的第一步,也是深刻了解 Druid 各类概念,继而深刻了解 Druid 原理的必经之路。数据库

示例数据

假设咱们有如下网络流量数据:json

  • srcIP: 发送端 IP 地址
  • srcPort: 发送端端口号
  • dstIP: 接收端 IP 地址
  • dstPort: 接收端端口号
  • protocol: IP 协议号
  • packets: 传输的包的数量
  • bytes: 传输字节数
  • cost: 传输耗费的时间
{"ts":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":10, "bytes":1000, "cost": 1.4}
{"ts":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":20, "bytes":2000, "cost": 3.1}
{"ts":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":30, "bytes":3000, "cost": 0.4}
{"ts":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":40, "bytes":4000, "cost": 7.9}
{"ts":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":50, "bytes":5000, "cost": 10.2}
{"ts":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}
{"ts":"2018-01-01T02:33:14Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":100, "bytes":10000, "cost": 22.4}
{"ts":"2018-01-01T02:33:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":200, "bytes":20000, "cost": 34.5}
{"ts":"2018-01-01T02:35:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":300, "bytes":30000, "cost": 46.3}

将上面 JSON 内容保存到 $druid_root\quickstart\目录下的 ingestion-tutorial-data.json 文件中。网络

下面咱们开始编写一个 Spect 将上面的数据写入 Druid。架构

在本教程中,咱们将使用本地批处理indexing任务。若是使用其余任务类型,摄入规范的某些地方将会不同,咱们将在教程中指出。并发

下面将详细讲解 Spec 配置,你将了解如下内容:高并发

image

定义 schema

Druid 摄入 spec 的核心元素是 dataSchemadataSchema 定义如何将输入的数据解析成 Druid 可以存储的列集合。post

咱们从一个空的dataSchema 开始,并按教程一步步添加字段。

quickstart/ 目录下建立 ingestion-tutorial-index.json 文件,将如下内容写入文件:

"dataSchema" : {}

随着教程的进行,咱们将不断的修改此 spec 文件。

数据源名称

数据源名称经过dataSchema 下的 dataSource 参数指定。dataSource 相似于 RDBMS 的 Table Name,写入的数据经过此名称查询,如:select * from $dataSource

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
}

让咱们将教程中的数据源命名为 ingestion-tutorial

时间列

dataSchema 须要知道如何从输入的数据中提取主时间字段。Druid 的数据必须有时间字段,Druid 底层按时间分 segment 来存储数据,详情能够参考《Apache Druid 的集群设计与工做流程》

咱们数据中的时间戳列是"ts",它是一个 ISO 8601 规范的时间戳,咱们将配置此字段的 timestampSpec信息加到 dataSchema 下:

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  }
}

列类型

如今,咱们已经定义了时间列,让咱们看一下其余列的定义。

Druid 支持如下列类型:String,Long,Float,Double。下面章节中咱们将看到这些类型如何被使用。

在咱们讲如何定义其余非时间列以前,先讨论一下 rollup

Rollup

在摄入数据时,咱们须要考虑是否须要 rollup。

  • 若是开启 rollup,须要将输入数据列分红两种类型,维度(dimension)和指标(metric)。维度是 rollup 的 grouping 列(用于 group by,filtering),指标是被聚合计算的列。
  • 若是不开启 rollup,全部列都被视为维度,将不会进行预聚合。

在此教程中,咱们开启 rollup。在 dataSchemagranularitySpec中指定:

"dataSchema" : {
 "dataSource" : "ingestion-tutorial",
 "timestampSpec" : {
   "format" : "iso",
   "column" : "ts"
 },
 "granularitySpec" : {
   "rollup" : true
 }
}

选择维度和指标

在此教程中,咱们按如下方式划分维度列和指标列:

  • Dimensions: srcIP, srcPort, dstIP, dstPort, protocol
  • Metrics: packets, bytes, cost

这些维度是一组属性,用以标识一组网络流量数据,而指标表明按此维度组合的网络流量的实际状况。

让咱们看看如何在 spec 中定义维度和指标吧。

维度

维度由 dataSchema 中的 dimensionsSpec 参数指定。

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "dimensionsSpec" : {
    "dimensions": [
      "srcIP",
      { "name" : "srcPort", "type" : "long" },
      { "name" : "dstIP", "type" : "string" },
      { "name" : "dstPort", "type" : "long" },
      { "name" : "protocol", "type" : "string" }
    ]
  },
  "granularitySpec" : {
    "rollup" : true
  }
}

每一个维度都有一个 name 和 一个 type ,其中 type 能够是"long", "float", "double", or "string"。

注意,srcIP 是一个 "string" 维度。对于字符串维度,只须要指定维度的名称就能够了,由于它的类型默认为"string"。

也请注意, protocol 在输入数据中是数字类型,但咱们以 "string" 列类型提取它,因此 Druid 在摄入数据时会将其强制由 long 类型转换成 string 类型。

Strings vs Numbers

数字类型的数据应该做为数字维度仍是字符串维度?

数字维度相对于字符串维度有如下优点和劣势:

  • 优点:数字须要更小的存储空间,而且在读取该列时须要更小的开销。
  • 劣势:数字维度没有索引,因此按此列 filter 的操做会比字符串类型的维度(这种维度有索引)更慢。

指标

指标经过 dataSchema 中的 metricsSpec 参数指定:

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "dimensionsSpec" : {
    "dimensions": [
      "srcIP",
      { "name" : "srcPort", "type" : "long" },
      { "name" : "dstIP", "type" : "string" },
      { "name" : "dstPort", "type" : "long" },
      { "name" : "protocol", "type" : "string" }
    ]
  },
  "metricsSpec" : [
    { "type" : "count", "name" : "count" },
    { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
    { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
    { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
  ],
  "granularitySpec" : {
    "rollup" : true
  }
}

在定义指标时,必须指定当前列在 rollup 时应该执行的聚合类型。

这里,咱们在 packetsbytes 两个指标列上定义了 long 类型的 sum 聚合,在 cost 列上定义了一个 double 的 sum 聚合。

注意 metricsSpecdimensionSpecparseSpec 的嵌套层级不同。它和 dataSchema 中的 parser在同一嵌套层级。

注意,咱们也定义了一个 count 聚合器。这个计数聚合器将统计原始数据摄入的行数。

No rollup

若是咱们不使用 rollup,将在 dimensionsSpec 中指定全部列,如:

"dimensionsSpec" : {
        "dimensions": [
          "srcIP",
          { "name" : "srcPort", "type" : "long" },
          { "name" : "dstIP", "type" : "string" },
          { "name" : "dstPort", "type" : "long" },
          { "name" : "protocol", "type" : "string" },
          { "name" : "packets", "type" : "long" },
          { "name" : "bytes", "type" : "long" },
          { "name" : "srcPort", "type" : "double" }
        ]
      },

定义粒度

此时,咱们已经完成了 parsermetricSpec 的定义,并几乎要完成 Spec 的 dataSchema

咱们还须要在 granularitySpec 中设置一些额外的参数:

  • granularitySpec 类型:支持两种类型——uniformarbitrary 。在本教程中,咱们将使用 uniform,这样全部的 segment 将有统一的时间范围大小(本示例中,全部 segment 覆盖一个小时的数据量)。
  • segment 粒度:设置单个 segment 应该包含多大时间范围的数据,如:DAYWEEK
  • 时间列中时间戳的 buckting 粒度(称为查询粒度 queryGranularity )。

Segment 粒度

segment 粒度经过 granularitySpec 中的 segmentGranularity 属性配置。此文档中,咱们将建立 hourly 粒度的 segment:

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "dimensionsSpec" : {
    "dimensions": [
      "srcIP",
      { "name" : "srcPort", "type" : "long" },
      { "name" : "dstIP", "type" : "string" },
      { "name" : "dstPort", "type" : "long" },
      { "name" : "protocol", "type" : "string" }
    ]
  },
  "metricsSpec" : [
    { "type" : "count", "name" : "count" },
    { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
    { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
    { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
  ],
  "granularitySpec" : {
    "type" : "uniform",
    "segmentGranularity" : "HOUR",
    "rollup" : true
  }
}

咱们的输入数据包含两个小时的事件,因此此任务将生成两个 segment。

查询粒度

查询粒度经过 granularitySpec 中的 queryGranularity 属性配置。此教程中,咱们使用 minute 级粒度:

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "dimensionsSpec" : {
    "dimensions": [
      "srcIP",
      { "name" : "srcPort", "type" : "long" },
      { "name" : "dstIP", "type" : "string" },
      { "name" : "dstPort", "type" : "long" },
      { "name" : "protocol", "type" : "string" }
    ]
  },
  "metricsSpec" : [
    { "type" : "count", "name" : "count" },
    { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
    { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
    { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
  ],
  "granularitySpec" : {
    "type" : "uniform",
    "segmentGranularity" : "HOUR",
    "queryGranularity" : "MINUTE",
    "rollup" : true
  }
}

为了查看查询粒度配置的效果,让咱们从原始输入数据中查看这一行:

{"ts":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}

当使用 minute 粒度摄入这行数据时,Druid 将把这行数据的时间戳 floor 成 minute 桶的时间:

{"ts":"2018-01-01T01:03:00Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}

定义时间范围

对于批量任务,必须定义时间范围。在时间范围以外的输入数据将不被摄入。

这个时间范围也在 granularitySpec 中指定:

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "dimensionsSpec" : {
    "dimensions": [
      "srcIP",
      { "name" : "srcPort", "type" : "long" },
      { "name" : "dstIP", "type" : "string" },
      { "name" : "dstPort", "type" : "long" },
      { "name" : "protocol", "type" : "string" }
    ]
  },
  "metricsSpec" : [
    { "type" : "count", "name" : "count" },
    { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
    { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
    { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
  ],
  "granularitySpec" : {
    "type" : "uniform",
    "segmentGranularity" : "HOUR",
    "queryGranularity" : "MINUTE",
    "intervals" : ["2018-01-01/2018-01-02"],
    "rollup" : true
  }
}

定义任务类型

如今咱们已经完成了 dataSchema 的定义。接下来要作的就是将咱们建立的 dataSchema 放入一个数据摄入任务中,并指定输入的源。

dataSchema 适用于全部类型的任务,但每种任务类型都有自生的规范格式。在本教程中,咱们使用本地批量数据摄入任务类型(the native ingestion task):

{
  "type" : "index_parallel",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "ingestion-tutorial",
      "timestampSpec" : {
        "format" : "iso",
        "column" : "ts"
      },
      "dimensionsSpec" : {
        "dimensions": [
          "srcIP",
          { "name" : "srcPort", "type" : "long" },
          { "name" : "dstIP", "type" : "string" },
          { "name" : "dstPort", "type" : "long" },
          { "name" : "protocol", "type" : "string" }
        ]
      },
      "metricsSpec" : [
        { "type" : "count", "name" : "count" },
        { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
        { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
        { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "HOUR",
        "queryGranularity" : "MINUTE",
        "intervals" : ["2018-01-01/2018-01-02"],
        "rollup" : true
      }
    }
  }
}

定义输入源

如今,让咱们来定义咱们本身的输入源,它在 ioConfig 对象中指定。每种任务类型都有本身的 ioConfig 类型。为了读取输入数据,咱们须要指定一个 inputSource。咱们以前保存的网络流量数据须要从一个本地文件读取,其配置以下:

"ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/",
        "filter" : "ingestion-tutorial-data.json"
      }
    }

定义数据格式

由于咱们的数据是 JSON 字符串形式的,咱们使用 inputFormat json 格式化数据(还支持 csv、protobuf 等数据类型):

"ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/",
        "filter" : "ingestion-tutorial-data.json"
      },
      "inputFormat" : {
        "type" : "json"
      }
    }
{
  "type" : "index_parallel",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "ingestion-tutorial",
      "timestampSpec" : {
        "format" : "iso",
        "column" : "ts"
      },
      "dimensionsSpec" : {
        "dimensions": [
          "srcIP",
          { "name" : "srcPort", "type" : "long" },
          { "name" : "dstIP", "type" : "string" },
          { "name" : "dstPort", "type" : "long" },
          { "name" : "protocol", "type" : "string" }
        ]
      },
      "metricsSpec" : [
        { "type" : "count", "name" : "count" },
        { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
        { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
        { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "HOUR",
        "queryGranularity" : "MINUTE",
        "intervals" : ["2018-01-01/2018-01-02"],
        "rollup" : true
      }
    },
    "ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/",
        "filter" : "ingestion-tutorial-data.json"
      },
      "inputFormat" : {
        "type" : "json"
      }
    }
  }
}

其余调整

每个摄取任务都有 tuningConfig 配置项,它容许用户调整各类摄取参数。

举例来讲,让咱们添加一个tuningConfig,以设置本次批量摄取任务的目标 segment 大小:

"tuningConfig" : {
      "type" : "index_parallel",
      "maxRowsPerSegment" : 5000000
    }

最终 spec

如今咱们已经定义完成了一个摄取规范,它如今看起来以下所示:

{
  "type" : "index_parallel",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "ingestion-tutorial",
      "timestampSpec" : {
        "format" : "iso",
        "column" : "ts"
      },
      "dimensionsSpec" : {
        "dimensions": [
          "srcIP",
          { "name" : "srcPort", "type" : "long" },
          { "name" : "dstIP", "type" : "string" },
          { "name" : "dstPort", "type" : "long" },
          { "name" : "protocol", "type" : "string" }
        ]
      },
      "metricsSpec" : [
        { "type" : "count", "name" : "count" },
        { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
        { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
        { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "HOUR",
        "queryGranularity" : "MINUTE",
        "intervals" : ["2018-01-01/2018-01-02"],
        "rollup" : true
      }
    },
    "ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/",
        "filter" : "ingestion-tutorial-data.json"
      },
      "inputFormat" : {
        "type" : "json"
      }
    },
    "tuningConfig" : {
      "type" : "index_parallel",
      "maxRowsPerSegment" : 5000000
    }
  }
}

提交任务并查询数据

在包根目录下,运行下面命令:

bin/post-index-task --file quickstart/ingestion-tutorial-index.json --url http://localhost:8081

脚本执行完成后,咱们来查询数据。

让咱们运行 bin/dsql 并发送一个 select * from "ingestion-tutorial"; 语句,查询已经被写入的数据:

$ bin/dsql
Welcome to dsql, the command-line client for Druid SQL.
Type "\h" for help.
dsql> select * from "ingestion-tutorial";

┌──────────────────────────┬───────┬──────┬───────┬─────────┬─────────┬─────────┬──────────┬─────────┬─────────┐
│ __time                   │ bytes │ cost │ count │ dstIP   │ dstPort │ packets │ protocol │ srcIP   │ srcPort │
├──────────────────────────┼───────┼──────┼───────┼─────────┼─────────┼─────────┼──────────┼─────────┼─────────┤
│ 2018-01-01T01:01:00.000Z │  6000 │  4.9 │     3 │ 2.2.2.2 │    3000 │      60 │ 6        │ 1.1.1.1 │    2000 │
│ 2018-01-01T01:02:00.000Z │  9000 │ 18.1 │     2 │ 2.2.2.2 │    7000 │      90 │ 6        │ 1.1.1.1 │    5000 │
│ 2018-01-01T01:03:00.000Z │  6000 │  4.3 │     1 │ 2.2.2.2 │    7000 │      60 │ 6        │ 1.1.1.1 │    5000 │
│ 2018-01-01T02:33:00.000Z │ 30000 │ 56.9 │     2 │ 8.8.8.8 │    5000 │     300 │ 17       │ 7.7.7.7 │    4000 │
│ 2018-01-01T02:35:00.000Z │ 30000 │ 46.3 │     1 │ 8.8.8.8 │    5000 │     300 │ 17       │ 7.7.7.7 │    4000 │
└──────────────────────────┴───────┴──────┴───────┴─────────┴─────────┴─────────┴──────────┴─────────┴─────────┘
Retrieved 5 rows in 0.12s.

dsql>

若是以为阅读后对你有帮助,但愿分享、点赞、在看三连哦。

关注 【码哥字节】解锁更多硬核。

推荐阅读

如下几篇文章阅读量与读者反馈都很好,推荐你们阅读:

公众号后台回复 ”加群“,加入读者技术群,里面有阿里、腾讯的小伙伴一块儿探讨技术。

MageByte