OpenTSDB 是可扩展的分布式时序数据库,底层依赖 HBase 并充分发挥了HBase的分布式列存储特性,支持数百万每秒的读写。git
面对大规模快速增加的物联网传感器采集、交易记录等数据,时间序列数据累计速度很是快,时序数据库经过提升效率来处理这种大规模数据,并带来性能的提高,包括:更高的容纳率(Ingest Rates)、更快的大规模查询以及更好的数据压缩。github
读者能够参考 OpenTSDB 官方文档 (http://opentsdb.net) 或 Docker (https://hub.docker.com/r/pete... 来下载安装 OpenTSDB 服务器,本文使用 OpenTSDB 2.4.0 版本。sql
该场景须要将 EMQ X 指定主题下且知足条件的消息存储到 OpenTSDB 数据库。为了便于后续分析检索,消息内容须要进行拆分存储。docker
该场景下客户端上报数据以下:shell
Payload:数据库
{ "metric": "cpu", "tags": { "host": "serverA" }, "value":12 }
启动 OpenTSDB Server 并开放 4242 端口。json
$ docker pull petergrace/opentsdb-docker $ docker run -d --name opentsdb -p 4242:4242 petergrace/opentsdb-docker
打开 EMQ X Dashboard,进入左侧菜单的 资源 页面,点击 新建 按钮,选择 OpenTSDB 资源类型并完成相关配置进行资源建立。性能优化
进入左侧菜单的 规则 页面,点击 新建 按钮,进行规则建立。这里选择触发事件 message.publish,即在 EMQ X 收到 PUBLISH 消息时触发该规则进行数据处理。服务器
选定触发事件后,咱们可在界面上看到可选字段及示例 SQL:socket
规则引擎使用 SQL 语句过滤和处理数据。例如前文提到的场景中咱们须要将 payload
中的字段提取出来使用,则能够经过 payload.<fieldName>
实现。同时咱们仅仅指望处理 stat/cpu
主题,那么能够在 WHERE 子句中使用主题通配符 =~
对 topic
进行筛选:topic =~ 'stat/cpu'
, 最终咱们获得 SQL 以下:
SELECT payload.metric as metric, payload.tags as tags, payload.value as value FROM "message.publish" WHERE topic =~ 'stat/cpu'
借助 SQL 测试功能,咱们能够快速确认刚刚填写的 SQL 语句可否达成咱们的目的。首先填写用于测试的 payload 等数据以下:
而后点击 测试 按钮,咱们获得如下数据输出:
{ "metric": "cpu", "tags": { "host": "serverA" }, "value": 12 }
测试输出与预期相符,咱们能够进行后续步骤。
SQL 条件输入输出无误后,咱们继续添加相应动做,配置写入 SQL 语句,将筛选结果存储到 OpenTSDB。
点击响应动做中的 添加 按钮,选择 保存数据到 OpenTSDB 动做,选取刚刚建立的 OpenTSDB
资源并完成剩余参数设置。OpenTSDB 动做用到的几个参数分别为:
这里咱们所有使用默认配置,点击 新建 按钮完成规则建立。
咱们成功建立了一条规则,包含一个处理动做,动做指望效果以下:
stat/cpu
主题上报消息时,该消息将命中 SQL,规则列表中 已命中 数字增长 1;切换到 工具 --> Websocket 页面,使用任意信息客户端链接到 EMQ X,链接成功后在 消息 卡片中发送以下消息:
Payload:
{ "metric": "cpu", "tags": { "host": "serverA" }, "value":12 }
点击 发送 按钮,发送成功后能够看到当前规则已命中次数已经变为了 1。
而后经过 Postman 向 OpenTSDB 发送查询请求,当咱们获得以下应答时说明新的 data point 已经添加成功:
至此,咱们经过规则引擎实现了使用规则引擎存储消息到 OpenTSDB 数据库的业务开发。
更多信息请访问咱们的官网 emqx.io,或关注咱们的开源项目 github.com/emqx/emqx ,详细文档请访问 官方文档。