Netty 是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服务器和客户端。咱们能够很简单的使用Netty 构建应用程序,你没必要是一名网络编程专家;并且Netty 比直接使用底层的Java API 容易得多,它推崇良好的设计实践,能够将你的应用程序逻辑和网络层解耦。java
在咱们开始首次深刻地了解Netty 以前,请仔细审视表1-1 中所总结的关键特性。有些是技术性的,而其余的更多的则是关于架构或设计哲学的。在本书的学习过程当中,咱们将不止一次地从新审视它们。编程
在深刻netty以前,咱们先来简单说说NIO;咱们都知道,它和之前的普通I/O相比的最大优点在于它是非阻塞的。bootstrap
由于普通I/O的阻塞,之前咱们设计并发只能像下图这样,为每一个I/O分配一个线程:安全
这显然带来了一些问题:服务器
Java 对于非阻塞I/O 的支持是在2002 年引入的,位于JDK 1.4 的java.nio 包中。下图展现了一个非阻塞设计,其实际上消除了普通I/O的那些弊端。选择器使得咱们可以经过较少的线程即可监视许多链接上的事件。
class java.nio.channels.Selector 是Java 的非阻塞I/O 实现的关键。它使用了事件通知API以肯定在一组非阻塞套接字中有哪些已经就绪可以进行I/O 相关的操做。由于能够在任何的时间检查任意的读操做或者写操做的完成状态,因此如上图 所示,一个单一的线程即可以处理多个并发的链接。网络
整体来看,与阻塞I/O 模型相比,这种模型提供了更好的资源管理:多线程
在本节中我将要讨论Netty 的主要构件块:架构
这些构建块表明了不一样类型的构造:资源、逻辑以及通知。你的应用程序将使用它们来访问网络以及流经网络的数据。并发
对于每一个组件来讲,咱们都将提供一个基本的定义,而且在适当的状况下,还会提供一个简单的示例代码来讲明它的用法。框架
基本的I/O 操做(bind()、connect()、read()和write())依赖于底层网络传输所提供的原语。在基于Java 的网络编程中,其基本的构造是class Socket。Netty 的Channel 接口所提供的API,大大地下降了直接使用Socket 类的复杂性。
Channel 是Java NIO 的一个基本构造。它表明一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个可以执行一个或者多个不一样的I/O操做的程序组件)的开放链接,如读操做和写操做。
目前,能够把Channel 看做是传入(入站)或者传出(出站)数据的载体。所以,它能够被打开或者被关闭,链接或者断开链接。
一个回调其实就是一个方法,一个指向已经被提供给另一个方法的方法的引用。这使得后者能够在适当的时候调用前者。回调在普遍的编程场景中都有应用,并且也是在操做完成后通知相关方最多见的方式之一。
Future 提供了另外一种在操做完成时通知应用程序的方式。这个对象能够看做是一个异步操做的结果的占位符;它将在将来的某个时刻完成,并提供对其结果的访问。
JDK 预置了interface java.util.concurrent.Future,可是其所提供的实现,只容许手动检查对应的操做是否已经完成,或者一直阻塞直到它完成。这是很是繁琐的,因此Netty提供了它本身的实现——ChannelFuture,用于在执行异步操做的时候使用。
ChannelFuture提供了几种额外的方法,这些方法使得咱们可以注册一个或者多个ChannelFutureListener实例。监听器的回调方法operationComplete(),将会在对应的操做完成时被调用。而后监听器能够判断该操做是成功地完成了仍是出错了。若是是后者,咱们能够检索产生的Throwable。简而言之,由ChannelFutureListener提供的通知机制消除了手动检查对应的操做是否完成的必要。
下面展现了一个异步地链接到远程节点,ChannelFuture 做为一个I/O 操做的一部分返回的例子。这里,connect()方法将会直接返回,而不会阻塞。
Channel channel = ...; ChannelFuture future = channel.connect( new InetSocketAddress("192.168.0.1", 25));
下面的代码显示了如何利用ChannelFutureListener。首先,要链接到远程节点上。而后,要注册一个新的ChannelFutureListener 到对connect()方法的调用所返回的ChannelFuture 上。当该监听器被通知链接已经创建的时候,要检查对应的状态。若是该操做是成功的,那么将数据写到该Channel。不然,要从ChannelFuture 中检索对应的Throwable。
Channel channel = ...; // 链接远程节点 ChannelFuture future = channel.connect( new InetSocketAddress("192.168.0.1", 25)); //注册一个ChannelFutureListener,以便在操做完成时得到通知 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { //状态判断 if (future.isSuccess()){ //若是操做是成功的,则建立一个ByteBuf 以持有数据 ByteBuf buffer = Unpooled.copiedBuffer( "Hello",Charset.defaultCharset()); //将数据异步地发送到远程节点。返回一个ChannelFuture ChannelFuture wf = future.channel() .writeAndFlush(buffer); .... } else { //若是发生错误,则访问描述缘由的Throwable Throwable cause = future.cause(); cause.printStackTrace(); } } });
若是你把ChannelFutureListener 看做是回调的一个更加精细的版本,那么你是对的。事实上,回调和Future 是相互补充的机制;它们相互结合,构成了Netty 自己的关键构件块之一。
Netty 使用不一样的事件来通知咱们状态的改变或者是操做的状态。这使得咱们可以基于已经发生的事件来触发适当的动做。这些动做多是:
Netty 是一个网络编程框架,因此事件是按照它们与入站或出站数据流的相关性进行分类的。可能由入站数据或者相关的状态更改而触发的事件包括:
出站事件是将来将会触发的某个动做的操做结果,这些动做包括:
从应用程序开发人员的角度来看,Netty 的主要组件是ChannelHandler,它充当了全部处理入站和出站数据的应用程序逻辑的容器。该组件实现了服务器对从客户端接收的数据的处理。每一个事件均可以被分发给ChannelHandler 类中的某个用户实现的方法。这是一个很好的将事件驱动范式直接转换为应用程序构件块的例子。图1-3 展现了一个事件是如何被一个这样的ChannelHandler 链处理的。
Netty 的ChannelHandler 为处理器提供了基本的抽象,如图1-3 所示的那些。你能够认为每一个ChannelHandler 的实例都相似于一种为了响应特定事件而被执行的回调。
下列代码就是一个handler的示例:
@ChannelHandler.Sharable public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { //重写了channelActive()方法,其将在一个链接创建时被调用 @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } //重写了channelRead0()方法。每当接收数据时,都会调用这个方法。 //须要注意的是,由服务器发送的消息可能会被分块接收。 // 也就是说,若是服务器发送了5 字节,那么不能保证这5 字节会被一次性接收。 //即便是对于这么少许的数据,channelRead0()方法也可能 // 会被调用两次,第一次使用一个持有3 字节的ByteBuf(Netty 的字节容器) // 第二次使用一个持有2 字节的ByteBuf。 @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println( "Client received: " + in.toString(CharsetUtil.UTF_8)); } //发生异常时被调用 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
channelHandler的主要抽象方法都定义于ChannelHandlerAdapter类中,咱们经过重写适当的方法,来控制整个生命周期的重要节点的逻辑。
Netty 提供了大量预约义的能够开箱即用的ChannelHandler 实现,包括用于各类协议(如HTTP 和SSL/TLS)的ChannelHandler。在内部,ChannelHandler 本身也使用了事件和Future,使得它们也成为了你的应用程序将使用的相同抽象的消费者。
ChannelPipeline 提供了ChannelHandler 链的容器,并定义了用于在该链上传播入站和出站事件流的API。当Channel 被建立时,它会被自动地分配到它专属的ChannelPipeline。
ChannelHandler 安装到ChannelPipeline 中的过程以下所示:
ServerBootstrap b = new ServerBootstrap(); //一个ChannelInitializer的实现被注册到了ServerBootstrap中①; b.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { //当ChannelInitializer.initChannel()方法被调用时ChannelInitializer将在 //ChannelPipeline 中安装一组自定义的ChannelHandler serverHandler; @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler); } });
EventLoop 定义了Netty 的核心抽象,用于处理链接的生命周期中所发生的事件。图3-1
下图在高层次上说明了Channel、EventLoop、Thread 以及EventLoopGroup 之间的关系。
这些关系是:
注意,在这种设计中,一个给定Channel 的I/O 操做都是由相同的Thread 执行的,实际
上消除了不一样线程间对于同步的须要。
Netty 的引导类为应用程序的网络层配置提供了容器,这涉及将一个进程绑定到某个指定的端口(ServerBootstrap),或者将一个进程链接到另外一个运行在某个指定主机的指定端口上的进程(Bootstrap)。Netty提供两种类型的引导,一种用于客户端(简单地称为Bootstrap),而另外一种(ServerBootstrap)用于服务器。不管你的应用程序使用哪一种协议或者处理哪一种类型的数据,惟一决定它使用哪一种引导类的是它是做为一个客户端仍是做为一个服务器。表3-1 比较了这两种类型的引导类。
这两种类型的引导类之间的第一个区别已经讨论过了:ServerBootstrap 将绑定到一个端口,由于服务器必需要监听链接,而Bootstrap 则是由想要链接到远程节点的客户端应用程序所使用的。
第二个区别可能更加明显。引导一个客户端只须要一个EventLoopGroup,可是一个ServerBootstrap 则须要两个(也能够是同一个实例)。为何呢?
由于服务器须要两组不一样的Channel。第一组将只包含一个ServerChannel,表明服务器自身的已绑定到某个本地端口的正在监听的套接字。而第二组将包含全部已建立的用来处理传入客户端链接(对于每一个服务器已经接受的链接都有一个)的Channel。图3-4 说明了这个模型,而且展现了为什么须要两个不一样的EventLoopGroup。
与ServerChannel 相关联的EventLoopGroup 将分配一个负责为 传入链接请求 建立Channel 的EventLoop。一旦链接被接受,第二个EventLoopGroup 就会给它的Channel分配一个EventLoop。
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; @ChannelHandler.Sharable public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { //重写了channelActive()方法,其将在一个链接创建时被调用 @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } //重写了channelRead0()方法。每当接收数据时,都会调用这个方法。 //须要注意的是,由服务器发送的消息可能会被分块接收。 // 也就是说,若是服务器发送了5 字节,那么不能保证这5 字节会被一次性接收。 //即便是对于这么少许的数据,channelRead0()方法也可能 // 会被调用两次,第一次使用一个持有3 字节的ByteBuf(Netty 的字节容器) // 第二次使用一个持有2 字节的ByteBuf。 @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println( "Client received: " + in.toString(CharsetUtil.UTF_8)); } //发生异常时被调用 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { //定义EventLoop EventLoopGroup group = new NioEventLoopGroup(); try { //Bootstrap类包提供包含丰富API的帮助类,可以很是方便的实现典型的服务器端和客户端通道初始化功能。 Bootstrap b = new Bootstrap(); //绑定EventLoop b.group(group) //使用默认的channelFactory建立一个channel .channel(NioSocketChannel.class) //定义远程地址 .remoteAddress(new InetSocketAddress(host, port)) //绑定自定义的EchoClientHandler到ChannelPipeline上 .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new EchoClientHandler()); } }); //同步式的连接 ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoClient("localhost", 8155).start(); } }
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.util.CharsetUtil; /** * 由于你的Echo 服务器会响应传入的消息,因此它须要实现ChannelInboundHandler 接口,用 * 来定义响应入站事件的方法。 */ //标示一个ChannelHandler 能够被多个Channel 安全地共享 @ChannelHandler.Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { //对于每一个传入的消息都会被调用; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { //将消息记录到控制台 ByteBuf in = (ByteBuf) msg; System.out.println( "Server received: " + in.toString(CharsetUtil.UTF_8)); ctx.write(in); } //通知ChannelInboundHandler最后一次对channelRead() //的调用是当前批量读取中的最后一条消息; @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } //在读取操做期间,有异常抛出时会调用。 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.*; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws Exception { //设置端口值(若是端口参数的格式不正确,则抛出一个NumberFormatException) int port = 8155; new EchoServer(port).start(); } public void start() throws Exception { //定义EventLoop EventLoopGroup group = new NioEventLoopGroup(); try { //与Bootstrap类包包含丰富的客户端API同样,ServerBootstrap可以很是方便的实现典型的服务端。 ServerBootstrap b = new ServerBootstrap(); b.group(group) //指定所使用的NIO传输Channel .channel(NioServerSocketChannel.class) //使用指定的端口设置套接字地址 .localAddress(new InetSocketAddress(port)) //添加一个EchoServerHandler 到子Channel的ChannelPipeline .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoServerHandler()); } }); //新建一个future实例,异步地绑定服务器;调用sync()方法阻塞等待直到绑定完成 ChannelFuture f = b.bind().sync(); //获取Channel 的CloseFuture,而且阻塞当前线程直到它完成 //该应用程序将会阻塞等待直到服务器的Channel关闭(由于你在Channel 的CloseFuture 上调用了sync()方法)。 f.channel().closeFuture().sync(); } finally { //关闭EventLoopGroup,释放全部的资源 group.shutdownGracefully().sync(); } } }
在本章中,咱们从技术和体系结构这两个角度探讨了理解Netty 的重要性。咱们也更加详细地从新审视了以前引入的一些概念和组件,特别是ChannelHandler、ChannelPipeline和引导。
特别地,咱们讨论了ChannelHandler 类的层次结构,并介绍了编码器和解码器,描述了它们在数据和网络字节格式之间来回转换的互补功能。下面的许多章节都将致力于深刻研究这些组件,而这里所呈现的概览应该有助于你对总体的把控。