传统的浏览器和服务器之间的交互模式是基于请求/响应的模式,虽然可使用js发送定时任务让浏览器在服务器中拉取可是弊端很明显,首先就是不能避免的延迟,其次就是频繁的请求,让服务器的压力骤然提高html
WebSocket是H5新增的协议,用于构建浏览器和服务器之间的不受限的长链接的通讯模式,再也不局限于请求/响应式的模型,服务端能够主动推送消息给客户端,(游戏有某个玩家得奖了的弹幕)基于这个特性咱们能够构建咱们的实时的通讯程序前端
websocket创建链接时,是经过浏览器发送的HTTP请求,报文以下:java
GET ws://localhost:3000/ws/chat HTTP/1.1 Host: localhost Upgrade: websocket Connection: Upgrade Origin: http://localhost:3000 Sec-WebSocket-Key: client-random-string Sec-WebSocket-Version: 13
ws
开头的Upgrade: websocket Connection: Upgrade
表示尝试创建WebSocket链接对于服务端的相应数据web
HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: server-random-string
其中的101
,表示服务端支持WebSocket协议, 双方基于Http请求,成功创建起WebSocket链接,双方之间的通讯也再也不经过HTTPajax
对于JS的WebSocket对象来讲,它经常使用 4个回调方法,以及两个主动方法数据库
方法名 | 做用 |
---|---|
onopen() | 和服务端成功创建链接后回调 |
onmessage(e) | 收到服务端的的消息后回调,e为消息对象 |
onerror() | 连接出现异常回调,如服务端关闭 |
onclose() | 客户端单方面断开链接时回调 |
send(e) | 主动向服务端推送消息 |
close() | 主动关闭通道 |
知道了回调函数回调时机,咱们接下来要作的就是在他的整个生命周期的不一样回调函数中,添加咱们指定的动做就ok了,下面是经过Window定义一个全局的聊天对象CHATjson
window.CHAT={ var socket = null; // 初始化socket init:function(){ // 判断当前的浏览器是否支持WebSocket if(window.WebSocket){ // 检验当前的webSocket是否存在,以及链接的状态,如已经链接,直接返回 if(CHAT.socket!=null&&CHAT.socket!=undefined&&CHAT.socket.readyState==WebSocket.OPEN){ return false; }else{// 实例化 , 第二个ws是咱们能够自定义的, 根据后端的路由来 CHAT.socket=new WebSocket("ws://192.168.43.10:9999/ws"); // 初始化WebSocket原生的方法 CHAT.socket.onopen=CHAT.myopen(); CHAT.socket.onmessage=CHAT.mymessage(); CHAT.socket.onerror=CHAT.myerror(); CHAT.socket.onclose=CHAT.myclose(); } }else{ alert("当前设备不支持WebSocket"); } } // 发送聊天消息 chat:function(msg){ // 若是的当前的WebSocket是链接的状态,直接发送 不然重新链接 if(CHAT.socket.readyState==WebSocket.OPEN&&CHAT.socket!=null&&CHAT.socket!=undefined){ socket.send(msg); }else{ // 从新链接 CHAT.init(); // 延迟一会,重新发送 setTimeout(1000); CHAT.send(msg); } } // 当链接创建完成后对调 myopen:function(){ // 拉取链接创建以前的未签收的消息记录 // 发送心跳包 } mymessage:function(msg){ // 由于服务端能够主动的推送消息,咱们提早定义和后端统一msg的类型, 如,拉取好友信息的消息,或 聊天的消息 if(msg==聊天内容){ // 发送请求签收消息,改变请求的状态 // 将消息缓存到本地 // 将msg 转换成消息对象, 植入html进行渲染 }else if(msg==拉取好友列表){ // 发送请求更新好友列表 } } myerror:function(){ console.log("链接出现异常..."); } myclose:function(){ console.log("链接关闭..."); } keepalive: function() { // 构建对象 var dataContent = new app.DataContent(app.KEEPALIVE, null, null); // 发送心跳 CHAT.chat(JSON.stringify(dataContent)); // 定时执行函数, 其余操做 // 拉取未读消息 // 拉取好友信息 } }
WebSocket对象经过send(msg)
;方法向后端提交数据,常见的数据以下:后端
为了使后端接收到不一样的类型的数据作出不一样的动做, 因而咱们约定发送的msg的类型;浏览器
// 消息action的枚举,这个枚举和后端约定好,统一值 CONNECT: 1, // 第一次(或重连)初始化链接 CHAT: 2, // 聊天消息 SIGNED: 3, // 消息签收 KEEPALIVE: 4, // 客户端保持心跳 PULL_FRIEND:5, // 从新拉取好友 // 消息模型的构造函数 ChatMsg: function(senderId, receiverId, msg, msgId){ this.senderId = senderId; this.receiverId = receiverId; this.msg = msg; this.msgId = msgId; } // 进一步封装两个获得最终版消息模型的构造函数 DataContent: function(action, chatMsg, extand){ this.action = action; this.chatMsg = chatMsg; this.extand = extand; }
咱们使用js,给发送按钮绑定点击事件,一经触发,从缓存中获取出咱们须要的参数,调用缓存
CHAT.chat(Json.stringify(dataContent));
后端netty会解析dataContent的类型,进一步处理
消息的签收时机:
之因此会有未签收的信息,是由于客户端未与服务端创建WebSocket链接, 当服务端判断他维护的channel组中没有接受者的channel时,不会发送数据,而是把数据持久化到数据库,而且标记flag=未读, 因此咱们签收信息天然放在客户端和服务端创建起链接时的回调函数中执行
action=SIGNED
的方式发送给后端,让其进行签收首先每个Netty服务端的程序都是神似的,想建立不一样的服务端,就得给Netty装配的pipeline不一样的Handler
针对聊天程序,处理String类型的Json信息,咱们选取SimpleChannelInboundHandler
, 他是个典型的入站处理器,而且若是咱们没有出来数据,她会帮咱们回收 重写它里面未实现抽象方法,这些抽象方法一样是回调方法, 当一个新的Channel进来, 它注册进Selector上的过程当中,会回调不一样的抽象方法
方法名 | 回调时机 |
---|---|
handlerAdded(ChannelHandlerContext ctx) | Pepiline中的Handler添加完成回调 |
channelRegistered(ChannelHandlerContext ctx) | channel注册进Selector后回调 |
channelActive(ChannelHandlerContext ctx) | channel处于活动状态回调 |
channelReadComplete(ChannelHandlerContext ctx) | channel, read结束后回调 |
userEventTriggered(ChannelHandlerContext ctx, Object evt) | 当出现用户事件时回调,如 读/写 |
channelInactive(ChannelHandlerContext ctx) | 客户端断开链接时回调 |
channelUnregistered(ChannelHandlerContext ctx) | 客户端断开链接后,取消channel的注册时回调 |
handlerRemoved(ChannelHandlerContext ctx) | 取消channel的注册后,将channel移除ChannelGroup后回调 |
exceptionCaught(ChannelHandlerContext ctx, Throwable cause) | 出现异常时回调 |
要作到点对点的聊天,前提是服务端拥有所有的channel由于全部数据的读写都依赖于它,而 netty为咱们提供了ChannelGroup
用来保存全部新添加进来的channel, 此外点对点的聊天,咱们须要将用户信息和它所属的channel进行一对一的绑定,才能够精准的匹配出两个channel进而数据交互, 所以添加UserChannel映射类
public class UserChanelRelationship { private static HashMap<String, Channel> manager = new HashMap<>(); public static void put(String sendId,Channel channel){ manager.put(sendId,channel); } public static Channel get(String sendId){ return manager.get(sendId); } public static void outPut(){ for (HashMap.Entry<String,Channel> entry:manager.entrySet()){ System.out.println("UserId: "+entry.getKey() + "channelId: "+entry.getValue().id().asLongText()); } } }
咱们把User和Channel之间的关系以键值对的形式存放进Map中,服务端启动后,程序就会维护这个map, 那么问题来了? 何时添加二者之间的映射关系呢? 看上handler的回调函数,咱们选择 channelRead0()
当咱们判断出 客户端发送过来的信息是 CONNECT
类型时,添加映射关系
下面是handler的处理编码
public class MyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { // 用于管理整个客户端的 组 public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame frame) throws Exception { Channel currentChanenl = channelHandlerContext.channel(); // 1. 获取客户端发送的消息 String content = frame.text(); System.out.println(" content: "+content); // 2. 判断不一样的消息的类型, 根据不一样的类型进行不一样的处理 // 当创建链接时, 第一次open , 初始化channel,将channel和数据库中的用户作一个惟一的关联 DataContent dataContent = JsonUtils.jsonToPojo(content,DataContent.class); Integer action = dataContent.getAction(); if (action == MsgActionEnum.CHAT.type) { // 3. 把聊天记录保存到数据库 // 4. 同时标记消息的签收状态 [未签收] // 5. 从咱们的映射中获取接受方的chanel 发送消息 // 6. 从 chanelGroup中查找 当前的channel是否存在于 group, 只有存在,咱们才进行下一步发送 // 6.1 若是没有接受者用户channel就不writeAndFlush, 等着用户上线后,经过js发起请求拉取未接受的信息 // 6.2 若是没有接受者用户channel就不writeAndFlush, 能够选择推送 }else if (action == MsgActionEnum.CONNECT.type){ // 当创建链接时, 第一次open , 初始化channel,将channel和数据库中的用户作一个惟一的关联 String sendId = dataContent.getChatMsg().getSenderId(); UserChanelRelationship.put(sendId,currentChanenl); }else if(action == MsgActionEnum.SINGNED.type){ // 7. 当用户没有上线时,发送消息的人把要发送的消息持久化在数据库,可是却没有把信息写回到接受者的channel, 把这种消息称为未签收的消息 // 8. 签收消息, 就是修改数据库中消息的签收状态, 咱们和前端约定,前端如何签收消息在上面有提到 String extend = dataContent.getExtand(); // 扩展字段在 signed类型表明 须要被签收的消息的id, 用逗号分隔 String[] msgIdList = extend.split(","); List<String> msgIds = new ArrayList<>(); Arrays.asList(msgIdList).forEach(s->{ if (null!=s){ msgIds.add(s); } }); if (!msgIds.isEmpty()&&null!=msgIds&&msgIds.size()>0){ // 批量签收 } }else if (action == MsgActionEnum.KEEPALIVE.type){ // 6. 心跳类型 System.out.println("收到来自channel 为" +currentChanenl+" 的心跳包... "); } } // handler 添加完成后回调 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 获取连接, 而且若想要群发的话,就得往每个channel中写数据, 所以咱们得在建立链接时, 把channel保存起来 System.err.println("handlerAdded"); users .add(ctx.channel()); } // 用户关闭了浏览器回调 @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // 断开链接后, channel会自动移除group // 咱们主动的关闭进行, channel会被移除, 可是咱们若是是开启的飞行模式,不会被移除 System.err.println("客户端channel被移出: "+ctx.channel().id().asShortText()); users.remove(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 发生异常关闭channel, 并从ChannelGroup中移除Channel ctx.channel().close(); users.remove(ctx.channel()); } ... 其余方法
双方创建起WebSocket链接后,服务端须要明确的知道,本身维护的诸多channel中,谁已经挂掉了, 为了提升性能,须要及早把废弃的channel移除ChanelGroup
客户端杀掉了进程,或者开启了飞行模式, 这时服务端是感知不到它维护的channel中已经有一个不能使用了,首先来讲,维护一个不能使用的channel会影响性能,并且当这个channel的好友给他发送消息时,服务端认为用户在线,因而向一个不存在的channel写入刷新数据,会带来额外的麻烦
这时咱们就须要添加心跳机制,客户端设置定时任务,每一个一段时间就往服务端发送心跳包,心跳包的内容是什么不是重点,它的做用就是告诉服务端本身还active, N多个客户端都要向服务端发送心跳,这并不会增长服务端的请求,由于这个请求是经过WebSocket的send方法发送过去的,只不过dataContent的类型是 KEEPALIVE , 一样这是咱们提早约定好的(此外,服务端向客户端发送心跳看起来是没有必要的)
因而对于后端来讲,咱们发送的心跳包,会使得当前客户端对应的channel的channelRead0()方法回调, netty为咱们提供了心跳相关的handler, 每一次的chanelRead0()的回调,都是read/write事件, 下面是netty对心跳的支持的实现
/** * @Author: Changwu * @Date: 2019/7/2 9:33 * 咱们的心跳handler不须要实现handler0方法,咱们选择,直接继承SimpleInboundHandler的父类 */ public class HeartHandler extends ChannelInboundHandlerAdapter { // 咱们重写 EventTrigger 方法 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // 当出现read/write 读写写空闲时触发 if(evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent) evt; if (event.state()== IdleState.READER_IDLE){ // 读空闲 System.out.println(ctx.channel().id().asShortText()+" 读空闲... "); }else if (event.state()==IdleState.WRITER_IDLE){ System.out.println(ctx.channel().id().asShortText()+" 写空闲... "); }else if (event.state()==IdleState.ALL_IDLE){ System.out.println("channel 读写空闲, 准备关闭当前channel , 当前UsersChanel的数量: "+MyHandler.users.size()); Channel channel = ctx.channel(); channel.close(); System.out.println("channel 关闭后, UsersChanel的数量: "+MyHandler.users.size()); } } }
Handler咱们再也不使用SimpleChannelInboundHandler了,由于它当中的方法都是抽象方法,而咱们须要回调的函数时机是,每次当有用户事件时回调, 好比read,write事件, 这些事件能够证实channel还活着,对应的方法是userEventTriggered()
此外, ChannelInboundHandlerAdapter是netty中,适配器模式的体现, 它实现了全都抽象方法,而后他的实现方法中并非在干活,而是把这个事件往下传播下去了,如今咱们重写userEventTriggered()
执行的就是咱们的逻辑
另外,咱们须要在pipeline中添加handler
... / 添加netty为咱们提供的 检测空闲的处理器, 每 20 40 60 秒, 会触发userEventTriggered事件的回调 pipeline.addLast(new IdleStateHandler(10,20,30)); // todo 添加心跳的支持 pipeline.addLast("heartHandler",new HeartHandler());
如, 添加好友的操做中, A向B发送添加好友请求的过程,会通过以下几步
可是A殊不知道,B已经赞成了,因而须要给A主动的推送数据, 怎么推送呢? 咱们须要在上面的UserChannel的关系中,拿出发送者的channel, 而后往回writeAndFlush
内容,这时A就得知B已经赞成了,从新加载好友列表