基于WebSocket协议实现Broker

写在前面:
前两篇文字<<基于MQTT协议谈谈物联网开发-华佗写代码>>,<<基于MQTT协议实现Broker-华佗写代码>>主要叙述了MQTT协议的编解码以及基于MQTT协议的一些常见应用场景,并以一个简单的消息推送系统做为例子具体阐述了Mqtt Broker部分的实现,以前主要以原生android或者iOS或者服务端代理做为例子,考虑到在移动端开发时,选择的技术栈有所不一样,有的选择web前端开发.做为例子,这里以以前的消息推送系统为例基于web前端开发,继续叙述基于WebSocket协议实现Broker.html

 

1.WebSocket协议主要特色:前端

(1)基于http协议握手创建tcp长链接;android

(2)相比http,WebSocket协议交换最小化,下降网络流量;nginx

(3)双向通讯,服务器能够主动推送数据给客户端;web

 

2.Mqtt Broker具体实现(WebSocket部分):json

2.1Mqtt Broker架构草图:服务器

 

2.2Mqtt Broker实现细节:websocket

(1)新增实现websocket server,监听不一样端口;网络

(2)每一个websocket链接,实例化一个mqttclient负责其协议解析,消息发布和订阅等;架构

(3)复用以前Mqtt Broker与RabbitMQ通讯部分,具体参考上一篇文字;

(4)其余...

 

2.3Mqtt Broker代码实现(WebSocket部分):

type tcpKeepAliveListener struct {
    *net.TCPListener
}

var upgrader = websocket.Upgrader{
    ReadBufferSize: 1024,
    WriteBufferSize: 1024,
}
 
//监听websocket server地址,注册websocket handler
func (mb *MqttBroker) ListenAndServeWeb() {
 defer mb.wg.Done() http.HandleFunc("/", mb.webHandler) webserver := &http.Server{Addr: mb.webaddr, Handler: nil} var listener net.Listener var err error listener, err = net.Listen("tcp", mb.webaddr) if err != nil { return } U.GetLog().Printf("listen and serve web broker on %s", mb.webaddr) err = webserver.Serve(tcpKeepAliveListener{listener.(*net.TCPListener)})
}

//每个Websocket链接,实例化一个MqttClient负责其协议解析,以及与rabbitmq的通讯
func (mb *MqttBroker) webHandler(w http.ResponseWriter, r *http.Request) {
    upgrader.CheckOrigin = checkSameOrigin
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        U.GetLog().Printf("upgrade error:%v", err)
        return
    }
    mqttclient, err := NewMqttClient(mb.wg, mb, nil, conn, "web")
    if err != nil {
        return
    }
    mb.clientMap[mqttclient.GetClientID()] = mqttclient
    mb.wg.Add(1)
    go mqttclient.ServeWeb()
}

 

2.4Mqtt Client代码实现(WebSocket部分):

//定义WebSocket通讯消息格式
//Action选项有publish,subscribe,unsubscribe
type WebMessage struct { Action
string Topic string Payload string } type MqttClient struct { wg *sync.WaitGroup broker *MqttBroker tcpconn net.Conn wconn *websocket.Conn ... needDisConn bool } //经过WebMessage.Action区分消息指令类型 func (mc *MqttClient) ServeWeb() { defer mc.wg.Done() defer mc.commonDefer() if mc.wconn == nil { return } for { if mc.needDisConn { break } _, message, err := mc.wconn.ReadMessage() if err != nil { U.GetLog().Printf("handle message error:%v", err) mc.needDisConn = true continue } wm := WebMessage{} err = json.Unmarshal(message, &wm) if err != nil { U.GetLog().Printf("json.Unmarshal(message, &wm) error:%v", err) continue } switch wm.Action { case "subscribe": err = mc.handleWebSubscibe(wm.Topic) case "publish": err = mc.handleWebPublish(wm.Topic, wm.Payload) case "unsubscribe": err = mc.handleWebUnSubscribe(wm.Topic) case "ping": mc.lastheartbeat = 0 default: U.GetLog().Printf("unexpected WebMessage Action:%s", wm.Action) continue } if err != nil { U.GetLog().Printf("handle message error:%v", err) } mc.lastheartbeat = 0 } }

 

3.WebSocket Client端实现:

3.1实现细节:

(1)创建与WebSocket Server的链接;

(2)初始化WebSocket,注册相关回调函数;

(3)实现WebSocket断线重连机制;

(4)封装相似mqtt基于topic的发布订阅等接口;

(5)Nodejs端须要browserify相关js文件,Javascript端能够直接调用WebSocket;

(6)其余...

 

3.2具体代码实现:

var WebSocket = require('ws'); var WEBSOCKET_MQTT_BROKER = 'ws://your_server_ip/ws/'; var ping = { Action: "ping" }; var _listeners = {}; var _websocket = null; var _connected = false; _access = function () { console.log('try mqtt.connect'); _connect_websocket(); setInterval(function () { _reconnect_websocket(); if (_websocket != null && _connected) { _websocket.send(JSON.stringify(ping)); } }, 3000); }; //websocket初始化,并实现相关回调函数 _init_websocket = function () { if (_websocket == null) { return; } _websocket.onopen = function () { _connected = true; console.log("Connected to WebSocket server."); for (var topic in _listeners) { var sub = { Action: "subscribe", Topic: topic, Payload: "" }; _websocket.send(JSON.stringify(sub)); } }; _websocket.onclose = function () { _connected = false; _websocket = null; console.log("Disconnected"); }; _websocket.onmessage = function (evt) { console.log('recv data from server: ' + evt.data); var dataObj = JSON.parse(evt.data); _listeners[dataObj.Topic] && _listeners[dataObj.Topic](dataObj.Payload); }; _websocket.onerror = function (evt) { _connected = false; _websocket = null; console.log('Error occured: ' + evt); }; }; _connect_websocket = function () { if (_connected) { return; } _websocket = new WebSocket(WEBSOCKET_MQTT_BROKER); _init_websocket(); }; //断线重连,经过定时器实现每三秒断线重连 _reconnect_websocket = function () { if (_connected) { return; } _websocket = new WebSocket(WEBSOCKET_MQTT_BROKER); _init_websocket(); }; //模拟mqtt发布消息 sendMessage = function (topic, data) { if (!_websocket || !_connected) { var err = new Error('iot client not ready.'); console.warn(err); return; } var send_data = JSON.stringify(data); var pub = { Action: "publish", Topic: topic, Payload: send_data }; _websocket.send(JSON.stringify(pub)); }; //模拟mqtt订阅消息,并根据topic注册回调函数 onMessage = function (topic, callback) { _listeners[topic] = callback; if (!_websocket || !_connected) { console.warn('onMessage, but iot client not ready.'); return; } var sub = { Action: "subscribe", Topic: topic, Payload: "" }; _websocket.send(JSON.stringify(sub)); }; //模拟mqtt取消订阅,并根据topic删除对应回调函数 stopReceiveMessage = function (topic) { delete _listeners[topic]; if (!_websocket || !_connected) { console.warn('stopReceiveMessage, but iot client not ready.'); return; } var unsub = { Action: "unsubscribe", Topic: topic, Payload: "" }; _websocket.send(JSON.stringify(unsub)); }; _access();

 

4.WebSocket相关nginx配置:

server { listen 80; server_name your_server_name;  ... location /ws/ { proxy_redirect off; add_header Access-Control-Allow-Origin *; add_header Access-Control-Allow-Methods 'GET, POST, OPTIONS'; add_header Access-Control-Allow-Headers 'DNT,X-Mx-ReqToken,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Authorization'; proxy_pass http://127.0.0.1:2884/;
            proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; }
}

 

出于篇幅考虑,以前两篇文字叙述过的内容,好比Mqtt Broker其余实现部分以及与RabbitMQ通讯部分,都是复用以前的代码逻辑,这里再也不赘述,Mqtt Broker中WebSocket部分至关于使用WebSocket协议作了MQTT协议的翻译转换,也有一些成员变量,用到了也不一一具体注释了,主要经过代码关键路径叙述实现的一些细节,若有错误,恳请指出,转载也请注明出处!!!

 

未完待续...

相关文章
相关标签/搜索