InfluxDB 是一个用于存储和分析时间序列数据的开源数据库,内置 HTTP API,类 SQL 语句的支持和无结构的特性对使用者而言都很是友好。它强大的数据吞吐能力以及稳定的性能表现使其很是适合 IoT 领域。html
经过 EMQ X 消息引擎,咱们能够自定义 Template 文件,而后将 Json 格式的 MQTT 消息转换为 Measurement 写入 InfluxDB:git
该场景须要将 EMQ X 指定主题下且知足条件的消息存储到 InfluxDB 时序数据库。为了便于后续分析检索,消息内容须要进行拆分存储。github
该场景下客户端上报数据以下:sql
Topic:data/sensordocker
Payload:shell
{
"location": "bedroom",
"data": {
"temperature": 25,
"humidity": 46.4,
"pm2_5": 0.5
}
}
复制代码
建立 db
数据库并开放 8089 UDP 端口。数据库
$ docker pull influxdb
$ git clone -b v1.0.0 https://github.com/palkan/influx_udp.git
$ cd influx_udp
$ docker run --name=influxdb --rm -d -p 8086:8086 -p 8089:8089/udp -v ${PWD}/files/influxdb.conf:/etc/influxdb/influxdb.conf:ro -e INFLUXDB_DB=db influxdb:latest
复制代码
打开 EMQ X Dashboard,进入左侧菜单的 资源 页面,点击 新建 按钮,选择 InfluxDB 资源类型并完成相关配置进行资源建立。json
进入左侧菜单的 规则 页面,点击 新建 按钮,进行规则建立。触发事件 选择 message.publish,即在 EMQ X 收到 PUBLISH 消息时触发该规则进行数据处理。bash
选定触发事件后,咱们可在界面上看到可选字段及示例 SQL:socket
规则引擎使用 SQL 语句过滤和处理数据。例如前文提到的场景中咱们须要将 payload
中的字段提取出来使用,则能够经过 payload.<fieldName>
实现。同时咱们仅仅指望处理 data/sensor
主题,那么能够在 WHERE 子句中使用主题通配符 =~
对 topic
进行筛选:topic =~ 'data/sensor'
, 最终咱们获得 SQL 以下:
SELECT
payload.location as location,
payload.data.temperature as temperature,
payload.data.humidity as humidity,
payload.data.pm2_5 as pm2_5
FROM
"message.publish"
WHERE
topic =~ 'data/sensor'
复制代码
借助 SQL 测试功能,咱们能够快速确认刚刚填写的 SQL 语句是否能达到咱们的目的。首先填写用于测试的 payload 等数据以下:
而后点击 测试 按钮,获得如下输出结果,与预期相符。
{
"humidity": 46.4,
"location": "bedroom",
"pm2_5": 0.5,
"temperature": 25
}
复制代码
SQL 条件输入输出无误后,咱们继续添加响应动做,配置写入 SQL 语句,将筛选结果存储到 InfluxDB。
点击响应动做中的 添加 按钮,选择动做 保存数据到 InfluxDB,选取刚刚建立的 InfluxDB
资源,再按照实际需求将 ${fieldName}
填写到 Field Keys
, Tag Keys
和 Timestamp Key
中,Measurement
表示将数据写入 InfluxDB
时使用的 Measurement
,最后点击 新建 按钮完成规则建立。
咱们成功建立了一条规则,包含一个处理动做,动做指望效果以下:
data/sensor
主题上报消息时,该消息将命中规则,规则列表中 已命中 数字将会增长 1;db
数据库中将会增长一条数据,数据内容与处理后的消息内容一致。切换到 工具 --> Websocket 页面,使用任意 Client ID 链接到 EMQ X,链接成功后在 消息 卡片中发送以下消息:
Topic:data/sensor
Payload:
{
"location": "bedroom",
"data": {
"temperature": 25,
"humidity": 46.4,
"pm2_5": 0.5
}
}
复制代码
点击 发送 按钮,发送成功后能够看到当前规则已命中次数已经变为 1。
而后检查 InfluxDB,新的 data point 是否添加成功:
$ docker exec -it influxdb influx
> use db
Using database db
> select * from "sensor_data"
name: sensor_data
time humidity location pm2_5 temperature
---- -------- -------- ----- -----------
1561535778444457348 46.4 bedroom 0.5 25
复制代码
至此,咱们经过规则引擎实现了存储消息到 InfluxDB 数据库的业务开发。
在阅读该教程以前,假定你已经了解 MQTT、EMQ X 的简单知识。
更多信息请访问咱们的官网 emqx.io,或关注咱们的开源项目 github.com/emqx/emqx ,详细文档请访问 官方文档。