常见的mq有Kafka,RocketMQ和RabbitMQ,你们也很常见。 前者很常见,MQTT是什么呢?MQTT属于IoT也就是物联网的概念。
快来和使用mqtt.js开发IM功能2年的做者一探究竟吧~
常见的mq有Kafka,RocketMQ和RabbitMQ,你们也很常见。MQTT是什么呢?
Kafka,RocketMQ和RabbitMQ属于微服务间的mq,而MQTT则属于IoT也就是物联网的概念。html
mqtt.js是MQTT在nodejs端的实现。vue技术栈下的前端也可用。
mqtt.js官方为微信小程序和支付宝小程序也作了支持。微信小程序的MQTT协议名为wxs
,支付宝小程序则是alis
。前端
若是仍是一脸懵逼,那么就跟随我经过mqtt.js去认识一下这个物联网领域的宠儿吧。vue
消息队列通常分为两种:java
目前我实践过的,也就是咱们本篇博文深刻分析的,是物联网消息队列的mqtt.js。node
传统的微服务间(多个子系统服务端间)消息队列是一种很是常见的服务端间消息传递的方式。python
典型表明有:RabbitMQ,Kafka,RocketMQ。
阿里云官网拥有AMQP(兼容RabbitMQ),Kafka,和RocketMQ这三种微服务消息队列,对于咱们从此在实际项目中落地提供了很大的帮助。ios
使用场景多种多样:git
MQTT是一个物联网MQTT协议,主要解决的是物联网IoT网络状况复杂的问题。github
阿里云有MQTT消息队列服务。通讯协议支持MQTT,STOMP,GB-808等。数据传输层支持TCP长链接、SSL加密、Websocket等。web
使用场景主要为数据传输:
目前我手上负责的运行了2年的聊天系统就是使用的这个服务,咱们主要按照设备<->server<->PC
的方式,MQTT协议,Websocket传输协议进行设备与PC间的数据通讯。
每一个MQTT实例都对应一个全局惟一的服务接入点。
肉眼可见的区别就是在经过mqtt.connect(url)
与server(broker)创建链接时,broker的url都是一致的。
假设有saleman1,salesman2···他们本地的前端与服务端间创建链接的url都是统一的,只是在clientId进行区分便可。
MQTT的Client ID是每一个客户端的惟一标识,要求全局都是惟一的,使用同一个Client ID链接会被拒绝。
阿里云的ClientID由两部分组成 <GroupID>@@@<DeviceID>
。
一般状况下Group ID是多前端统一的,好比PC端,安卓移动端,ios移动端,DeviceID也是多前端统一的。
那么如何区分多端呢?能够对Client ID中间的@@@作修改。
好比:
let CID_PC = `<GroupID>@@@-PC<DeviceID>` let CID_Android = `<GroupID>@@@-Android<DeviceID>` let CID_IOS = `<GroupID>@@@-IOS<DeviceID>`
用于指定一组逻辑功能彻底一致的节点公用的组名,表明的是一类相同功能的设备。
每一个设备独一无二的标识。这个须要保证全局惟一,能够是每一个传感器设备的序列号,能够是登陆PC的userId。
MQTT协议基于Pub/Sub模型,任何消息都属于一个Topic。
Topic能够存在多级,第一级为父级Topic。
须要控制台单首创建。
MQTT能够有二级Topic,也能够有三级Topic。
无需建立,代码中直接写便可。
Pub/Sub消息就是订阅和发布的模式,相似事件监听和广播。
若是对发布订阅不理解,能够去看Webhook究竟是个啥?
MQTT除了支持Pub/Sub的模式,还支持P2P的模式。
发送消息时
接收消息时
const p2pTopic =topic+"/p2p/GID_xxxx@@@DEVICEID_001"; mqtt.client.publish(p2pTopic);
import mqtt from 'mqtt'; import config from '@/config'; export default class MQTT { constructor(options) { this.name = options.name; this.connecting = false; } /** * 客户端链接 */ initClient(config) { const { url, groupId, key, password, topic: { publish: publishTopic }} = config; return new Promise((resolve) => { this.client = mqtt.connect( { url, clientId: `${groupId}@@@${deviceId}`, username: key, password, } ); this.client.on('connect', () => { this.connecting = true; resolve(this); }); }); } /** * 订阅topic */ subscribeTopic(topic, config) { if (this.connecting) { this.client.subscribe(topic, config); } return this; } /** * 发送消息 */ publishMessage(message) { this.client.publish(publishTopic, message, { qos: 1 }); } /** * 接收消息 */ handleMessage(callback) { if (!this.client._events.message) { this.client.on('message', callback); } } }
var mqtt = require('mqtt-packet') var object = { cmd: 'publish', retain: false, qos: 0, dup: false, length: 10, topic: 'test', payload: 'test' // Can also be a Buffer } var opts = { protocolVersion: 4 } // default is 4. Usually, opts is a connect packet console.log(mqtt.generate(object)) // Prints: // // <Buffer 30 0a 00 04 74 65 73 74 74 65 73 74> // // Which is the same as: // // new Buffer([ // 48, 10, // Header (publish) // 0, 4, // Topic length // 116, 101, 115, 116, // Topic (test) // 116, 101, 115, 116 // Payload (test) // ])
发出packetsend事件而且经过mqtt.writeToStream将packet写入client的stream中。
var mqttPacket = require('mqtt-packet') function sendPacket (client, packet) { client.emit('packetsend', packet) mqttPacket.writeToStream(packet, client.stream, client.options) }
MqttClient.prototype._sendPacket = function (packet) { sendPacket(this, packet); }
mqtt client创建与mqtt server(broker)的链接,一般是经过给定一个'mqtt', 'mqtts', 'tcp', 'tls', 'ws', 'wss', 'wxs' , 'alis'为协议的url进行链接。
mqtt.connect([url], options)
官方说明:
再来看一下我手上项目的链接配置,链接结果。
敏感信息已经过foo,bar,baz或者xxxx的组合进行数据脱敏处理。
{ key: 'xxxxxxxx', secret: 'xxxxxxxx', url: 'wss://foo-bar.mqtt.baz.com/mqtt', groupId: 'FOO_BAR_BAZ_GID', topic: { publish: 'PUBLISH_TOPIC', subscribe: ['PUBLISH_TOPIC/noticePC/', 'PUBLISH_TOPIC/p2p'], unsubscribe: 'PUBLISH_TOPIC/noticeMobile/', }, }
包括总览,响应头和请求头。
Request URL: wss://foo-bar.mqtt.baz.com Request Method: GET Status Code: 101 Switching Protocols
HTTP/1.1 101 Switching Protocols upgrade: websocket connection: upgrade sec-websocket-accept: xxxxxxx sec-websocket-protocol: mqtt
GET wss://foo-bar.mqtt.baz.com/ HTTP/1.1 Host: foo-bar.mqtt.baz.com Connection: Upgrade Pragma: no-cache Cache-Control: no-cache User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36 Upgrade: websocket Origin: https://xxx.xxx.com Sec-WebSocket-Version: 13 Accept-Encoding: gzip, deflate, br Accept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6 Sec-WebSocket-Key: xxxxxxxxx Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits Sec-WebSocket-Protocol: mqtt
下面来看这段mqtt链接的代码。
this.client = mqtt.connect( { url, clientId: `${groupId}@@@${deviceId}`, username: key, password, } );
function parseAuthOptions (opts) { var matches if (opts.auth) { matches = opts.auth.match(/^(.+):(.+)$/) if (matches) { opts.username = matches[1] opts.password = matches[2] } else { opts.username = opts.auth } } } /** * connect - connect to an MQTT broker. * * @param {String} [brokerUrl] - url of the broker, optional * @param {Object} opts - see MqttClient#constructor */ function connect (brokerUrl, opts) { if ((typeof brokerUrl === 'object') && !opts) { // 能够传入一个单对象,既包含url又包含选项 opts = brokerUrl brokerUrl = null } opts = opts || {} // 设置username和password parseAuthOptions(opts) if (opts.query && typeof opts.query.clientId === 'string') { // 设置Client Id opts.clientId = opts.query.clientId } function wrapper (client) { ... return protocols[opts.protocol](client, opts) } // 最终返回一个mqtt client实例 return new MqttClient(wrapper, opts) }
const topic = { subscribe: ['PUBLISH_TOPIC/noticePC/', 'PUBLISH_TOPIC/p2p'], unsubscribe: 'PUBLISH_TOPIC/noticeMobile/', }; const config = { qos:1 }; this.client.subscribe(topic.subscribe, config)
MqttClient.prototype.subscribe = function () { var packet var args = new Array(arguments.length) for (var i = 0; i < arguments.length; i++) { args[i] = arguments[i] } var subs = [] // obj为订阅的topic列表 var obj = args.shift() // qos等配置 var opts = args.pop() var defaultOpts = { qos: 0 } opts = xtend(defaultOpts, opts) // 数组类型的订阅的topic列表 if (Array.isArray(obj)) { obj.forEach(function (topic) { if (!that._resubscribeTopics.hasOwnProperty(topic) || that._resubscribeTopics[topic].qos < opts.qos || resubscribe) { var currentOpts = { topic: topic, qos: opts.qos } // subs是最终的订阅的topic列表 subs.push(currentOpts) } }) } // 这个packet很重要 packet = { // 发出订阅命令 cmd: 'subscribe', subscriptions: subs, qos: 1, retain: false, dup: false, messageId: this._nextId() } // 发出订阅包 this._sendPacket(packet) return this }
const topic = { publish: 'PUBLISH_TOPIC', }; const messge = { foo: '', bar: '', baz: '', ... } const msgStr = JSON.stringify(message); this.client.publish(topic.publish, msgStr);
注意publish的消息须要使用JSON.stringify进行序列化,而后再发到指定的topic。
MqttClient.prototype.publish = function (topic, message, opts, callback) { var packet var options = this.options var defaultOpts = {qos: 0, retain: false, dup: false} opts = xtend(defaultOpts, opts) // 将消息传入packet的payload packet = { cmd: 'publish', topic: topic, payload: message, qos: opts.qos, retain: opts.retain, messageId: this._nextId(), dup: opts.dup } // 处理不一样qos switch (opts.qos) { case 1: case 2: // 发出publish packet this._sendPacketI(packet); ... default: this._sendPacket(packet); ... } return this }
this.client.on('message', callback);
数据以callback的方式接收。
function (topic, message, packet) {}
topic表明接收到的topic,buffer则是具体的数据。
message是接收到的数据,谨记经过JSON.parse()对buffer作解析。
handleMessage(callback) { this.client.on('message', callback); } this.client.handleMessage((topic, buffer) => { let receiveMsg = null; try { receiveMsg = JSON.parse(buffer.toString()); } catch (e) { receiveMsg = null; } if (!receiveMsg) { return; } ...do something with receiveMsg... });
MqttClient继承了EventEmitter。
从而进行可使用on监听“message”事件。
inherits(MqttClient, EventEmitter)
那么究竟是在哪里间发出message事件的呢?>emit the message event
this.stream = this.streamBuilder(this) function streamBuilder (client, opts) { return createWebSocket(client, opts) } var websocket = require('websocket-stream') function createWebSocket (client, opts) { var websocketSubProtocol = (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) ? 'mqttv3.1' : 'mqtt' setDefaultOpts(opts) var url = buildUrl(opts, client) return websocket(url, [websocketSubProtocol], opts.wsOptions) }
var Writable = require('readable-stream').Writable var writable = new Writable(); this.stream.pipe(writable);
writable._write = function (buf, enc, done) { completeParse = done parser.parse(buf) work() } function work () { var packet = packets.shift() if (packet) { that._handlePacket(packet, nextTickWork) } } function nextTickWork () { if (packets.length) { process.nextTick(work) } else { var done = completeParse completeParse = null done() } }
MqttClient.prototype._handlePacket = function (packet, done) { switch (packet.cmd) { case 'publish': this._handlePublish(packet, done) break ... } // emit the message event MqttClient.prototype._handlePublish = function (packet, done) { switch (qos) { case 1: { // emit the message event if (!code) { that.emit('message', topic, message, packet) } } }
参考资料:
期待和你们交流,共同进步,欢迎你们加入我建立的与前端开发密切相关的技术讨论小组:
- SegmentFault技术圈:ES新规范语法糖
- SegmentFault专栏:趁你还年轻,作个优秀的前端工程师
- 知乎专栏:趁你还年轻,作个优秀的前端工程师
- Github博客: 趁你还年轻233的我的博客
- 前端开发QQ群:660634678
- 微信公众号: 生活在浏览器里的咱们 / excellent_developers
努力成为优秀前端工程师!