使用JDK NIO类库时开发NIO的异步服务端时,须要用到:多路复用器Selector,ServerSocketChannel,SocketChanel,ByteBuffer,SelectionKey等。若是用源生的JAVA NIO搭建服务端,无疑是十分复杂的,这不只拖慢了项目的进度,并且开发出来的项目可能还不稳定。那么Netty是如何下降其复杂度的呢?Netty开发简单异步服务端不超过9步,下面分步骤来说解用Netty来开发异步服务端:java
第一步:建立EventLoopGroupbootstrap
//第一步 建立EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();
讲解:EventLoopGroup 是EventLoop的数组,EventLoop的职责是处理全部注册到本线程多路复用器Selector上的Channel,Selector的轮询操做由EventLoop线程的run方法驱动。数组
第二步:建立ServerBootstrap安全
ServerBootstrap b = new ServerBootstrap();
讲解:ServerBootstrap是Netty服务端启动的辅助类,从代码能够看出,咱们在建立ServerBootstrap时,用的是一个无参构造器建立的,你也会惊讶的发现,ServerBootstrap只有一个无参构造器,由于这个辅助类须要的参数太多了,并且有些参数是用户能够选择性添加的,因此ServerBootstrap引入了Builder模式 让用户动态添加参数:服务器
ServerBootstrap b = new ServerBootstrap(); //动态添加参数,支持链式添加 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler()) .childHandler(new HttpServerHandler());
第三步:设置并绑定服务端Channel(开始绑定ServerSocketChannel)网络
b.channel(NioServerSocketChannel.class)
讲解:这里只须要传入对应的.class类便可,ServerBootstrap会经过工厂类,利用反射帮咱们建立对应的Class对象。异步
第四步:建立并初始化处理网络时间的职责链 ChannelPipelinesocket
ch.pipeline().addLast(new LoggingHandler()); //请求 http解码 ch.pipeline().addLast("http-decoder",new HttpRequestDecoder()); //将多个消息转换为单一的FullHttpRequest ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536)); //应答http编码 ch.pipeline().addLast("http-encoder",new HttpResponseEncoder()); //链接异步发送请求,防止内存溢出 ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler()); ch.pipeline().addLast("httpServerHandlerAdapter",new HttpServerHandlerAdapter());
讲解:ChannelPipeline并非NIO服务端必须的,它本质就是一个处理网络事件的职责链,负责管理和执行ChannelHandler,咱们能够在其中添加服务端接收请求的解码和发送请求的编码,以及粘包\拆包的处理等。ide
第五步:添加并设置ChannelHandleroop
ChannelHandler是Netty提供给咱们定制和扩展的关键接口,利用ChannelHandler咱们能够完成大多数功能的定制,例如消息编解码,心跳安全认证等,下面是我定义的Handler:
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler()) //加入自定义的handler .childHandler(new HttpServerHandler()); //我定义的handler public class HttpServerHandler extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); //请求 http解码 ch.pipeline().addLast("http-decoder",new HttpRequestDecoder()); //将多个消息转换为单一的FullHttpRequest ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536)); //应答http编码 ch.pipeline().addLast("http-encoder",new HttpResponseEncoder()); //链接异步发送请求,防止内存溢出 ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler()); ch.pipeline().addLast("httpServerHandlerAdapter",new HttpServerHandlerAdapter()); } //第二层对消息处理 public class HttpServerHandlerAdapter extends SimpleChannelInboundHandler<FullHttpRequest>{ @Override public boolean acceptInboundMessage(Object msg) throws Exception { System.out.println(msg); return super.acceptInboundMessage(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { System.out.println(msg); } }
第六步:绑定并启动监听端口:
ChannelFuture f = b.bind(8080).sync(); System.out.println("Http服务器启动完成,监听端口为:"+port); f.channel().closeFuture().sync();
讲解:在监听端口以前,Netty会作一系列的初始化和检测工做,而后启动监听端口,同时,会将ServerSocketChannel注册到多路复用器Selector上,监听客户端的链接。
第七步:Selector轮询:
讲解:Selector的轮询是由Reactor线程NioEventLoop负责调度的,而后将准备就绪的Channel放到一个集合中;
第八步:Reactor线程NioEventLoop将准备就绪的Channel去执行咱们定义好的ChannelPipeline方法,并执行系统定义的ChannelHandler;
第九步:最终,执行咱们根据具体业务逻辑定义好的的ChannelHandler。
完整源码以下:
HttpServer:
package kaoqin.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LoggingHandler; public class HttpServer { private void run(int port) throws InterruptedException{ //第一步 建立EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler()) //加入自定义的handler .childHandler(new HttpServerHandler()); //.option(ChannelOption.AUTO_READ,true); ChannelFuture f = b.bind(port).sync(); System.out.println("Http服务器启动完成,监听端口为:"+port); f.channel().closeFuture().sync(); }finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new HttpServer().run(9999); } }
HttpServerHandler:
package kaoqin.server; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; public class HttpServerHandler extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); //请求 http解码 ch.pipeline().addLast("http-decoder",new HttpRequestDecoder()); //将多个消息转换为单一的FullHttpRequest ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536)); //应答http编码 ch.pipeline().addLast("http-encoder",new HttpResponseEncoder()); //链接异步发送请求,防止内存溢出 ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler()); ch.pipeline().addLast("httpServerHandlerAdapter",new HttpServerHandlerAdapter()); } }
HttpServerHandlerAdapter:
package kaoqin.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.FullHttpRequest; public class HttpServerHandlerAdapter extends SimpleChannelInboundHandler<FullHttpRequest>{ @Override public boolean acceptInboundMessage(Object msg) throws Exception { System.out.println(msg); return super.acceptInboundMessage(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { System.out.println(msg); } }