WebSocket实现Web端即时通讯

BST

前言

WebSocket 是HTML5开始提供的一种在浏览器和服务器间进行全双工通讯的协议。目前不少没有使用WebSocket进行客户端服务端实时通讯的web应用,大多使用设置规则时间的轮询,或者使用长轮询较多来处理消息的实时推送。这样势必会较大程度浪费服务器和带宽资源,而咱们如今要讲的WebSocket正是来解决该问题而出现,使得B/S架构的应用拥有C/S架构同样的实时通讯能力。css

HTTP和WebSocket比较

HTTP

HTTP协议是半双工协议,也就是说在同一时间点只能处理一个方向的数据传输,同时HTTP消息也是过于庞大,里面包含大量消息头数据,真正在消息处理中不少数据不是必须的,这也是对资源的浪费。html

  • 定时轮询:定时轮询就是客户端定时去向服务器发送HTTP请求,看是否有数据,服务器接受到请求后,返回数据给客户端,本次链接也会随着关闭。该实现方案最简单,可是会存在消息延迟和大量浪费服务器和带宽资源。
  • 长轮询:长轮询与定时轮询同样,也是经过HTTP请求实现,但这里不是定时发送请求。客户端发送请求给服务端,这时服务端会hold住该请求,当有数据过来或者超时时返回给请求的客户端并开始下一轮的请求。

WebSocket

WebSocket在客户端和服务端只需一次请求,就会在客户端和服务端创建一条通讯通道,能够实时相互传输数据,而且不会像HTTP那样携带大量请求头等信息。由于WebSocket是基于TCP双向全双工通讯的协议,因此支持在同一时间点处理发送和接收消息,作到实时的消息处理。java

  • 创建WebSocket链接:创建WebSocket链接,首先客户端先要向服务端发送一个特殊的HTTP请求,使用的协议不是httphttps,而是使用过wswss(一个非安全的,一个安全的,相似前二者之间的差异),请求头里面要附加一个申请协议升级的信息Upgrade: websocket,还有随机生成一个Sec-WebSocket-Key的值,及版本信息Sec-WebSocket-Version等等。服务端收到客户端的请求后,会解析该请求的信息,包括请求协议升级,版本校验,以及将Sec-WebSocket-Key的加密后以sec-websocket-accept的值返回给客户端,这样客户端和服务端的链接就创建了。
  • 关闭WebSocket链接:客户端和服务端均可发送一个close控制帧,另外一端主动关闭链接。

HTTP轮询和WebSocket生命周期示意图jquery

HTTP轮询和WebSocket生命周期示意图

服务端

这里服务端利用Netty的WebSocket开发。这里首先实现服务端启动类,而后自定义处理器来处理WebSocket的消息。ios

package com.ytao.websocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * Created by YANGTAO on 2019/11/17 0017.
 */
public class WebSocketServer {

    public static String HOST = "127.0.0.1";
    public static int PORT = 8806;

    public static void startUp() throws Exception {
        // 监听端口的线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 处理每一条链接的数据读写的线程组
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        // 启动的引导类
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        try {
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<socketchannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception{
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO));
                            // 将请求和返回消息编码或解码成http
                            pipeline.addLast("http-codec", new HttpServerCodec());
                            // 使http的多个部分组合成一条完整的http
                            pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                            // 向客户端发送h5文件,主要是来支持websocket通讯
                            pipeline.addLast("http-chunked", new ChunkedWriteHandler());
                            // 服务端自定义处理器
                            pipeline.addLast("handler", new WebSocketServerHandler());
                        }
                    })
                    // 开启心跳机制
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<nioserversocketchannel>() {
                        protected void initChannel(NioServerSocketChannel ch) {
                            System.out.println("WebSocket服务端启动中...");
                        }
                    });

            Channel ch = serverBootstrap.bind(HOST, PORT).sync().channel();
            System.out.println("WebSocket host: "+ch.localAddress().toString().replace("/",""));
            ch.closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {
        startUp();
    }
}

上面启动类和HTTP协议的相似,因此较好理解。启动类启动后,咱们须要处理WebSocket请求,这里自定义WebSocketServerHandler。 咱们在处理中设计的业务逻辑有,若是只有一个链接来发送信息聊天,那么咱们就以服务器自动回复,若是存在一个以上,咱们就将信息发送给其余人。web

package com.ytao.websocket;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.*;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created by YANGTAO on 2019/11/17 0017.
 */
public class WebSocketServerHandler extends SimpleChannelInboundHandler<object> {

    private WebSocketServerHandshaker handshaker;

    private static Map<string, channelhandlercontext> channelHandlerContextConcurrentHashMap = new ConcurrentHashMap&lt;&gt;();

    private static final Map<string, string> replyMap = new ConcurrentHashMap&lt;&gt;();
    static {
        replyMap.put("博客", "https://ytao.top");
        replyMap.put("公众号", "ytao公众号");
        replyMap.put("在吗", "在");
        replyMap.put("吃饭了吗", "吃了");
        replyMap.put("你好", "你好");
        replyMap.put("谁", "ytao");
        replyMap.put("几点", "如今本地时间:"+LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")));
    }

    @Override
    public void messageReceived(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception{
        channelHandlerContextConcurrentHashMap.put(channelHandlerContext.channel().toString(), channelHandlerContext);
        // http
        if (msg instanceof FullHttpRequest){
            handleHttpRequest(channelHandlerContext, (FullHttpRequest) msg);
        }else if (msg instanceof WebSocketFrame){ // WebSocket
            handleWebSocketFrame(channelHandlerContext, (WebSocketFrame) msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception{
        if (channelHandlerContextConcurrentHashMap.size() &gt; 1){
            for (String key : channelHandlerContextConcurrentHashMap.keySet()) {
                ChannelHandlerContext current = channelHandlerContextConcurrentHashMap.get(key);
                if (channelHandlerContext == current)
                    continue;
                current.flush();
            }
        }else {
            // 单条处理
            channelHandlerContext.flush();
        }
    }

    private void handleHttpRequest(ChannelHandlerContext channelHandlerContext, FullHttpRequest request) throws Exception{
        // 验证解码是否异常
        if (!"websocket".equals(request.headers().get("Upgrade")) || request.decoderResult().isFailure()){
            // todo send response bad
            System.err.println("解析http信息异常");
            return;
        }

        // 建立握手工厂类
        WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(
          "ws:/".concat(channelHandlerContext.channel().localAddress().toString()),
                null,
                false
        );
        handshaker = factory.newHandshaker(request);

        if (handshaker == null)
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channelHandlerContext.channel());
        else
            // 响应握手消息给客户端
            handshaker.handshake(channelHandlerContext.channel(), request);

    }

    private void handleWebSocketFrame(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame){
        // 关闭链路
        if (webSocketFrame instanceof CloseWebSocketFrame){
            handshaker.close(channelHandlerContext.channel(), (CloseWebSocketFrame) webSocketFrame.retain());
            return;
        }

        // Ping消息
        if (webSocketFrame instanceof PingWebSocketFrame){
            channelHandlerContext.channel().write(
              new PongWebSocketFrame(webSocketFrame.content().retain())
            );
            return;
        }

        // Pong消息
        if (webSocketFrame instanceof PongWebSocketFrame){
            // todo Pong消息处理
        }

        // 二进制消息
        if (webSocketFrame instanceof BinaryWebSocketFrame){
            // todo 二进制消息处理
        }

        // 拆分数据
        if (webSocketFrame instanceof ContinuationWebSocketFrame){
            // todo 数据被拆分为多个websocketframe处理
        }

        // 文本信息处理
        if (webSocketFrame instanceof TextWebSocketFrame){
            // 推送过来的消息
            String  msg = ((TextWebSocketFrame) webSocketFrame).text();
            System.out.println(String.format("%s 收到消息 : %s", new Date(), msg));

            String responseMsg = "";
            if (channelHandlerContextConcurrentHashMap.size() &gt; 1){
                responseMsg = msg;
                for (String key : channelHandlerContextConcurrentHashMap.keySet()) {
                    ChannelHandlerContext current = channelHandlerContextConcurrentHashMap.get(key);
                    if (channelHandlerContext == current)
                        continue;
                    Channel channel = current.channel();
                    channel.write(
                            new TextWebSocketFrame(responseMsg)
                    );
                }
            }else {
                // 自动回复
                responseMsg = this.answer(msg);
                if(responseMsg == null)
                    responseMsg = "暂时没法回答你的问题 -&gt;_-&gt;";
                System.out.println("回复消息:"+responseMsg);
                Channel channel = channelHandlerContext.channel();
                channel.write(
                        new TextWebSocketFrame("【服务端】" + responseMsg)
                );
            }
        }

    }

    private String answer(String msg){
        for (String key : replyMap.keySet()) {
            if (msg.contains(key))
                return replyMap.get(key);
        }
        return null;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable){
        throwable.printStackTrace();
        channelHandlerContext.close();
    }

    @Override
    public void close(ChannelHandlerContext channelHandlerContext, ChannelPromise promise) throws Exception {
        channelHandlerContextConcurrentHashMap.remove(channelHandlerContext.channel().toString());
        channelHandlerContext.close(promise);
    }

}

刚创建链接时,第一次握手有HTTP协议处理,因此WebSocketServerHandler#messageReceived会判断是HTTP仍是WebSocket,若是是HTTP时,交由WebSocketServerHandler#handleHttpRequest处理,里面会去验证请求,而且处理握手后将消息返回给客户端。 若是不是HTTP协议,而是WebSocket协议时,处理交给WebSocketServerHandler#handleWebSocketFrame处理,进入WebSocket处理后,这里面有判断消息属于哪一种类型,里面包括CloseWebSocketFramePingWebSocketFramePongWebSocketFrameBinaryWebSocketFrameContinuationWebSocketFrameTextWebSocketFrame,他们都是WebSocketFrame的子类,而且WebSocketFrame又继承自DefaultByteBufHolderbootstrap

channelHandlerContextConcurrentHashMap是缓存WebSocket已链接的信息,由于咱们实现的需求要记录链接数量,当有链接关闭时咱们要删除以缓存的链接,因此在WebSocketServerHandler#close中要移除缓存。promise

最后的发送文本到客户端,根据链接数量判断。若是链接数量不大于1,那么,咱们"价值一个亿的AI核心代码"WebSocketServerHandler#answer来回复客户端消息。不然除了本次接收的链接,消息会发送给其余全部链接的客户端。浏览器

客户端

客户端使用JS实现WebSocket的操做,目前主流的浏览器基本都支持WebSocket。支持状况如图:缓存

支持WebSocket的浏览器

客户端H5的代码实现:

<meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>ytao-websocket</title>
    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
    <style type="text/css">
        #msgContent{
            line-height:200%;
            width: 500px;
            height: 300px;
            resize: none;
            border-color: #FF9900;
        }
        .clean{
            background-color: white;
        }
        .send{
            border-radius: 10%;
            background-color: #2BD56F;
        }
        @media screen and (max-width: 600px) {
            #msgContent{
                line-height:200%;
                width: 100%;
                height: 300px;
            }
        }
    </style>

<script>
    var socket;
    var URL = "ws://127.0.0.1:8806/ytao";

    connect();

    function connect() {
        $("#status").html("<span>链接中.....</span>");
        window.WebSocket = !window.WebSocket == true? window.MozWebSocket : window.WebSocket;
        if(window.WebSocket){
            socket = new WebSocket(URL);
            socket.onmessage = function(event){
                var msg = event.data + "\n";
                addMsgContent(msg);
            };

            socket.onopen = function(){
                $("#status").html("<span style='background-color: #44b549'>WebSocket已链接</span>");
            };

            socket.onclose = function(){
                $("#status").html("<span style='background-color: red'>WebSocket已断开链接</span>");
                setTimeout("connect()", 3000);
            };
        }else{
            $("#status").html("<span style='background-color: red'>该浏览器不支持WebSocket协议!</span>");
        }
    }

    function addMsgContent(msg) {
        var contet = $("#msgContent").val() + msg;
        $("#msgContent").val(contet)
    }

    function clean() {
        $("#msgContent").val("");
    }

    function getUserName() {
        var n = $("input[name=userName]").val();
        if (n == "")
            n = "匿名";
        return n;
    }

    function send(){
        var message = $("input[name=message]").val();
        if(!window.WebSocket) return;
        if ($.trim(message) == ""){
            alert("不能发送空消息!");
            return;
        }
        if(socket.readyState == WebSocket.OPEN){
            var msg = "【我】" + message + "\n";
            this.addMsgContent(msg);
            socket.send("【"+getUserName()+"】"+message);
            $("input[name=message]").val("");
        }else{
            alert("没法创建WebSocket链接!");
        }
    }

    $(document).keyup(function(){
        if(event.keyCode ==13){
            send()
        }
    });
</script>

    <div style="text-align: center;">
        <div id="status">
            <span>链接中.....</span>
        </div>
        <div>
            <h2>信息面板</h2>
            <textarea id="msgContent" readonly></textarea>
        </div>
        <div>
            <input class="clean" type="button" value="清除聊天纪录" onclick="clean()">
            <input type="text" name="userName" value="" placeholder="用户名">
        </div>
        <hr>
        <div>
            <form onsubmit="return false">
                <input type="text" name="message" value="" placeholder="请输入消息">
                <input class="send" type="button" name="msgBtn" value="send" onclick="send()">
            </form>
        </div>
        <div>
            <br><br>
            <img src="https://oscimg.oschina.net/oscnet/ytao%E5%85%AC%E4%BC%97%E5%8F%B7.jpg">
        </div>
    </div>

JS这里实现相对较简单,主要用到:

  • new WebSocket(URL)建立WebSocket对象
  • onopen()打开链接
  • onclose()关闭链接
  • onmessage接收消息
  • send()发送消息

当断开链接后,客户端这边从新发起链接,直到链接成功为止。

启动

客户端和服务端链接后,咱们从日志和请求中能够看到上面所提到的验证信息。 客户端:

客户端链接信息

服务端:

服务端链接信息

启动服务端后,先实验咱们"价值一个亿的AI",只有一个链接用户时,发送信息结果如图:

多个用户链接,这里使用三个链接用户群聊。

用户一:

用户二:

用户三:

到目前为止,WebSocket已帮助咱们实现即时通讯的需求,相信你们也基本入门了WebSocket的基本使用。

总结

经过本文了解,能够帮助你们入门WebSocket而且解决当前可能存在的一些Web端的通讯问题。我曾经在两个项目中也有看到该类解决方案都是经过定时轮询去作的,也或多或少对服务器资源形成必定的浪费。由于WebSocket自己是较复杂的,它提供的API也是比较多,因此在使用过程,要去真正使用好或去优化它,并非一件很简单的事,也是须要根据现实场景针对性的去作。

我的博客: https://ytao.top

个人公众号 ytao

个人公众号

相关文章
相关标签/搜索