轻量级边缘计算 EMQ X Kuiper 与 AWS IoT 集成方案

背景

本文以一个常见的物联网使用场景为案例,介绍了如何利用边缘计算来实现对业务的快速、低成本和有效地处理。html

在各种物联网项目中,好比智能楼宇项目,须要将楼宇的数据(好比电梯、燃气、水电等)进行采集和分析。一种解决方案是将全部的设备直接接入在云端的物联网平台,相似于像 AWS IoT 或者 Azure IoT Hub。这种解决方案的问题在于,前端

  • 数据处理时延较长:经过 Internet 传输和云端的处理后返回给设备,所需时间较长
  • 数据传输和存储成本:经过 Internet 传输须要带宽,对于大规模链接的物联网项目来讲,耗费的带宽会至关可观
  • 数据的安全性:有些物联网的数据会至关敏感,所有经过物联网传输的话会有风险

为了解决以上的问题,业界提出了边缘计算的方案,边缘计算的核心就在于把数据进行就近处理,避免没必要要的时延、成本和安全问题。linux

业务场景

假设现有一组设备,组中的每一个设备有一个 id,经过 MQTT 协议往 MQTT 消息服务器上相应的主题发送数据。主题的设计以下,其中 {device_id} 为设备的 id。git

devices/{device_id}/messages

每一个设备发送的数据格式为 JSON,发送的经过该传感器采集的温度与湿度数据。github

{
    "temperature": 30, 
    "humidity" : 20
}

如今须要实时分析数据,并提出如下的需求:对每一个设备的温度数据按照每 10 秒钟计算平均值(t_av),而且记下 10 秒钟内的最大值 (t_max)、最小值(t_min) 和数据条数(t_count),计算完毕后将这 4 个结果进行保存,如下为样例结果数据:sql

[
    {
        "device_id" : "1", "t_av" : 25,  "t_max" : 45, "t_min" : 5, "t_count" : 2
    },
    {
        "device_id" : "2", "t_av" : 25,  "t_max" : 45, "t_min" : 5, "t_count" : 2
    },
    ...
]

方案介绍

以下图所示,采用边缘分析/流式数据处理的方式,在边缘端咱们采用了 EMQ X 的方案,最后将计算结果输出到 AWS IoT 中。docker

emqx_aws.png

  • EMQ X Edge 能够接入各类协议类型的设备,好比 MQTT、CoAP、LwM2M 等,这样用户能够不须要关心协议适配方面的问题;另外它自己也比较轻量级,适合部署在边缘设备上
  • EMQ X Kuiper 是 EMQ 发布的基于 SQL 的轻量级边缘流式数据分析引擎,安装包只有约 7MB,很是适合于运行在边缘设备端
  • AWS IoT 提供了比较全的设备接入和数据分析的方案,此处用于云端的结果数据接入,以及应用所需的结果数据分析

实现步骤

安装 EMQ X Edge & Kuiper

  • 写本文的时候,EMQ X Edge 的最新版本是4.0,用户能够经过 Docker 来安装和启动 EMQ X Edgeshell

    # docker pull emqx/emqx-edge
    # docker run -d --name emqx -p 1883:1883  emqx/emqx-edge:latest
    # docker ps
    CONTAINER ID        IMAGE                   COMMAND                  CREATED             STATUS              PORTS                                                                                                           NAMES
    a348e3ac150c        emqx/emqx-edge:latest   "/usr/bin/docker-entr"   3 seconds ago       Up 2 seconds        4369/tcp, 5369/tcp, 6369/tcp, 8080/tcp, 8083-8084/tcp, 8883/tcp, 11883/tcp, 0.0.0.0:1883->1883/tcp, 18083/tcp   emqx

    用户能够经过 telnet 命令来判断是否启动成功,以下所示。数据库

    # telnet localhost 1883
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
  • 安装、启动 Kuiper编程

    点击这里下载最新版 Kuiper,并解压。在写本文的时候,Kuiper 最新版本为 0.0.3。

    # unzip kuiper-linux-amd64-0.0.3.zip
    # cd kuiper
    # bin/server
    Serving Kuiper server on port 20498

    若是没法启动,请查看日志文件 log/stream.log

建立流

Kuiper 提供了一个命令用于管理流和规则,用户能够经过在命令行窗口中敲入 bin/cli 查看有哪些子命令及其帮助。cli 命令缺省链接的是本地的 Kuiper 服务器,cli 命令也能够链接到别的 Kuiper 服务器,用户能够在 etc/client.yaml配置文件中修改链接的 Kuiper 服务器。用户若是想了解更多关于命令行的信息,能够参考这里

建立流定义:建立流的目的是为了定义发送到该流上的数据格式,相似于在关系数据库中定义表的结构。 Kuiper 中全部支持的数据类型,能够参考这里

# cd kuiper
# bin/cli create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="devices/+/messages")'

上述语句在 Kuiper 中建立了一个名为 demo 的流定义,包含了两个字段,分别为 temperature 和 humidity,数据源为订阅 MQTT 的主题 devices/+/messages,这里请注意采用了通配符 +,用于订阅不一样设备的消息。该数据源所对应的 MQTT 服务器地址在配置文件 etc/mqtt_source.yaml中,能够根据所在的服务器地址进行配置。以下图所示,配置 servers 项目。

#Global MQTT configurations
default:
  qos: 1
  sharedsubscription: true
  servers: [tcp://127.0.0.1:1883]

用户能够在命令行中敲入 describe 子命令来查看刚建立好的流定义。

# bin/cli describe stream demo
Connecting to 127.0.0.1:20498
Fields
--------------------------------------------------------------------------------
temperature    float
humidity    bigint

FORMAT: JSON
DATASOURCE: devices/+/messages

数据业务逻辑处理

Kuiper 采用 SQL 实现业务逻辑,每10秒钟统计温度的平均值、最大值、最小值和次数,并根据设备 ID 进行分组,实现的 SQL 以下所示。

SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)

这里的 SQL 用了四个聚合函数,用于统计在10秒钟窗口期内的相关值。

  • avg:平均值
  • max:最大值
  • min:最小值
  • count:计数

另外还使用了两个基本的函数

  • mqtt:消息中取出 MQTT 协议的信息,mqtt(topic) 就是取得当前取得消息的主题名称
  • split_value:该函数将第一个参数使用第二个参数进行分割,而后第三个参数指定下标,取得分割后的值。因此函数 split_value("devices/001/messages", "/", 1) 调用就返回001

GROUP BY 跟的是分组的字段,分别为计算字段 device_id;时间窗口 TUMBLINGWINDOW(ss, 10),该时间窗口的含义为每10秒钟生成一批统计数据。

调试 SQL

在正式写规则以前,咱们须要对规则进行调试,Kuiper 提供了 SQL 的调试工具,可让用户很是方便地对 SQL 进行调试。

  • 进入 kuiper 安装目录,并运行 bin/cli query
  • 在出现的命令行提示符中输入前面准备好的 SQL 语句。

    # bin/cli query
    Connecting to 127.0.0.1:20498
    kuiper > SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)
    query is submit successfully.
    kuiper >

    在日志文件 log/stream.log 中,能够看到建立了一个名为 internal-kuiper_query_rule 的临时规则。

    ...
    time="2019-11-12T11:56:10+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=internal-kuiper_query_rule
    time="2019-11-12T11:56:10+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=internal-kuiper_query_rule

    值得注意的是,这个名为 internal-kuiper_query_rule 的规则是经过 query 建立的,服务器端每5秒钟会检测一下 query 客户端是否在线,若是query 客户端发现有超过10秒钟没有反应(好比被关闭),那么这个内部建立的 internal-kuiper_query_rule 规则会被自动删除,被删除的时候在日志文件中会打印以下的信息。

    ...
    time="2019-11-12T12:04:08+08:00" level=info msg="The client seems no longer fetch the query result, stop the query now."
    time="2019-11-12T12:04:08+08:00" level=info msg="stop the query."
    time="2019-11-12T12:04:08+08:00" level=info msg="unary operator project cancelling...." rule=internal-kuiper_query_rule
    ...
  • 发送测试数据

    经过任何的测试工具,向 EMQ X Edge 发送如下的测试数据。笔者在测试过程当中用的是 JMeter 的 MQTT 插件,由于基于 JMeter 能够作一些比较灵活的自动数据生成,业务逻辑控制,以及大量设备的模拟等。用户也能够直接使用 mosquitto 等其它客户端进行模拟。

    • 主题:devices/$device_id/messages,其中$device_id 为下面数据中的第一列
    • 消息:{"temperature": $temperature, "humidity" : $humidity}, 其中$temperature$humidity 分别为下面数据中的第二列和第三列
    #device_id, temperature, humidity
    1,20,30
    2,31,40
    1,35,50
    2,20,30
    1,80,90
    2,45,20
    1,10,90
    2,12,30
    1,65,35
    2,55,32

咱们能够发现发送了模拟数据后,在 query 客户端命令行里在两个10秒的时间窗口里打印了两组数据。这里输出的结果条数跟用户发送数据的频率有关系,若是 Kuiper 在一个时间窗口内接受到全部的数据,那么只打印一条结果。

kuiper > [{"device_id":"1","t_av":45,"t_count":3,"t_max":80,"t_min":20},{"device_id":"2","t_av":25.5,"t_count":2,"t_max":31,"t_min":20}]

[{"device_id":"2","t_av":37.333333333333336,"t_count":3,"t_max":55,"t_min":12},{"device_id":"1","t_av":37.5,"t_count":2,"t_max":65,"t_min":10}]

建立、提交规则

完成了 SQL 的调试以后,开始配置规则文件,将结果数据经过 Kuiper 的 MQTT Sink 发送到远程的 AWS IoT 中。 在 AWS IoT 中,用户须要先建立好如下内容,

  • 设备:表明处理设备数据的网关,该网关安装了 Kuiper,网关在把相关相关数据处理完毕后,将结果发送到 AWS 云端。此处建立的名称为 demo,以下图所示。

aws_device.png

  • 设备链接证书与密钥:AWS 的物联网设备经过证书来链接,保证其安全性。在建立设备的过程当中,AWS会生成的如下三个文件。这里会用到的是证书与私钥。

    • 证书:相似于 d3807d9fa5-certificate.pem
    • 私钥:相似于 d3807d9fa5-private.pem.key
    • 公钥:相似于 d3807d9fa5-public.pem.key

关于这方面更多的信息,请参考 AWS 建立设备的文档

编写 Kuiper 规则文件

规则文件是一个文本文件,描述了业务处理的逻辑(前面已经调试好的 SQL 语句),以及 sink 的配置(消息处理结果的发送目的地)。链接 AWS IoT 的大部分信息都已经在前文中描述。 Kuiper 的测试结果将被发送到 AWS IoT设备的 devices/result 主题下。

{
  "sql": "SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \"/\", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)",
  "actions": [
    {
      "log": {}
    },
    {
      "mqtt": {
        "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
        "topic": "devices/result",
        "qos": 1,
        "clientId": "demo_001",
        "certificationPath": "/var/aws/d3807d9fa5-certificate.pem",
        "privateKeyPath": "/var/aws/d3807d9fa5-private.pem.key"
      }
    }
  ]
}

经过 Kuiper 命令行建立规则

# bin/cli create rule rule1 -f rule1.txt
Connecting to 127.0.0.1:20498
Creating a new rule from file rule1.txt. 
Rule rule1 was created.

在日志文件中能够查看规则的运行链接状况,若是配置项都正确的话,应该能够看到到 AWS IoT 的链接创建成功。

......
time="2019-11-13T17:41:19+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=rule1
time="2019-11-13T17:41:19+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=rule1
time="2019-11-13T17:41:20+08:00" level=info msg="The connection to server ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883 was established successfully" rule=rule1
......
  • 经过 AWS IoT 提供的 MQTT Client 工具,订阅设备的devices/result主题。并往本地的 EMQ X Edge 上发送模拟数据。通过 Kuiper 处理后,相应的处理结果被发送到了 AWS IoT 中。以下图所示,收到了两次测试结果(第一次结果被折叠)。

aws_iot_result.png

用户能够经过 AWS IoT Rule 将分析结果存储到 Amazon DynamoDB 数据库或者其它服务中,前端的应用程序能够经过读取 DynamoDB 中的数据来呈现给终端用户,具体请参考 Amazon DynamoDB 文档

总结

经过本文,读者能够了解到利用 EMQ X 在边缘端的解决方案能够很是快速、灵活地开发出基于边缘数据分析的系统,实现数据低时延、低成本和安全的处理。

AWS IoT 也提供了 Greengrass 的边缘解决方案,与 AWS Greengrass 相比,Kuiper 方案更加轻量级,业务逻辑的实现方式(基于 SQL)在编程上也相对更加简单。AWS Greengrass 提供了基于 Lambada 的编程模型,经过提供不一样编程语言的 SDK 来实现边缘端的数据分析,以及往 AWS IoT 的分析结果上传,所以在业务逻辑实现的灵活性上会更好。最后,与 Greengrass 只能链接 AWS IoT 不一样,Kuiper 在与不一样的第三方 IoT 平台的集成的灵活性上也更好。

若是有兴趣了解更多关于边缘流式数据分析的内容,请参考 Kuiper 开源项目


更多信息请访问咱们的官网 emqx.io,或关注咱们的开源项目 github.com/emqx/emqx ,详细文档请访问 官方文档

相关文章
相关标签/搜索