本文以一个常见的物联网使用场景为案例,介绍了如何利用边缘计算来实现对业务的快速、低成本和有效地处理。html
在各种物联网项目中,好比智能楼宇项目,须要将楼宇的数据(好比电梯、燃气、水电等)进行采集和分析。一种解决方案是将全部的设备直接接入在云端的物联网平台,相似于像 AWS IoT 或者 Azure IoT Hub。这种解决方案的问题在于,前端
为了解决以上的问题,业界提出了边缘计算的方案,边缘计算的核心就在于把数据进行就近处理,避免没必要要的时延、成本和安全问题。linux
假设现有一组设备,组中的每一个设备有一个 id,经过 MQTT 协议往 MQTT 消息服务器上相应的主题发送数据。主题的设计以下,其中 {device_id} 为设备的 id。git
devices/{device_id}/messages
每一个设备发送的数据格式为 JSON,发送的经过该传感器采集的温度与湿度数据。github
{ "temperature": 30, "humidity" : 20 }
如今须要实时分析数据,并提出如下的需求:对每一个设备的温度数据按照每 10 秒钟计算平均值(t_av
),而且记下 10 秒钟内的最大值 (t_max
)、最小值(t_min
) 和数据条数(t_count
),计算完毕后将这 4 个结果进行保存,如下为样例结果数据:sql
[ { "device_id" : "1", "t_av" : 25, "t_max" : 45, "t_min" : 5, "t_count" : 2 }, { "device_id" : "2", "t_av" : 25, "t_max" : 45, "t_min" : 5, "t_count" : 2 }, ... ]
以下图所示,采用边缘分析/流式数据处理的方式,在边缘端咱们采用了 EMQ X 的方案,最后将计算结果输出到 AWS IoT 中。docker
写本文的时候,EMQ X Edge 的最新版本是4.0,用户能够经过 Docker 来安装和启动 EMQ X Edgeshell
# docker pull emqx/emqx-edge # docker run -d --name emqx -p 1883:1883 emqx/emqx-edge:latest # docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES a348e3ac150c emqx/emqx-edge:latest "/usr/bin/docker-entr" 3 seconds ago Up 2 seconds 4369/tcp, 5369/tcp, 6369/tcp, 8080/tcp, 8083-8084/tcp, 8883/tcp, 11883/tcp, 0.0.0.0:1883->1883/tcp, 18083/tcp emqx
用户能够经过 telnet
命令来判断是否启动成功,以下所示。数据库
# telnet localhost 1883 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'.
安装、启动 Kuiper编程
点击这里下载最新版 Kuiper,并解压。在写本文的时候,Kuiper 最新版本为 0.0.3。
# unzip kuiper-linux-amd64-0.0.3.zip # cd kuiper # bin/server Serving Kuiper server on port 20498
若是没法启动,请查看日志文件 log/stream.log
。
Kuiper 提供了一个命令用于管理流和规则,用户能够经过在命令行窗口中敲入 bin/cli
查看有哪些子命令及其帮助。cli
命令缺省链接的是本地的 Kuiper 服务器,cli
命令也能够链接到别的 Kuiper 服务器,用户能够在 etc/client.yaml
配置文件中修改链接的 Kuiper 服务器。用户若是想了解更多关于命令行的信息,能够参考这里。
建立流定义:建立流的目的是为了定义发送到该流上的数据格式,相似于在关系数据库中定义表的结构。 Kuiper 中全部支持的数据类型,能够参考这里。
# cd kuiper # bin/cli create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="devices/+/messages")'
上述语句在 Kuiper 中建立了一个名为 demo 的流定义,包含了两个字段,分别为 temperature 和 humidity,数据源为订阅 MQTT 的主题 devices/+/messages
,这里请注意采用了通配符 +
,用于订阅不一样设备的消息。该数据源所对应的 MQTT 服务器地址在配置文件 etc/mqtt_source.yaml
中,能够根据所在的服务器地址进行配置。以下图所示,配置 servers
项目。
#Global MQTT configurations default: qos: 1 sharedsubscription: true servers: [tcp://127.0.0.1:1883]
用户能够在命令行中敲入 describe
子命令来查看刚建立好的流定义。
# bin/cli describe stream demo Connecting to 127.0.0.1:20498 Fields -------------------------------------------------------------------------------- temperature float humidity bigint FORMAT: JSON DATASOURCE: devices/+/messages
Kuiper 采用 SQL 实现业务逻辑,每10秒钟统计温度的平均值、最大值、最小值和次数,并根据设备 ID 进行分组,实现的 SQL 以下所示。
SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)
这里的 SQL 用了四个聚合函数,用于统计在10秒钟窗口期内的相关值。
avg
:平均值max
:最大值min
:最小值count
:计数另外还使用了两个基本的函数
mqtt
:消息中取出 MQTT 协议的信息,mqtt(topic)
就是取得当前取得消息的主题名称split_value
:该函数将第一个参数使用第二个参数进行分割,而后第三个参数指定下标,取得分割后的值。因此函数 split_value("devices/001/messages", "/", 1)
调用就返回001
GROUP BY
跟的是分组的字段,分别为计算字段 device_id
;时间窗口 TUMBLINGWINDOW(ss, 10)
,该时间窗口的含义为每10秒钟生成一批统计数据。
在正式写规则以前,咱们须要对规则进行调试,Kuiper 提供了 SQL 的调试工具,可让用户很是方便地对 SQL 进行调试。
bin/cli query
在出现的命令行提示符中输入前面准备好的 SQL 语句。
# bin/cli query Connecting to 127.0.0.1:20498 kuiper > SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10) query is submit successfully. kuiper >
在日志文件 log/stream.log
中,能够看到建立了一个名为 internal-kuiper_query_rule
的临时规则。
... time="2019-11-12T11:56:10+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=internal-kuiper_query_rule time="2019-11-12T11:56:10+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=internal-kuiper_query_rule
值得注意的是,这个名为 internal-kuiper_query_rule
的规则是经过 query
建立的,服务器端每5秒钟会检测一下 query
客户端是否在线,若是query
客户端发现有超过10秒钟没有反应(好比被关闭),那么这个内部建立的 internal-kuiper_query_rule
规则会被自动删除,被删除的时候在日志文件中会打印以下的信息。
... time="2019-11-12T12:04:08+08:00" level=info msg="The client seems no longer fetch the query result, stop the query now." time="2019-11-12T12:04:08+08:00" level=info msg="stop the query." time="2019-11-12T12:04:08+08:00" level=info msg="unary operator project cancelling...." rule=internal-kuiper_query_rule ...
发送测试数据
经过任何的测试工具,向 EMQ X Edge 发送如下的测试数据。笔者在测试过程当中用的是 JMeter 的 MQTT 插件,由于基于 JMeter 能够作一些比较灵活的自动数据生成,业务逻辑控制,以及大量设备的模拟等。用户也能够直接使用 mosquitto
等其它客户端进行模拟。
devices/$device_id/messages
,其中$device_id
为下面数据中的第一列{"temperature": $temperature, "humidity" : $humidity}
, 其中$temperature
和 $humidity
分别为下面数据中的第二列和第三列#device_id, temperature, humidity 1,20,30 2,31,40 1,35,50 2,20,30 1,80,90 2,45,20 1,10,90 2,12,30 1,65,35 2,55,32
咱们能够发现发送了模拟数据后,在 query
客户端命令行里在两个10秒的时间窗口里打印了两组数据。这里输出的结果条数跟用户发送数据的频率有关系,若是 Kuiper 在一个时间窗口内接受到全部的数据,那么只打印一条结果。
kuiper > [{"device_id":"1","t_av":45,"t_count":3,"t_max":80,"t_min":20},{"device_id":"2","t_av":25.5,"t_count":2,"t_max":31,"t_min":20}] [{"device_id":"2","t_av":37.333333333333336,"t_count":3,"t_max":55,"t_min":12},{"device_id":"1","t_av":37.5,"t_count":2,"t_max":65,"t_min":10}]
完成了 SQL 的调试以后,开始配置规则文件,将结果数据经过 Kuiper 的 MQTT Sink 发送到远程的 AWS IoT 中。 在 AWS IoT 中,用户须要先建立好如下内容,
设备链接证书与密钥:AWS 的物联网设备经过证书来链接,保证其安全性。在建立设备的过程当中,AWS会生成的如下三个文件。这里会用到的是证书与私钥。
d3807d9fa5-certificate.pem
d3807d9fa5-private.pem.key
d3807d9fa5-public.pem.key
关于这方面更多的信息,请参考 AWS 建立设备的文档。
编写 Kuiper 规则文件
规则文件是一个文本文件,描述了业务处理的逻辑(前面已经调试好的 SQL 语句),以及 sink 的配置(消息处理结果的发送目的地)。链接 AWS IoT 的大部分信息都已经在前文中描述。 Kuiper 的测试结果将被发送到 AWS IoT设备的 devices/result
主题下。
{ "sql": "SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \"/\", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)", "actions": [ { "log": {} }, { "mqtt": { "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883", "topic": "devices/result", "qos": 1, "clientId": "demo_001", "certificationPath": "/var/aws/d3807d9fa5-certificate.pem", "privateKeyPath": "/var/aws/d3807d9fa5-private.pem.key" } } ] }
经过 Kuiper 命令行建立规则
# bin/cli create rule rule1 -f rule1.txt Connecting to 127.0.0.1:20498 Creating a new rule from file rule1.txt. Rule rule1 was created.
在日志文件中能够查看规则的运行链接状况,若是配置项都正确的话,应该能够看到到 AWS IoT 的链接创建成功。
...... time="2019-11-13T17:41:19+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=rule1 time="2019-11-13T17:41:19+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=rule1 time="2019-11-13T17:41:20+08:00" level=info msg="The connection to server ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883 was established successfully" rule=rule1 ......
devices/result
主题。并往本地的 EMQ X Edge 上发送模拟数据。通过 Kuiper 处理后,相应的处理结果被发送到了 AWS IoT 中。以下图所示,收到了两次测试结果(第一次结果被折叠)。用户能够经过 AWS IoT Rule 将分析结果存储到 Amazon DynamoDB 数据库或者其它服务中,前端的应用程序能够经过读取 DynamoDB 中的数据来呈现给终端用户,具体请参考 Amazon DynamoDB 文档。
经过本文,读者能够了解到利用 EMQ X 在边缘端的解决方案能够很是快速、灵活地开发出基于边缘数据分析的系统,实现数据低时延、低成本和安全的处理。
AWS IoT 也提供了 Greengrass 的边缘解决方案,与 AWS Greengrass 相比,Kuiper 方案更加轻量级,业务逻辑的实现方式(基于 SQL)在编程上也相对更加简单。AWS Greengrass 提供了基于 Lambada 的编程模型,经过提供不一样编程语言的 SDK 来实现边缘端的数据分析,以及往 AWS IoT 的分析结果上传,所以在业务逻辑实现的灵活性上会更好。最后,与 Greengrass 只能链接 AWS IoT 不一样,Kuiper 在与不一样的第三方 IoT 平台的集成的灵活性上也更好。
若是有兴趣了解更多关于边缘流式数据分析的内容,请参考 Kuiper 开源项目。
更多信息请访问咱们的官网 emqx.io,或关注咱们的开源项目 github.com/emqx/emqx ,详细文档请访问 官方文档。