WebSocket+Netty构建web聊天程序

WebSocket

传统的浏览器和服务器之间的交互模式是基于请求/响应的模式,虽然可使用js发送定时任务让浏览器在服务器中拉取可是弊端很明显,首先就是不能避免的延迟,其次就是频繁的请求,让服务器的压力骤然提高html

WebSocket是H5新增的协议,用于构建浏览器和服务器之间的不受限的长链接的通讯模式,再也不局限于请求/响应式的模型,服务端能够主动推送消息给客户端,(游戏有某个玩家得奖了的弹幕)基于这个特性咱们能够构建咱们的实时的通讯程序前端

协议详情:

websocket创建链接时,是经过浏览器发送的HTTP请求,报文以下:java

GET ws://localhost:3000/ws/chat HTTP/1.1
Host: localhost
Upgrade: websocket
Connection: Upgrade
Origin: http://localhost:3000
Sec-WebSocket-Key: client-random-string
Sec-WebSocket-Version: 13
  • 首先GET请求是以 ws开头的
  • 其中请求头中的Upgrade: websocket Connection: Upgrade表示尝试创建WebSocket链接

对于服务端的相应数据web

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: server-random-string

其中的101,表示服务端支持WebSocket协议, 双方基于Http请求,成功创建起WebSocket链接,双方之间的通讯也再也不经过HTTPajax

JS对WebSocket的封装对象

对于JS的WebSocket对象来讲,它经常使用 4个回调方法,以及两个主动方法数据库

方法名 做用
onopen() 和服务端成功创建链接后回调
onmessage(e) 收到服务端的的消息后回调,e为消息对象
onerror() 连接出现异常回调,如服务端关闭
onclose() 客户端单方面断开链接时回调
send(e) 主动向服务端推送消息
close() 主动关闭通道

再次对WebSocket进行封装

知道了回调函数回调时机,咱们接下来要作的就是在他的整个生命周期的不一样回调函数中,添加咱们指定的动做就ok了,下面是经过Window定义一个全局的聊天对象CHATjson

window.CHAT={
var socket = null;
// 初始化socket
init:function(){
// 判断当前的浏览器是否支持WebSocket
if(window.WebSocket){
    // 检验当前的webSocket是否存在,以及链接的状态,如已经链接,直接返回
    if(CHAT.socket!=null&&CHAT.socket!=undefined&&CHAT.socket.readyState==WebSocket.OPEN){
        return false;
    }else{// 实例化 , 第二个ws是咱们能够自定义的, 根据后端的路由来
        CHAT.socket=new WebSocket("ws://192.168.43.10:9999/ws");
        // 初始化WebSocket原生的方法
        CHAT.socket.onopen=CHAT.myopen();
        CHAT.socket.onmessage=CHAT.mymessage();
        CHAT.socket.onerror=CHAT.myerror();
        CHAT.socket.onclose=CHAT.myclose(); 
    
    }
}else{
    alert("当前设备不支持WebSocket");
}
}
// 发送聊天消息
chat:function(msg){
    // 若是的当前的WebSocket是链接的状态,直接发送 不然重新链接
    if(CHAT.socket.readyState==WebSocket.OPEN&&CHAT.socket!=null&&CHAT.socket!=undefined){
        socket.send(msg);
    }else{
        // 从新链接
        CHAT.init();
        // 延迟一会,重新发送
        setTimeout(1000);
        CHAT.send(msg);
    }
}
// 当链接创建完成后对调
myopen:function(){
    // 拉取链接创建以前的未签收的消息记录
    // 发送心跳包
}
mymessage:function(msg){
    // 由于服务端能够主动的推送消息,咱们提早定义和后端统一msg的类型, 如,拉取好友信息的消息,或 聊天的消息
    if(msg==聊天内容){
    // 发送请求签收消息,改变请求的状态
    // 将消息缓存到本地
    // 将msg 转换成消息对象, 植入html进行渲染
    }else if(msg==拉取好友列表){
    // 发送请求更新好友列表
    }
    
}
myerror:function(){
    console.log("链接出现异常...");
}
myclose:function(){
    console.log("链接关闭...");
}
keepalive: function() {
    // 构建对象
    var dataContent = new app.DataContent(app.KEEPALIVE, null, null);
    // 发送心跳
    CHAT.chat(JSON.stringify(dataContent));
    
    // 定时执行函数, 其余操做
    // 拉取未读消息
    // 拉取好友信息
}

}

对消息类型的约定

WebSocket对象经过send(msg);方法向后端提交数据,常见的数据以下:后端

  • 客户端发送聊天消息
  • 客户端签收消息
  • 客户端发送心跳包
  • 客户端请求创建链接

为了使后端接收到不一样的类型的数据作出不一样的动做, 因而咱们约定发送的msg的类型;浏览器

// 消息action的枚举,这个枚举和后端约定好,统一值
CONNECT: 1,     // 第一次(或重连)初始化链接
CHAT: 2,        // 聊天消息
SIGNED: 3,      // 消息签收
KEEPALIVE: 4,   // 客户端保持心跳
PULL_FRIEND:5,  // 从新拉取好友

// 消息模型的构造函数
ChatMsg: function(senderId, receiverId, msg, msgId){
    this.senderId = senderId;
    this.receiverId = receiverId;
    this.msg = msg;
    this.msgId = msgId;
}

//  进一步封装两个获得最终版消息模型的构造函数
DataContent: function(action, chatMsg, extand){
    this.action = action;
    this.chatMsg = chatMsg;
    this.extand = extand;
}

如何发送数据?

咱们使用js,给发送按钮绑定点击事件,一经触发,从缓存中获取出咱们须要的参数,调用缓存

CHAT.chat(Json.stringify(dataContent));

后端netty会解析dataContent的类型,进一步处理

如何签收未与服务器链接时好友发送的消息?

  • 消息的签收时机:
    之因此会有未签收的信息,是由于客户端未与服务端创建WebSocket链接, 当服务端判断他维护的channel组中没有接受者的channel时,不会发送数据,而是把数据持久化到数据库,而且标记flag=未读, 因此咱们签收信息天然放在客户端和服务端创建起链接时的回调函数中执行

  • 步骤:
    • 客户端经过js请求,拉取所有的和本身相关的flag=未读的消息实体列表
    • 从回调函数数中,把列表中的数据取出,缓存在本地
    • 将列表中的数据回显在html页面中
    • 和后端约定,将该列表中全部的实例的id取出,用逗号分隔拼接成字符串, 以action=SIGNED的方式发送给后端,让其进行签收

Netty对WebSocket的支持

首先每个Netty服务端的程序都是神似的,想建立不一样的服务端,就得给Netty装配的pipeline不一样的Handler

针对聊天程序,处理String类型的Json信息,咱们选取SimpleChannelInboundHandler, 他是个典型的入站处理器,而且若是咱们没有出来数据,她会帮咱们回收 重写它里面未实现抽象方法,这些抽象方法一样是回调方法, 当一个新的Channel进来, 它注册进Selector上的过程当中,会回调不一样的抽象方法

方法名 回调时机
handlerAdded(ChannelHandlerContext ctx) Pepiline中的Handler添加完成回调
channelRegistered(ChannelHandlerContext ctx) channel注册进Selector后回调
channelActive(ChannelHandlerContext ctx) channel处于活动状态回调
channelReadComplete(ChannelHandlerContext ctx) channel, read结束后回调
userEventTriggered(ChannelHandlerContext ctx, Object evt) 当出现用户事件时回调,如 读/写
channelInactive(ChannelHandlerContext ctx) 客户端断开链接时回调
channelUnregistered(ChannelHandlerContext ctx) 客户端断开链接后,取消channel的注册时回调
handlerRemoved(ChannelHandlerContext ctx) 取消channel的注册后,将channel移除ChannelGroup后回调
exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 出现异常时回调

handler的设计编码

要作到点对点的聊天,前提是服务端拥有所有的channel由于全部数据的读写都依赖于它,而 netty为咱们提供了ChannelGroup 用来保存全部新添加进来的channel, 此外点对点的聊天,咱们须要将用户信息和它所属的channel进行一对一的绑定,才能够精准的匹配出两个channel进而数据交互, 所以添加UserChannel映射类

public class UserChanelRelationship {
    private static HashMap<String, Channel> manager = new HashMap<>();
    public static  void put(String sendId,Channel channel){
        manager.put(sendId,channel);
    }
    public static Channel get(String sendId){
        return  manager.get(sendId);
    }
    public static void outPut(){
        for (HashMap.Entry<String,Channel> entry:manager.entrySet()){
            System.out.println("UserId: "+entry.getKey() + "channelId: "+entry.getValue().id().asLongText());
        }
    }
}

咱们把User和Channel之间的关系以键值对的形式存放进Map中,服务端启动后,程序就会维护这个map, 那么问题来了? 何时添加二者之间的映射关系呢? 看上handler的回调函数,咱们选择 channelRead0() 当咱们判断出 客户端发送过来的信息是 CONNECT类型时,添加映射关系

下面是handler的处理编码

public class MyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 用于管理整个客户端的 组
public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame frame) throws Exception {
Channel currentChanenl = channelHandlerContext.channel();

// 1. 获取客户端发送的消息
String content = frame.text();
System.out.println("  content:  "+content);

// 2. 判断不一样的消息的类型, 根据不一样的类型进行不一样的处理
    // 当创建链接时, 第一次open , 初始化channel,将channel和数据库中的用户作一个惟一的关联
DataContent dataContent = JsonUtils.jsonToPojo(content,DataContent.class);
Integer action = dataContent.getAction();

if (action == MsgActionEnum.CHAT.type) {

    // 3. 把聊天记录保存到数据库
    // 4. 同时标记消息的签收状态 [未签收]
    // 5. 从咱们的映射中获取接受方的chanel  发送消息
    // 6. 从 chanelGroup中查找 当前的channel是否存在于 group, 只有存在,咱们才进行下一步发送
    //  6.1 若是没有接受者用户channel就不writeAndFlush, 等着用户上线后,经过js发起请求拉取未接受的信息
    //  6.2 若是没有接受者用户channel就不writeAndFlush, 能够选择推送

}else if (action == MsgActionEnum.CONNECT.type){
    // 当创建链接时, 第一次open , 初始化channel,将channel和数据库中的用户作一个惟一的关联
    String sendId = dataContent.getChatMsg().getSenderId();
    UserChanelRelationship.put(sendId,currentChanenl);
    
}else if(action == MsgActionEnum.SINGNED.type){
    // 7. 当用户没有上线时,发送消息的人把要发送的消息持久化在数据库,可是却没有把信息写回到接受者的channel, 把这种消息称为未签收的消息
    
    // 8. 签收消息, 就是修改数据库中消息的签收状态, 咱们和前端约定,前端如何签收消息在上面有提到
    String extend = dataContent.getExtand();
    // 扩展字段在 signed类型表明 须要被签收的消息的id, 用逗号分隔
    String[] msgIdList = extend.split(",");
    List<String> msgIds = new ArrayList<>();
    Arrays.asList(msgIdList).forEach(s->{
        if (null!=s){
            msgIds.add(s);
        }
    });
    if (!msgIds.isEmpty()&&null!=msgIds&&msgIds.size()>0){
        // 批量签收
    }

}else if (action == MsgActionEnum.KEEPALIVE.type){
    // 6. 心跳类型
    System.out.println("收到来自channel 为" +currentChanenl+" 的心跳包... ");
}

}

// handler 添加完成后回调
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 获取连接, 而且若想要群发的话,就得往每个channel中写数据, 所以咱们得在建立链接时, 把channel保存起来
System.err.println("handlerAdded");
users .add(ctx.channel());
}

// 用户关闭了浏览器回调
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 断开链接后, channel会自动移除group
// 咱们主动的关闭进行, channel会被移除, 可是咱们若是是开启的飞行模式,不会被移除
System.err.println("客户端channel被移出: "+ctx.channel().id().asShortText());
users.remove(ctx.channel());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 发生异常关闭channel, 并从ChannelGroup中移除Channel
ctx.channel().close();
users.remove(ctx.channel());
}

... 其余方法

先后端的心跳维持

双方创建起WebSocket链接后,服务端须要明确的知道,本身维护的诸多channel中,谁已经挂掉了, 为了提升性能,须要及早把废弃的channel移除ChanelGroup

客户端杀掉了进程,或者开启了飞行模式, 这时服务端是感知不到它维护的channel中已经有一个不能使用了,首先来讲,维护一个不能使用的channel会影响性能,并且当这个channel的好友给他发送消息时,服务端认为用户在线,因而向一个不存在的channel写入刷新数据,会带来额外的麻烦

这时咱们就须要添加心跳机制,客户端设置定时任务,每一个一段时间就往服务端发送心跳包,心跳包的内容是什么不是重点,它的做用就是告诉服务端本身还active, N多个客户端都要向服务端发送心跳,这并不会增长服务端的请求,由于这个请求是经过WebSocket的send方法发送过去的,只不过dataContent的类型是 KEEPALIVE , 一样这是咱们提早约定好的(此外,服务端向客户端发送心跳看起来是没有必要的)

因而对于后端来讲,咱们发送的心跳包,会使得当前客户端对应的channel的channelRead0()方法回调, netty为咱们提供了心跳相关的handler, 每一次的chanelRead0()的回调,都是read/write事件, 下面是netty对心跳的支持的实现

/**
 * @Author: Changwu
 * @Date: 2019/7/2 9:33
 * 咱们的心跳handler不须要实现handler0方法,咱们选择,直接继承SimpleInboundHandler的父类
*/
public class HeartHandler extends ChannelInboundHandlerAdapter {
// 咱们重写  EventTrigger 方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 当出现read/write  读写写空闲时触发
if(evt instanceof IdleStateEvent){
    IdleStateEvent event = (IdleStateEvent) evt;

    if (event.state()== IdleState.READER_IDLE){ // 读空闲
        System.out.println(ctx.channel().id().asShortText()+" 读空闲... ");
    }else if (event.state()==IdleState.WRITER_IDLE){
        System.out.println(ctx.channel().id().asShortText()+" 写空闲... ");
    }else if (event.state()==IdleState.ALL_IDLE){
        System.out.println("channel 读写空闲, 准备关闭当前channel  , 当前UsersChanel的数量: "+MyHandler.users.size());
        Channel channel = ctx.channel();
        channel.close();
        System.out.println("channel 关闭后, UsersChanel的数量: "+MyHandler.users.size());
    }
}
}

Handler咱们再也不使用SimpleChannelInboundHandler了,由于它当中的方法都是抽象方法,而咱们须要回调的函数时机是,每次当有用户事件时回调, 好比read,write事件, 这些事件能够证实channel还活着,对应的方法是userEventTriggered()

此外, ChannelInboundHandlerAdapter是netty中,适配器模式的体现, 它实现了全都抽象方法,而后他的实现方法中并非在干活,而是把这个事件往下传播下去了,如今咱们重写userEventTriggered() 执行的就是咱们的逻辑

另外,咱们须要在pipeline中添加handler

... 
/ 添加netty为咱们提供的 检测空闲的处理器,  每 20 40 60 秒, 会触发userEventTriggered事件的回调
pipeline.addLast(new IdleStateHandler(10,20,30));
// todo 添加心跳的支持
pipeline.addLast("heartHandler",new HeartHandler());

服务端主动向客户端推送数据

如, 添加好友的操做中, A向B发送添加好友请求的过程,会通过以下几步

  • A向服务端发送ajax请求,将本身的id, 目标朋友的id持久化到 数据库,请求friend_request表
  • 用户B上线,经过js,向后端拉取friend_request表中有没有关于本身的信息,因而服务端把A的请求给B推送过去
  • 在B的前端回显A的请求, B进一步处理这个信息, 此时两种状况
    • B拒绝了A的请求: 后端把friend_request表关于AB的信息清除
    • B赞成了A的请求: 后端在firend_List表中,将AB双方的信息都持久化进去, 这时咱们能够顺势在后端的方法中,给B推送最新的联系人信息, 可是这不属于主动推送,由于此次会话是客户端主动发起的

可是A殊不知道,B已经赞成了,因而须要给A主动的推送数据, 怎么推送呢? 咱们须要在上面的UserChannel的关系中,拿出发送者的channel, 而后往回writeAndFlush内容,这时A就得知B已经赞成了,从新加载好友列表

相关文章
相关标签/搜索