Netty是Java领域有名的开源网络库,特色是高性能和高扩展性,所以不少流行的框架都是基于它来构建的,好比咱们熟知的Dubbo、Rocketmq、Hadoop等,针对高性能RPC,通常都是基于Netty来构建,好比sock-bolt。总之一句话,Java小伙伴们须要且有必要学会使用Netty并理解其实现原理。java
netty旨在为可维护的高性能、高可扩展性协议服务器和客户端的快速开发提供异步事件驱动的网络应用程序框架和工具。换句话说,Netty是一个NIO客户端服务器框架,能够快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化并简化了TCP和UDP套接字服务器开发等网络编程。git
学习netty原理细节,看netty源码是必不可少的,那首先来看下如何编译源码:程序员
- 从github下载netty 4.x源码
- 若是缺乏XxxObjectHashMap类,这些类是在编译时自动生成的,能够执行mvn clean install或者cd common && mvn clean install命令便可。
- 打开idea,开启源码阅读之旅 :)
除了看源码,能够结合一些书籍来看,学习效果更好。关于Netty的书籍,笔者这里推荐一本 李林锋 写的《Netty权威指南》,这本书对于Netty的基础概念和NIO部分讲解的仍是不错的,不过有点地方感受有点贴代码凑字数嫌疑,总体来讲还算不错。github
Netty是一个事件驱动的高性能Java网络库,是一个隐藏了背后复杂性而提供一个易于使用的API的客户端/服务端框架。Netty以其高性能和可扩展性,使开发者专一于真正感兴趣的地方。它的一个主要目标就是促进“关注点分离”:使业务逻辑从网络基础设施应用程序中分离。面试
不只仅是Netty框架,其余框架的设计目的也大都是为了使业务程序和底层技术解耦,使程序员更加专一于业务逻辑实现,提升开发质量和效率。Netty为何性能如此之高,主要是其内部的Reactor模型机制。编程
Netty 是一个非阻塞、事件驱动的网络框架。Netty 其实是使用 Threads( 多线程) 处理 I/O事件的,对于熟悉多线程编程的读者可能会须要关注同步代码。这样的方式很差,由于同步会影响程序的性能,Netty 的设计保证程序处理事件不会有同步。由于某个Channel事件是被添加到一个EventLoop中的,之后该Channel事件都是由该EventLoop来处理的,而EventLoop是一个线程来处理的,也就是说Netty不须要同步IO操做,EventLoop与EventLoopGroup的关系能够理解为线程与线程池的关系同样。bootstrap
ByteBuf是字节数据的容器,全部的网络通讯都是基于底层的字节流传输,ByteBuf 是一个很好的通过优化的数据容器,咱们能够将字节数据有效的添加到 ByteBuf 中或从 ByteBuf 中获取数据。为了便于操做,ByteBuf 提供了两个索引:一个用于读,一个用于写。咱们能够按顺序读取数据,也能够经过调整读取数据的索引或者直接将读取位置索引做为参数传递给get方法来重复读取数据。数组
ByteBuf使用模式promise
堆缓冲区ByteBuf将数据存储在 JVM 的堆空间,这是经过将数据存储在数组的实现。堆缓冲区能够快速分配,当不使用时也能够快速释放。它还提供了直接访问数组的方法,经过 ByteBuf.array() 来获取 byte[]数据。服务器
堆缓冲区ByteBuf使用示例:
ByteBuf heapBuf = ...; if (heapBuf.hasArray()) { byte[] array = heapBuf.array(); int offset = heapBuf.arrayOffset() + heapBuf.readerIndex(); int length = heapBuf.readableBytes(); handleArray(array, offset, length); }
直接缓冲区ByteBuf,在 JDK1.4 中被引入 NIO 的ByteBuffer 类容许 JVM 经过本地方法调用分配内存,其目的是经过免去中间交换的内存拷贝, 提高IO处理速度; 直接缓冲区的内容能够驻留在垃圾回收扫描的堆区之外。DirectBuffer 在-XX:MaxDirectMemorySize=xx
M大小限制下, 使用 Heap 以外的内存, GC对此”无能为力”,也就意味着规避了在高负载下频繁的GC过程对应用线程的中断影响。
了解了Netty基础概念以后,一块儿看下Netty的使用示例,下面以TCP server、TCP client、http server为例,因为示例代码不难,因此再也不赘述,直接上代码。
public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap boot = new ServerBootstrap(); boot.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(8080) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoHandler()); } }); // start ChannelFuture future = boot.bind().sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // shutdown bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public class EchoHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; System.out.println(in.toString(CharsetUtil.UTF_8)); ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect("localhost", 8081).sync(); f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } public class EchoClientHandler extends ChannelInboundHandlerAdapter { private final ByteBuf message; public EchoClientHandler() { message = Unpooled.buffer(256); message.writeBytes("hello netty".getBytes(CharsetUtil.UTF_8)); } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(message); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(((ByteBuf) msg).toString(CharsetUtil.UTF_8)); ctx.write(msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
netty client端在何时将channel注册到selector上的呢?是在建立channel以后,就注册到selector的,相关代码在initAndRegister方法中:
final ChannelFuture initAndRegister() { Channel channel = null; try { // 建立(netty自定义)Channel实例,并初始化 // channel为 NioServerSocketChannel 实例,NioServerSocketChannel的父类AbstractNioChannel保存有nio的ServerSocketChannel channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } // 向Selector注册channel ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
initAndRegister以后会执行connect动做,注意,真正的channel.connect动做是由NioEventLoop线程来完成的,当链接三次握手完成以后,会触发该channel的ACCEPT事件,也就是NIOEventLoop中处理事件的流程。
public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap boot = new ServerBootstrap(); boot.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(8080) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast("decoder", new HttpRequestDecoder()) .addLast("encoder", new HttpResponseEncoder()) .addLast("aggregator", new HttpObjectAggregator(512 * 1024)) .addLast("handler", new HttpHandler()); } }); // start ChannelFuture future = boot.bind().sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // shutdown bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer("hello netty".getBytes())); HttpHeaders heads = response.headers(); heads.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN + "; charset=UTF-8"); heads.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); // 3 heads.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); ctx.writeAndFlush(response); } }
欢迎小伙伴关注【TopCoder】阅读更多精彩好文。