MQTT(MQ Telemetry Transport) 消息队列遥测传输协议是IBM开发的一种网络应用层的协议,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通讯变得简单,好比如今应用普遍的低功耗传感器,手机、嵌入式计算机、微型控制器,卫星等移动设备。javascript
MQTT 的独特之处在于,它的每消息标题能够短至 2 个byte。MQ 和 HTTP 都拥有高得多的每消息开销。对于 HTTP,为每一个新请求消息从新创建 HTTP 链接会致使重大的开销。MQ 和 MQTT 所使用的永久链接显著减小了这一开销。html
您须要可以及时地将通知传递给客户。为此,必须采用某种按期轮询或推送方法;从电池、系统负载和带宽角度讲,推送是最佳解决方案。MQTT 是专门针对低功耗目标而设计的。HTTP 的设计没有考虑此因素,所以增长了功耗。java
在 HTTP 堆栈上,维护数百万个并发链接,须要作许多的工做来提供支持。尽管能够实现此支持,但大多数商业产品都为处理这一数量级的永久链接而进行了优化。IBM 提供了 IBM MessageSight,这是一个单机架装载服务器,通过测试能处理多达 100 万个经过 MQTT 并发链接的设备。相反,MQ 不是为大量并发客户端而设计的。c++
MQTT提供三种不一样消息传递等级,让消息能按需到达目的地,适应在不稳定工做的网络传输需求。MQTT 和 MQ 可以从断开等故障中恢复,并且没有进一步的代码需求。可是,HTTP 没法原生地实现此目的,须要客户端重试编码,这可能增长幂等性问题。spring
支持各类流行编程语言(包括C,Java,Ruby,Python 等等)且易于使用的客户端。macos
支持发布 / 订阅模型,简化应用程序的开发。编程
企业可能须要在没有第三方中介的状况下发送敏感的信息。这下降了特定于操做系统的解决方案(好比 Apple iOS、Google Play 通知)做为主要传输机制的价值。安全
HTTP 只容许使用一种称为COMET 的方法,使用持久的 HTTP 请求来执行推送。从客户端和服务器的角度讲,此方法都很昂贵。MQ 和 MQTT 都支持推送,这是它们的一个基本特性。服务器
一些企业防火墙将出站链接限制到一些已定义的端口。这些端口一般被限制为 HTTP(80 端口)、HTTPS(443 端口)等。HTTP 显然能够在这些状况下运行。MQTT 可封装在一个 WebSockets 链接中,显示为一个 HTTP 升级请求,从而容许在这些状况下运行。MQ 不容许采用这种模式。cookie
因为MQTT自己的各项技术优点,愈来愈多的企业倾向于选用MQTT做为物联网产品通信的标准协议,也所以,工程师们渐渐发现MQTT协议要想大规模商用,也有一些有待完善的功能。好比:
不一样的异构终端,须要有对应的与MQTT服务器通讯的软件SDK包,好比MCU、Linux、Android、IOS、WEB等之间要实现互联互通必然须要不一样的SDK包。
有些应用场景,须要传输的信息可能不只仅限于指令,好比声音信号和视频信号,这些须要经过File和AV来实现通讯。
虽然MQTT协议优于普通的HTTP协议,可是基于传统的HTTP协议的WEB服务器仍然占主流市场,那么这些服务器要实现与MQTT协议的互联互通,以下降升级成本也尤其关键。
用户在进行设备的行为数据分析的时候,显得尤其重要,这又是工业4.0、大数据时代的必然需求。
消息弥补设备离线之后,MQTT服务器对设备的控制信息丢失的问题。
(解决方案:https://helpcdn.aliyun.com/document_detail/59914.html)
采用标准的MQTT协议,理论上能够经过相互订阅的方式实现点对点通讯,可是逻辑相对复杂,而且对设备的安全性方面存在担心。当设备B和设备C在同一主题的状况下,设备A没法知道是设备B仍是设备C发送的消息,也有可能消息被设备D窃听。
实现了对群组成员的管理,群组成员之间能互通消息,这在一个设备被多人控制,或者多个设备被一人控制的这种场景下,尤其有用。
怎么样,是否是一目了然,很是简单
一款实现了消息推送协议 MQTT v3.1 的开源消息代理软件,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通讯变得简单,好比如今应用普遍的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。一个典型的应用案例就是 Andy Stanford-ClarkMosquitto(MQTT协议创始人之一)在家中实现的远程监控和自动化。并在 OggCamp 的演讲上,对MQTT协议进行详细阐述。
https://mosquitto.org/
2.3.1使用简单
2.3.2 网络资料丰富
2.4.1 没有可视化管理后台
2.4.2 商用实例很少
1.源码包下载:http://mosquitto.org/files/source/
或者wget http://mosquitto.org/files/source/mosquitto-1.4.9.tar.gz
解压:tar -zxvf mosquitto-1.4.tar.gz
进入目录:cd mosquitto-1.4
2.编译安装
打开配置文件,去掉暂且不须要的功能:
vi config.mk
如:WITH_TLS,WITH_TLS_PSK, WITH_SRV, WITH_WEBSOCKETS, WITH_SOCKS, WITH_UUID等
保存退出:wq
安装mosquitto
make
make install
踩过的坑:
a】编译找不到openssl/ssl.h
安装openssl sudo apt-get install libssl-dev
【b】编译过程找不到ares.h
sudo apt-get install libc-ares-dev
【c】编译过程找不到uuid/uuid.h
sudo apt-get install uuid-dev
【d】使用过程当中找不到libmosquitto.so.1
error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory
【解决方法】——修改libmosquitto.so位置
# 建立连接
sudo ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1
# 更新动态连接库
sudo ldconfig
【e】make: g++:命令未找到
【解决方法】
安装g++编译器
sudo apt-get install g++
启动 mosquitto broker
mosquitto -c /etc/mosquitto/mosquitto.conf.example &
-c : specify the broker config file.
-d : put the broker into the background after starting.
-h : display this help.
-p : start the broker listening on the specified port.
Not recommended in conjunction with the -c option.
-v : verbose mode - enable all logging types. This overrides
any logging options given in the config file.
订阅消息:
./mosquitto_sub -h 127.0.0.1 -p 1883 -t "/sports/wordcup"
发布消息:
./mosquitto_pub -h 127.0.0.1 -p 1883 -t "/sports/wordcup " -m "this is carter hello"
或者
./mosquitto_sub -h 10.129.4.12 -p 1883 -t "/sports/wordcup"
./mosquitto_pub -h 10.129.4.12 -p 1883 -t "/sports/wordcup" -m "this is carter hello 666"
外网地址不行,因此我在本机用paho代码一直报timeOut异常,缘由是服务器防火墙未放开端口
【解决办法】
放开防火墙端口:firewall-cmd --add-port=1883/tcp –permanent
重启防火墙:systemctl restart firewalld
Java客户端实现
采用eclipse.paho框架
新建maven工程,加入依赖
<!-- spring整合mqtt 开始--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>4.1.0.RELEASE</version> <exclusions> <exclusion> <groupId>org.eclipse.paho</groupId> <artifactId>mqtt-client</artifactId> </exclusion> </exclusions> </dependency> <!-- spring整合mqtt 结束--> <!-- mqtt依赖 开始 --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>mqtt-client</artifactId> </dependency> <!-- mqtt依赖 结束 -->
public class ServerMQTT { //tcp://MQTT安装的服务器地址:MQTT定义的端口号 public static final String HOST = "tcp://111.9.116.136:1883"; //定义一个主题 public static final String TOPIC = "pos_message_all"; //定义MQTT的ID,能够在MQTT服务配置中指定 private static final String clientid = "server11"; private MqttClient client; private MqttTopic topic11; private String userName = "mosquitto"; //非必须 private String passWord = ""; //非必须 private MqttMessage message; /** * 构造函数 * @throws MqttException */ public ServerMQTT() throws MqttException { // MemoryPersistence设置clientid的保存形式,默认为之内存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); connect(); } /** * 用来链接服务器 */ private void connect() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); // 设置超时时间 options.setConnectionTimeout(10); // 设置会话心跳时间 options.setKeepAliveInterval(20); try { client.setCallback(new PushCallBack()); client.connect(options); topic11 = client.getTopic(TOPIC); } catch (Exception e) { e.printStackTrace(); } } /** * @param topic * @param message * @throws MqttPersistenceException * @throws MqttException */ public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException, MqttException { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); System.out.println("message is published completely! " + token.isComplete()); } /** * 启动入口 * @param args * @throws MqttException */ public static void main(String[] args) throws MqttException { ServerMQTT server = new ServerMQTT(); server.message = new MqttMessage(); server.message.setQos(1); //保证消息能到达一次 server.message.setRetained(true); server.message.setPayload("I love this Summer 8888".getBytes()); server.publish(server.topic11 , server.message); System.out.println(server.message.isRetained() + "------ratained状态"); } }
/** * 模拟一个客户端接收消息 */ public class ClientMQTT { public static final String HOST = "tcp://111.9.116.136:1883"; public static final String TOPIC1 = "pos_message_all"; private static final String clientid = "client11"; private MqttClient client; private MqttConnectOptions options; private String userName = "admin"; //非必须 private String passWord = "password"; //非必须 @SuppressWarnings("unused") private ScheduledExecutorService scheduler; /** * $SYS中各主题说明以下: $SYS/broker/load/connections/+ 不一样时间段内服务器接收到的connections包的平均数。最后的“+”但是1min,5min,15min。分别表示1分钟,5分钟,15分钟的平均数。 $SYS/broker/load/bytes/received/+ 不一样时间段内服务器接收数据的平均字节数。最后的“+”但是1min,5min,15min。 $SYS/broker/load/bytes/sent/+ 不一样时间段内服务器发送数据的平均字节数。最后的“+”但是1min,5min,15min。 $SYS/broker/load/messages/received/+ 不一样时间段内服务器接收到的全部类型消息的平均数。最后的“+”但是1min,5min,15min。 $SYS/broker/load/messages/sent/+ 不一样时间段内服务器发送的全部类型的消息的平均数。最后的“+”但是1min,5min,15min。 $SYS/broker/load/publish/dropped/+ 不一样时间段内服务器丢弃的消息的平均数,这代表了那些持久链接但与服务器断开的客户端失去消息的速率。最后的“+”但是1min,5min,15min。 $SYS/broker/load/publish/received/+ 不一样时间段内服务器接收的发布消息的平均数。最后的“+”但是1min,5min,15min。 $SYS/broker/load/publish/sent/+ 不一样时间段内服务器发送的发布消息的平均数。最后的“+”但是1min,5min,15min。 $SYS/broker/load/sockets/+ 不一样时间段内服务器打开的socket链接的平均数。最后的“+”但是1min,5min,15min。 $SYS/broker/messages/inflight 等待确认的Qos>0的消息的数量。 $SYS/broker/messages/received 自服务器启动以来接收的全部类型的消息总数。 $SYS/broker/messages/sent 自服务器启动以来发送的全部类型的消息总数。 $SYS/broker/messages/stored 服务器存储的消息的总数,包括保留消息和持久链接客户端的消息队列中的消息数。 $SYS/broker/publish/messages/dropped 因为inflight/queuing限制而直接丢弃的消息的总数,相关设置请查看mosquitto.conf中max_inflight_messages 和max_queued_messages参数。 $SYS/broker/publish/messages/received 自服务器启动以来接收的发布消息的总数。 $SYS/broker/publish/messages/sent 自服务器启动以来发送的发布消息的总数。 $SYS/broker/retained messages/count 服务器保留的消息总数。 $SYS/broker/subscriptions/count 服务器订阅主题总数。 $SYS/broker/timestamp Mosquitto软件build的详细时间(Static)。 $SYS/broker/uptime Mosquitto启动时长(单位:秒)。 $SYS/broker/version Mosquitto软件版本号(Static)。 */ public static final String TOPIC2 = "$SYS/broker/bytes/received"; //自服务器启动以来共接收的字节数 public static final String TOPIC3 = "$SYS/broker/bytes/sent"; //自服务器启动以来共发送的字节数 public static final String TOPIC4 = "$SYS/broker/clients/expired"; //超过有效期被断开链接的客户端数量,有效期经过persistent_client_expiration参数设置。 public static final String TOPIC5 = "$SYS/broker/clients/disconnected"; //自服务器启动以来断开的链接数 public static final String TOPIC6 = "$SYS/broker/clients/maximum"; //服务器同一时间链接的最大客户端数量 public static final String TOPIC7 = "$SYS/broker/clients/total"; //有效和无效链接、注册到服务器上的总数。 public static final String TOPIC8 = "$SYS/broker/connection/#"; //若是服务器设置了桥接,系统会提供一个主题来标识链接状态,默认使用$SYS/broker/connection/,若是主题值为1表示链接激活,若是为0表示链接没有激活。 public static final String TOPIC9 = "$SYS/broker/heap/current size"; //Mosquitto正在使用的堆内存大小。注意这个主题是否可使用取决于系统编译时的相关参数设置。 public static final String TOPIC10 = "$SYS/broker/heap/maximum size"; //Mosquitto使用的最大堆内存。这个参数是否有效也取决于系统编译时的相关参数设置。 private void start() { try { // host为主机名,clientid即链接MQTT的客户端ID,通常以惟一标识符表示,MemoryPersistence设置clientid的保存形式,默认为之内存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的链接设置 options = new MqttConnectOptions(); // 设置是否清空session,这里若是设置为false表示服务器会保留客户端的链接记录,设置为true表示每次链接到服务器都以新的身份链接 options.setCleanSession(false); // 设置链接的用户名 options.setUserName(userName); // 设置链接的密码 options.setPassword(passWord.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并无重连的机制 options.setKeepAliveInterval(20); // 设置回调 client.setCallback(new PushCallBack()); MqttTopic topic = client.getTopic(TOPIC1); //setWill方法,若是项目中须要知道客户端是否掉线能够调用该方法。设置最终端口的通知消息 options.setWill(topic, "close".getBytes(), 2, true);//遗嘱 client.connect(options); //订阅消息 int[] Qos = {1,0,2,1,0,2,1,0,2}; String[] topic1 = {TOPIC2,TOPIC3,TOPIC4,TOPIC5,TOPIC6,TOPIC7,TOPIC8,TOPIC9,TOPIC10}; // int[] Qos = {1}; // String[] topic1 = {TOPIC1}; client.subscribe(topic1, Qos); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws MqttException { ClientMQTT client = new ClientMQTT(); client.start(); } }
PushCallBack 必须实现 MqttCallback 接口
有三个方法:
public void connectionLost(Throwable cause) { // 链接丢失后,通常在这里面进行重连 System.out.println("链接断开,能够作重连"); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete() +token.getMessageId()); } public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后获得的消息会执行到这里面 System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息内容 : " + new String(message.getPayload())); }
MQTT提供三种Qos的消息传递质量:
最多一次(Atmost once delivery):QoS=0,协议对此等级应用信息不要求回应确认,也没有重发机制,这类信息可能会发生消息丢失或重复,取决于TCP/IP提供的尽最大努力交互的数据包服务。
(0:消息最多被传递一次,好比通常类广告,通知)
最少一次(Atleast once delivery):QoS=1,确保信息到达,但消息重复可能发生,发送者若是在指定时间内没有收到PUBACK控制报文,应用信息会被从新发送,且控制报文中DUP标志位置1。
(1 :消息会被传递但可能会重复传递,好比帐户余额通知)
仅仅一次(Exactlyonce delivery):QoS=2,最高级别的服务质量,消息丢失和重复都是不可接受的。
(2 :消息保证传递且仅有一次传递,好比交易支付批复通知)
EMQ 2.0 (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 语言平台开发,支持大规模链接和分布式集群,发布订阅模式的开源 MQTT 消息服务器。
http://www.emqtt.com/docs/v2/index.html
开放18083端口访问管理后台
官方的回复是8核心32G的配置可以承载160W台设备的连接
nzip emqttd-macosx-v2.0.zip && cd emqttd # 启动emqttd ./bin/emqttd start # 检查运行状态./bin/emqttd_ctl status # 中止emqttd ./bin/emqttd stop 默认配置文件 /bin/emqenv [ "x" = "x$EMQ_NODE_NAME" ] && EMQ_NODE_NAME=emqttd@127.0.0.1 [ "x" = "x$EMQ_NODE_COOKIE" ] && EMQ_NODE_COOKIE=emqsecretcookie [ "x" = "x$EMQ_MAX_PACKET_SIZE" ] && EMQ_MAX_PACKET_SIZE=64KB [ "x" = "x$EMQ_MAX_PORTS" ] && EMQ_MAX_PORTS=65536 [ "x" = "x$EMQ_TCP_PORT" ] && EMQ_TCP_PORT=1883 [ "x" = "x$EMQ_SSL_PORT" ] && EMQ_SSL_PORT=8883 [ "x" = "x$EMQ_WS_PORT" ] && EMQ_WS_PORT=8083 [ "x" = "x$EMQ_WSS_PORT" ] && EMQ_WSS_PORT=8084 对外暴露的tcp端口依然是1883 和mosquitto同样
在客户端分别订阅和发布消息,在管理后台列表能够看到消息的状态,管理后台默认端口为18083