EMQ X 规则引擎系列(十一)存储消息到 PostgreSQL 数据库

画板 17@2x.png

PostgreSQL 数据库介绍

做为开源关系数据库重要一员,PostgreSQL 标榜本身是世界上最早进的开源数据库,相比于其余开源关系数据库如 MySQL,PostgreSQL 是彻底由社区驱动的开源项目,由全世界超过 1000 名贡献者所维护。PostgreSQL 提供了单个完整功能的版本,而不像 MySQL 那样提供了多个不一样的社区版、商业版与企业版。PostgreSQL 基于自由的 BSD/MIT 许可,组织可使用、复制、修改和从新分发代码,只须要提供一个版权声明便可。node

PostgreSQL 具备诸多特性,在 GIS 领域有较多支持,其“无锁定”特性很是突出,支持函数和条件索引,有成熟的集群方案。PostgreSQL 还具有及其强悍的 SQL 编程能力如统计函数和统计语法支持,经过 Timescaledb 插件,PostgreSQL 能够转变为功能完备的时序数据库 Timescaledb 。sql

场景介绍

该场景须要将 EMQ X 指定主题下且知足条件的消息存储到 PostgreSQL 数据库。为了便于后续分析检索,消息内容须要进行拆分存储。docker

该场景下客户端上报数据以下:shell

  • Topic:testtopic
  • Payload:数据库

    {"msg":"Hello, World!"}

准备工做

建立数据库

建立 tutorial 数据库,用户名为 postgres,密码为 password:编程

$ docker pull postgres

$ docker run --rm --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=password -d postgres:latest

$ docker exec -it postgres psql -U postgres

> CREATE database tutorial;

> \c tutorial

建立数据表

建立 t_mqtt_msg 表:json

CREATE TABLE t_mqtt_msg (
  id SERIAL primary key,
  msgid character varying(64),
  sender character varying(64),
  topic character varying(255),
  qos integer,
  retain integer,
  payload text,
  arrived timestamp without time zone
);

配置说明

建立资源

打开 EMQ X Dashboard,进入左侧菜单的 资源 页面,点击 新建 按钮,选择 PostgreSQL 资源类型并完成相关配置进行资源建立。socket

image-20190725142933513.png

建立规则

进入左侧菜单的 规则 页面,点击 新建 按钮,进行规则建立。这里选择触发事件 message.publish,即在 EMQ X 收到 PUBLISH 消息时触发该规则进行数据处理。函数

选定触发事件后,咱们可在界面上看到可选字段及示例 SQL:工具

image-20190719112141128.png

筛选所需字段

规则引擎使用 SQL 语句过滤和处理数据。这里咱们须要 msgid, topic, payload 等数据,同时但愿匹配全部主题的消息,所以仅须要在默认 SQL 基础上删除 WHERE 子句便可,最终咱们获得 SQL 以下:

SELECT
  *
FROM
  "message.publish"

SQL 测试

借助 SQL 测试功能,咱们能够快速确认刚刚填写的 SQL 语句可否达成咱们的目的。首先填写用于测试的 payload 等数据以下:

image-20190725145617081.png

而后点击 测试 按钮,咱们获得如下数据输出:

{
  "client_id": "c_emqx",
  "event": "message.publish",
  "id": "589A429E9572FB44B0000057C0001",
  "node": "emqx@127.0.0.1",
  "payload": "{\"msg\":\"Hello, World!\"}",
  "peername": "127.0.0.1:50891",
  "qos": 1,
  "retain": 0,
  "timestamp": 1564037750692,
  "topic": "testtopic",
  "username": "u_emqx"
}

测试输出包含了全部须要的数据,咱们能够进行后续步骤。

添加响应动做,存储消息到 PostgreSQL

SQL 条件输入输出无误后,咱们继续添加相应动做,配置写入 SQL 语句,将筛选结果存储到 PostgreSQL。

点击响应动做中的 添加 按钮,选择 保存数据到 PostgreSQL 动做,选取刚刚建立的 PostgreSQL 资源并填写 SQL 模板以下:

insert into t_mqtt_msg(msgid, topic, qos, retain, payload, arrived) values (${id}, ${topic}, ${qos}, ${retain}, ${payload}, to_timestamp(${timestamp}::double precision /1000)) returning id

最后点击 新建 按钮完成规则建立。

image-20190725144256942.png

测试

预期结果

咱们成功建立了一条规则,包含一个处理动做,动做指望效果以下:

  1. 客户端上报消息时,该消息将命中 SQL,规则列表中 已命中 数字增长 1;
  2. PostgreSQL tutorial 数据库的 t_mqtt_msg 表中将增长一条数据,数据内容与消息内容一致。

使用 Dashboard 中的 Websocket 工具测试

切换到 工具 --> Websocket 页面,使用任意信息客户端链接到 EMQ X,链接成功后在 消息 卡片中发送以下消息:

  • Topic:testtopic
  • Payload:

    {"msg":"Hello, World!"}

image-20190725145805279.png

点击 发送 按钮,发送成功后能够看到当前规则已命中次数已经变为了 1。

而后检查 PostgreSQL,新的 data point 是否添加成功:

image-20190725145107685.png

至此,咱们经过规则引擎实现了使用规则引擎存储消息到 PostgreSQL 数据库的业务开发。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文连接:https://www.emqx.io/cn/blog/e...

相关文章
相关标签/搜索