IoT Simulator(publisher)----> MQTT broker---->Telegraf(subscriber)---->InfluxDB---->Hosted Grafana(Cloud)
数据来源在IoT Case下通常来自各个传感设备。 由于身边没有可用的传感器设备,因而在github上搜了个小工具来模拟数据发射器。该工具可输出自定义的json格式数据,而且支持MQTT,HTTP(s),Azure IoT hub, Kafka等主流协议/工具,应用范围和场景普遍是我选择该工具的主要缘由。
惟一的缺点是输出的json默认为object, 不支持对array of object json的扩展,在API config的时候可能会遇到一些工具只能识别array of json object 的状况(好比power bi的rest api)。
若是你的case中有使用真实的iot设备和通信协议可自动忽略此part。
安装和配置相关请参照readme
在mySimConfigjson中对MQTT进行配置,语法参见[配置中使用的MQTT]html
{ "type": "mqtt", "broker.server": "tcp://localhost", "broker.port": 1883, "topic": "sensors/iot_simulator", "clientId": "iot_simulator", "qos": 2 }
使用如下命令开始模拟数据 java -jar json-data-generator-1.3.1-SNAPSHOT.jar mySimConfig.json
前端
MQTT为MQ Telemetry Transport的缩写,该协议定义了在机器对机器或物联网环境下的通讯规则。它采用发布/订阅的模式传输数据,设计思想是在尽力保证必定程度的数据可达性及稳定性的同时,减小对网络带宽和设备资源的依赖。MQTT协议简洁且轻量,适用于低带宽,高延迟或不稳定的网络环境中的设备,同时也适用于带宽和电源受限的移动应用.
ref:http://mqtt.org/faqjava
在使用Power BI做为可视化方案时,曾成功使用REST API直接将数据源经过HTTP推送到POWER BI所提供的endpoint。所以在架构这套服务之初,我并无打算使用MQTT做为数据传输协议。但通过一番研究后,在个人案例当中,发现使用HTTP协议有如下局限性:node
ref:https://docs.influxdata.com/t...git
MQTT client1(sub)--->MQTT broker<----MQTT Client2(pub) (message center) ^ | MQTT client2(pub)
two wildcards: # and +
"+" for a single level of hierarchy,+/+/+/HARDDRIVE_NAME表示了包含上述例子的一个父集
"#" for all remaining levels of hierarchy,sensors/COMPUTER_NAME/temperature/# 表示了包含上述例子的一个父集程序员
0: The broker/client will deliver the message once, with no confirmation.
1: The broker/client will deliver the message at least once, with confirmation required.
2: The broker/client will deliver the message exactly once by using a four-step handshake. github
消息订阅者容许在消息发布者制定的QoS级别上进行降级;例如mqtt_pub规定qos=2, 则mqtt_sub可使qos=2 or 1 or 0;
ref: https://mosquitto.org/man/mqt...数据库
step 1: 安装并解压telegraf (若是没有wget请自行下载(恕在下直言,windows简直就是个辣鸡))
step 2: 修改配置文件telegraf.conf(主要配置input,output&processor plugins)
配置主要参见:[InfluxDB HTTP API和Hosted Grafana HTTPS 通信的冲突问题]及[配置中使用的MQTT]
processor plugin的功能主要是打印从mqtt broker订阅的数据并显示在console中json
[[outputs.influxdb]] urls = ["https://localhost:8086"] # required # The target database for metrics (telegraf will create it if not exists) database = "telegraf" # required precision = "s" ## Name of existing retention policy to write to. Empty string writes to ## the default retention policy. retention_policy = "" ## Write consistency (clusters only), can be: "any", "one", "quorum", "all" write_consistency = "any" ## Write timeout (for the InfluxDB client), formatted as a string. ## If not provided, will default to 5s. 0s means no timeout (not recommended). timeout = "5s" [[inputs.mqtt_consumer]] ## MQTT broker URLs to be used. The format should be scheme://host:port, ## schema can be tcp, ssl, or ws. servers = ["tcp://localhost:1883"] ## MQTT QoS, must be 0, 1, or 2 qos = 2 ## Connection timeout for initial connection in seconds connection_timeout = "30s" ## Topics to subscribe to topics = [ "sensors/iot_simulator" ] # if true, messages that can't be delivered while the subscriber is offline # will be delivered when it comes back (such as on service restart). # NOTE: if true, client_id MUST be set persistent_session = false # If empty, a random client ID will be generated. client_id = "" ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" # ssl_key = "/etc/telegraf/key.pem" ## Use SSL but skip chain & host verification insecure_skip_verify = true ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "json" ## List of tag names to extract from top-level of JSON server response tag_keys = [ "equippment name", "timestamp" ] [[inputs.exec]] ## Commands array commands = [] #"/tmp/test.sh", "/usr/bin/mycollector --foo=bar" ## measurement name suffix (for separating different commands) name_suffix = "mqtt_consumer" ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "json" [[processors.printer]]
step 3: 运行telegraf,运行前先开启数据模拟发射器和MQTT broker确保influxdb能订阅到稳定的数据流,不然influxdb有可能会报错监听不到数据写入。 to\your\dir: telegraf --config telegraf.conf
因为配置了printer plugin,在telegraf正常运行的状况下能够看到数据流打印在console中
step 4: 检查数据是否已写入数据库
ref: https://docs.influxdata.com/t...windows
influxDB做为数据和终端可视化工具之间的桥梁,角色尤其重要。influxDB做为一个time-series database很是适合实时IoT数据的存储。 配置influxdb的过程较为简单,主要解决的问题集中在从http到https协议转换问题。
step 1: 按照官网文档下载并解压influxdb
step 2: 运行influxdb(若是不须要修改任何influxdb的config文件) to\your\dir:influxd
Influx DB默认采用HTTP协议进行Client和Server端的通讯,而云端的Grafana服务则强制采用HTTPS确保数据传输的安全性。 众所周知,HTTPS协议是HTTP协议的安全版本,其安全性能的实现主要依靠在Transport Layer之上增长的TLS/SSL层实现文本及数据的加密。HTTPS与HTTP一个重要的区别在于HTTPS增长了对身份的验证功能,所以第三方没法伪造服务端或客户端身份,引入的证书认证机制就是用来确保这一功能的实现。
为了确保网络间通信的安全,我将InfluxDB的接口也进行了相关配置,让其利用TLS层使用HTTPS协议进行数据的传输。
在配置过程当中我使用的证书是self-signed-certificate,使用windows系统的配置过程稍有不一样(windows真是伤不起,连个配置说明都没有)
step 1:使用openssl(没有的朋友要安装一下)生成证书和密钥;sudo openssl req -x509 -nodes -newkey rsa:2048 -keyout /route/to/your/dir/influxdb-selfsigned.key -out /route/to/your/route/influxdb-selfsigned.crt -days <NUMBER_OF_DAYS>
进入文件所在目录,双击证书文件并安装证书(注意!要安装在受信任的根目录下)
step 2:配置influxDB;
和官方文档一致,点[这里](https://docs.influxdata.com/i...
)
step 3:重启influxdb /influxdb/folder: influxd --config influxdb.conf
step 4:测试
和官方文档一致,点这里
step 5:若是有使用telegraf,记得要将telegraf中output plugin的相关API也改为https!
这一步也是卡了好久,grafana的错误提示基本形同虚设,最好inpect一下页面看看dev tool的错误提示。(这点是否是太不程序员友好了,疯狂diss )
在没有使用https以前grafana报错(谁能知道这个undefined是什么鬼意思!!
inspect后终于在console看到了错误详细状况:
使用以后!Bang!
注意此处不要选择proxy模式,让grafana的后端服务代理请求,这样处于本地服务器的influxdb没法接受到grafana的请求;
最后一个折腾了我好久才获得的一个很粗略的demo。