在线上系统中,须要使用node的多进程模型,咱们能够本身实现简易的基于cluster模式的socket分发模型,也可使用比较稳定的pm2这样进程管理工具。在常规的http服务中,这套模式一切正常,但是一旦server中集成了socket.io服务就会致使ws通道创建失败,即便经过backup的polling方式仍会出现时断时连的现象,所以咱们须要解决这种问题,让socket.io充分利用多核。html
在这里之因此提到socket.io而未说websocket服务,是由于socket.io在封装websocket基础上又保证了可用性。在客户端未提供websocket功能的基础上使用xhr polling、jsonp或forever iframe的方式进行兼容,同时在创建ws链接前每每经过几回http轮训确保ws服务可用,所以socket.io并不等于websocket。再往底层深刻研究,socket.io其实并无作真正的websocket兼容,而是提供了上层的接口以及namespace服务,真正的逻辑则是在“engine.io”模块。该模块实现握手的http代理、链接升级、心跳、传输方式等,所以研究engine.io模块才能清楚的了解socket.io实现机制。前端
服务端采用express+socket.io的组合方案,搭配pm2的cluster模式,实现一个简易的b/s通讯demo:node
app.jsnginx
var path = require('path'); var app = require('express')(), server = require('http').createServer(app), io = require('socket.io')(server); io .on('connection', function(socket) { socket.on('disconnect', function() { console.log('/: disconnect-------->') }); socket.on('b:message', function() { socket.emit('s:message', '/: '+port); console.log('/: '+port) }); }); io.of('/ws') .on('connection', function(socket) { socket.on('disconnect', function() { console.log('/ws: disconnect-------->') }); socket.on('b:message', function() { socket.emit('/ws: message', port); }); }); app.get('/page',function(req,res){ res.sendFile(path.join(process.cwd(),'./index.html')); }); server.listen(8080);
index.htmlweb
<script> var btn = document.getElementById('btn1'); btn.addEventListener('click',function(){ var socket = io.connect('http://127.0.0.1:8080/ws',{ reconnection: false }); socket.on('connect',function(){ // 发起“脚手架安装”请求 socket.emit('b:message',{}); socket.on('s:message',function(d){ console.log(d); }); }); socket.on('error',function(err){ console.log(err); }) }); </script>
pm2.jsonredis
{ "apps": [ { "name": "ws", "script": "./app.js", "env": { "NODE_ENV": "development" }, "env_production": { "NODE_ENV": "production" }, "instances": 4, "exec_mode": "cluster", "max_restarts" : 3, "restart_delay" : 5000, "log_date_format" : "YYYY-MM-DD HH:mm Z", "combine_logs" : true } ] }
这样,执行命令pm2 start pm2.json
便可开启服务,访问127.0.0.1:8080/page
,点击按钮发起ws链接,观察控制台便可。算法
下图清晰显示了socket.io握手的错误:
express
可见在websocket链接创建以前多出了3个xhr请求,而websocket链接创建失败后又多出了几个xhr请求,同时最后两个xhr请求失败了。json
socket.io没有采用直接创建websocket链接的粗暴方式,而是首先经过http请求(xhr)访问服务端的相关轮训配置信息以及sid。此处sid相似sessionID,可是它惟一标识链接,可理解为socketId,之后每次http请求cookie中都必须携带sid(httponly);后端
第2、三个请求用于确认链接,在socket.io中,post请求是客户端发送消息给服务端的惟一形式,并且post响应必定是“ok”,它的“content-length”必定为2;而get请求主要用于轮训,同时获取服务端的相关消息,这会在下文中有体现;
第四个websocket链接请求失败,这主要是因为与后端http握手失败形成的;
第五个请求为xhr方式的post请求,它是做为websocket通道创建失败后的一种兼容性处理,上文讲述了socket.io的post请求只在客户端须要发送消息给服务端时才会使用,所以,为了证明咱们查看消息体:
可见,它携带了客户端发出的消息类型b:message,同时包含消息体{}空对象。对应的,服务端返回“OK”;
第六个请求为xhr方式的get请求,用来获取服务端对第五个请求的响应。
至此,大体分析了socket.io创建链接的大体过程以及链接创建失败后如何兜底的方案,下面分析为什么出现握手失败的问题。
实例中pm2主进程开启了4个工做进程,由主进程侦听8080端口并分发请求给工做进程。pm2进程在分发请求的阶段采用了某种算法的均衡,如round-robin或者其余hash方式(但不是iphash),所以在socket.io客户端链接创建阶段发送的多个xhr请求,会被pm2定位到不一样的worker进程中。前文中提到每一个xhr请求都会携带sid字段标识当前链接,所以当一个携带sid字段的请求被pm2定位到另外一个与该链接无关的worker时,就会形成请求失败,返回{"code":1,"message":"Session ID unknown"}错误;即便前三次xhr握手成功,进入websocket链接升级阶段,负责侦听update事件的worker也每每不是以前的那个worder,所以致使websocket链接创建失败。
一言以蔽之,客户端屡次请求的服务端进程不是同一个进程才致使的ws链接没法成功创建。
那么如何才能解决呢?最简单的方案就是确保客户端的每次请求均可以定位到同一个服务进程便可。固然,分布式session一样能够解决问题,依托第三方缓存相似redis并配合一致性hash算法,确保全部服务进程均可以获取到链接信息,相互配合完成链接创建。但这也仅仅是做者在理论上分析的一种实现方式,并无测试经过,由于这种分布式架构不只实现繁杂并且引入了相关依赖redis,不太可取。
那么下文主要针对确保客户端的每次请求均可以定位到同一个服务进程这一点实现解决方案。
官方提供了一种比较轻便的架构:nginx反向代理+iphash
咱们的示例demo中的http服务器只侦听8080端口,所以必须由pm2分发请求,不然会出现端口占用的错误发生。可是,官方的解决方案是每一个进程的socket.io服务器建立不一样端口的http服务器,专一用于http握手和升级,由nginx作握手请求的代理。并且针对nginx必须设置iphash,保证同一个客户端的屡次请求定位到后端同一个服务进程。
这样,示例demo中会占用5个端口,其中8080端口为公用的http服务器使用,其余四个端口则只用于ws链接握手。可是这四个端口却如何选取呢?为了保证扩展性以及顺序性,采用与pm2相兼容的方案。pm2会为每一个worker进程分配一个id,而且将该id绑定到进程的环境变量中,那么咱们就能够利用该worker id生成4个不一样的端口号。
app.js
var path = require('path'); var app = require('express')(), server = require('http').createServer(app), port = 3131 + parseInt(process.env.NODE_APP_INSTANCE), io = require('socket.io')(port); io .on('connection', function(socket) { socket.on('disconnect', function() { console.log('/: disconnect-------->') }); socket.on('b:message', function() { socket.emit('s:message', '/: '+port); console.log('/: '+port) }); }); io.of('/ws') .on('connection', function(socket) { socket.on('disconnect', function() { console.log('disconnect-------->') }); socket.on('b:message', function() { socket.emit('s:message', port); }); }); app.get('/abc',function(req,res){ res.sendFile(path.join(process.cwd(),'./index.html')); }); server.listen(8080);
index.html
<script> var btn = document.getElementById('btn1'); btn.addEventListener('click',function(){ var socket = io.connect('http://ws.vd.net/ws',{ reconnection: false }); socket.on('connect',function(){ // 发起“脚手架安装”请求 socket.emit('b:message',{a:1}); socket.on('s:message',function(d){ console.log(d); }); }); socket.on('error',function(err){ console.log(err); }) }); </script>
nginx.conf
upstream io_nodes { ip_hash; server 127.0.0.1:3131; server 127.0.0.1:3132; server 127.0.0.1:3133; server 127.0.0.1:3134; } server { listen 80; server_name ws.vd.net; location / { proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Host $host; proxy_http_version 1.1; proxy_pass http://io_nodes; } }
在本机绑定hosts地址后开启nginx服务,同时开启服务器,点击按钮创建ws链接成功。
服务端路由,意义在于“服务端作worker的负载均衡,并将选择的worker ip和端口渲染在页面,以后浏览器的全部ws链接默认链接到对应 ip:port的服务器中”。这样只要是服务端渲染的页面均可以采用这种方式实现。
若是页面采用前端异步渲染,仍能够采用这种方式,不过首先经过xhr请求向服务端获取须要握手的http服务器的ip和端口,而后在进行ws链接。
服务端路由的前提仍然是须要针对每一个ws服务器分配一个端口,只不过去掉nginx由服务端作ip hash。采用服务端路由架构清晰,并且实现容易,兼容性好。
此处的上帝进程即为主进程,相似pm2进程。上帝进程路由则是在上帝进程层面上作请求的定向分发,保证请求主机和进程的一致性。在上帝进程中,针对每一个请求的ip作hash,并对每个ws服务器建立单独的http服务器用于握手升级。
简易代码:
var express = require('express'), cluster = require('cluster'), net = require('net'), sio = require('socket.io'); var port = 3000, num_processes = require('os').cpus().length; if (cluster.isMaster) { var workers = []; var spawn = function(i) { workers[i] = cluster.fork(); workers[i].on('exit', function(code, signal) { console.log('respawning worker', i); spawn(i); }); }; for (var i = 0; i < num_processes; i++) { spawn(i); } // ip hash var worker_index = function(ip, len) { var s = ''; for (var i = 0, _len = ip.length; i < _len; i++) { if (!isNaN(ip[i])) { s += ip[i]; } } return Number(s) % len; }; var server = net.createServer({ pauseOnConnect: true }, function(connection) { var worker = workers[worker_index(connection.remoteAddress, num_processes)]; worker.send('sticky-session:connection', connection); }).listen(port); } else { // worker var app = new express(); // handshake server. var server = app.listen(0, 'localhost'), io = sio(server); process.on('message', function(message, connection) { if (message !== 'sticky-session:connection') { return; } server.emit('connection', connection); connection.resume(); }); }
本文实现了三种解决方案,归根到底就是“ip hash”,不一样点在于在请求处理的不一样阶段作ip hash。
能够在请求处理最前端作iphash,即nginx方式,这也就是第一种方案; 能够在请求处理的第二层分发处作iphash,即上帝进程路由的方式,即第三种; 也能够在请求处理的终端作iphash,即服务端路由的方式,也就是第二种; 同时共享session也一样能够实现,借助socket.io-redis模块也能够实现。