EMQ X 规则引擎系列(十)—— 存储消息到 OpenTSDB 数据库

OpenTSDB 介绍

OpenTSDB 是可扩展的分布式时序数据库,底层依赖 HBase 并充分发挥了HBase的分布式列存储特性,支持数百万每秒的读写。git

面对大规模快速增加的物联网传感器采集、交易记录等数据,时间序列数据累计速度很是快,时序数据库经过提升效率来处理这种大规模数据,并带来性能的提高,包括:更高的容纳率(Ingest Rates)、更快的大规模查询以及更好的数据压缩。github

安装与验证 OpenTSDB 服务器

读者能够参考 OpenTSDB 官方文档 (http://opentsdb.net) 或 Docker (https://hub.docker.com/r/petergrace/opentsdb-docker/) 来下载安装 OpenTSDB 服务器,本文使用 OpenTSDB 2.4.0 版本。sql

场景介绍

该场景须要将 EMQ X 指定主题下且知足条件的消息存储到 OpenTSDB 数据库。为了便于后续分析检索,消息内容须要进行拆分存储。docker

该场景下客户端上报数据以下:shell

  • Topic:stat/cpu数据库

  • Payload:json

    {
      "metric": "cpu",
      "tags": {
        "host": "serverA"
      },
      "value":12
    }

准备工做

启动 OpenTSDB Server

启动 OpenTSDB Server 并开放 4242 端口。性能优化

$ docker pull petergrace/opentsdb-docker

$ docker run -d --name opentsdb -p 4242:4242 petergrace/opentsdb-docker

配置说明

建立资源

打开 EMQ X Dashboard,进入左侧菜单的 资源 页面,点击 新建 按钮,选择 OpenTSDB 资源类型并完成相关配置进行资源建立。服务器

建立规则

进入左侧菜单的 规则 页面,点击 新建 按钮,进行规则建立。这里选择触发事件 message.publish,即在 EMQ X 收到 PUBLISH 消息时触发该规则进行数据处理。socket

选定触发事件后,咱们可在界面上看到可选字段及示例 SQL:

筛选所需字段

规则引擎使用 SQL 语句过滤和处理数据。例如前文提到的场景中咱们须要将 payload 中的字段提取出来使用,则能够经过 payload.<fieldName> 实现。同时咱们仅仅指望处理 stat/cpu 主题,那么能够在 WHERE 子句中使用主题通配符 =~topic 进行筛选:topic =~ 'stat/cpu', 最终咱们获得 SQL 以下:

SELECT
  payload.metric as metric, payload.tags as tags, payload.value as value
FROM
  "message.publish"
WHERE
	topic =~ 'stat/cpu'

SQL 测试

借助 SQL 测试功能,咱们能够快速确认刚刚填写的 SQL 语句可否达成咱们的目的。首先填写用于测试的 payload 等数据以下:

而后点击 测试 按钮,咱们获得如下数据输出:

{
  "metric": "cpu",
  "tags": {
    "host": "serverA"
  },
  "value": 12
}

测试输出与预期相符,咱们能够进行后续步骤。

添加响应动做,存储消息到 OpenTSDB

SQL 条件输入输出无误后,咱们继续添加相应动做,配置写入 SQL 语句,将筛选结果存储到 OpenTSDB。

点击响应动做中的 添加 按钮,选择 保存数据到 OpenTSDB 动做,选取刚刚建立的 OpenTSDB 资源并完成剩余参数设置。OpenTSDB 动做用到的几个参数分别为:

  1. 详细信息。是否须要 OpenTSDB Server 返回存储失败的 Data points 及失败缘由,默认为 false。
  2. 摘要信息。是否须要 OpenTSDB Server 返回 data point 存储成功与失败的数量,默认为 true。
  3. 最大批处理数量。消息请求频繁时容许驱动从队列中一次读取多少个 Data Points 合并为一个 HTTP 请求,为性能优化参数,默认为 20。
  4. 是否同步调用。配置 OpenTSDB Server 是否等待全部数据都被写入后才返回结果,默认为 false。
  5. 同步调用超时时间。OpenTSDB Server 等待数据写入的最大时间,默认为 0,即永不超时。

这里咱们所有使用默认配置,点击 新建 按钮完成规则建立。

测试

预期结果

咱们成功建立了一条规则,包含一个处理动做,动做指望效果以下:

  1. 客户端向 stat/cpu 主题上报消息时,该消息将命中 SQL,规则列表中 已命中 数字增长 1;
  2. OpenTSDB Server 中将增长一条数据,数据内容与消息内容一致。

使用 Dashboard 中的 Websocket 工具测试

切换到 工具 --> Websocket 页面,使用任意信息客户端链接到 EMQ X,链接成功后在 消息 卡片中发送以下消息:

  • Topic:stat/cpu

  • Payload:

    {
      "metric": "cpu",
      "tags": {
        "host": "serverA"
      },
      "value":12
    }

点击 发送 按钮,发送成功后能够看到当前规则已命中次数已经变为了 1。

而后经过 Postman 向 OpenTSDB 发送查询请求,当咱们获得以下应答时说明新的 data point 已经添加成功:

至此,咱们经过规则引擎实现了使用规则引擎存储消息到 OpenTSDB 数据库的业务开发。


更多信息请访问咱们的官网 emqx.io,或关注咱们的开源项目 github.com/emqx/emqx ,详细文档请访问 官方文档

相关文章
相关标签/搜索