第十一章:WebSocket

本章介绍javascript

WebSockethtml

ChannelHandler,Decoder and Encoderjava

引导一个Netty基础程序web

测试WebSocketbootstrap

使用Netty附带的WebSocket,咱们不须要关注协议内部实现,只须要使用Netty提供的一些简单的方法就能够实现浏览器

11.1 WebSockets some background安全

        关于WebSocket的一些概念和背景,能够查询网上相关介绍。这里不赘述。服务器

11.2 面临的挑战websocket

要显示“real-time”支持的WebSocket,应用程序将显示如何使用Netty中的WebSocket实现一个在浏览器中进行聊天的IRC应用程序。网络

在这个应用程序中,不一样的用户能够同时交谈,很是像IRC(Internet Relay Chat,互联网中继聊天)。

上图显示的逻辑很简单:

一个客户端发送一条消息

消息被广播到其余已链接的客户端

它的工做原理就像聊天室同样,在这里例子中,咱们将编写服务器,而后使用浏览器做为客户端。带着这样的思路,咱们将会很简单的实现它。

11.3 实现

WebSocket使用HTTP升级机制从一个普通的HTTP链接WebSocket,由于这个应用程序使用WebSocket老是开始于HTTP(s),而后再升级。

在这里,若是url的结尾以/ws结束,咱们将只会升级到WebSocket,不然服务器将发送一个网页给客户端。升级后的链接将经过WebSocket传输全部数据。逻辑图以下:

11.3.1 处理http请求

服务器将做为一种混合式以容许同时处理http和websocket,因此服务器还须要html页面,html用来充当客户端角色,链接服务器并交互消息。所以,若是客户端不发送/ws的uri,咱们须要写一个ChannelInboundHandler用来处理FullHttpRequest。看下面代码:

package netty.in.action;  
import io.netty.channel.ChannelFuture;  
import io.netty.channel.ChannelFutureListener;  
import io.netty.channel.ChannelHandlerContext;  
import io.netty.channel.DefaultFileRegion;  
import io.netty.channel.SimpleChannelInboundHandler;  
import io.netty.handler.codec.http.DefaultFullHttpResponse;  
import io.netty.handler.codec.http.DefaultHttpResponse;  
import io.netty.handler.codec.http.FullHttpRequest;  
import io.netty.handler.codec.http.FullHttpResponse;  
import io.netty.handler.codec.http.HttpHeaders;  
import io.netty.handler.codec.http.HttpResponse;  
import io.netty.handler.codec.http.HttpResponseStatus;  
import io.netty.handler.codec.http.HttpVersion;  
import io.netty.handler.codec.http.LastHttpContent;  
import io.netty.handler.ssl.SslHandler;  
import io.netty.handler.stream.ChunkedNioFile;  
import java.io.RandomAccessFile;  
/** 
 * WebSocket,处理http请求 
 */  
public class HttpRequestHandler extends  
        SimpleChannelInboundHandler<FullHttpRequest> {  
    //websocket标识  
    private final String wsUri;  
  
    public HttpRequestHandler(String wsUri) {  
        this.wsUri = wsUri;  
    }  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg)  
            throws Exception {  
        //若是是websocket请求,请求地址uri等于wsuri  
        if (wsUri.equalsIgnoreCase(msg.getUri())) {  
            //将消息转发到下一个ChannelHandler  
            ctx.fireChannelRead(msg.retain());  
        } else {//若是不是websocket请求  
            if (HttpHeaders.is100ContinueExpected(msg)) {  
                //若是HTTP请求头部包含Expect: 100-continue,  
                //则响应请求  
                FullHttpResponse response = new DefaultFullHttpResponse(  
                        HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);  
                ctx.writeAndFlush(response);  
            }  
            //获取index.html的内容响应给客户端  
            RandomAccessFile file = new RandomAccessFile(  
                    System.getProperty("user.dir") + "/index.html", "r");  
            HttpResponse response = new DefaultHttpResponse(  
                    msg.getProtocolVersion(), HttpResponseStatus.OK);  
            response.headers().set(HttpHeaders.Names.CONTENT_TYPE,  
                    "text/html; charset=UTF-8");  
            boolean keepAlive = HttpHeaders.isKeepAlive(msg);  
            //若是http请求保持活跃,设置http请求头部信息  
            //并响应请求  
            if (keepAlive) {  
                response.headers().set(HttpHeaders.Names.CONTENT_LENGTH,  
                        file.length());  
                response.headers().set(HttpHeaders.Names.CONNECTION,  
                        HttpHeaders.Values.KEEP_ALIVE);  
            }  
            ctx.write(response);  
            //若是不是https请求,将index.html内容写入通道  
            if (ctx.pipeline().get(SslHandler.class) == null) {  
                ctx.write(new DefaultFileRegion(file.getChannel(), 0, file  
                        .length()));  
            } else {  
                ctx.write(new ChunkedNioFile(file.getChannel()));  
            }  
            //标识响应内容结束并刷新通道  
            ChannelFuture future = ctx  
                    .writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);  
            if (!keepAlive) {  
                //若是http请求不活跃,关闭http链接  
                future.addListener(ChannelFutureListener.CLOSE);  
            }  
            file.close();  
        }  
    }  
    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  
            throws Exception {  
        cause.printStackTrace();  
        ctx.close();  
    }  
}

11.3.2 处理WebSocket框架

WebSocket支持6种不一样框架,以下图:

咱们的程序只须要使用下面4个框架:

CloseWebSocketFrame

PingWebSocketFrame

PongWebSocketFrame

TextWebSocketFrame

咱们只须要显示处理TextWebSocketFrame,其余的会自动由WebSocketServerProtocolHandler处理,看下面代码:

package netty.in.action;  
import io.netty.channel.ChannelHandlerContext;  
import io.netty.channel.SimpleChannelInboundHandler;  
import io.netty.channel.group.ChannelGroup;  
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;  
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;  
/** 
 * WebSocket,处理消息 
 */  
public class TextWebSocketFrameHandler extends  
        SimpleChannelInboundHandler<TextWebSocketFrame> {  
    private final ChannelGroup group;  
  
    public TextWebSocketFrameHandler(ChannelGroup group) {  
        this.group = group;  
    }  
  
    @Override  
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)  
            throws Exception {  
        //若是WebSocket握手完成  
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {  
            //删除ChannelPipeline中的HttpRequestHandler  
            ctx.pipeline().remove(HttpRequestHandler.class);  
            //写一个消息到ChannelGroup  
            group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel()  
                    + " joined"));  
            //将Channel添加到ChannelGroup  
            group.add(ctx.channel());  
        }else {  
            super.userEventTriggered(ctx, evt);  
        }  
    }  
  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)  
            throws Exception {  
        //将接收的消息经过ChannelGroup转发到因此已链接的客户端  
        group.writeAndFlush(msg.retain());  
    }  
}

11.3.3 初始化ChannelPipeline

 看下面代码:

package netty.in.action;  
import io.netty.channel.Channel;  
import io.netty.channel.ChannelInitializer;  
import io.netty.channel.ChannelPipeline;  
import io.netty.channel.group.ChannelGroup;  
import io.netty.handler.codec.http.HttpObjectAggregator;  
import io.netty.handler.codec.http.HttpServerCodec;  
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;  
import io.netty.handler.stream.ChunkedWriteHandler;  
/** 
 * WebSocket,初始化ChannelHandler 
 */  
public class ChatServerInitializer extends ChannelInitializer<Channel> {  
    private final ChannelGroup group;  
      
    public ChatServerInitializer(ChannelGroup group){  
        this.group = group;  
    }  
    @Override  
    protected void initChannel(Channel ch) throws Exception {  
        ChannelPipeline pipeline = ch.pipeline();  
        //编解码http请求  
        pipeline.addLast(new HttpServerCodec());  
        //写文件内容  
        pipeline.addLast(new ChunkedWriteHandler());  
        //聚合解码HttpRequest/HttpContent/LastHttpContent到FullHttpRequest  
        //保证接收的Http请求的完整性  
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));  
        //处理FullHttpRequest  
        pipeline.addLast(new HttpRequestHandler("/ws"));  
        //处理其余的WebSocketFrame  
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));  
        //处理TextWebSocketFrame  
        pipeline.addLast(new TextWebSocketFrameHandler(group));  
    }  
}

WebSocketServerProtcolHandler不只处理Ping/Pong/CloseWebSocketFrame,还和它本身握手并帮助升级WebSocket。这是执行完成握手和成功修改ChannelPipeline,而且添加须要的编码器/解码器和删除不须要的ChannelHandler。

 看下图:

ChannelPipeline经过ChannelInitializer的initChannel(...)方法完成初始化,完成握手后就会更改事情。一旦这样作了,WebSocketServerProtocolHandler将取代HttpRequestDecoder、WebSocketFrameDecoder13和HttpResponseEncoder、WebSocketFrameEncoder13。另外也要删除全部不须要的ChannelHandler已得到最佳性能。这些都是HttpObjectAggregator和HttpRequestHandler。下图显示ChannelPipeline握手完成:

咱们甚至没注意到它,由于它是在底层执行的。以很是灵活的方式动态更新ChannelPipeline让单独的任务在不一样的ChannelHandler中实现。

11.4 结合在一块儿使用

一如既往,咱们要将它们结合在一块儿使用。使用Bootstrap引导服务器和设置正确的ChannelInitializer。看下面代码:

package netty.in.action;  
  
import io.netty.bootstrap.ServerBootstrap;  
import io.netty.channel.Channel;  
import io.netty.channel.ChannelFuture;  
import io.netty.channel.ChannelInitializer;  
import io.netty.channel.EventLoopGroup;  
import io.netty.channel.group.ChannelGroup;  
import io.netty.channel.group.DefaultChannelGroup;  
import io.netty.channel.nio.NioEventLoopGroup;  
import io.netty.channel.socket.nio.NioServerSocketChannel;  
import io.netty.util.concurrent.ImmediateEventExecutor;  
  
import java.net.InetSocketAddress;  
/** 
 * 访问地址:http://localhost:2048 
 */  
public class ChatServer {  
  
    private final ChannelGroup group = new DefaultChannelGroup(  
            ImmediateEventExecutor.INSTANCE);  
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();  
    private Channel channel;  
  
    public ChannelFuture start(InetSocketAddress address) {  
        ServerBootstrap b = new ServerBootstrap();  
        b.group(workerGroup).channel(NioServerSocketChannel.class)  
                .childHandler(createInitializer(group));  
        ChannelFuture f = b.bind(address).syncUninterruptibly();  
        channel = f.channel();  
        return f;  
    }  
  
    public void destroy() {  
        if (channel != null)  
            channel.close();  
        group.close();  
        workerGroup.shutdownGracefully();  
    }  
  
    protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {  
        return new ChatServerInitializer(group);  
    }  
  
    public static void main(String[] args) {  
        final ChatServer server = new ChatServer();  
        ChannelFuture f = server.start(new InetSocketAddress(2048));  
        Runtime.getRuntime().addShutdownHook(new Thread() {  
            @Override  
            public void run() {  
                server.destroy();  
            }  
        });  
        f.channel().closeFuture().syncUninterruptibly();  
    }  
}

另外,须要将index.html文件放在项目根目录,index.html内容以下:

<html>  
<head>  
<title>Web Socket Test</title>  
</head>  
<body>  
<script type="text/javascript">  
var socket;  
if (!window.WebSocket) {  
  window.WebSocket = window.MozWebSocket;  
}  
if (window.WebSocket) {  
  socket = new WebSocket("ws://localhost:2048/ws");  
  socket.onmessage = function(event) {  
    var ta = document.getElementById('responseText');  
    ta.value = ta.value + '\n' + event.data  
  };  
  socket.onopen = function(event) {  
    var ta = document.getElementById('responseText');  
    ta.value = "Web Socket opened!";  
  };  
  socket.onclose = function(event) {  
    var ta = document.getElementById('responseText');  
    ta.value = ta.value + "Web Socket closed";   
  };  
} else {  
  alert("Your browser does not support Web Socket.");  
}  
  
function send(message) {  
  if (!window.WebSocket) { return; }  
  if (socket.readyState == WebSocket.OPEN) {  
    socket.send(message);  
  } else {  
    alert("The socket is not open.");  
  }  
}  
</script>  
    <form onsubmit="return false;">  
        <input type="text" name="message" value="Hello, World!"><input  
            type="button" value="Send Web Socket Data"  
            onclick="send(this.form.message.value)">  
        <h3>Output</h3>  
        <textarea id="responseText" style="width: 500px; height: 300px;"></textarea>  
    </form>  
</body>  
</html>

最后在浏览器中输入:http://localhost:2048,多开几个窗口就能够聊天了。

11.5 给WebSocket加密

上面的应用程序虽然工做的很好,可是在网络上收发消息存在很大的安全隐患,因此有必要对消息进行加密。添加这样一个加密的功能通常比较复杂,须要对代码有较大的改动。可是使用Netty就能够很容易的添加这样的功能,只须要将SslHandler加入到ChannelPipeline中就能够了。实际上还须要添加SslContext,但这不在本例子范围内。

首先咱们建立一个用于添加加密Handler的handler初始化类,看下面代码:

package netty.in.action;  
import io.netty.channel.Channel;  
import io.netty.channel.group.ChannelGroup;  
import io.netty.handler.ssl.SslHandler;  
import javax.net.ssl.SSLContext;  
import javax.net.ssl.SSLEngine;  
public class SecureChatServerIntializer extends ChatServerInitializer {  
    private final SSLContext context;  
    public SecureChatServerIntializer(ChannelGroup group,SSLContext context) {  
        super(group);  
        this.context = context;  
    }  
  
    @Override  
    protected void initChannel(Channel ch) throws Exception {  
        super.initChannel(ch);  
        SSLEngine engine = context.createSSLEngine();  
        engine.setUseClientMode(false);  
        ch.pipeline().addFirst(new SslHandler(engine));  
    }  
}

最后咱们建立一个用于引导配置的类,看下面代码:

package netty.in.action;  
import io.netty.channel.Channel;  
import io.netty.channel.ChannelFuture;  
import io.netty.channel.ChannelInitializer;  
import io.netty.channel.group.ChannelGroup;  
import java.net.InetSocketAddress;  
import javax.net.ssl.SSLContext;  
/** 
 * 访问地址:https://localhost:4096 
 */  
public class SecureChatServer extends ChatServer {  
    private final SSLContext context;  
  
    public SecureChatServer(SSLContext context) {  
        this.context = context;  
    }  
  
    @Override  
    protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {  
        return new SecureChatServerIntializer(group, context);  
    }  
  
    /** 
     * 获取SSLContext须要相关的keystore文件,这里没有 关于HTTPS能够查阅相关资料,这里只介绍在Netty中如何使用 
     *  
     * @return 
     */  
    private static SSLContext getSslContext() {  
        return null;  
    }  
  
    public static void main(String[] args) {  
        SSLContext context = getSslContext();  
        final SecureChatServer server = new SecureChatServer(context);  
        ChannelFuture future = server.start(new InetSocketAddress(4096));  
        Runtime.getRuntime().addShutdownHook(new Thread() {  
            @Override  
            public void run() {  
                server.destroy();  
            }  
        });  
        future.channel().closeFuture().syncUninterruptibly();  
    }  
}
相关文章
相关标签/搜索