消息实时推送,指的是将消息实时地推送到浏览器,用户不须要刷新浏览器就能够实时获取最新的消息,实时聊天室的技术原理也是如此。传统的Web站点为了实现推送技术,所用的技术都是轮询,这种传统的模式带来很明显的缺点,即浏览器须要不断的向服务器发出请求。
短轮询(Polling)
客户端须要定时往浏览器轮询发送请求,且只有当服务有数据更新后,客户端的下一次轮询请求才能拿到更新后的数据,在数据更新前的屡次请求至关于无效。这对带宽资源形成了极大的浪费,若提升轮询定时器时间,又会有数据更新不及时的烦恼。
commet
为了解决短轮询的弊端,一种基于http长链接的"服务器推"方式被hack出来。其与短轮询的区别主要是,采用commet时,客户端与服务端保持一个长链接,当数据发生改变时,服务端主动将数据推送到客户端。Comet 又能够被细分为两种实现方式,一种是长轮询机制,一种是流技术。html
长轮询html5
长轮询跟短轮询不一样的地方是,客户端往服务端发送请求后,服务端判断是否有数据更新,若没有,则将请求hold住,等待数据更新时,才返回响应。这样则避免了大量无效的http请求,但即便采用长轮询方式,接受数据更新的最小时间间隔仍是为2*RTT(往返时间)。node
流技术nginx
流技术(http stream)基于iframe实现。经过HTML标签iframe src指向服务端,创建一个长链接。当有数据推送,则往客户端返回,无须再请求。但流技术有个缺点就是,在浏览器顶部会一直出现页面未加载完成的loading标示。git
websocket
为了解决服务端如何更快地实时推送数据到客户端以及以上推送方式技术的不足,HTML5中定义了Websocket协议,它是一种在单个TCP链接上进行全双工通信的协议。与http协议不一样的请求/响应模式不一样,Websocket在创建链接以前有一个Handshake(Opening Handshake)过程,创建链接以后,双方便可双向通讯。固然,因为websocket是html5新特性,在部分浏览器(IE10如下)是不支持的。
咱们来看下websocket的握手报文:github
请求报文:web
GET /chat HTTP/1.1 Host: server.example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== Sec-WebSocket-Protocol: chat Sec-WebSocket-Version: 13 Origin: http://example.com
"Upgrade "、"Connection": 告诉服务器这个请求是一个websocket协议,须要区别处理redis
"Upgrade: websocket": 代表这是一个 WebSocket 类型请求,意在告诉 server 须要将通讯协议切换到 WebSocket浏览器
"Sec-WebSocket-Key": 是 client 发送的一个 base64 编码的密文,要求 server 必须返回一个对应加密的 "Sec-WebSocket-Accept" 应答,不然 client 会抛出 "Error during WebSocket handshake" 错误,并关闭链接服务器
"Sec-WebSocket-Protocol":一个用户定义的字符串,用来区分同URL下,不一样的服务所须要的协议
"Sec-WebSocket-Version":Websocket Draft (协议版本)
响应报文:
HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo= Sec-WebSocket-Protocol: chat
"Sec-WebSocket-Accept": 这个则是通过服务器确认,而且加密事后的 Sec-WebSocket-Key。加密方式为将Sec-WebSocket-Key与一段固定的 GUID 字符串进行链接,而后进行SHA-1 hash,接着base64编码获得。
socket.io(http://socket.io)
是一个彻底由JavaScript实现,基于Node.js、支持WebSocket的协议用于实时通讯、跨平台的开源框架。Socket.IO除了支持WebSocket通信协议外,还支持许多种轮询机制以及其它实时通讯方式,并封装成了通用的接口,并可以根据浏览器对通信机制的支持状况自动地选择最佳的方式来实现网络实时应用。
首先,咱们建立一个socket.io server对象,指定监听80端口。而且指定收到message消息,以及socket端口的监听方法。接着,当socket创建链接后,经过socket.emit方法,能够往客户端发送消息。
var io = require('socket.io')(); io.on('connection', function(socket) { //接受消息 socket.on('message', function (msg) { console.log('receive messge : ' + msg ); }); //发送消息 socket.emit('message', 'hello'); //断开链接回调 socket.on('disconnect', function () { console.log('socket disconnect'); }); }); io.listen(80);
客户端的代码也很是简单,只要引入socket.io对应的客户端库(https://github.com/socketio/s...。
在socket创建链接的回调中,使用socket.emit以及socket.on就能够分别作消息的发送以及监听了。
<script> var socket = io('http://localhost/'); socket.on('connect', function () { socket.emit('message', 'hi, i am client!'); socket.on('message', function (msg) { console.log('msg received from server'); }); }); </script>
若只是单机部署应用,单纯使用socket.io的消息事件监听处理便可知足咱们的需求。但随着业务的扩大,咱们须要考虑多机集群部署,客户端能够链接到任一节点,并发送消息。如何作到多节点的同时推送,咱们须要创建一套多节点之间的消息分发/订阅架构。这时咱们引入redis的pub/sub功能。
redis
redis是一个key-value存储系统,在该项目中主要起到一个消息分发中心(publish/subscribe)的做用。用户经过socket.io namespace 订阅房间号后,socket.io server则往redis订阅(subscribe)该房间号channel。当在该房间中的某一用户发送消息时,则经过redis的publish功能往redis该房间号channel publish消息。这样全部订阅该房间号channel的websocket链接则会收到消息回调,而后推送给客户端。
nginx
因为采用了集群架构,则须要nginx来作反向代理。须要注意的是,websocket的支持须要nginx1.3以上版本。而且咱们须要经过配置ip_hash作粘性会话(ip_hash)处理,避免在低版本浏览器socket.io使用兼容方案轮询请求,请求到不一样机器,形成session异常。
####3、架构设计图
客户端经过socket.io namespace 指定对应roomid,请求到nginx。nginx根据ip_hash反向代理到对应机器的某一端口的socket.io server 进程。创建websocket链接,并往redis订阅对应到房间(roomid)channel。到这个时候,一个订阅了某一房间的websocket通道创建完成。
当用户发送消息时,socket.io server捕获到该房间到消息后,即往redis对应房间id的channel publish消息。这时全部订阅了该房间id channel的socket.io server就会收到订阅响应,接着找到对应房间id的webscoket通道,并将消息推送到客户端。
nginx配置(nginx版本须>1.3):
在http{}里配置定义upstream,并设置ip_hash。使同一个ip的请求可以落在同一个机器同一个进程中。 若是改节点挂了,则自动重连到另一个节点,该方案对于后期扩容也很是方便。
upstream io_nodes { ip_hash; server 127.0.0.1:6001; server 127.0.0.1:6002; server 127.0.0.1:6003; server 127.0.0.1:6004; server 127.0.0.1:6005; server 127.0.0.1:6006; server 127.0.0.1:6007; server 127.0.0.1:6008; server 10.x.x.x:6001; server 10.x.x.x:6002; server 10.x.x.x:6003; server 10.x.x.x:6004; server 10.x.x.x:6005; server 10.x.x.x:6006; server 10.x.x.x:6007; server 10.x.x.x:6008; }
在server中,配置location:
location / { proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Host $host; proxy_http_version 1.1; proxy_pass http://io_nodes; proxy_redirect off; }
cluster.js
咱们采用了多进程的设计,充分利用cpu多核优点。经过主进程统一管理维护子进程,每一个进程监听一个端口。
var cupNum = require('os').cpus().length, workerArr = [], roomInfo = []; var connectNum = 0; for (var i = 0; i < cupNum; i++) { workerArr.push(fork('./fork_server.js', [6001 + i])); workerArr[i].on('message', function(msg) { if (msg.cmd && msg.cmd === 'client connect') { connectNum++; console.log('socket server connectnum:' + connectNum); } if (msg.cmd && msg.cmd === 'client disconnect') { connectNum--; console.log('socket server connectnum:' + connectNum); } });
fork_server.js
var process = require('process'); var io = require('socket.io')(); var num = 0; var redis = require('redis'); var redisClient = redis.createClient; //创建redis pub、sub链接 var pub = redisClient({port:13800, host: '127.0.0.1', password:'xxxx'}); var sub = redisClient({port: 13800, host:'127.0.0.1', password:'xxxx'}); var roomSet = {}; //获取父进程传递端口 var port = parseInt(process.argv[2]); //当websocket链接时 io.on('connection', function(socket) { //客户端请求ws URL: http://127.0.0.1:6001?roomid=k12_webcourse_room_1 var roomid = socket.handshake.query.roomid; console.log('worker pid: ' + process.pid + ' join roomid: '+ roomid); socket.on('join', function (data) { socket.join(roomid); //加入房间 // 往redis订阅房间id if(!roomSet[roomid]){ roomSet[roomid] = {}; console.log('sub channel ' + roomid); sub.subscribe(roomid); } roomSet[roomid][socket.id] = {}; reportConnect(); console.log(data.username + ' join, IP: ' + socket.client.conn.remoteAddress); roomSet[roomid][socket.id].username = data.username; // 往该房间id的reids channel publish用户进入房间消息 pub.publish(roomid, JSON.stringify({"event":'join',"data": data})); }); //用户发言 推送消息到redis socket.on('say', function (data) { console.log("Received Message: " + data.text); pub.publish(roomid, JSON.stringify({"event":'broadcast_say',"data": { username: roomSet[roomid][socket.id].username, text: data.text }})); }); socket.on('disconnect', function() { num--; console.log('worker pid: ' + process.pid + ' clien disconnection num:' + num); process.send({ cmd: 'client disconnect' }); if (roomSet[roomid] && roomSet[roomid][socket.id] && roomSet[roomid][socket.id].username) { console.log(roomSet[roomid][socket.id].username + ' quit'); pub.publish(roomid, JSON.stringify({"event":'broadcast_quit',"data": { username: roomSet[roomid][socket.id].username }})); } roomSet[roomid] && roomSet[roomid][socket.id] && (delete roomSet[roomid][socket.id]); }); }); /** * 订阅redis 回调 * @param {[type]} channel [频道] * @param {[type]} count [数量] * @return {[type]} [description] */ sub.on("subscribe", function (channel, count) { console.log('worker pid: ' + process.pid + ' subscribe: ' + channel); }); /** * 收到redis publish 对应channel的消息 * @param {[type]} channel [description] * @param {[type]} message * @return {[type]} [description] */ sub.on("message", function (channel, message) { console.log("message channel " + channel + ": " + message); //往对应房间广播消息 io.to(channel).emit('message', JSON.parse(message)); }); /** * 上报链接到master进程 * @return {[type]} [description] */ var reportConnect = function(){ num++; console.log('worker pid: ' + process.pid + ' client connect connection num:' + num); process.send({ cmd: 'client connect' }); }; io.listen(port); console.log('worker pid: ' + process.pid + ' listen port:' + port);
客户端:
<script src="static/socket.io.js"></script> <script> var roomid = (function () { return prompt('请输入房间号','') })(); var userInfo = { username: (function () { return prompt('请输入rtx昵称', ''); })() }; if(roomid != null && roomid != "") { var socket = io.connect('http://10.244.146.2?roomid='+ roomid); socket.emit('join', { username: userInfo.username }); socket.on('message', function(msg){ switch (msg.event) { case 'join': if (msg.data.username) { console.log(msg.data.username + '加入了聊天室'); var data = { text: msg.data.username + '加入了聊天室' }; showNotice(data); } break; /*收到消息广播后,显示消息*/ case 'broadcast_say': if(msg.data.username!==userInfo.username) { console.log(msg.data.username + '说: ' + msg.data.text); showMessage(msg.data); } break; /*离开聊天室广播后,显示消息*/ case 'broadcast_quit': if (msg.data.username) { console.log(msg.data.username + '离开了聊天室'); var data = { text: msg.data.username + '离开了聊天室' }; showNotice(data); } break; } }) } /*点击发送按钮*/ document.getElementById('send').onclick = function () { var keywords = document.getElementById('keywords'); if (keywords.value === '') { keywords.focus(); return false; } var data = { text: keywords.value, type: 0, username: userInfo.username }; /*向服务器提交一个say事件,发送消息*/ socket.emit('say', data); showMessage(data); keywords.value = ""; keywords.focus(); }; /*展现消息*/ function showMessage(data) { var itemArr = []; itemArr.push('<dd class="'+(data.type === 0 ? "me" : "other")+'">'); itemArr.push('<ul>'); itemArr.push('<li class="nick-name">' + data.username + '</li>'); itemArr.push('<li class="detail">'); itemArr.push('<div class="head-icon"></div>'); itemArr.push('<div class="text">' + data.text + '</div>'); itemArr.push('</li>'); itemArr.push('</ul>'); itemArr.push('</dd>'); document.getElementById('list').innerHTML += itemArr.join(''); } /*展现通知*/ function showNotice(data) { var item = '<dd class="tc"><span>' + data.text + '</span><dd>'; document.getElementById('list').innerHTML += item; } /*回车事件*/ document.onkeyup = function (e) { if (!e) e = window.event; if ((e.keyCode || e.which) == 13) { document.getElementById('send').click(); } } </script>
gihub源码地址:https://github.com/493326889/...