最近在构建两个系统的实时通讯部分,总结一下所学。前端
这是一个系列文章,暂时主要构思四个部分git
这个是我在造的玩具的一个简单架构图。将实时通讯部分给抽离出来做为一个Websocket节点,造成了一个简单的分布式系统,而后经过Redis的Pub/Sub作Websocket集群之间的通讯以及Websocket节点与Restful API节点的通讯(好比用户调用Restful API发表文章以后通知Websocket推送新消息小红点给前端)。 github
分布式系统的坑 左耳朵耗子:从亚马逊的实践,谈分布式系统的难点web
本文主要介绍下分布式Websocket集群解决方案,最后会有个可运行的Demo。redis
在这篇博客里,咱们最终但愿构建一个Websocket集群来实现与客户端的实时通讯,好比聊天室。咱们固然能够经过简单的demo构建一个Websocket服务器并让全部客户端链接这台机器,但当这个聊天室的交互量很是庞大呢?好比斗鱼的直播弹幕,我去斗鱼看了下请求,从命名也能够看到其创建了一个ws链接,叫作danmuproxy.douyu.com
,以下图。数据库
那么问题来了,若是我只使用一台服务器,如何去支持可能有10万人同时加入的这个聊天室呢?显然咱们须要一个解决方案,好比将流量负载均衡到不一样的服务器上并提供一种通讯机制让各个服务器能进行消息同步(否则用户A连上服务器A,用户B脸上服务器B,它们发消息的时候对方都无法收到)。安全
其实从上图的名字来看就知道斗鱼链接的这个danmuproxy.douyu.com
中的proxy
就大体能推断出他们也是把流量作了一个分发。服务器
因为和普通的HTTP服务器的负载均衡不一样,上一节也说到了这些Websocket服务器须要共享信息(固然,须要作Session共享的服务器也同样)。这意味着客户端与Websocket服务器的交互是有状态(stateful)的,咱们须要把每一个客户端的链接数据保存在内存中。而当咱们要实现分布式的时候,咱们则须要在各个机器上共享这些信息,因此咱们须要一个Publish/Subscribe broker(其实broker之前上学讲软件设计体系结构的时候学过,但当时太萌新了没理解)。接下来举个例子。websocket
假设咱们如今使用Redis做为咱们的解决方案,而后咱们如今有三台Websocket服务器WS1
,WS2
和WS3
。而后每台服务器上连了三个用户。WS1
机器上的其中一个用户发送了某个消息到聊天室,在你的Websocket服务器的逻辑中,你首先会把这个消息存入数据库作一个持久化(好比作历史消息),而后将这个消息根据channelId之类的东西推送至这个聊天室的channel(Websocket的channel的实现会在下一篇中详细讲),咱们假设这个channelId叫“The☆World”。架构
如今你把数据安全的存入了DB里,而且你发布了一个事件给你的Pub/Sub broker(Redis channel)来通知其余对此感兴趣的部分(其余Websocket或者API服务器等)。因此以前的另外两个服务器WS2
和WS3
由于对这部分感兴趣因此他们也经过脚本监听了这一个Redis channel,它们就会获得通知,而后每一个服务器就会对DB请求query获取更新而后emit消息给Websocket上对应channel。
这就是大家能够看到的,使用Pub/Sub brooker来实现了一个横向扩展的Websocket集群。
从这里也能够看到集群具备的有点,高扩展性以及高可用性。
此次实现使用了个人一台高配阿里云国内服务器和一台比较low的阿里云9元学生服务器以及高配服务器上的redis。
首先配置Nginx作负载均衡,下图是个人配置,只是个Demo没作wss相关的。
代码都在github上。
Demo的代码也很短
const WebSocket = require('ws');
const publicIp = require('public-ip');
const uuidv1 = require('uuid/v1');
const redis = require("redis");
const config = require('./config');
const sub = redis.createClient(config.DB.REDIS_PORT, config.DB.REDIS_HOST);
const pub = redis.createClient(config.DB.REDIS_PORT, config.DB.REDIS_HOST);
if (config.DB.REDIS_PASSWORD) {
sub.auth(config.DB.REDIS_PASSWORD);
pub.auth(config.DB.REDIS_PASSWORD);
}
const wss = new WebSocket.Server({ port: 2333 });
const ip2name = {
'47.94.233.234': '梁王的高配据点',
'115.28.68.89': '梁王的9块服务器',
}
let sockets = {};
wss.on('connection', function connection(ws) {
const uuid = uuidv1();
ws.uuid = uuid;
sockets[uuid] = ws;
ws.on('message', function incoming(message) {
// publish消息给其余服务器
pub.publish('channel', `${ws.uuid}>${message}`);
console.log(`publish to channel: ${ws.uuid}>${message}`)
// 向本服务器的socket广播
wss.clients.forEach(function each(client) {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(`来自${ws.from || '???'}的用户${ws.uuid}发送了: ${message}`);
}
});
});
publicIp.v4().then(ip => {
console.log(ip);
ws.from = ip2name[ip] ? ip2name[ip] : '未知';
ws.send(`你链接的服务器为${ws.from}`);
});
});
// 监听其余服务器发送的消息
sub.on('message', function(channel, message) {
console.log(`channel ${channel}, ${message}`)
if (channel == 'channel')
{
var messageArr = message.split('>');
var uuid = messageArr[0]
var wsFrom = sockets[uuid];
var content = messageArr[1];
// 若是socket是非本服务器的
if(!wsFrom) {
wss.clients.forEach(function each(client) {
client.send(`来自其余服务器的用户${uuid}发送了: ${content}`);
});
}
}
});
sub.subscribe('channel');
复制代码
能够用如下代码在控制台中尝试,服务器后期可能会关。
var socket = new WebSocket('ws://websocket-demo.lwio.me');
// Listen for messages
socket.addEventListener('message', function (event) {
console.log('收到了', event.data);
});
// socket.send('keke')
复制代码
4月1号更新,妈耶今天阿里云一直报警,大家就看我redis直接暴露到公网就给我来了一波是吧。学习了学习了,向信安大佬低头。
参考资料: