物联网数据采集涉及到大量设备接入、海量的时序数据传输,EMQ X MQTT 服务器 与 TDengine 大数据平台的组合技术栈彻底可以胜任场景中的海量时间序列监测数据的传输、存储和计算。javascript
数据入库后,每每须要其余方式如数据可视化系统将数据按照规则统计、展示出来,实现数据的监控、指标统计等业务需求,以便充分发挥数据的价值,TDengine 搭配开源软件 Grafana 能够快速搭建物联网数据可视化平台。java
上述整套方案无需代码开发,涉及的产品均能提供开源软件、企业服务、云端 SaaS 服务不一样层次的交付模式,可以根据项目需求实现免费版或企业版私有化落地以及云端部署。node
EMQ X 是基于高并发的 Erlang/OTP 语言平台开发,支持百万级链接和分布式集群架构,发布订阅模式的开源 MQTT 消息服务器。EMQ X 内置了大量开箱即用的功能,其 开源版 EMQ X Broker 及 企业版 EMQ X Enterprise 均支持经过规则引擎将设备消息存储到 TDengine。sql
TDengine 是涛思数据专为物联网、车联网、工业互联网、IT 运维等设计和优化的大数据平台。除核心的快 10 倍以上的时序数据库功能外,还提供缓存、数据订阅、流式计算等功能,最大程度减小研发和运维的复杂度,且核心代码,包括集群功能所有开源。docker
TDengine 提供社区版、企业版和云服务版,安装/使用教程详见 TDengine 使用文档。数据库
Grafana 是一个跨平台、开源的度量分析和可视化工具,能够查询处理各种数据源中的数据,进行可视化的展现。它能够快速灵活建立的客户端图表,面板插件有许多不一样方式的可视化指标和日志,官方库中具备丰富的仪表盘插件,好比热图、折线图、图表等多种展现方式;支持 Graphite,TDengine、InfluxDB,OpenTSDB,Prometheus,Elasticsearch,CloudWatch和 KairosDB 等数据源,支持数据项独立/混合查询展现;能够建立自定义告警规则并通知到其余消息处理服务或组件中。macos
本文模拟物联网环境数据采集场景,假设现有必定数据的环境数据采集点,全部采集点数据均经过 MQTT 协议 传输至采集平台(MQTT Publish),主题设计以下:npm
sensor/data
传感器发送的数据格式为 JSON,数据包括传感器采集的温度、湿度、噪声音量、PM十、PM2.五、二氧化硫、二氧化氮、一氧化碳、传感器 ID、区域、采集时间等数据。json
{ "temperature": 30, "humidity" : 20, "volume": 44.5, "PM10": 23, "pm25": 61, "SO2": 14, "NO2": 4, "CO": 5, "id": "10-c6-1f-1a-1f-47", "area": 1, "ts": 1596157444170 }
如今须要实时存储以便在后续任意时间查看数据,提出如下的需求:浏览器
本文所用各个组件均有 Docker 镜像,除 EMQ X 须要修改少数配置为了便于操做使用下载安装外,TDengine 与 Grafana 均使用 Docker 搭建。
安装包资源与使用教程参照各自官网:
若是您是 EMQ X 新手用户,推荐经过 EMQ X 文档 快速上手
访问 EMQ X 下载 页面下载适合您操做系统的安装包,本文截稿时 EMQ X 开源版最新版本为 v4.1.2,下载 zip 包的启动步骤以下 :
## 解压下载好的安装包 unzip emqx-macosx-v4.1.1.zip cd emqx ## 以 console 模式启动 EMQ X 方便调试 ./bin/emqx console
启动成功后浏览器访问 http://127.0.0.1:18083 访问 EMQ X 管理控制台 Dashboard,使用 admin
public
默认用户名密码完成初次登陆。
为了方便测试使用经过 Docker 进行安装(需映射网络端口),也可使用安装包的方式进行安装:
## 拉取并启动容器 docker run -d --name tdengine -p 6030-6041:6030-6041 tdengine/tdengine:latest ## 启动后检查容器运行状态 docker ps -a
使用如下命令经过 Docker 安装并启动 Grafana:
docker run -d --name=grafana -p 3000:3000 grafana/grafana
启动成功后浏览器访问 http://127.0.0.1:3000 访问 Grafana 可视化面板,使用 admin
admin
默认用户名密码完成初次登陆,登陆后按照提示修改密码使用新密码登陆进入主界面:
进入TDengine Docker 容器:
docker exec -it tdengine bash
建立 test
数据库:
taos create database test;
建立 sensor_data 表,关于 TDengine 数据结构以及 SQL 命令参见 TAOS SQL:
use test; CREATE TABLE sensor_data ( ts timestamp, temperature float, humidity float, volume float, PM10 float, pm25 float, SO2 float, NO2 float, CO float, sensor_id NCHAR(255), area TINYINT, coll_time timestamp );
打开 EMQ X Dashboared,进入 规则引擎 -> 规则 页面,点击 建立 按钮进入建立页面。
规则 SQL 用于 EMQ X 消息以及事件筛选,如下 SQL 表示从 sensor/data
主题筛选出 payload 数据:
SELECT payload FROM "sensor/data"
使用 SQL 测试功能 ,输入测试数据进行筛选结果测试,测试有结果且输出内容以下,标明 SQL 编写正确:
{ "payload": "{\"temperature\":30,\"humidity\":20,\"volume\":44.5,\"PM10\":23,\"pm2.5\":61,\"SO2\":14,\"NO2\":4,\"CO\":5,\"id\":\"10-c6-1f-1a-1f-47\",\"area\":1,\"ts\":1596157444170}" }
为支持各类不一样类型平台的开发,TDengine 提供符合 REST 设计标准的 API。经过 RESTful Connector 提供了最简单的链接方式,即便用 HTTP 请求携带认证信息与要执行的 SQL 操做 TDengine。
使用 EMQ X 开源版中的 发送到 Web 服务 便可经过 RESTful Connector 写入数据到 TDengine。即将到来的 EMQ X 企业版 4.1.1 版本将提供原生更高性能的写入 Connector。
发送到 Web 服务须要两个数据,一个是关联资源,另外一个是消息内容模板。
INSERT INTO test.sensor_data VALUES( now, ${payload.temperature}, ${payload.humidity}, ${payload.volume}, ${payload.PM10}, ${payload.pm25}, ${payload.SO2}, ${payload.NO2}, ${payload.CO}, '${payload.id}', ${payload.area}, ${payload.ts} )
点击响应动做下的 添加 按钮,在弹出框内选择 发送数据到 Web 服务,点击 新建资源 新建一个 WebHook 资源。
资源类型选择 Webhook,请求 URL 填写 http://127.0.0.1:6041/rest/sql,请求方法选择 POST, 还需添加 Authorization 请求头做为认证信息 。
Authorization 的值为 Basic + TDengine 的 {username}:{password}
通过 Base64 编码以后的字符串, 例如 root:taosdata
编码后为 cm9vdDp0YW9zZGF0YQ==
,实际填入的值为:Basic cm9vdDp0YW9zZGF0YQ==
在响应动做建立页面选择新建的资源,并填入消息模板内容便可。
如下脚本模拟了 10000 个设备在过去 24 小时内、每隔 5 秒钟上报一条模拟数据并发送到 EMQ X 的场景。
读者安装 Node.js ,按需修改配置参数后能够经过如下命令启动:
npm install mqtt mockjs --save --registry=https://registry.npm.taobao.org node mock.js
附:模拟生成数据并发送到 EMQ X 代码,请根据集群性能调整相关参数
// mock.js const mqtt = require('mqtt') const Mock = require('mockjs') const EMQX_SERVER = 'mqtt://localhost:1883' const CLIENT_NUM = 10000 const STEP = 5000 // 模拟采集时间间隔 ms const AWAIT = 5000 // 每次发送完后休眠时间,防止消息速率过快 ms const CLIENT_POOL = [] startMock() function sleep(timer = 100) { return new Promise(resolve => { setTimeout(resolve, timer) }) } async function startMock() { const now = Date.now() for (let i = 0; i < CLIENT_NUM; i++) { const client = await createClient(`mock_client_${i}`) CLIENT_POOL.push(client) } // last 24h every 5s const last = 24 * 3600 * 1000 for (let ts = now - last; ts <= now; ts += STEP) { for (const client of CLIENT_POOL) { const mockData = generateMockData() const data = { ...mockData, id: client.clientId, area: 0, ts, } client.publish('sensor/data', JSON.stringify(data)) } const dateStr = new Date(ts).toLocaleTimeString() console.log(`${dateStr} send success.`) await sleep(AWAIT) } console.log(`Done, use ${(Date.now() - now) / 1000}s`) } /** * Init a virtual mqtt client * @param {string} clientId ClientID */ function createClient(clientId) { return new Promise((resolve, reject) => { const client = mqtt.connect(EMQX_SERVER, { clientId, }) client.on('connect', () => { console.log(`client ${clientId} connected`) resolve(client) }) client.on('reconnect', () => { console.log('reconnect') }) client.on('error', (e) => { console.error(e) reject(e) }) }) } /** * Generate mock data */ function generateMockData() { return { "temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)), "humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)), "volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)), "PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)), "pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)), "SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)), "NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)), "CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)), "area": Mock.Random.integer(0, 20), "ts": 1596157444170, } }
组件安装完成,模拟数据写入成功后,按照 Grafana 可视化界面的操做指引,完成业务所需数据可视化配置。
添加数据源,即显示的数据源信息。选取 TDengine 类型数据源,输入链接参数进行配置,默认状况下,关键配置信息以下:
添加好数据源后,添加须要显示的数据仪表盘信息。仪表盘为多个可视化面板的集合,点击 New Dashboard 后,选择 + Query 经过查询来添加数据面板。
建立面板须要四个步骤,分别是 Queries(查询) 、 Visualization(可视化) 、 General(图表配置) 、 Alert(告警) ,建立时间
使用 Grafana 的可视化查询构建工具,查询出全部设备的平均值。
如下 SQL 按照指定时间段($form $to)、指定时间间隔($interval),查询出数据中关键指标的平均值:
select avg(temperature), avg(humidity), avg(volume), avg(PM10), avg(pm25), avg(SO2), avg(NO2), avg(CO) from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)
Visualization 默认不作更改, General 里面修改面板名称为 历史平均值,若是须要对业务进行监控告警,能够在 Alert 里编排告警规则,此处仅作可视化展现,不使用此功能。
完成建立后,点击左上角返回按钮,该 Dashboard 里成功添加一个数据面板。点击顶部导航栏 保存 图标,输入 Dashboard 名称完成 Dashboard 的建立。
继续点击 Dashboard 的 Add panel 按钮,添加最大值、最小值图表。操做步骤同添加平均值,仅对查询中 SELECT 统计方法字段作出调整,调整为 AVG 函数为 MAX 与 MIN:
select max(temperature), max(humidity), max(volume), max(PM10), max(pm25), max(SO2), max(NO2), max(CO), min(temperature), min(humidity), min(volume), min(PM10), min(pm25), min(SO2), min(NO2), min(CO) from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)
保存仪表盘,拖拽调整每一个数据面板大小、位置,最终获得一个视觉效果较好的数据仪表盘。仪表盘右上角能够选择时间区间、自动刷新时间,此时设备持续发送数据采集数据,仪表盘数据值会有所变更,实现了比较好的可视化效果。
至此咱们借助 EMQ X + TDengine 完成了物联网数据传输、存储、展示整个流程的系统搭建,读者能够了解到 EMQ X 丰富的拓展能力与 TDengine 完备的大数据平台特性在物联网数据采集中的应用。深刻学习掌握 Grafana 的其余功能后,用户能够定制出更完善的数据可视化乃至监控告警系统。
版权声明: 本文为 EMQ 原创,转载请注明出处。