Netty 实现 WebSocket 聊天功能

上一次咱们用Netty快速实现了一个 Java 聊天程序(见http://www.waylau.com/netty-chat/)。如今,咱们要作下修改,加入 WebSocket 的支持,使它能够在浏览器里进行文本聊天。javascript

准备

  • JDK 7+
  • Maven 3.2.x
  • Netty 4.x
  • Eclipse 4.x

WebSocket

WebSocket 经过“Upgrade handshake(升级握手)”从标准的 HTTP 或HTTPS 协议转为 WebSocket。所以,使用 WebSocket 的应用程序将始终以 HTTP/S 开始,而后进行升级。在何时发生这种状况取决于具体的应用;它能够是在启动时,或当一个特定的 URL 被请求时。html

在咱们的应用中,当 URL 请求以“/ws”结束时,咱们才升级协议为WebSocket。不然,服务器将使用基本的 HTTP/S。一旦升级链接将使用的WebSocket 传输全部数据。java

整个服务器逻辑以下:git

1.客户端/用户链接到服务器并加入聊天github

2.HTTP 请求页面或 WebSocket 升级握手web

3.服务器处理全部客户端/用户bootstrap

4.响应 URI “/”的请求,转到默认 html 页面api

5.若是访问的是 URI“/ws” ,处理 WebSocket 升级握手浏览器

6.升级握手完成后 ,经过 WebSocket 发送聊天消息服务器

服务端

让咱们从处理 HTTP 请求的实现开始。

处理 HTTP 请求

HttpRequestHandler.java

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { //1
    private final String wsUri;
    private static final File INDEX;

    static {
        URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
        try {
            String path = location.toURI() + "WebsocketChatClient.html";
            path = !path.contains("file:") ? path : path.substring(5);
            INDEX = new File(path);
        } catch (URISyntaxException e) {
            throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
        }
    }

    public HttpRequestHandler(String wsUri) {
        this.wsUri = wsUri;
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (wsUri.equalsIgnoreCase(request.getUri())) {
            ctx.fireChannelRead(request.retain());                  //2
        } else {
            if (HttpHeaders.is100ContinueExpected(request)) {
                send100Continue(ctx);                               //3
            }

            RandomAccessFile file = new RandomAccessFile(INDEX, "r");//4

            HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
            response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");

            boolean keepAlive = HttpHeaders.isKeepAlive(request);

            if (keepAlive) {                                        //5
                response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
                response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
            }
            ctx.write(response);                    //6

            if (ctx.pipeline().get(SslHandler.class) == null) {     //7
                ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
            } else {
                ctx.write(new ChunkedNioFile(file.getChannel()));
            }
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);           //8
            if (!keepAlive) {
                future.addListener(ChannelFutureListener.CLOSE);        //9
            }

            file.close();
        }
    }

    private static void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("Client:"+incoming.remoteAddress()+"异常");
        // 当出现异常就关闭链接
        cause.printStackTrace();
        ctx.close();
    }
}

1.扩展 SimpleChannelInboundHandler 用于处理 FullHttpRequest信息

2.若是请求是 WebSocket 升级,递增引用计数器(保留)而且将它传递给在 ChannelPipeline 中的下个 ChannelInboundHandler

3.处理符合 HTTP 1.1的 "100 Continue" 请求

4.读取默认的 WebsocketChatClient.html 页面

5.判断 keepalive 是否在请求头里面

6.写 HttpResponse 到客户端

7.写 index.html 到客户端,判断 SslHandler 是否在 ChannelPipeline 来决定是使用 DefaultFileRegion 仍是 ChunkedNioFile

8.写并刷新 LastHttpContent 到客户端,标记响应完成

9.若是 keepalive 没有要求,当写完成时,关闭 ChannelHttpRequestHandler 作了下面几件事;

  • 若是该 HTTP 请求被发送到URI “/ws”,调用 FullHttpRequest 上的 retain(),并经过调用 fireChannelRead(msg) 转发到下一个 ChannelInboundHandler。retain() 是必要的,由于 channelRead() 完成后,它会调用 FullHttpRequest 上的 release() 来释放其资源。 (请参考咱们先前的 SimpleChannelInboundHandler 在第6章中讨论)
  • 若是客户端发送的 HTTP 1.1 头是“Expect: 100-continue” ,将发送“100 Continue”的响应。
  • 在 头被设置后,写一个 HttpResponse 返回给客户端。注意,这是否是 FullHttpResponse,惟一的反应的第一部分。此外,咱们不使用 writeAndFlush() 在这里 - 这个是在最后完成。
  • 若是没有加密也不压缩,要达到最大的效率能够是经过存储 index.html 的内容在一个 DefaultFileRegion 实现。这将利用零拷贝来执行传输。出于这个缘由,咱们检查,看看是否有一个 SslHandler 在 ChannelPipeline 中。另外,咱们使用 ChunkedNioFile。
  • 写 LastHttpContent 来标记响应的结束,并终止它
  • 若是不要求 keepalive ,添加 ChannelFutureListener 到 ChannelFuture 对象的最后写入,并关闭链接。注意,这里咱们调用 writeAndFlush() 来刷新全部之前写的信息。

处理 WebSocket frame

WebSockets 在“帧”里面来发送数据,其中每个都表明了一个消息的一部分。一个完整的消息能够利用了多个帧。 WebSocket "Request for Comments" (RFC) 定义了六中不一样的 frame; Netty 给他们每一个都提供了一个 POJO 实现 ,而咱们的程序只须要使用下面4个帧类型:

  • CloseWebSocketFrame
  • PingWebSocketFrame
  • PongWebSocketFrame
  • TextWebSocketFrame

在这里咱们只须要显示处理 TextWebSocketFrame,其余的会由 WebSocketServerProtocolHandler 自动处理。

下面代码展现了 ChannelInboundHandler 处理 TextWebSocketFrame,同时也将跟踪在 ChannelGroup 中全部活动的 WebSocket 链接

TextWebSocketFrameHandler.java

public class TextWebSocketFrameHandler extends
        SimpleChannelInboundHandler<TextWebSocketFrame> {

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx,
            TextWebSocketFrame msg) throws Exception { // (1)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            if (channel != incoming){
                channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text()));
            } else {
                channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() ));
            }
        }
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  // (2)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));
        }
        channels.add(ctx.channel());
        System.out.println("Client:"+incoming.remoteAddress() +"加入");
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  // (3)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 离开"));
        }
        System.out.println("Client:"+incoming.remoteAddress() +"离开");
        channels.remove(ctx.channel());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
        Channel incoming = ctx.channel();
        System.out.println("Client:"+incoming.remoteAddress()+"在线");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
        Channel incoming = ctx.channel();
        System.out.println("Client:"+incoming.remoteAddress()+"掉线");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("Client:"+incoming.remoteAddress()+"异常");
        // 当出现异常就关闭链接
        cause.printStackTrace();
        ctx.close();
    }

}

1.TextWebSocketFrameHandler 继承自 SimpleChannelInboundHandler,这个类实现了ChannelInboundHandler接口,ChannelInboundHandler 提供了许多事件处理的接口方法,而后你能够覆盖这些方法。如今仅仅只须要继承 SimpleChannelInboundHandler 类而不是你本身去实现接口方法。

2.覆盖了 handlerAdded() 事件处理方法。每当从服务端收到新的客户端链接时,客户端的 Channel 存入ChannelGroup列表中,并通知列表中的其余客户端 Channel

3.覆盖了 handlerRemoved() 事件处理方法。每当从服务端收到客户端断开时,客户端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其余客户端 Channel

4.覆盖了 channelRead0() 事件处理方法。每当从服务端读到客户端写入信息时,将信息转发给其余客户端的 Channel。其中若是你使用的是 Netty 5.x 版本时,须要把 channelRead0() 重命名为messageReceived()

5.覆盖了 channelActive() 事件处理方法。服务端监听到客户端活动

6.覆盖了 channelInactive() 事件处理方法。服务端监听到客户端不活动

7.exceptionCaught() 事件处理方法是当出现 Throwable 对象才会被调用,即当 Netty 因为 IO 错误或者处理器在处理事件时抛出的异常时。在大部分状况下,捕获的异常应该被记录下来而且把关联的 channel 给关闭掉。然而这个方法的处理方式会在遇到不一样异常的状况下有不一样的实现,好比你可能想在关闭链接以前发送一个错误码的响应消息。

上面显示了 TextWebSocketFrameHandler 仅做了几件事:

  • 当WebSocket 与新客户端已成功握手完成,经过写入信息到 ChannelGroup 中的 Channel 来通知全部链接的客户端,而后添加新 Channel 到 ChannelGroup
  • 若是接收到 TextWebSocketFrame,调用 retain() ,并将其写、刷新到 ChannelGroup,使全部链接的 WebSocket Channel 都能接收到它。和之前同样,retain() 是必需的,由于当 channelRead0()返回时,TextWebSocketFrame 的引用计数将递减。因为全部操做都是异步的,writeAndFlush() 可能会在之后完成,咱们不但愿它来访问无效的引用。

因为 Netty 处理了其他大部分功能,惟一剩下的咱们如今要作的是初始化 ChannelPipeline 给每个建立的新的 Channel 。作到这一点,咱们须要一个ChannelInitializer

WebsocketChatServerInitializer.java

public class WebsocketChatServerInitializer extends
        ChannelInitializer<SocketChannel> { //1

    @Override
    public void initChannel(SocketChannel ch) throws Exception {//2
         ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(64*1024));
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpRequestHandler("/ws"));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(new TextWebSocketFrameHandler());

    }
}

1.扩展 ChannelInitializer

2.添加 ChannelHandler 到 ChannelPipeline

initChannel() 方法设置 ChannelPipeline 中全部新注册的 Channel,安装全部须要的  ChannelHandler。

WebsocketChatServer.java

编写一个 main() 方法来启动服务端。

public class WebsocketChatServer {

    private int port;

    public WebsocketChatServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new WebsocketChatServerInitializer())  //(4)
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            System.out.println("WebsocketChatServer 启动了");

            // 绑定端口,开始接收进来的链接
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 等待服务器  socket 关闭 。
            // 在这个例子中,这不会发生,但你能够优雅地关闭你的服务器。
            f.channel().closeFuture().sync();

        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();

            System.out.println("WebsocketChatServer 关闭了");
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new WebsocketChatServer(port).run();

    }
}

1.NioEventLoopGroup是用来处理I/O操做的多线程事件循环器,Netty 提供了许多不一样的EventLoopGroup的实现用来处理不一样的传输。在这个例子中咱们实现了一个服务端的应用,所以会有2个 NioEventLoopGroup 会被使用。第一个常常被叫作‘boss’,用来接收进来的链接。第二个常常被叫作‘worker’,用来处理已经被接收的链接,一旦‘boss’接收到链接,就会把链接信息注册到‘worker’上。如何知道多少个线程已经被使用,如何映射到已经建立的 Channel上都须要依赖于 EventLoopGroup 的实现,而且能够经过构造函数来配置他们的关系。

2.ServerBootstrap是一个启动 NIO 服务的辅助启动类。你能够在这个服务中直接使用 Channel,可是这会是一个复杂的处理过程,在不少状况下你并不须要这样作。

3.这里咱们指定使用NioServerSocketChannel类来举例说明一个新的 Channel 如何接收进来的链接。

4.这里的事件处理类常常会被用来处理一个最近的已经接收的 Channel。SimpleChatServerInitializer 继承自ChannelInitializer是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel。也许你想经过增长一些处理类好比 SimpleChatServerHandler 来配置一个新的 Channel 或者其对应的ChannelPipeline来实现你的网络程序。当你的程序变的复杂时,可能你会增长更多的处理类到 pipline 上,而后提取这些匿名类到最顶层的类上。

5.你能够设置这里指定的 Channel 实现的配置参数。咱们正在写一个TCP/IP 的服务端,所以咱们被容许设置 socket 的参数选项好比tcpNoDelay 和 keepAlive。请参考ChannelOption和详细的ChannelConfig实现的接口文档以此能够对ChannelOption 的有一个大概的认识。

6.option() 是提供给NioServerSocketChannel用来接收进来的链接。childOption() 是提供给由父管道ServerChannel接收到的链接,在这个例子中也是 NioServerSocketChannel。

7.咱们继续,剩下的就是绑定端口而后启动服务。这里咱们在机器上绑定了机器全部网卡上的 8080 端口。固然如今你能够屡次调用 bind() 方法(基于不一样绑定地址)。

恭喜!你已经完成了基于 Netty 聊天服务端程序。

客户端

在程序的 resources 目录下,咱们建立一个 WebsocketChatClient.html 页面来做为客户端

WebsocketChatClient.html

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
    <script type="text/javascript">
        var socket;
        if (!window.WebSocket) {
            window.WebSocket = window.MozWebSocket;
        }
        if (window.WebSocket) {
            socket = new WebSocket("ws://localhost:8080/ws");
            socket.onmessage = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + '\n' + event.data
            };
            socket.onopen = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = "链接开启!";
            };
            socket.onclose = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + "链接被关闭";
            };
        } else {
            alert("你的浏览器不支持 WebSocket!");
        }

        function send(message) {
            if (!window.WebSocket) {
                return;
            }
            if (socket.readyState == WebSocket.OPEN) {
                socket.send(message);
            } else {
                alert("链接没有开启.");
            }
        }
    </script>
    <form onsubmit="return false;">
        <h3>WebSocket 聊天室:</h3>
        <textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
        <br> 
        <input type="text" name="message"  style="width: 300px" value="Welcome to www.waylau.com">
        <input type="button" value="发送消息" onclick="send(this.form.message.value)">
        <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天记录">
    </form>
    <br> 
    <br> 
    <a href="http://www.waylau.com/" >更多例子请访问 www.waylau.com</a>
</body>
</html>

逻辑比较简单,不累述。

运行效果

先运行 WebsocketChatServer,再打开多个浏览器页面实现多个 客户端访问 http://localhost:8080

源码

https://github.com/waylau/netty-4-user-guide-demos中 websocketchat

参考 Netty 4.x 用户指南https://github.com/waylau/netty-4-user-guide Netty 实战(精髓)https://github.com/waylau/essential-netty-in-action

相关文章
相关标签/搜索