最近,本实验室大量上马云测量,云监控方面的项目,大概是属于物联网应用的一个分支。老板也有将旧有仪器改造的想法,因此要实现仪器设备的云控制。本文是其中的一个解决方案。javascript
MQTT
,服务器使用centos
,安装mosquitto
C#
,窗体框架使用WPF
,MQTT的客户端使用MQTTNet
spring-cloud
微服务框架Vue
,使用Element-admin-ui
后台框架,使用MQTTJS
组件(MQTTJS
采用websocket
链接方式)TPL Dataflow
这个解决方案加入如图(请原谅个人懒惰):html
以上只是本demo的架构,很简单,可是也有很大的问题。前端
客户端跟随仪器,原则上一台仪器一个控制程序,固然也能够有多个。客户端实现了对仪器全部硬件设备的控制,对全部数据的采集。客户端能够有界面,也能够没有界面,通常来讲,咱们是须要一个界面的,客户端能够独立完成测量任务。java
客户端集成了网络通讯功能,能够彻底替代用户对客户端的操做使用。具体架构以下(请原谅个人懒惰):node
其中,客户端界面和网络接口是等效的,能够单独控制,也能够共同控制。git
服务端基于spring-cloud
微服务框架,主要提供服务发现,用户管理,权限管理,设备管理,MQTT节点管理等管理功能github
前端网页是用户经过网络操做仪器设备的交互接口。采用日前流行的Vue
框架,因为是后台管理模式,就使用Element-admin-ui
这个框架。web
客户端: https://github.com/spartajet/IotWpfClient
服务端:https://github.com/spartajet/iot-demo-server
前端网页:https://github.com/spartajet/iot-demo-web算法
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通信协议,有可能成为物联网的重要组成部分。该协议支持全部平台,几乎能够把全部联网物品和外部链接起来,被用来当作传感器和制动器(好比经过Twitter让房屋联网)的通讯协议。spring
MQTT协议是为大量计算能力有限,且工做在低带宽、不可靠的网络的远程传感器和控制设备通信而设计的协议,它具备如下主要的几项特性:
一、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
二、对负载内容屏蔽的消息传输;
三、使用 TCP/IP 提供网络链接;
四、有三种消息发布服务质量:
“至多一次”,消息发布彻底依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于以下状况,环境传感器数据,丢失一次读记录无所谓,由于不久后还会有第二次发送。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。这一级别可用于以下状况,在计费系统中,消息重复或丢失会致使不正确的结果。
五、小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以下降网络流量;
六、使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制;
详细的MQTT协议内容请参考:https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/01-Introduction.html
本文使用开源的MQTT
服务器mosquitto
,介绍以下:
Eclipse Mosquitto is an open source (EPL/EDL licensed) message broker that implements the MQTT protocol versions 3.1 and 3.1.1. Mosquitto is lightweight and is suitable for use on all devices from low power single board computers to full servers.The MQTT protocol provides a lightweight method of carrying out messaging using a publish/subscribe model. This makes it suitable for Internet of Things messaging such as with low power sensors or mobile devices such as phones, embedded computers or microcontrollers.
CentOS 7没有mosquitto包。要安装它,咱们将首先安装一个名为Extra Packages for Enterprise Linux或EPEL的额外软件存储库。
sudo yum -y install epel-release
而后安装
sudo yum -y install mosquitto
直接brew开路:
brew install mosquitto
配置文件mosquitto.conf
路径:
mac :/usr/local/etc/mosquitto/mosquitto.conf
centos:/etc/mosquitto/mosquitto.conf
bind_address 127.0.0.1 port 1883 protocol mqtt
bind_address能够不配置,
websocket
支持:listener 1884 protocol websockets
首先,要禁止匿名登陆
allow_anonymous false
配置密码文件:
password_file usr/local/etc/mosquitto/pwfile
配置密码的方法:
mosquitto
提供了mosquitto_passwd
工具设置密码,使用方法以下:
➜ ~ mosquitto_passwd help mosquitto_passwd is a tool for managing password files for mosquitto. Usage: mosquitto_passwd [-c | -D] passwordfile username mosquitto_passwd -b passwordfile username password mosquitto_passwd -U passwordfile -b : run in batch mode to allow passing passwords on the command line. -c : create a new password file. This will overwrite existing files. -D : delete the username rather than adding/updating its password. -U : update a plain text password file to use hashed passwords. See http://mosquitto.org/ for more information.
个人作法是:先在usr/local/etc/mosquitto/pwfile
中把用的密码写下来,以下:
user1:passwd1 user2:passwd2 user3:passwd3 user4:passwd4 user5:passwd5 user6:passwd6
而后使用mosquitto_passwd -U usr/local/etc/mosquitto/pwfile
指令生成密码
更多配置请参考 https://mosquitto.org
# Config file for mosquitto # # See mosquitto.conf(5) for more information. # # Default values are shown, uncomment to change. # # Use the # character to indicate a comment, but only if it is the # very first character on the line. # ================================================================= # General configuration # ================================================================= # 从新发送已经发出去的Qos 为1或者2的消息的等待时间 retry_interval 20 # 系统状态的刷新时间,设置为0表示不刷新 sys_interval 10 #清除在内部消息存储里面的未引用的消息的时间。 #较低的值将占用较少的内存,但处理器时间较长, #越高的值将产生相反的效果。 #设置值为0意味着未引用的消息将以尽量快的速度处理。 store_clean_interval 10 #pid_file # 以什么用户启动 mosquitto,此配置在 windows 下无效,以非 root 运行无效 #user mosquitto #当前每一个客户端正在传输的Qo1和2消息的最大数量。 #这包括经过握手信息,以及那些正在重试的信息。 #默认为20。设置为0表示无上限。 #设置为1将保证QoS 1 和2的消息按顺序传递 max_inflight_messages 20 #当前正在进行的队列中Qos 1和2条消息的最大数量。默认为100。 #设置到0表示没有上限(不推荐)。一样可参见queue_qos0_messages max_queued_messages 100 #设置为true,当一个持久客户端被断开链接时,以Qos为0将消息放到队列中。 #这些消息受max_queued_messages限制 queue_qos0_messages false #此选项设置被代理容许发布的消息的大小。 #超过这个尺寸的消息将不会被代理接受。 #默认值为0,这意味着全部有效的MQTT消息都被接受。 #MQTT的最大有效大小为268435455字节 message_size_limit 0 # 用于设置客户端长链接的过时时间,默认永不过时,必须以h d w m y为单位 #分别表明 小时,天,星期,月,念 #persistent_client_expiration # 若是客户端订阅了多个重叠的订阅,例如foo/ #和foo/+/baz,而后MQTT指望当代理接 #收到一个与两个订阅相匹配的主题的消息时,例如foo/bar/baz,那么客户端应该只接 #收一次消息。为了知足这一要求,mosquitto不断跟踪发送给客户的消息。容许重复的 #消息选项容许禁用此行为,若是您有大量的客户端订阅相同的主题集合,而且很是关注 #最小化内存使用的状况,那么这个选项多是有用的。若是你事先知道你的客户端永不 #会有重叠的订阅,那么你的客户必须可以正确处理重复的信息,即便在Qo = 2的时候, #你的客户端也必须可以正确地处理重复的信息 #allow_duplicate_messages false # ================================================================= # Default listener # ================================================================= # 服务绑定的IP地址 #bind_address # 服务绑定的端口号 #port 1883 # 容许的最大链接数,-1表示没有限制 #max_connections -1 # cafile:CA证书文件 # capath:CA证书目录 # certfile:PEM证书文件 # keyfile:PEM密钥文件 #cafile #capath #certfile #keyfile # 必须提供证书以保证数据安全性 #require_certificate false # 若require_certificate值为true,use_identity_as_username也必须为true #use_identity_as_username false # 启用PSK(Pre-shared-key)支持 #psk_hint # SSL/TSL加密算法,可使用“openssl ciphers”命令获取 # as the output of that command. #ciphers # ================================================================= # Persistence # ================================================================= # 消息自动保存的间隔时间 #autosave_interval 1800 # 消息自动保存功能的开关 #autosave_on_changes false # 持久化功能的开关 persistence true # 持久化DB文件 #persistence_file mosquitto.db # 持久化DB文件目录 #persistence_location /var/lib/mosquitto/ # ================================================================= # Logging # ================================================================= # 4种日志模式:stdout、stderr、syslog、topic # none 则表示不记日志,此配置能够提高些许性能 log_dest none # 选择日志的级别(可设置多项) #log_type error #log_type warning #log_type notice #log_type information # 是否记录客户端链接信息 #connection_messages true # 是否记录日志时间 #log_timestamp true # ================================================================= # Security # ================================================================= # 客户端ID的前缀限制,可用于保证安全性 #clientid_prefixes # 容许匿名用户 #allow_anonymous true # 用户/密码文件,默认格式:username:password #password_file # PSK格式密码文件,默认格式:identity:key #psk_file # pattern write sensor/%u/data # ACL权限配置,经常使用语法以下: # 用户限制:user <username> # 话题限制:topic [read|write] <topic> # 正则限制:pattern write sensor/%u/data #acl_file # ================================================================= # Bridges # ================================================================= # 容许服务之间使用“桥接”模式(可用于分布式部署) #connection <name> #address <host>[:<port>] #topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix] # 设置桥接的客户端ID #clientid # 桥接断开时,是否清除远程服务器中的消息 #cleansession false # 是否发布桥接的状态信息 #notifications true # 设置桥接模式下,消息将会发布到的话题地址 # $SYS/broker/connection/<clientid>/state #notification_topic # 设置桥接的keepalive数值 #keepalive_interval 60 # 桥接模式,目前有三种:automatic、lazy、once #start_type automatic # 桥接模式automatic的超时时间 #restart_timeout 30 # 桥接模式lazy的超时时间 #idle_timeout 60 # 桥接客户端的用户名 #username # 桥接客户端的密码 #password # bridge_cafile:桥接客户端的CA证书文件 # bridge_capath:桥接客户端的CA证书目录 # bridge_certfile:桥接客户端的PEM证书文件 # bridge_keyfile:桥接客户端的PEM密钥文件 #bridge_cafile #bridge_capath #bridge_certfile #bridge_keyfile
参考文献: https://www.imooc.com/article/19459
brew services start/stop mosquitto
systemctl start/stop/restart mosquitto
Topic
与消息队列相比,主题很是轻量级。 客户端不须要在发布或订阅以前建立所需的主题,由于代理接受每一个有效主题时不须要进行任何预初始化。
主题使用/
来分层次,如下是几个主题的示例:
sensors/COMPUTER_NAME/temperature/HARDDRIVE_NAME
更重要的是,MQTT提供了通配符
+号仅仅匹配一个主题层次。例如,finance/stock/+匹配finance/stock/ibm与finance/stock/xyz,可是不匹配finance/stock/ibm/closingprice。由于单层次通配符仅仅匹配一个层次,finance/+不匹配finance。
单层次通配符可用于主题树内任何层次,并与多层次通配符一块儿使用。必须用于在顶层分隔符以后,除了当本身指定时。所以,+和finance/+ 都是有效的,可是finance+无效。单层通配符可用于主题树最后或者在主题树内,例如,finance/+与finance/+/ibm都是有效的。
示例以下:
* a/b/c/d * +/b/c/d * a/+/c/d * a/+/+/d * +/+/+/+
#号能够匹配主题内任何层次
单层次通配符可用于主题树内任何层次,并与多层次通配符一块儿使用。必须用于在顶层分隔符以后,除了当本身指定时。所以,+和finance/+ 都是有效的,可是finance+无效。单层通配符可用于主题树最后或者在主题树内,例如,finance/+与finance/+/ibm都是有效的。
示例以下:
* a/b/c/d * # * a/# * a/b/# * a/b/c/# * +/b/c/#
目前,我手上尚未一个趁手的采样设备,因此只能模拟生成数据,可是,预计下周,我就能够解决这个问题了
关于如何生成数据以及使用TPL Dataflow
数据采集和处理,请参考个人另外一篇博客:
这里重点介绍MQTTnet
的使用
MQTTnet
介绍MQTTnet是一个高性能的MQTT基础链接.NET
库。提供了MQTT服务端和客户端支持。
MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
特性以下:
目前支持的版本以下:
重点是:支持异步
MQTTnet
客户端使用链接MQTT
/// <summary> /// 初始化初始化MQTT /// </summary> private void InitialMqtt() { this._mqttClient = new MqttFactory().CreateMqttClient(); this._mqttClient.ConnectAsync(new MqttClientOptionsBuilder() .WithClientId(Guid.NewGuid().ToString("N")) .WithTcpServer("*****",1883) .WithCredentials("admin", "admin") .WithCleanSession() .Build()); }
能够看到,咱们使用的ConnectAsync()
方法,是异步链接。
发布消息
首先要构建消息
var message = new MqttApplicationMessageBuilder() .WithTopic("measure/force") .WithPayload(t.ToString(CultureInfo.InvariantCulture)) .WithExactlyOnceQoS() .WithRetainFlag() .Build();
而后异步发送
this._mqttClient.PublishAsync(message);
详细代码请参考
客户端: https://github.com/spartajet/IotWpfClient
服务端主要是提供用户管理,参考源代码便可,涉及到CORS
跨域问题,请参考个人另外一篇博客:
MQTTJS
使用mqttjs
介绍mqttjs
是支持MQTT协议的客户端javascript库,注意只是客户端,而且通讯方式是websockt
,因此要在mosquitto
服务器开启websocket
支持。
MQTT.js is a client library for the MQTT protocol, written in JavaScript for node.js and the browser.
mqttjs
安装npm install mqtt
mqttjs
的APImqtt.connect() mqtt.Client() mqtt.Client#publish() mqtt.Client#subscribe() mqtt.Client#unsubscribe() mqtt.Client#end() mqtt.Client#removeOutgoingMessage() mqtt.Client#reconnect() mqtt.Client#handleMessage() mqtt.Client#connected mqtt.Client#reconnecting mqtt.Client#getLastMessageId() mqtt.Store() mqtt.Store#put() mqtt.Store#del() mqtt.Store#createStream() mqtt.Store#close()
mqttjs
的使用链接服务器:
const client = mqtt.connect('ws://ip:1884', { clientid: 'fdafdafas', username: 'admin', password: 'admin' })
设置链接后的事件,要订阅相关主题的消息
client.on('connect', function() { client.subscribe('measure/force', function(err) { if (!err) { client.publish('measure/force', 'Hello mqtt') } }) })
消息推送通知事件
client.on('message', function (topic, message) { // message is Buffer console.log(message.toString()) client.end() })
其余内容请参考项目源码:
前端网页:https://github.com/spartajet/iot-demo-web