若是你有跟进Web技术的最新进展,你极可能就遇到过“实时Web”这个短语,这里并非指所谓的硬实时服务质量(QoS),硬实时服务质量是保证计算结果将在指定的时间间隔内被递交。仅HTTP的请求/响应模式设计就使得其很难被支持。html
实时Web利用技术和实践,使用户在信息的做者发布信息以后就可以当即收到信息,而不须要他们或者他们的软件周期性地检查信息源以及获取更新。bootstrap
一、WebSocket简介浏览器
WebSocket协议是彻底从新设计的协议,旨在为Web上的双向数据传输问题提供一个切实可行的解决方案,使得客户端和服务器之间能够在任意时刻传输信息,所以,这也就要求他们异步地处理消息回执(做为HTML5客户端API的一部分,大部分最新的浏览器都已经支持了WebSocket)服务器
Netty对于WebSocket的支持包含了全部正在使用中的主要实现,所以在你的下一个应用程序中采用它将是简单直接的。和往常使用Netty同样,你能够彻底使用该协议,而无需关心它内部的实现细节,咱们将经过建立一个基于WbeSocket的实时聊天应用程序来演示。dom
二、WebSocket示例应用程序异步
为了让示例应用程序展现它的实时功能,咱们将经过使用WebSocket协议来实现一个基于浏览器的聊天应用程序,就像你可能在FaceBook的文本消息功能中见到过的那样。咱们将经过使用多个用户之间能够同时进行相互通讯,从而更进一步。ide
下图说明应用逻辑:oop
——客户端发送一个消息性能
——该消息将被广播到全部其余连接的客户端this
这正如你可能会预期的一个聊天室应当的工做方式:全部的人均可以和其余的人聊天。在示例中,咱们将只实现服务器端,而客户端则是经过Web页面访问该聊天室的浏览器。正如同你将在接下来的几页中所看到的,WebSocket简化了编写这样的服务器的过程。
三、添加WebSocket支持
在从标准的HTTP或者HTTPS协议切换到WebSocket时,将会使用一种称为升级握手的机制。所以,使用WebSocket的应用程序将始终以HTTP/S做为开始,而后再执行升级。这个升级动做发生的确切时刻特定于应用程序;他可能会发生在启动时,也可能会发生在请求了某个特定的URL以后。
咱们的应用程序将采用下面的约定:若是被请求的URL以/ws结尾,那么咱们将会把该协议升级为WebSocket;不然,服务器将使用基本的HTTP/S。在链接已经升级完成以后,全部数据都将会使用WebSocket进行传输。下图说明了该服务器逻辑,一如在Netty中同样,它由一组ChannelHandler实现。
四、处理HTTP请求
首先,咱们将实现该处理HTTP请求的组件。这个组件将提供用于访问聊天室并显示由链接的客户端发送的消息的网页。以下代码给出了这个HttpRequestHandler对应的代码,其扩展了SimpleChannelInboundHandler以处理FullHttpRequest消息。须要注意是,channelRead0()方法的实现是如何转发任何目标URI为/ws的请求的。
//扩展SimpleChannelInboundHandler以处理FullHttpReuqest消息public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest>{ private final String wsUri; private static final File INDEX; static { URL location = HttpRequestHandler.class.getProtectionDomain() .getCodeSource().getLocation(); try { String path = location.toURI() + "index.html"; path = !path.contains("file:") ? path : path.substring(5); INDEX = new File(path); }catch (URISyntaxException e){ throw new IllegalStateException("Unable to locate index.html",e); } } public HttpRequestHandler(String wsUri){ this.wsUri = wsUri; } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { //若是请求了WebSocket协议升级,则增长引用技术,并将它传递给下一个ChannelInboundHandler if (wsUri.equalsIgnoreCase(request.getUri())){ ctx.fireChannelRead(request.retain()); } else { //处理100Continue请求以符合HTTP1.1规范 if (HttpHeaders.is100ContinueExpected(request)){ send100Continue(ctx); } //读取“index.html” RandomAccessFile file = new RandomAccessFile(INDEX,"r"); HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(),HttpResponseStatus.OK); response.headers().set(HttpHeaders.Names.CONTENT_TYPE,"text/html;charset=UTF-8"); boolean keepAlive = HttpHeaders.isKeepAlive(request); //若是请求了keep-alive,则添加所须要的HTTP头信息 if (keepAlive){ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH,file.length()); response.headers().set(HttpHeaders.Names.CONNECTION,HttpHeaders.Values.KEEP_ALIVE); } //将HttpResponse写到客户端 ctx.write(response); //将index.html写到客户端 if (ctx.pipeline().get(SslHandler.class) == null){ ctx.write(new DefaultFileRegion(file.getChannel(),0,file.length())); } else { ctx.write(new ChunkedNioFile(file.getChannel())); } //写LastHttpContent并冲刷至客户端 ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); if (!keepAlive){ //若是没有请求keep-alive,则在写操做完成后关闭Channel future.addListener(ChannelFutureListener.CLOSE); } } } private static void send100Continue(ChannelHandlerContext ctx){ FullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
若是该HTTP请求指向了地址为/ws的URI,那么HttpRequestHandler将调用FullHttpRequest对象上的retain()方法。并经过调用fireChannelRead(msg)方法将它转发给下一个ChannelInboundHandler。之因此须要调用retain()方法,是由于调用channelRead()方法完成以后,它将调用FullHttpRequest对象上的release()方法以释放它的资源。
若是客户端发送了HTTP1.1的HTTP头信息Expect:100-continue,那么HttpRequestHandler将会发送一个100Continue响应。在该HTTP头信息被设置以后,HttpRequestHandler将会写回一个HttpResponse给客户端。这不是一个FullHttpResponse,由于它只是响应的第一部分。此外,这里也不会调用writeAndFlush()方法,在结束的时候才会调用。
若是不须要加密和压缩,那么能够经过将index.html的内容存储到DefaultFileRegion中来达到最佳效率。这将会利用零拷贝特性来进行内容的传输。为此,你能够检查一下,是否有SslHandler存在于在ChannelPipeline中。不然,你可使用ChunkedNioFile。
HttpRequestHandler将写一个LastHttpContent来标记响应的结束。若是没有请求keep-alive,那么HttpRequestHandler将会添加一个ChannelFutureListener到最后一次写出动做的ChannelFuture,并关闭该链接。在这里,你将调用writeAndFlush()方法以冲刷全部以前写入的消息。
这部分代码表明了聊天服务器的第一个部分,它管理纯粹的HTTP请求和响应。接下来,咱们将处理传输实际聊天消息的WebSocket帧。
WEBSOCKET帧:WebSocket以帧的方式传输数据,每一帧表明消息的一部分。一个完整的消息可能会包含许多帧。
五、处理WebSocket帧
有IETF发布的WebSocket RFC,定义了6种帧,Netty为它们都提供了一个POJO实现。
BinaryWebSocketFrame——包含了二进制数据
TextWebSocketFrame——包含了文本数据
ContinuationWebSocketFrame——包含属于上一个BinaryWebSocketFrame或TextWebSocketFrame的文本数据或者二进制数据
CloseWebSocketFrame——表示一个CLOSE请求,包含一个关闭的状态码和关闭的缘由
PingWebSocketFrame——请求传输一个PongWebSocketFrame
PongWebSocketFrame——做为一个对于PingWebSocketFrame的响应被发送
TextWebSocketFrame是咱们惟一真正须要处理的帧类型。为了符合WebSocket RFC,Netty提供了WebSocketServerProtocolHandler来处理其余类型的帧。
如下代码展现了咱们用于处理TextWebSocketFrame的ChannelInboundHandler,其还将在它的ChannelGroup中跟踪全部活动的WebSocket链接。
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ private final ChannelGroup group; public TextWebSocketFrameHandler(ChannelGroup group){ this.group = group; } //重写userEventTriggered方法以处理自定义事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE){ //若是该事件表示握手成功,则从该ChannelPipeline中移除HttpRequestHandler,由于将不会接收到任何HTTP消息了 ctx.pipeline().remove(HttpRequestHandler.class); //通知全部已经链接的WebSocket客户端新的客户端链接上了 group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined")); //将新的WebSocket Channel添加到ChannelGroup中,以便它能够接收到全部的消息 group.add(ctx.channel()); } else { super.userEventTriggered(ctx,evt); } } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg) throws Exception { //增长消息的引用计数,并将它写到ChannelGroup中全部已经链接的客户端 group.writeAndFlush(msg.retain()); } }
TextWebSocketFrameHandler只有一组很是少许的责任。当和新客户端的WebSocket握手成功完成以后,它将经过把通知消息写到ChannelGroup中的全部Channel来通知全部已经链接的客户端,而后它将把这个新Channel加入到该ChannelGroup中。
若是接收到了TextWebSocketFrame消息,TextWebSocketFrameHandler将调用TextWebSocketFrame消息上的retain()方法,并使用writeAndFlush()方法来将它传输给ChannelGroup,以便全部已经链接的WebSocket Channel都将接收到它。
和以前同样,对于retain()方法的调用时必需的。由于当ChannelRead0()方法返回时,TextWebSocketFrame的引用技术将会被减小。因为全部的操做都是异步的,所以,writeAndFlush()方法可能会在channelRead0()方法返回以后完成,并且它绝对不能访问一个已经失效的引用。
由于Netty在内部处理了大部分剩下的功能,全部如今剩下惟一须要作的事情就是为每一个新建立的Channel初始化其ChannelPipeline。为此,咱们须要一个ChannelInitializer。
六、初始化ChannelPipeline
如下代码展现了生成的ChatServerInitializer。
public class ChatServerInitializer extends ChannelInitializer<Channel>{ private final ChannelGroup group; public ChatServerInitializer(ChannelGroup group) { this.group = group; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(64 * 1024)); pipeline.addLast(new HttpRequestHandler("/ws")); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast(new TextWebSocketFrameHandler(group)); }
对于initChannel()方法调用,经过安装全部必须的ChannelHandler来设置该新注册的Channel的ChannelPipeline。
Netty的WebSocketServerProtocolHandler处理了全部委托管理的WebSocket帧类型以及升级握手自己。若是握手成功,那么所需的ChannelHandler将会被添加到ChannelPipeline中,而那些再也不须要的ChannelHandler则将会被移除。
WebSocket协议升级以前的ChannelPipeline的状态以下图,这表明了刚刚被ChatServerInitializer初始化以后的ChannelPipeline。 图片当WebSocket协议升级完成以后,WebSocketServerProtocolHandler将会把HttpRequestDecoder替换为WebSocketFrameDecoder,把HttpResponseEncoder替换为WebSocketFrameEncoder。为了性能最大化,它将移除任何再也不被WebSocket链接所须要的ChannelHandler。这也包括上图所示的HttpObjectAggregator和HttpRequestHandler。
下图展现了这些操做完成以后的ChannelPipeline。须要注意的是,Netty目前支持4个版本的WebSocket协议,他们每一个都具备本身的实现类。Netty将会根据客户端(这里指浏览器)所支持的版本,自动地选择正确版本的WebSocketFrameDecoder和WebSocketFrameEncoder。 图片
七、引导
这幅拼图最后的一部分是引导该服务器,并安装ChatSererInitializer的代码。这将有ChatServer类处理,以下代码所示。
public class ChatServer { //建立DefaultChannelGroup,其将保存全部已经链接的WebSocket Channel private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); private final EventLoopGroup group = new NioEventLoopGroup(); private Channel channel; public ChannelFuture start(InetSocketAddress address){ //引导服务器 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) .channel(NioServerSocketChannel.class) .childHandler(createInitializer(channelGroup)); ChannelFuture future = bootstrap.bind(address); future.syncUninterruptibly(); channel = future.channel(); return future; } //建立ChatServerInitializer protected ChannelInitializer<Channel> createInitializer(ChannelGroup group){ return new ChatServerInitializer(group); } //处理服务器关闭,并释放全部的资源 public void destroy(){ if (channel != null){ channel.close(); } channelGroup.close(); group.shutdownGracefully(); } public static void main(String[] args) throws Exception{ if (args.length != 1){ System.out.println("Please give port as argument"); System.exit(1); } int port = Integer.parseInt(args[0]); final ChatServer endpoint = new ChatServer(); ChannelFuture future = endpoint.start( new InetSocketAddress(port)); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { endpoint.destroy(); } }); future.channel().closeFuture().syncUninterruptibly(); } }
八、如何进行加密
在真实世界的场景中,你将很快就会被要求向该服务器添加加密。使用Netty,这不过是将一个SslHandler添加到ChannelPipeline中,并配置它的问题。如下代码展现了如何经过扩展咱们的ChatServerInitializer来建立一个SecureChatServerInitializer以完成需求。
//扩展ChatServerInitializer以添加加密public class SecureChatServerInitializer extends ChatServerInitializer{ private final SslContext context; public SecureChatServerInitializer(ChannelGroup group,SslContext context) { super(group); this.context = context; } @Override protected void initChannel(Channel channel) throws Exception { //调用父类的initChannel()方法 super.initChannel(channel); SSLEngine engine = context.newEngine(channel.alloc()); engine.setUseClientMode(false); //将SslHandler添加到ChannelPipeline中 channel.pipeline().addFirst(new SslHandler(engine)); } }
最后一步是调整ChatServer以使用SecureChatServerInitializer,以便在ChannelPipeline中安装SslHandler。
public class SecureChatServer extends ChatServer{ private final SslContext context; public SecureChatServer(SslContext context) { this.context = context; } @Override protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) { //返回以前建立的SecureChatServerInitializer以启用加密 return new SecureChatServerInitializer(group,context); } public static void main(String[] args) throws Exception{ if (args.length != 1){ System.out.println("Please give port as argument"); System.exit(1); } int port = Integer.parseInt(args[0]); SelfSignedCertificate cert = new SelfSignedCertificate(); SslContext context = SslContext.newServerContext(cert.certificate(),cert.privateKey()); final SecureChatServer endpoint = new SecureChatServer(context); ChannelFuture future = endpoint.start( new InetSocketAddress(port)); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { endpoint.destroy(); } }); future.channel().closeFuture().syncUninterruptibly(); } }
这就是为全部的通讯启用SSL/TLS加密须要作的所有。