本文以一个常见的物联网使用场景为案例,介绍了如何利用边缘计算来实现对业务的快速、低成本和有效地处理。linux
在各种物联网项目中,好比智能楼宇项目,须要将楼宇的数据(好比电梯、燃气、水电等)进行采集和分析。一种解决方案是将全部的设备直接接入在云端的物联网平台,相似于像 Azure IoT Hub 或者 AWS IoT Hub。这种解决方案的问题在于,git
为了解决以上的问题,业界提出了边缘计算的方案,边缘计算的核心就在于把数据进行就近处理,避免没必要要的时延、成本和安全问题。github
假设现有一组设备,组中的每一个设备有一个 id,经过 MQTT 协议往 MQTT 消息服务器上相应的主题发送数据。主题的设计以下,其中 {device_id} 为设备的 id。sql
devices/{device_id}/messages
每一个设备发送的数据格式为 JSON,发送的经过该传感器采集的温度与湿度数据。docker
{ "temperature": 30, "humidity" : 20 }
如今须要实时分析数据,并提出如下的需求:对每一个设备的温度数据按照每 10 秒钟计算平均值(t_av
),而且记下 10 秒钟内的最大值 (t_max
)、最小值(t_min
) 和数据条数(t_count
),计算完毕后将这 4 个结果进行保存,如下为样例结果数据:shell
[ { "device_id" : "1", "t_av" : 25, "t_max" : 45, "t_min" : 5, "t_count" : 2 }, { "device_id" : "2", "t_av" : 25, "t_max" : 45, "t_min" : 5, "t_count" : 2 }, ... ]
以下图所示,采用边缘分析/流式数据处理的方式,在边缘端咱们采用了 EMQ X 的方案,最后将计算结果输出到 Azure 的 IoT Hub 中。数据库
写本文的时候,EMQ X Edge 的最新版本是4.0,用户能够经过 Docker 来安装和启动 EMQ X Edgejson
# docker pull emqx/emqx-edge # docker run -d --name emqx -p 1883:1883 emqx/emqx-edge:latest # docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES a348e3ac150c emqx/emqx-edge:latest "/usr/bin/docker-entr" 3 seconds ago Up 2 seconds 4369/tcp, 5369/tcp, 6369/tcp, 8080/tcp, 8083-8084/tcp, 8883/tcp, 11883/tcp, 0.0.0.0:1883->1883/tcp, 18083/tcp emqx
用户能够经过 telnet
命令来判断是否启动成功,以下所示。api
# telnet localhost 1883 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'.
安装、启动 Kuiper安全
点击这里下载最新版 Kuiper,并解压。在写本文的时候,Kuiper 最新版本为 0.0.3。
# unzip kuiper-linux-amd64-0.0.3.zip # cd kuiper # bin/server Serving Kuiper server on port 20498
若是没法启动,请查看日志文件 log/stream.log
。
Kuiper 提供了一个命令用于管理流和规则,用户能够经过在命令行窗口中敲入 bin/cli
查看有哪些子命令及其帮助。cli
命令缺省链接的是本地的 Kuiper 服务器,cli
命令也能够链接到别的 Kuiper 服务器,用户能够在 etc/client.yaml
配置文件中修改链接的 Kuiper 服务器。用户若是想了解更多关于命令行的信息,能够参考这里。
建立流定义:建立流的目的是为了定义发送到该流上的数据格式,相似于在关系数据库中定义表的结构。 Kuiper 中全部支持的数据类型,能够参考这里。
# cd kuiper # bin/cli create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="devices/+/messages")'
上述语句在 Kuiper 中建立了一个名为 demo 的流定义,包含了两个字段,分别为 temperature 和 humidity,数据源为订阅 MQTT 的主题 devices/+/messages
,这里请注意采用了通配符 +
,用于订阅不一样设备的消息。该数据源所对应的 MQTT 服务器地址在配置文件 etc/mqtt_source.yaml
中,能够根据所在的服务器地址进行配置。以下图所示,配置 servers
项目。
#Global MQTT configurations default: qos: 1 sharedsubscription: true servers: [tcp://127.0.0.1:1883]
用户能够在命令行中敲入 describe
子命令来查看刚建立好的流定义。
# bin/cli describe stream demo Connecting to 127.0.0.1:20498 Fields -------------------------------------------------------------------------------- temperature float humidity bigint FORMAT: JSON DATASOURCE: devices/+/messages
Kuiper 采用 SQL 实现业务逻辑,每10秒钟统计温度的平均值、最大值、最小值和次数,并根据设备 ID 进行分组,实现的 SQL 以下所示。
SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)
这里的 SQL 用了四个聚合函数,用于统计在10秒钟窗口期内的相关值。
avg
:平均值max
:最大值min
:最小值count
:计数另外还使用了两个基本的函数
mqtt
:消息中取出 MQTT 协议的信息,mqtt(topic)
就是取得当前取得消息的主题名称split_value
:该函数将第一个参数使用第二个参数进行分割,而后第三个参数指定下标,取得分割后的值。因此函数 split_value("devices/001/messages", "/", 1)
调用就返回001
GROUP BY
跟的是分组的字段,分别为计算字段 device_id
;时间窗口 TUMBLINGWINDOW(ss, 10)
,该时间窗口的含义为每10秒钟生成一批统计数据。
在正式写规则以前,咱们须要对规则进行调试,Kuiper 提供了 SQL 的调试工具,可让用户很是方便地对 SQL 进行调试。
bin/cli query
在出现的命令行提示符中输入前面准备好的 SQL 语句。
# bin/cli query Connecting to 127.0.0.1:20498 kuiper > SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10) query is submit successfully. kuiper >
在日志文件 log/stream.log
中,能够看到建立了一个名为 internal-kuiper_query_rule
的临时规则。
... time="2019-11-12T11:56:10+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=internal-kuiper_query_rule time="2019-11-12T11:56:10+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=internal-kuiper_query_rule
值得注意的是,这个名为 internal-kuiper_query_rule
的规则是经过 query
建立的,服务器端每5秒钟会检测一下 query
客户端是否在线,若是query
客户端发现有超过10秒钟没有反应(好比被关闭),那么这个内部建立的 internal-kuiper_query_rule
规则会被自动删除,被删除的时候在日志文件中会打印以下的信息。
... time="2019-11-12T12:04:08+08:00" level=info msg="The client seems no longer fetch the query result, stop the query now." time="2019-11-12T12:04:08+08:00" level=info msg="stop the query." time="2019-11-12T12:04:08+08:00" level=info msg="unary operator project cancelling...." rule=internal-kuiper_query_rule ...
发送测试数据
经过任何的测试工具,向 EMQ X Edge 发送如下的测试数据。笔者在测试过程当中用的是 JMeter 的 MQTT 插件,由于基于 JMeter 能够作一些比较灵活的自动数据生成,业务逻辑控制,以及大量设备的模拟等。用户也能够直接使用 mosquitto
等其它客户端进行模拟。
devices/$device_id/messages
,其中$device_id
为下面数据中的第一列{"temperature": $temperature, "humidity" : $humidity}
, 其中$temperature
和 $humidity
分别为下面数据中的第二列和第三列#device_id, temperature, humidity 1,20,30 2,31,40 1,35,50 2,20,30 1,80,90 2,45,20 1,10,90 2,12,30 1,65,35 2,55,32
咱们能够发现发送了模拟数据后,在 query
客户端命令行里在两个10秒的时间窗口里打印了两组数据。这里输出的结果条数跟用户发送数据的频率有关系,若是 Kuiper 在一个时间窗口内接受到全部的数据,那么只打印一条结果。
kuiper > [{"device_id":"1","t_av":45,"t_count":3,"t_max":80,"t_min":20},{"device_id":"2","t_av":25.5,"t_count":2,"t_max":31,"t_min":20}] [{"device_id":"2","t_av":37.333333333333336,"t_count":3,"t_max":55,"t_min":12},{"device_id":"1","t_av":37.5,"t_count":2,"t_max":65,"t_min":10}]
完成了 SQL 的调试以后,开始配置规则文件,将结果数据经过 Kuiper 的 MQTT Sink 发送到远程的 Azure IoT Hub 中。 在 Azure IoT Hub 中,用户须要先建立好如下内容,
rockydemo
,用于接入设备以下图所示,在 Azure IoT Hub 中建立完成的相关设备。
编写 Kuiper 规则文件
规则文件是一个文本文件,描述了业务处理的逻辑(前面已经调试好的 SQL 语句),以及 sink 的配置(消息处理结果的发送目的地)。链接 Azure IoT Hub 的大部分信息都已经在前文中描述,须要注意是必须设置 protocol_version
的值为 3.1.1
,而不能为 3.1
。
{ "sql": "SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \"/\", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)", "actions": [ { "log": {} }, { "mqtt": { "server": "ssl://rockydemo.azure-devices.net:8883", "topic": "devices/demo_001/messages/events/", "protocol_version": "3.1.1", "qos": 1, "clientId": "demo_001", "username": "rockydemo.azure-devices.net/demo_001/?api-version=2018-06-30", "password": "SharedAccessSignature sr=*******************" } } ] }
经过 Kuiper 命令行建立规则
# bin/cli create rule rule1 -f rule1.txt Connecting to 127.0.0.1:20498 Creating a new rule from file rule1.txt. Rule rule1 was created.
在日志文件中能够查看规则的运行链接状况,若是配置项都正确的话,应该能够看到到 Azure IoT Hub 的链接创建成功。
...... time="2019-11-12T14:30:34+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=rule1 time="2019-11-12T14:30:34+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=rule1 time="2019-11-12T14:30:35+08:00" level=info msg="The connection to server ssl://rockydemo.azure-devices.net:8883 was established successfully" rule=rule1 ......
经过命令 az iot hub monitor-events -n rockydemo
启动 Azure IoT Hub 监控,并往本地的 EMQ X Edge 上发送跟调试 SQL 语句同样的模拟数据。通过 Kuiper 处理后,相应的处理结果被发送到了 Azure IoT Hub 中。
#az iot hub monitor-events -n rockydemo Starting event monitor, use ctrl-c to stop... { "event": { "origin": "demo_001", "payload": "[{\"device_id\":\"2\",\"t_av\":32,\"t_count\":3,\"t_max\":45,\"t_min\":20},{\"device_id\":\"1\",\"t_av\":45,\"t_count\":3,\"t_max\":80,\"t_min\":20}]" } } { "event": { "origin": "demo_001", "payload": "[{\"device_id\":\"2\",\"t_av\":33.5,\"t_count\":2,\"t_max\":55,\"t_min\":12},{\"device_id\":\"1\",\"t_av\":37.5,\"t_count\":2,\"t_max\":65,\"t_min\":10}]" } }
经过本文,读者能够了解到利用 EMQ X 在边缘端的解决方案能够很是快速、灵活地开发出基于边缘数据分析的系统,实现数据低时延、低成本和安全的处理。Azure IoT 也提供了 IoT Edge 方案,与 Azure 的方案相比,
若是有兴趣了解更多关于边缘流式数据分析的内容,请参考 Kuiper 开源项目。
更多信息请访问咱们的官网 emqx.io,或关注咱们的开源项目 github.com/emqx/emqx ,详细文档请访问 官方文档。