咱们的项目是基于 ThinkJS + Vue 开发的,最近实现了一个多端实时同步数据的功能,因此想写一篇文章来介绍下如何在 ThinkJS 的项目中利用 WebSocket 实现多端的实时通讯。ThinkJS 是基于 Koa 2 开发的企业级 Node.js 服务端框架,文章中会从零开始实现一个简单的聊天室,但愿读者们能有所收获。javascript
WebSocket 是 HTML5 中提出的一种协议。它的出现是为了解决客户端和服务端的实时通讯问题。在 WebSocket 出现以前,若是想实现实时消息传递通常有两种方式:html
能够看到,这两种实现方式的本质仍是客户端向服务端“Pull”的过程,并无一个服务端主动“Push”到客户端的方式,全部的方式都是依赖客户端先发起请求。为了知足两方的实时通讯, WebSocket 应运而生。java
首先,WebSocket 是基于 HTTP 协议的,或者说借用了 HTTP 协议来完成链接的握手部分。其次,WebSocket 是一个持久化协议,相对于 HTTP 这种非持久的协议来讲,一个 HTTP 请求在收到服务端回复后会直接断开链接,下次获取消息须要从新发送 HTTP 请求,而 WebSocket 在链接成功后能够保持链接状态。下图应该能体现二者的关系:git
在发起 WebSocket 请求时须要先经过 HTTP 请求告诉服务端需求将协议升级为 WebSocket。github
浏览器先发送请求:web
GET / HTTP/1.1
Host: localhost:8080
Origin: [url=http://127.0.0.1:3000]http://127.0.0.1:3000[/url]
Connection: Upgrade
Upgrade: WebSocket
Sec-WebSocket-Version: 13
Sec-WebSocket-Key: w4v7O6xFTi36lq3RNcgctw==
复制代码
服务端回应请求:redis
HTTP/1.1 101 Switching Protocols
Connection:Upgrade
Upgrade: WebSocket
Sec-WebSocket-Accept: Oy4NRAQ13jhfONC7bP8dTKb4PTU=
复制代码
在请求头中核心的部分是 Connection 和 Upgrade ,经过这两个字段服务端会将 HTTP 升级为 WebSocket 协议。服务端返回对应信息后链接成功,客户端和服务端就能够正常通讯了。json
随着新标准的推动,WebSocket 已经比较成熟了,而且各个主流浏览器对 WebSocket 的支持状况比较好(不兼容低版本 IE,IE 10 如下)bootstrap
Socket.io 是一个彻底由 JavaScript 实现、基于 Node.js、支持 WebSocket 协议的用于实时通讯、跨平台的开源框架。它包括了客户端的 JavaScript 和服务器端的 Node.js,而且有着很好的兼容性,会根据浏览器的支持状况选择不一样的方式进行通信,如上面介绍的轮询和 HTTP 长链接。api
对于 WebSocket 目前 ThinkJS 支持了 Socket.io 并对其进行了一些简单的包装,只须要进行一些简单的配置就可
以使用 WebSocket 了。
ThinkJS 默认采用了多进程模型,每次请求会根据策略输送到不一样的进程中执行,关于其多进程模型能够参考《细谈 ThinkJS 多进程模型》。而 WebSocket 链接前须要使用 HTTP 请求来完成握手升级,多个请求须要保证命中相同的进程,才能保证握手成功。这个时候就须要开启 StickyCluster 功能,使客户端全部的请求命中同一进程。修改配置文件 src/config/config.js
便可。
module.exports = {
stickyCluster: true,
// ...
}
复制代码
在 src/config/extend.js
引入 WebSocket:
const websocket = require('think-websocket');
module.exports = [
// ...
websocket(think.app),
];
复制代码
在 src/config/adapter.js
文件中配置 WebSocket
const socketio = require('think-websocket-socket.io');
exports.websocket = {
type: 'socketio',
common: {
// common config
},
socketio: {
handle: socketio,
messages: {
open: '/websocket/open', //创建链接时处理对应到 websocket Controller 下的 open Action
close: '/websocket/close', // 关闭链接时处理的 Action
room: '/websocket/room' // room 事件处理的 Action
}
}
}
复制代码
配置中的 message
对应着事件的映射关系。好比上述的例子,客户端触发 room 事件,服务端须要在 websocket controller 下的 roomAction
中处理消息。
建立处理消息的 controller 文件。上面的配置是 /websocket/xxx
,因此直接在项目根目录 src/controller
下建立 websocket.js 文件。
module.exports = class extends think.Controller {
// this.socket 为发送消息的客户端对应的 socket 实例, this.io 为Socket.io 的一个实例
constructor(...arg) {
super(...arg);
this.io = this.ctx.req.io;
this.socket = this.ctx.req.websocket;
}
async openAction() {
this.socket.emit('open', 'websocket success')
}
closeAction() {
this.socket.disconnect(true);
}
};
复制代码
这时候服务端代码就已经配置完了。
客户端代码使用比较简单,只须要引入 socket.io.js 就能够直接使用了。
<script src="https://lib.baomitu.com/socket.io/2.0.1/socket.io.js"></script>
复制代码
引入后在初始化代码建立 WebSocket 链接:
this.socket = io();
this.socket.on('open', data => {
console.log('open', data)
})
复制代码
这样一个最简单的 WebSocket 的 demo 就完成了,打开页面的时候会自动建立一个 WebSocket 链接,建立成功后服务端会触发 open 事件,客户端在监听的 open 事件中会接收到服务端返回的 websocket success 字符串。
接下来咱们开始实现一个简单的聊天室。
从刚才的内容中咱们知道每一个 WebSocket 链接的建立会有一个 Socket 句柄建立,对应到代码中的 this.socket
变量。因此本质上聊天室人与人的通讯能够转换成每一个人对应的 Socket 句柄的通讯。我只须要找到这我的对应的 Socket 句柄,就能实现给对方发送消息了。
简单来实现咱们能够设置一个全局变量来存储链接到服务端的 WebSocket 的一些信息。在 src/bootstrap/global.js 中设置全局变量:
global.$socketChat = {};
复制代码
而后在 src/bootstrap/worker.js 中引入global.js,使全局变量生效。
require('./global');
复制代码
而后在服务端 controller 增长 roomAction
和 messageAction
, messageAction
用来接收客户端用户的聊天信息,并将信息发送给全部的客户端成员。 roomAction
用来接收客户端进入/离开聊天室的信息。这两个的区别是聊天消息是须要同步到全部的成员因此使用 this.io.emit
,聊天室消息是同步到全部除当前客户端外的全部成员因此使用this.socket.broadcast.emit
module.exports = class extends think.Controller {
constructor(...arg) {
super(...arg);
this.io = this.ctx.req.io;
this.socket = this.ctx.req.websocket;
global.$socketChat.io = this.io;
}
async messageAction() {
this.io.emit('message', {
nickname: this.wsData.nickname,
type: 'message',
message: this.wsData.message,
id: this.socket.id
})
}
async roomAction() {
global.$socketChat[this.socket.id] = {
nickname: this.wsData.nickname,
socket: this.socket
}
this.socket.broadcast.emit('room', {
nickname: this.wsData.nickname,
type: 'in',
id: this.socket.id
})
}
async closeAction() {
const closeSocket = global.$socketChat[this.socket.id];
const nickname = closeSocket && closeSocket.nickname;
this.socket.disconnect(true);
this.socket.removeAllListeners();
this.socket.broadcast.emit('room', {
nickname,
type: 'out',
id: this.socket.id
})
delete global.$socketChat[this.socket.id]
}
}
复制代码
客户端经过监听服务端 emit 的事件来处理信息
this.socket.on('message', data => {
// 经过socket的id的对比,判断消息的发送方
data.isMe = (data.id === this.socket.id);
this.chatData.push(data);
})
this.socket.on('room', (data) => {
this.chatData.push(data);
})
复制代码
经过 emit 服务端对应的 action 来发送消息
this.socket.emit('room', {
nickname: this.nickname
})
this.socket.emit('message', {
message: this.chatMsg,
nickname: this.nickname
})
复制代码
根据发送/接收消息的type判断消息类型
<div class="chat-box">
<div v-for="(item, index) in chatData" :key="index">
<p v-if="item.type == 'in'" class="enter-tip">{{item.nickname}}进入聊天室</p>
<p v-if="item.type == 'out'" class="enter-tip">{{item.nickname}}离开聊天室</p>
<p v-else-if="item.type == 'message'" :class="['message',{'me':item.isMe}]">
{{item.nickname}}:{{item.message}}
</p>
</div>
</div>
复制代码
至此一个简单的聊天室就完成了。
刚才咱们说了通讯的本质实际上是 Socket 句柄查询使用的过程,本质上咱们是利用全局变量存储全部的 WebSocket 句柄的方式解决了 WebSocket 链接查找的问题。可是当咱们的服务端扩容后,会出现多个服务器都有 WebSocket 链接,这个时候跨节点的 WebSocket 链接查找使用全局变量的方式就无效了。此时咱们就就须要换一种方式来实现跨服务器的通讯同步,通常有如下几种方式:
发送消息不直接执行 emit
事件,而是将消息发送到消息队列中,而后全部的节点对这条消息进行消费。拿到数据后查看接收方的 WebSocket 链接是否在当前节点上,不在的话就忽略这条数据,在的话则执行发送的动做。
经过外部存储服务例如 Redis 充当以前的“全局变量”的角色,全部的节点建立 WebSocket 链接后都向 Redis 中注册一下,告诉你们有个叫 “A” 家伙的链接在 “192.168.1.1” 这。当 B 要向 A 发送消息的时候它去 Redis 中查找到 A 的链接所处的节点后,通知 192.168.1.1 这个节点 B 要向 A 发送消息,而后节点会执行发送的动做。
Redis 的 pub/sub 是一种消息通讯模式:发送者(pub)发送消息,订阅者(sub)接收消息。WebSocket 的一个节点接收到消息后,经过 Redis 发布(pub),其余节点做为订阅者(sub)接收消息再进行后续处理。
此次咱们将在聊天室的 demo 上实现节点通讯的功能。
首先,在 websocket controller 文件中增长接口调用
const ip = require('ip');
const host = ip.address();
module.exports = class extends think.Controller {
async openAction() {
// 记录当前 WebSocket 链接到的服务器ip
await global.rediser.hset('-socket-chat', host, 1);
}
emit(action, data) {
if (action === 'message') {
this.io.emit(action, data)
} else {
this.socket.broadcast.emit(action, data);
}
this.crossSync(action, data)
}
async messageAction() {
const data = {
nickname: this.wsData.nickname,
type: 'message',
message: this.wsData.message,
id: this.socket.id
};
this.emit('message', data);
}
async closeAction() {
const connectSocketCount = Object.keys(this.io.sockets.connected).length;
this.crossSync(action, data);
if (connectSocketCount <= 0) {
await global.rediser.hdel('-socket-chat', host);
}
}
async crossSync(action, params) {
const ips = await global.rediser.hkeys('-socket-chat').filter(ip => ip !== host);
ips.forEach(ip => request({
method: 'POST',
uri: `http://${ip}/api/websocket/sync`,
form: {
action,
data: JSON.stringify(params)
},
json: true
});
);
}
}
复制代码
而后在 src/controller/api/websocket
实现通讯接口
const Base = require('../base');
module.exports = class extends Base {
async syncAction() {
const {action, data} = this.post();
const blackApi = ['room', 'message', 'close', 'open'];
if (!blackApi.includes(action)) return this.fail();
// 因为是跨服务器接口,因此直接使用io.emit发送给当前全部客户端
const io = global.$socketChat.io;
io && io.emit(action, JSON.parse(data));
}
};
复制代码
这样就实现了跨服务的通讯功能,固然这只是一个简单的 demo ,可是基本原理是相同的。
第二种 Redis (sub/pub) 的方式,socket.io 提供了一种官方的库 socket.io-redis 来实现。它在 Redis 的 pub/sub 功能上进行了封装,让开发者能够忽略 Redis 相关的部分,方便了开发者使用。使用时只须要传入 Redis 的配置便可。
// Thinkjs socket.io-redis 配置
const redis = require('socket.io-redis');
exports.websocket = {
...
socketio: {
adapter: redis({ host: 'localhost', port: 6379 }),
message: {
...
}
}
}
// then controller websocket.js
this.io.emit('hi', 'all sockets');
复制代码
若是想经过非 socket.io 进程向 socket.io 服务通讯,例如:HTTP,可使用官方的 socket.io-emitter库。使用方式以下:
var io = require('socket.io-emitter')({ host: '127.0.0.1', port: 6379 });
setInterval(function(){
io.emit('time', new Date);
}, 5000);
复制代码
整个聊天室的代码已经上传到github,你们能够直接下载体验聊天室示例。