前面文章分享了如何安装kuiper和kuiper-manager,本篇文章经过一个完整的例子来演示kuiper的一个比较完整的流式计算。html
下图仍旧使用了kuiper官网文档中的图,我在里面稍微加了一些注释:sql
kuiper的流式计算建立于操做分为以下几个步骤:docker
1)docker命令行方式json
# 进入kuiper容器
docker exec -it kuiper /bin/bash
# 建立一个流,定义了temperature和humidity这两个字段,后面会对应mqtt消息(payload)的两个字段,其中DataSource能够理解为订阅的topic
bin/kuiper create stream demo2 '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="demo2")'
2) rest方式bash
#-d指定了stream的具体数据 curl -H "Content-Type: application/json" -X POST -d '{"sql":"create stream demo2 (temperature float, humidity bigint) WITH ( datasource = \"demo2\",FORMAT = \"json\")"}' http://localhost:9081/streams
1)docker命令行app
#编写rule规则文件myRule,过滤temperature>30的数据,并输出到log里面(使用了kuiper的log插件),内容以下 { "sql": "SELECT temperature from demo2 where temperature > 30", "actions": [{ "log": {} }] } #命令行建立规则,指定ruleid为ruleDemo bin/kuiper create rule ruleDemo -f myRule
2) rest方式curl
curl -H "Content-Type: application/json" -X POST -d '{"id":"ruleDemo","sql":"SELECT temperature from demo2 where temperature > 30","actions":[{"log":{}}]}' http://localhost:9081/rules
3)发送mqtt消息给emqxide
#给emqx(192.168.200.2)发送{"temperature": 40, "humidity" : 20},指定topic为demo2 mosquitto_pub -h 192.168.200.2 -m '{"temperature": 40, "humidity" : 20}' -t demo2
4)能够在kuiper的日志中看到规则过滤后的数据工具
以上就是kuiper流式计算的例子。总结一下:kuiper运行在某个边缘设备(这里是一台虚机)上,订阅了emqx的topic:demo2,当有{"temperature": 40, "humidity" : 20}这样的数据上报的时候,kuiper的规则引擎会经过sql将数据输出到log中。测试
博主:测试生财
座右铭:专一测试与自动化,致力提升研发效能;经过测试精进完成原始积累,经过读书理财奔向财务自由。
csdn:https://blog.csdn.net/ccgshigao