Druid:经过 Kafka 加载流数据

开始

本教程演示了如何使用 Druid 的 Kafka indexing 服务从 Kafka 流中加载数据至 Druid。html

在本教程中,咱们假设你已经按照 quickstart 文档中使用micro-quickstart单机配置所描述的下载了 Druid,并在本机运行了 Druid。你不须要加载任何数据。shell

下载并启动 Kafka

Apache Kafka是一种高吞吐量消息总线,可与 Druid 很好地配合使用。在本教程中,咱们将使用 Kafka 2.1.0。在终端运行下面命令下载 Kafka:apache

curl -O https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar -xzf kafka_2.12-2.1.0.tgz
cd kafka_2.12-2.1.0

在终端运行下面命令启动 kafka broker:json

./bin/kafka-server-start.sh config/server.properties

运行下面命令建立名为wikipedia的 topic,咱们将向其发送数据:bootstrap

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia

向 Kafka 加载数据

wikipedia topic 启动一个 kafka producer,并发送数据。服务器

在 Druid 目录下,运行下面命令:并发

cd quickstart/tutorial
gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json

在 Kafka 目录下运行下面命令,将{PATH_TO_DRUID}替换成你的 Kafka 路径:app

export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json

上面命令会向 kakfa 的wikiapedia topic 发送 events。以后,咱们将使用 Druid 的 Kafka indexing 服务从 Kafka topic 中提取数据。curl

经过 data loader 加载数据

导航至 localhost:8080 并单击控制台顶部的Load dataide

Druid:经过 Kafka 加载流数据

选择 Apache Kafka 并单击 Connect data.

Druid:经过 Kafka 加载流数据

输入 bootstrap:localhost:9092和 topic:wikipedia

单击Preview并肯定你看到的数据正确。

找到数据后,能够单击"Next: Parse data"进入下一步。

Druid:经过 Kafka 加载流数据

data loader 会尝试自动选择正确的数据解析器。在本示例中,将选择json解析器。你能够尝试选择其余解析器,看看 Druid 是如何解析数据的。

选择json解析器,点击Next: Parse time进入下一步,来肯定 timestamp 列。

Druid:经过 Kafka 加载流数据

Druid 须要一个主 timestamp 列(内部将存储在__time 列)。若是你的数据中没有 timestamp 列,选择Constant value。在咱们的示例中,将选择time列,由于它是数据之中惟一能够做为主时间列的候选者。

单击Next: ...两次以跳过TransformFilter步骤。

您无需在这些步骤中输入任何内容,由于应用提取数据的时间变换和过滤器不在本教程范围内。

Druid:经过 Kafka 加载流数据

Configure schema步骤中,你能够配置哪些维度和指标能够摄入 Druid。这是数据被摄入 Druid 后呈现的样子。因为咱们的数据集比较小,点击Rollup开关关闭 rollup 功能。

对 schema 配置满意后,单击Next进入Partition步骤,以调整数据至 segment 的分区。

Druid:经过 Kafka 加载流数据

在这里,您能够调整如何在 Druid 中将数据拆分为多个段。因为这是一个很小的数据集,所以在此步骤中无需进行任何调整。

单击Tune步骤后,进入发布步骤。

Druid:经过 Kafka 加载流数据

Publish步骤中,咱们能够指定 Druid 中的数据源名称。咱们将此数据源命名为wikipedia。最后,单击Next以查看 spec。

Druid:经过 Kafka 加载流数据

这是你构建的 spec。尝试随意返回并在以前的步骤中进行更改,以查看变更将如何更新 spec。一样,你也能够直接编辑 spec,并在前面的步骤中看到它。

对 spec 满意后,点击Submit建立摄取任务。

Druid:经过 Kafka 加载流数据

你将进入任务视图,重点关注新建立的任务。任务视图设置为自动刷新,等待任务成功。

当一项任务成功完成时,意味着它创建了一个或多个 segment,这些 segment 将由数据服务器接收。

Datasources从标题导航到视图。

Druid:经过 Kafka 加载流数据

等待直到你的数据源(wikipedia)出现。加载 segment 时可能须要几秒钟。

一旦看到绿色(彻底可用)圆圈,就能够查询数据源。此时,你能够转到Query视图以对数据源运行 SQL 查询。

Druid:经过 Kafka 加载流数据

运行SELECT * FROM "wikipedia"查询以查看结果。

Druid:经过 Kafka 加载流数据

经过控制台提交 supervisor

在控制台中,单击Submit supervisor打开提交 supervisor 窗口。

Druid:经过 Kafka 加载流数据

粘贴如下 spec 并点击提交:

{
  "type": "kafka",
  "spec" : {
    "dataSchema": {
      "dataSource": "wikipedia",
      "timestampSpec": {
        "column": "time",
        "format": "auto"
      },
      "dimensionsSpec": {
        "dimensions": [
          "channel",
          "cityName",
          "comment",
          "countryIsoCode",
          "countryName",
          "isAnonymous",
          "isMinor",
          "isNew",
          "isRobot",
          "isUnpatrolled",
          "metroCode",
          "namespace",
          "page",
          "regionIsoCode",
          "regionName",
          "user",
          { "name": "added", "type": "long" },
          { "name": "deleted", "type": "long" },
          { "name": "delta", "type": "long" }
        ]
      },
      "metricsSpec" : [],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": "NONE",
        "rollup": false
      }
    },
    "tuningConfig": {
      "type": "kafka",
      "reportParseExceptions": false
    },
    "ioConfig": {
      "topic": "wikipedia",
      "inputFormat": {
        "type": "json"
      },
      "replicas": 2,
      "taskDuration": "PT10M",
      "completionTimeout": "PT20M",
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      }
    }
  }
}

这将启动 supervisor,并分化出 task 监听数据流入。

直接提交 supervisor

为了直接启动服务,咱们须要在 Druid 包根目录下运行下面命令提交一个 supervisor spec 给 Druid overlord:

curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor

若是 supervisor 成功建立,你将获得一个包含 supervisor ID 的响应。在咱们的示例中,将返回{"id":"wikipedia"}

你能够在控制台中查看当前 supervisor 和 tasks: http://localhost:8888/unified-console.html#tasks.

查询数据

当数据发送给 Kafka stream 后,马上就能够查询数据。

本文翻译自 Druid 官方文档

请关注咱们。一块儿学习 Druid 知识。

码哥字节

相关文章
相关标签/搜索