在不少开源项目中都有netty的影子,好比阿里系的rocketmq和dubbo,大数据领elasticsearch,hbase,hadoop都是使用了netty做为底层传输的。到底netty是个啥东东呢,其实 Netty是一个NIO client-server(客户端服务器)框架,使用Netty能够快速开发网络应用,例如服务器和客户端协议。Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都是数一数二的,它已经获得成百上千的商用项目验证,例如Hadoop的RPC框架(Remote Procedure Call Protocol ,远程过程调用协议,它是一种经过网络从远程计算机程序上请求服务,而不须要了解底层网络技术的协议)avro使用Netty做为底层通讯框架。不少其它业界主流的RPC框架,也使用Netty来构建高性能的异步通讯能力。经过对Netty的分析,咱们将它的优势总结以下:java
说到netty不得不说IO,如今通讯方式无非就是三种通讯模式,BIO,NIO,AIO。git
BIO: 同步并阻塞,服务器实现模式为一个链接一个线程,即客户端有链接请求时服务器端就须要启动一个线程进行处理(一客户端一线程)。该模型最大的问题就是缺少弹性伸缩能力,当客户端并发访问量增长后,服务端的线程数与客户端并发访问数呈1:1的关系,系统性能将急剧降低,随着并发访问量的继续增长,系统会发生线程堆栈溢出、建立新线程失败等问题,并最终致使宕机或僵死。github
NIO:同步非阻塞,服务器实现模式为一个请求一个线程,客户端发送的链接请求都会注册到多路复用器上,多路复用器轮询到链接有I/O请求时才启动一个线程进行处理。 对于NIO,有两点须要强调的: (1)关于概念有两种理解,New I/O(相对于以前的I/O库是新增的)和Non-block I/O(非阻塞的)。因为NIO的目标就是让java支持非阻塞I/O,全部更多人喜欢用Non-block I/O。 (2)不少人喜欢将NIO称为异步非阻塞I/O,可是,若是按照严格的NUIX网络编程模型和JDK的实现进行区分,实际上它只是非阻塞I/O,不能称之为异步非阻塞I/O。但因为NIO库支持非阻塞读和写,相对于以前的同步阻塞读和写,它是异步的,所以不少人习惯称NIO为异步非阻塞I/O。编程
AIO:JDK1.7升级了NIO库,升级后的NIO库被称为NIO2.0,正式引入了异步通道的概念。NIO2.0的异步套接字通道是真正的异步非阻塞I/O,此即AIO。其服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理。bootstrap
NIO 就是New IO , NIO和IO有相同的做用和目的,但实现方式不一样,NIO主要用到的是块,因此NIO的效率要比IO高不少。NIO主要是经过buffer和channel传递数据,而原始的io 主要是经过字节流输入输出数据。浏览器
Netty就是采用了NIO模型,说了这么多,仍是先来个栗子看看吧:安全
首先构建服务端,SslContext主要是来构建安全套接字,不过这个不是重点, EventLoopGroup bossGroup = new NioEventLoopGroup(1)开始构建boss线程组, ServerBootstrap负责初始化netty服务器,而且开始监听端口的socket请求。ServerBootstrap用一个ServerSocketChannelFactory 来实例化。ServerSocketChannelFactory 有两种选择,一种是NioServerSocketChannelFactory,一种是OioServerSocketChannelFactory。 前者使用NIO,后则使用普通的阻塞式IO。它们都须要两个线程池实例做为参数来初始化,一个是boss线程池,一个是worker线程池。服务器
public final class EchoServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure SSL.构建安全套接字 final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { [@Override](https://my.oschina.net/u/1162528) public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(new EchoServerHandler()); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
ServerBootstrap.bind(int)负责绑定端口,当这个方法执行后,ServerBootstrap就能够接受指定端口上的socket链接了。一个ServerBootstrap能够绑定多个端口。也就是说ServerBootstrap监听的一个端口对应一个boss线程,它们一一对应,在boss线程接收了socket链接请求后,会产生一个channel(一个打开的socket对应一个打开的channel),并把这个channel交给ServerBootstrap初始化时指定的ServerSocketChannelFactory来处理,boss线程则继续处理socket的请求。ServerSocketChannelFactory则会从worker线程池中找出一个worker线程来继续处理这个请求。若是是OioServerSocketChannelFactory的话,那个这个channel上全部的socket消息,从开始到channel(socket)关闭,都只由这个特定的worker来处理,也就是说一个打开的socket对应一个指定的worker线程,这个worker线程在socket没有关闭的状况下,也只能为这个socket处理消息,没法服务其余socket。网络
若是是NioServerSocketChannelFactory的话则否则,每一个worker能够服务不一样的socket或者说channel,worker线程和channel再也不有一一对应的关系。 显然,NioServerSocketChannelFactory只须要少许活动的worker线程及能很好的处理众多的channel,而OioServerSocketChannelFactory则须要与打开channel等量的worker线程来服务。并发
线 程是一种资源,因此当netty服务器须要处理长链接的时候,最好选择NioServerSocketChannelFactory,这样能够避免建立大 量的worker线程。在用做http服务器的时候,也最好选择NioServerSocketChannelFactory,由于现代浏览器都会使用 http keepalive功能(可让浏览器的不一样http请求共享一个信道),这也是一种长链接。
worker线程池中的线程生命周期是怎么样的?当某个channel有消息到达或者有消息须要写入socket的时候,worker线程就会从线程池中取出一个。在worker线程中,消息会通过设定好 的ChannelPipeline处理。ChannelPipeline就是一堆有顺序的filter,它分为两部分:UpstreamHandler和 DownStreamHandler, 客户端送入的消息会首先由许多UpstreamHandler依次处理,处理获得的数据送入应用的业务逻辑handler,对 于Nio当messageReceived()方法执行后,若是没有产生异常,worker线程就执行完毕了,它会被线程池回收。业务逻辑hanlder 会经过一些方法,把返回的数据交给指定好顺序的DownStreamHandler处理,处理后的数据若是须要,会被写入channel,进而经过绑定的 socket发送给客户端。这个过程是由另一个线程池中的worker线程来完成的。 对于Oio来讲,从始到终,都是由一个指定的worker来处理。
减小worker线程的处理占用时间
worker 线程是由netty内部管理,统一调配的一种资源,因此最好应该尽快的把让worker线程执行完毕,返回给线程池回收利用。worker线程的大部分时 间消耗在在ChannelPipeline的各类handler中,而在这些handler中,通常是负责应用程序业务逻辑掺入的那个handler最占 时间,它一般是排在最后的UpstreamHandler。因此经过把这部分处理内容交给另一个线程来处理,能够有效的减小worker线程的周期循环 时间。messageReceived()方法中开启一个新的线程来处理业务逻辑,固然也可使用利用netty框架自带的ExecutionHandler,这个之后在说。。
下面要说明下载本栗子中用于进行业务逻辑处理的EchoServerHandler,实际的开发中更能够拥有多个handler作消息的处理,下面咱们来看下这个handler都作了那些工做。 EchoServerHandler主要是继承ChannelInboundHandlerAdapter,而且复写其中的某几个方法就能够实现本身定义的逻辑,例如咱们能够复写channelRead方法能够获取客户端传过来的消息。
public class EchoServerHandler extends ChannelInboundHandlerAdapter { [@Override] public void channelRead(ChannelHandlerContext ctx, Object msg) { try { ByteBuf buf = (ByteBuf)msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String response = new String(req,"utf-8"); System.out.println("收到客户端服务器的请求消息"+response); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } ctx.writeAndFlush(Unpooled.copiedBuffer("服务端返回的消息hello".getBytes())) .addListener(ChannelFutureListener.CLOSE); } [@Override] public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } [@Override](https://my.oschina.net/u/1162528) public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); }
下面来看看客户端是如何编写的,首先构建客户端线程组,只须要构建一个线程组便可,而且绑定服务器主机的ip和port,而且向pipeline中绑定本身的handler这个handler主要是客户端中须要自定义的业务处理逻辑。
public final class EchoClient { static final boolean SSL = System.getProperty("ssl") != null; static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception { // Configure SSL.git final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { [@Override](https://my.oschina.net/u/1162528) public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } }
下面看看handler的处理逻辑,只是在链接服务器的时候向服务器注册完毕时候及channelActive发送条消息,而且获得服务器返回消息的时候调用channelRead方法,获得消息而且将消息打印。
public class EchoClientHandler extends ChannelInboundHandlerAdapter { private final ByteBuf firstMessage; /** * Creates a client-side handler. */ public EchoClientHandler() { firstMessage = Unpooled.buffer(EchoClient.SIZE); for (int i = 0; i < firstMessage.capacity(); i ++) { firstMessage.writeByte((byte) i); } } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(firstMessage); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { ByteBuf buf = (ByteBuf)msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String response = new String(req,"utf-8"); System.out.println("收到服务端返回的消息"+response); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
总结一下Netty的核心包含如下几个部分:
bootstrap:应用启动入口
buffer:数据交换的载体
handler:协议与事件的处理工具
channel下包含的核心概念有:
Channel:通讯通道,对应一个物理链接 ChannelPipeline:事件处理管道 ChannelHandler:事件处理器 ChannelHandlerContext:上下文环境,包含了handler的引用