前面介绍了基本的网络模型以及IO与NIO,那么有了NIO来开发非阻塞服务器,你们就知足了吗?有了技术支持,就回去追求效率,所以就产生了不少NIO的框架对NIO进行封装——这就是大名鼎鼎的Netty。html
前几篇的内容,能够参考:java
这个问题几乎能够当作废话,框架确定要比一些原生的API封装了更多地功能,重复造轮子在追求效率的状况并非明智之举。那么先来讲说NIO有什么缺点吧:git
那么有了这些问题,就急需一些大牛们开发通用框架来方便劳苦大众了。最致命的NIO框架就是MINA和Netty了,这里不得不说个小插曲:github
先来看看MINA的主要贡献者:web
再来看看NETYY的主要贡献者:编程
总结起来,有这么几点:bootstrap
所以,若是让你选择你应该知道选择谁了吧。另外,MINA对底层系统要求功底更深,且国内Netty的氛围更好,有李林峰等人在大力宣传(《Netty权威指南》的做者)。promise
讲了一大堆的废话以后,总结来讲就是——Netty有前途,学它准没错。安全
按照定义来讲,Netty是一个异步、事件驱动的用来作高性能、高可靠性的网络应用框架。主要的优势有:服务器
主要支持的功能或者特性有:
总之提供了不少现成的功能能够直接供开发者使用。
基于Netty的服务器编程能够看作是Reactor模型:
即包含一个接收链接的线程池(也有多是单个线程,boss线程池)以及一个处理链接的线程池(worker线程池)。boss负责接收链接,并进行IO监听;worker负责后续的处理。为了便于理解Netty,直接看看代码:
package cn.xingoo.book.netty.chap04; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; import java.nio.charset.Charset; public class NettyNioServer { public void serve(int port) throws InterruptedException { final ByteBuf buffer = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\n", Charset.forName("UTF-8"))); // 第一步,建立线程池 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ // 第二步,建立启动类 ServerBootstrap b = new ServerBootstrap(); // 第三步,配置各组件 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(buffer.duplicate()).addListener(ChannelFutureListener.CLOSE); } }); } }); // 第四步,开启监听 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully().sync(); workerGroup.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { NettyNioServer server = new NettyNioServer(); server.serve(5555); } }
代码很是少,并且想要换成阻塞IO,只须要替换Channel里面的工厂类便可:
public class NettyOioServer { public void serve(int port) throws InterruptedException { final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\b", Charset.forName("UTF-8"))); EventLoopGroup bossGroup = new OioEventLoopGroup(1); EventLoopGroup workerGroup = new OioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup)//配置boss和worker .channel(OioServerSocketChannel.class) // 使用阻塞的SocketChannel ....
归纳来讲,在Netty中包含下面几个主要的组件:
了解上面的基本组件后,就看一下几个重要的内容。
在Unix操做系统中,系统底层能够基于mmap实现内核空间和用户空间的内存映射。可是在Netty中并非这个意思,它主要来自于下面几个功能:
另外,Netty本身封装实现了ByteBuf,相比于Nio原生的ByteBuffer,API上更易用了;同时支持容量的动态扩容;另外还支持Buffer的池化,高效复用Buffer。
public class ByteBufTest { public static void main(String[] args) { //建立bytebuf ByteBuf buf = Unpooled.copiedBuffer("hello".getBytes()); System.out.println(buf); // 读取一个字节 buf.readByte(); System.out.println(buf); // 读取一个字节 buf.readByte(); System.out.println(buf); // 丢弃无用数据 buf.discardReadBytes(); System.out.println(buf); // 清空 buf.clear(); System.out.println(buf); // 写入 buf.writeBytes("123".getBytes()); System.out.println(buf); buf.markReaderIndex(); System.out.println("mark:"+buf); buf.readByte(); buf.readByte(); System.out.println("read:"+buf); buf.resetReaderIndex(); System.out.println("reset:"+buf); } }
输出为:
UnpooledHeapByteBuf(ridx: 0, widx: 5, cap: 5/5) UnpooledHeapByteBuf(ridx: 1, widx: 5, cap: 5/5) UnpooledHeapByteBuf(ridx: 2, widx: 5, cap: 5/5) UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5) UnpooledHeapByteBuf(ridx: 0, widx: 0, cap: 5/5) UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5) mark:UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5) read:UnpooledHeapByteBuf(ridx: 2, widx: 3, cap: 5/5) reset:UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5)
有兴趣的能够看一下上一篇分享的ByteBuffer,对比一下,就能发如今Netty中经过独立的读写索引维护,避免读写模式的切换,更加方便了。
前面介绍了Handler包含了Inbound和Outbound两种,他们统一放在一个双向链表中:
当接收消息的时候,会从链表的表头开始遍历,若是是inbound就调用对应的方法;若是发送消息则从链表的尾巴开始遍历。那么上面途中的例子,接收消息就会输出:
InboundA --> InboundB --> InboundC
输出消息,则会输出:
OutboundC --> OutboundB --> OutboundA
这里有段代码,能够直接复制下来,试试看:
package cn.xingoo.book.netty.pipeline; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.charset.Charset; /** * 注意: * * 1 ChannelOutboundHandler要在最后一个Inbound以前 * */ public class NettyNioServerHandlerTest { final static ByteBuf buffer = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\n", Charset.forName("UTF-8"))); public void serve(int port) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("1",new InboundA()); pipeline.addLast("2",new OutboundA()); pipeline.addLast("3",new InboundB()); pipeline.addLast("4",new OutboundB()); pipeline.addLast("5",new OutboundC()); pipeline.addLast("6",new InboundC()); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully().sync(); workerGroup.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { NettyNioServerHandlerTest server = new NettyNioServerHandlerTest(); server.serve(5555); } private static class InboundA extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; System.out.println("InboundA read"+buf.toString(Charset.forName("UTF-8"))); super.channelRead(ctx, msg); } } private static class InboundB extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; System.out.println("InboundB read"+buf.toString(Charset.forName("UTF-8"))); super.channelRead(ctx, msg); // 从pipeline的尾巴开始找outbound ctx.channel().writeAndFlush(buffer); } } private static class InboundC extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; System.out.println("InboundC read"+buf.toString(Charset.forName("UTF-8"))); super.channelRead(ctx, msg); // 这样会从当前的handler向前找outbound //ctx.writeAndFlush(buffer); } } private static class OutboundA extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutboundA write"); super.write(ctx, msg, promise); } } private static class OutboundB extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutboundB write"); super.write(ctx, msg, promise); } } private static class OutboundC extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutboundC write"); super.write(ctx, msg, promise); } } }
最后有一个TCP粘包的例子,有兴趣的也能够本身试一下,代码就不贴上来了,能够参考最后面的Github链接。