MQTT,是:php
- 轻量级的消息订阅和发布(publish/subscribe)协议
- 创建在TCP/IP协议之上
IoT,internet of things,物联网,MQTT在这方面应用较多。html
官方网站:http://mqtt.org/java
MQTT协议是针对以下状况设计的:node
- M2M(Machine to Machine) communication,机器端到端通讯,好比传感器之间的数据通信
- 由于是Machine to Machine,须要考虑:
- Machine,或者叫设备,好比温度传感器,硬件能力很弱,协议要考虑尽可能小的资源消耗,好比计算能力和存储等
- M2M多是无线链接,网络不稳定,带宽也比较小
MQTT协议的架构,用一个示例说明。好比有1个温度传感器(1个Machine),2个小的显示屏(2个Machine),显示屏要显示温度传感器的温度值。git
可经过MQTT V3.1 Protocol Specification查阅详细规范的细节。github
显示器须要先经过MQTT协议subscribe(订阅)一个好比叫temperature
的topic(主题):web
当温度传感器publish(发布)温度数据,显示器就能够收到了:redis
注:以上两张图,取自MQTT and CoAP, IoT Protocolsmongodb
协议里还有2个主要的角色:npm
- client,客户端
- broker,服务器端
它们是经过TCP/IP协议链接的。
由于MQTT是协议,因此不能拿来直接用的,就比如HTTP协议同样。须要找实现这个协议的库或者服务器来运行。
这里是官方的Server support。
我服务器端使用nodejs开发,所以选择了:
MQTT.js最基本使用
安装是很简单的:
npm install mqtt
MQTT.js实现的服务器端
代码以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
var mqtt = require('mqtt');
//{'topicName':[clientObj,clientObj ..]}
var subscribeTopics={};
//建立服务器对象
var server = mqtt.createServer(function(client) {
//创建链接时触发
client.on(
'connect', function(packet) {
client.connack({returnCode:
0});
});
//客户端发布主题时触发
client.on(
'publish', function(packet) {
var topic=packet.topic;
var payload=packet.payload;
//若是没有建立空的主题对应的client数组
if(subscribeTopics[topic]==null){
subscribeTopics[topic]=[];
}
else{
//遍历该主题下所有client,并逐一发送消息
for(var i in subscribeTopics[topic]){
var client=subscribeTopics[topic][i];
client.publish({
topic: topic,
payload: payload
});
}
}
});
//当客户端订阅时触发
client.on(
'subscribe', function(packet) {
var topic=packet.subscriptions[0].topic;
//如没有,建立空的主题对应的client数组
if(subscribeTopics[topic]==null){
subscribeTopics[topic]=[];
}
//若是client数组中没有当前client,加入
if(subscribeTopics[topic].indexOf(client)==-1){
subscribeTopics[topic].push(client);
}
});
client.on(
'pingreq', function(packet) {
client.pingresp();
});
client.on(
'disconnect', function(packet) {
//遍历全部主题,检查对应的数组中是否有当前client,从数组中删除
for (var topic in subscribeTopics){
var index=subscribeTopics[topic].indexOf(client);
if(index>-1){
subscribeTopics[topic].splice(index,
1);
}
}
});
});
//监听端口
server.listen(
1883);
|
这是一个最基本的服务器端,消息的存储和查询都须要本身编程处理。
好比你若是须要用redis保存和触发数据,可参考这篇中文文章:node mqtt server (redis pub/sub)。
MQTT.js实现的客户端
代码:
1
2
3
4
5
6
7
8
9
10
11
12
|
var mqtt = require('mqtt');
client = mqtt.createClient(
1883, 'localhost');
client.subscribe(
'testMessage');
client.publish(
'testMessage', '发布测试信息');
client.on(
'message', function (topic, message) {
console.log(message);
client.end();
});
|
写的很简易,订阅了主题,而后向相同主题发布消息,接收到消息后client中止。
使用Mosca
MQTT.js只是实现了最基础的MQTT协议部分,对于服务器端的处理须要本身完成。
有关MQTT.js是否实现了MQTT server,详细的说明,可参见MQTT Server: MQTT.js or Mosca?
正好,Mosca在MQTT基础上实现了这些,它能够:
- 做为独立运行的MQTT服务器运行
- 集成到nodejs程序里使用
安装很简单:
npm install mosca bunyan -g
做为独立服务器运行
运行:
mosca -v | bunyan
而后,还能够用我上文的客户端代码运行测试。
集成在本身程序中使用
我考虑的后端持久化,是用MongoDB。Mosca另外几个选项:
- Redis,缺点是更注重做为缓存,而不适合可靠持久化
- LevelUp,头一次据说,不打算作技术准备了,是用nodejs的包装起来的LevelDB
- Memory,使用内存,估计默认的就是这个,不适合我使用的状况
首先要安装mosca的库:
npm install mosca
而后,在本机将mongodb运行起来,应该就能够执行下面的代码了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
var mosca = require('mosca')
var settings = {
port:
1883,
backend:{
type:
'mongo',
url:
'mongodb://localhost:27017/mqtt',
pubsubCollection:
'ascoltatori',
mongo: {}
},
persistence:{
factory: mosca.persistence.Mongo,
url:
"mongodb://localhost:27017/mosca"
}
};
var server = new mosca.Server(settings);
server.on(
'ready', function(){
console.log('Mosca server is up and running');
});
server.on(
'published', function(packet, client) {
console.log('Published', packet.payload);
});
|
直接运行做者文档中的代码会在屡次运行客户端后出现错误,我是参考了他2天前加上的示例代码。
做者Matteo Collina生活在乎大利的博洛尼亚,写代码很勤奋,这个项目更新很快,是否是说明这个方向(mqtt)很活跃呢?
做者也写了个幻灯片,MQTT and Node.js
MQTT高级问题
keepalive和PING
从这篇文章MQTT协议笔记之链接和心跳:
心跳时间(Keep Alive timer)
以秒为单位,定义服务器端从客户端接收消息的最大时间间隔。通常应用服务会在业务层次检测客户端网络是否链接,不是TCP/IP协议层面的 心跳机制(好比开启SOCKET的SO_KEEPALIVE选项)。 通常来说,在一个心跳间隔内,客户端发送一个PINGREQ消息到服务器,服务器返回PINGRESP消息,完成一次心跳交互,继而等待下一轮。若客户端 没有收到心跳反馈,会关闭掉TCP/IP端口链接,离线。 16位两个字节,可看作一个无符号的short类型值。最大值,2^16-1 = 65535秒 = 18小时。最小值能够为0,表示客户端不断开。通常设为几分钟,好比微信心跳周期为300秒。
下面的代码中我设置的是10秒:
1
2
3
4
5
6
7
8
9
10
11
|
var mqtt = require('mqtt');
var settings = {
keepalive:
10,
protocolId:
'MQIsdp',
protocolVersion:
3,
clientId:
'client-b',
clean:
false
}
client = mqtt.createClient(
1883, 'localhost',settings);
|
可使用MQTT.js编写简单的服务器代码,观察到服务器端接收到PING请求,并发回PING响应:
1
2
3
4
|
client.
on('pingreq', function(packet) {
client.pingresp();
console.
log('pingreq & resp');
});
|
完整代码上面已经贴过,另见Gist
QoS
QoS在MQTT中有(摘自MQ 遥测传输 (MQTT) V3.1 协议规范):
- “至多一次”,消息发布彻底依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于以下状况,环境传感器数据,丢失一次读记录无所谓,由于不久后还会有第二次发送。
- “至少一次”,确保消息到达,但消息重复可能会发生。
- “只有一次”,确保消息到达一次。这一级别可用于以下状况,在计费系统中,消息重复或丢失会致使不正确的结果。
MQTT.js只是支持了MQTT协议,并无支持QoS,也就是说,只支持最低级别的“至多一次”(QoS0)。
Mosca支持QoS0和1,但不支持2,见Add support QOS 2
接收离线消息
我在应用中的一个主要场景是,使用MQTT.js+Mosca作聊天服务器。
默认Mosca是不支持离线消息的,表现的现象是,若是是有人(client-a)先在主题上发布了消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
var mqtt = require('mqtt');
var settings = {
keepalive:
10,
protocolId:
'MQIsdp',
protocolVersion:
3,
clientId:
'client-a'
}
client = mqtt.createClient(
1883, 'localhost',settings);
client.publish(
'testMessage', '发布new测试信息0',{qos:1,retain: true});
client.publish(
'testMessage', '发布new测试信息1',{qos:1,retain: true});
client.publish(
'testMessage', '发布new测试信息2',{qos:1,retain: true});
client.publish(
'testMessage', '发布new测试信息3',{qos:1,retain: true});
setTimeout(
function(){
client.end();
},
1000);
|
那么另一我的(client-b),随后订阅,仅能看到最后一条消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
var mqtt = require('mqtt');
var settings = {
keepalive:
10,
protocolId:
'MQIsdp',
protocolVersion:
3,
clientId:
'client-b'
}
client = mqtt.createClient(
1883, 'localhost',settings);
client.subscribe(
'testMessage',{qos:1},function(){
console.log('subscribe ok.');
});
client.on(
"message", function(topic, payload) {
console.log('message: '+payload);
});
|
运行结果相似这样:
subscribe ok.
message: 发布new测试信息3
离线消息,须要如下几点:
- 客户端订阅设置QoS=1
- 客户端链接属性
clean: false
,做用是断开链接重连的时候服务器端帮助恢复session,不须要再次订阅
用代码说明如下,先运行这段代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
var mqtt = require('mqtt');
var settings = {
keepalive:
10,
protocolId:
'MQIsdp',
protocolVersion:
3,
clientId:
'client-b',
clean:
false
}
client = mqtt.createClient(
1883, 'localhost',settings);
client.subscribe(
'testMessage',{qos:1},function(){
console.log('subscribe ok.');
client.end();
});
|
而后执行刚才发布多条消息的代码。再执行下面的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
var mqtt = require('mqtt');
var settings = {
keepalive:
10,
protocolId:
'MQIsdp',
protocolVersion:
3,
clientId:
'client-b',
clean:
false
}
client = mqtt.createClient(
1883, 'localhost',settings);
client.on(
"message", function(topic, payload) {
console.log('message: '+payload);
});
|
运行结果相似这样:
message: 发布new测试信息1 message: 发布new测试信息3 message: 发布new测试信息2 message: 发布new测试信息0
收到消息的顺序是乱的,为何会这样,其实很好理解,为了小型受限设备以及网络不稳定的状况,消息是很差保证顺序的。
解决办法是发送的消息带时间戳,接收后再作排序。
另外,担忧客户端没有作client.end()
而非正常退出,那么再次链接是否能恢复session,测试了一下,注释client.end()
,没有问题,正常收到多条离线消息。
SSL链接
Mosca支持SSL链接,可根据Nodejs TLS建立公钥私钥。
而后相似这样启动:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
var mosca = require('mosca')
var SECURE_KEY = __dirname + '/../../test/secure/tls-key.pem';
var SECURE_CERT = __dirname + '/../../test/secure/tls-cert.pem';
var settings = {
port:
8443,
logger: {
name:
"secureExample",
level:
40,
},
secure : {
keyPath: SECURE_KEY,
certPath: SECURE_CERT,
}
};
var server = new mosca.Server(settings);
server.on(
'ready', setup);
// fired when the mqtt server is ready
function setup() {
console.log('Mosca server is up and running')
}
|
这部分我没有测试,直接转自Mosca Encryption Support。
认证和受权
在Mosca Authentication提供了个简易的命令行,可建立帐号用于认证并受权。
可是它不适合个人需求场景,我须要本身编写认证和受权的逻辑。
虽然在做者官方网站上未找到,但在问题管理记录中提交了这方面的支持:Authentication & Authorization。
有下面两条支持,应该能够写出本身的回调,并集成到Mosca中:
- add a callback to authorize a publish.
- add a callback to authorize a subscribe.
不过这块没有写代码,只是大体能肯定。
性能问题
MQTT.js并非完整解决方案,不须要考虑它的性能问题。
说一下Mosca,有一个这方面问题做者的答复,what about mosca’s performance,问问题的仍是个中国人,我前面还引用了他的文章。做者基本意思是:
It basically depends on the RAM. On an AWS large instance it can reach 10k concurrent connections, with roughly 10k messages/second.