Netty 介绍

本指南对Netty 进行了介绍并指出其意义所在。

1. 问题

如今,咱们使用适合通常用途的应用或组件来和彼此通讯。例如,咱们经常使用一个HTTP客户端从远程服务器获取信息或者经过web services进行远程方法的调用。
然而,一个适合普通目的的协议或其实现并不具有其规模上的扩展性。例如,咱们没法使用一个普通的HTTP服务器进行大型文件,电邮信息的交互,或者处理金融信息和多人游戏数据那种要求准实时消息传递的应用场景。所以,这些都要求使用一个适用于特殊目的并通过高度优化的协议实现。例如,你可能想要实现一个对基于AJAX的聊天应用,媒体流或大文件传输进行过特殊优化的HTTP服务器。你甚至可能想去设计和实现一个全新的,特定于你的需求的通讯协议。
另外一种没法避免的场景是你可能不得不使用一种专有的协议和原有系统交互。在这种状况下,你须要考虑的是如何可以快速的开发出这个协议的实现而且同时尚未牺牲最终应用的性能和稳定性。

2. 方案

Netty 是一个异步的,事件驱动的网络编程框架和工具,使用Netty 能够快速开发出可维护的,高性能、高扩展能力的协议服务及其客户端应用。
也就是说,Netty 是一个基于NIO的客户,服务器端编程框架,使用Netty 能够确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty至关简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。
“快速”和“简单”并不意味着会让你的最终应用产生维护性或性能上的问题。Netty 是一个吸取了多种协议的实现经验,这些协议包括FTP,SMPT,HTTP,各类二进制,文本协议,并通过至关精心设计的项目,最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
一些用户可能找到了某些一样声称具备这些特性的编程框架,所以大家可能想问Netty 又有什么不同的地方。这个问题的答案是Netty 项目的设计哲学。从创立之初,不管是在API仍是在其实现上Netty 都致力于为你提供最为温馨的使用体验。虽然这并非显而易见的,但你终将会认识到这种设计哲学将令你在阅读本指南和使用Netty 时变得更加得轻松和容易。
第一章. 开始

这一章节将围绕Netty的核心结构展开,同时经过一些简单的例子可让你更快的了解Netty的使用。当你读完本章,你将有能力使用Netty完成客户端和服务端的开发。
若是你更喜欢自上而下式的学习方式,你能够首先完成 第二章:架构总览 的学习,而后再回到这里。
1.1. 开始以前

运行本章示例程序的两个最低要求是:最新版本的Netty程序以及JDK 1.5或更高版本。最新版本的Netty程序可在项目下载页 下载。下载正确版本的JDK,请到你偏好的JDK站点下载。
这就已经足够了吗?实际上你会发现,这两个条件已经足够你完成任何协议的开发了。若是不是这样,请联系Netty项目社区 ,让咱们知道还缺乏了什么。
最终但不是至少,当你想了解本章所介绍的类的更多信息时请参考API手册。为方便你的使用,这篇文档中全部的类名均链接至在线API手册。此外,若是本篇文档中有任何错误信息,不管是语法错误,仍是打印排版错误或者你有更好的建议,请不要顾虑,当即联系Netty项目社区 。
1.2. 抛弃协议服务

在这个世界上最简化的协议不是“Hello,world!”而是抛弃协议 。这是一种丢弃接收到的任何数据并不作任何回应的协议。
实现抛弃协议(DISCARD protocol),你仅须要忽略接受到的任何数据便可。让咱们直接从处理器(handler)实现开始,这个处理器处理Netty的全部I/O事件。
 
package org.jboss.netty.example.discard;  
@ChannelPipelineCoverage("all")1 
public class DiscardServerHandler extends SimpleChannelHandler {2 
 
    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {3 
    }  
 
 @Override 
 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {4 
     e.getCause().printStackTrace();  
      
     Channel ch = e.getChannel();  
     ch.close();  
 }  
 
 
 
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {4 
        e.getCause().printStackTrace();  
         
        Channel ch = e.getChannel();  
        ch.close();  
    }  
     代码说明
1)ChannelPipelineCoverage注解了一种处理器类型,这个注解标示了一个处理器是否可被多个Channel通道共享(同时关联着ChannelPipeline)。DiscardServerHandler没有处理任何有状态的信息,所以这里的注解是“all”。
2)DiscardServerHandler继承了SimpleChannelHandler,这也是一个ChannelHandler 的实现。SimpleChannelHandler提供了多种你能够重写的事件处理方法。目前直接继承SimpleChannelHandler已经足够了,并不须要你完成一个本身的处理器接口。
3)咱们这里重写了messageReceived事件处理方法。这个方法由一个接收了客户端传送数据的MessageEvent事件调用。在这个例子中,咱们忽略接收到的任何数据,并以此来实现一个抛弃协议(DISCARD protocol)。
4)exceptionCaught 事件处理方法由一个ExceptionEvent异常事件调用,这个异常事件原由于Netty的I/O异常或一个处理器实现的内部异常。多数状况下,捕捉到的异常应当被记录下来,并在这个方法中关闭这个channel通道。固然处理这种异常状况的方法实现可能因你的实际需求而有所不一样,例如,在关闭这个链接以前你可能会发送一个包含了错误码的响应消息。
 
目前进展不错,咱们已经完成了抛弃协议服务器的一半开发工做。下面要作的是完成一个能够启动这个包含DiscardServerHandler处理器服务的主方法。
 
 
package org.jboss.netty.example.discard;  
 
import java.net.InetSocketAddress;  
import java.util.concurrent.Executors;  
 
public class DiscardServer {  
 
    public static void main(String[] args) throws Exception {  
        ChannelFactory factory =  
            new NioServerSocketChannelFactory (  
                    Executors.newCachedThreadPool(),  
                    Executors.newCachedThreadPool());  
 
        ServerBootstrap bootstrap = new ServerBootstrap (factory);  
 
        DiscardServerHandler handler = new DiscardServerHandler();  
        ChannelPipeline pipeline = bootstrap.getPipeline();  
        pipeline.addLast("handler", handler);  
 
        bootstrap.setOption("child.tcpNoDelay", true);  
        bootstrap.setOption("child.keepAlive", true);  
 
        bootstrap.bind(new InetSocketAddress(8080));  
    }  
     代码说明

1)ChannelFactory 是一个建立和管理Channel通道及其相关资源的工厂接口,它处理全部的I/O请求并产生相应的I/O ChannelEvent通道事件。Netty 提供了多种 ChannelFactory 实现。这里咱们须要实现一个服务端的例子,所以咱们使用NioServerSocketChannelFactory实现。另外一件须要注意的事情是这个工厂并本身不负责建立I/O线程。你应当在其构造器中指定该工厂使用的线程池,这样作的好处是你得到了更高的控制力来管理你的应用环境中使用的线程,例如一个包含了安全管理的应用服务。
2)ServerBootstrap 是一个设置服务的帮助类。你甚至能够在这个服务中直接设置一个Channel通道。然而请注意,这是一个繁琐的过程,大多数状况下并不须要这样作。
3)这里,咱们将DiscardServerHandler处理器添加至默认的ChannelPipeline通道。任什么时候候当服务器接收到一个新的链接,一个新的ChannelPipeline管道对象将被建立,而且全部在这里添加的ChannelHandler对象将被添加至这个新的 ChannelPipeline管道对象。这很像是一种浅拷贝操做(a shallow-copy operation);全部的Channel通道以及其对应的ChannelPipeline实例将分享相同的DiscardServerHandler 实例。
4)你也能够设置咱们在这里指定的这个通道实现的配置参数。咱们正在写的是一个TCP/IP服务,所以咱们运行设定一些socket选项,例如 tcpNoDelay和keepAlive。请注意咱们在配置选项里添加的"child."前缀。这意味着这个配置项仅适用于咱们接收到的通道实例,而不是ServerSocketChannel实例。所以,你能够这样给一个ServerSocketChannel设定参数:
bootstrap.setOption("reuseAddress", true);
5)咱们继续。剩下要作的是绑定这个服务使用的端口而且启动这个服务。这里,咱们绑定本机全部网卡(NICs,network interface cards)上的8080端口。固然,你如今也能够对应不一样的绑定地址屡次调用绑定操做。
大功告成!如今你已经完成你的第一个基于Netty的服务端程序。
1.3. 查看接收到的数据

如今你已经完成了你的第一个服务端程序,咱们须要测试它是否能够真正的工做。最简单的方法是使用telnet 命令。例如,你能够在命令行中输入“telnet localhost 8080 ”或其余类型参数。
然而,咱们能够认为服务器在正常工做吗?因为这是一个丢球协议服务,因此实际上咱们没法真正的知道。你最终将收不到任何回应。为了证实它在真正的工做,让咱们修改代码打印其接收到的数据。
咱们已经知道当完成数据的接收后将产生MessageEvent消息事件,而且也会触发messageReceived处理方法。因此让我在DiscardServerHandler处理器的messageReceived方法内增长一些代码。
@Override 
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
    ChannelBuffer  buf = (ChannelBuffer) e.getMessage();  
    while(buf.readable()) {  
        System.out.println((char) buf.readByte());  
    }  
     代码说明
1) 基本上咱们能够假定在socket的传输中消息类型老是ChannelBuffer。ChannelBuffer是Netty的一个基本数据结构,这个数据结构存储了一个字节序列。ChannelBuffer相似于NIO的ByteBuffer,可是前者却更加的灵活和易于使用。例如,Netty容许你建立一个由多个ChannelBuffer构建的复合ChannelBuffer类型,这样就能够减小没必要要的内存拷贝次数。
2) 虽然ChannelBuffer有些相似于NIO的ByteBuffer,但强烈建议你参考Netty的API手册。学会如何正确的使用ChannelBuffer是无障碍使用Netty的关键一步。
 
若是你再次运行telnet命令,你将会看到你所接收到的数据。
抛弃协议服务的全部源代码均存放在在分发版的org.jboss.netty.example.discard包下。

1.4. 响应协议服务

目前,咱们虽然使用了数据,但最终却未做任何回应。然而通常状况下,一个服务都须要回应一个请求。让咱们实现ECHO协议 来学习如何完成一个客户请求的回应消息,ECHO协议规定要返回任何接收到的数据。
与咱们上一节实现的抛弃协议服务惟一不一样的地方是,这里须要返回全部的接收数据而不是仅仅打印在控制台之上。所以咱们再次修改messageReceived方法就足够了。
 
@Override 
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
    Channel  ch = e.getChannel();  
    ch.write(e.getMessage());  
代码说明
1) 一个ChannelEvent通道事件对象自身存有一个和其关联的Channel对象引用。这个返回的Channel通道对象表明了这个接收 MessageEvent消息事件的链接(connection)。所以,咱们能够经过调用这个Channel通道对象的write方法向远程节点写入返回数据。
如今若是你再次运行telnet命令,你将会看到服务器返回的你所发送的任何数据。
相应服务的全部源代码存放在分发版的org.jboss.netty.example.echo包下。
1.5. 时间协议服务

这一节须要实现的协议是TIME协议 。这是一个与先前所介绍的不一样的例子。这个例子里,服务端返回一个32位的整数消息,咱们不接受请求中包含的任何数据而且当消息返回完毕后当即关闭链接。经过这个例子你将学会如何构建和发送消息,以及当完成处理后如何主动关闭链接。
由于咱们会忽略接收到的任何数据而只是返回消息,这应当在创建链接后就当即开始。所以此次咱们再也不使用messageReceived方法,取而代之的是使用channelConnected方法。下面是具体的实现:
 
 
package org.jboss.netty.example.time;  
 
@ChannelPipelineCoverage("all")  
public class TimeServerHandler extends SimpleChannelHandler {  
 
    @Override 
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {  
        Channel ch = e.getChannel();  
         
        ChannelBuffer time = ChannelBuffers.buffer(4);  
        time.writeInt(System.currentTimeMillis() / 1000);  
         
        ChannelFuture f = ch.write(time);  
         
        f.addListener(new ChannelFutureListener() {  
            public void operationComplete(ChannelFuture future) {  
                Channel ch = future.getChannel();  
                ch.close();  
            }  
        });  
    }  
 
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {  
        e.getCause().printStackTrace();  
        e.getChannel().close();  
    }  
代码说明
1) 正如咱们解释过的,channelConnected方法将在一个链接创建后当即触发。所以让咱们在这个方法里完成一个表明当前时间(秒)的32位整数消息的构建工做。
2) 为了发送一个消息,咱们须要分配一个包含了这个消息的buffer缓冲。由于咱们将要写入一个32位的整数,所以咱们须要一个4字节的 ChannelBuffer。ChannelBuffers是一个能够建立buffer缓冲的帮助类。除了这个buffer方法,ChannelBuffers还提供了不少和ChannelBuffer相关的实用方法。更多信息请参考API手册。
另外,一个很不错的方法是使用静态的导入方式:
import static org.jboss.netty.buffer.ChannelBuffers.*;
...
ChannelBuffer dynamicBuf = dynamicBuffer(256);
ChannelBuffer ordinaryBuf = buffer(1024);
3) 像一般同样,咱们须要本身构造消息。
可是打住,flip在哪?过去咱们在使用NIO发送消息时不是经常须要调用 ByteBuffer.flip()方法吗?实际上ChannelBuffer之因此不须要这个方法是由于 ChannelBuffer有两个指针;一个对应读操做,一个对应写操做。当你向一个 ChannelBuffer写入数据的时候写指针的索引值便会增长,但与此同时读指针的索引值不会有任何变化。读写指针的索引值分别表明了这个消息的开始、结束位置。
与之相应的是,NIO的buffer缓冲没有为咱们提供如此简洁的一种方法,除非你调用它的flip方法。所以,当你忘记调用flip方法而引发发送错误时,你便会陷入困境。这样的错误不会再Netty中发生,由于咱们对应不一样的操做类型有不一样的指针。你会发现就像你已习惯的这样过程变得更加容易— 一种没有flippling的体验!
另外一点须要注意的是这个写方法返回了一个ChannelFuture对象。一个ChannelFuture 对象表明了一个还没有发生的I/O操做。这意味着,任何已请求的操做均可能是没有被当即执行的,由于在Netty内部全部的操做都是异步的。例如,下面的代码可能会关闭一 个链接,这个操做甚至会发生在消息发送以前:
Channel ch = ...;
ch.write(message);
ch.close();
所以,你须要这个write方法返回的ChannelFuture对象,close方法须要等待写操做异步完成以后的ChannelFuture通知/监听触发。须要注意的是,关闭方法仍旧不是当即关闭一个链接,它一样也是返回了一个ChannelFuture对象。
4) 在写操做完成以后咱们又如何获得通知?这个只须要简单的为这个返回的ChannelFuture对象增长一个ChannelFutureListener 便可。在这里咱们建立了一个匿名ChannelFutureListener对象,在这个ChannelFutureListener对象内部咱们处理了异步操做完成以后的关闭操做。
另外,你也能够经过使用一个预约义的监听类来简化代码。
f.addListener(ChannelFutureListener.CLOSE);

1.6. 时间协议服务客户端

不一样于DISCARD和ECHO协议服务,咱们须要一个时间协议服务的客户端,由于人们没法直接将一个32位的二进制数据转换一个日历时间。在这一节咱们将学习如何确保服务器端工做正常,以及如何使用Netty完成客户端的开发。
使用Netty开发服务器端和客户端代码最大的不一样是要求使用不一样的Bootstrap及ChannelFactory。请参照如下的代码:
 
package org.jboss.netty.example.time;  
 
import java.net.InetSocketAddress;  
import java.util.concurrent.Executors;  
 
public class TimeClient {  
 
    public static void main(String[] args) throws Exception {  
        String host = args[0];  
        int port = Integer.parseInt(args[1]);  
 
        ChannelFactory factory =  
            new NioClientSocketChannelFactory (  
                    Executors.newCachedThreadPool(),  
                    Executors.newCachedThreadPool());  
 
        ClientBootstrap bootstrap = new ClientBootstrap (factory);  
 
        TimeClientHandler handler = new TimeClientHandler();  
        bootstrap.getPipeline().addLast("handler", handler);  
         
        bootstrap.setOption("tcpNoDelay" , true);  
        bootstrap.setOption("keepAlive", true);  
 
        bootstrap.connect (new InetSocketAddress(host, port));  
    }  
 
代码说明
1) 使用NioClientSocketChannelFactory而不是NioServerSocketChannelFactory来建立客户端的Channel通道对象。
2) 客户端的ClientBootstrap对应ServerBootstrap。
3) 请注意,这里不存在使用“child.”前缀的配置项,客户端的SocketChannel实例不存在父级Channel对象。
4) 咱们应当调用connect链接方法,而不是以前的bind绑定方法。
 
正如你所看到的,这与服务端的启动过程是彻底不同的。ChannelHandler又该如何实现呢?它应当负责接收一个32位的整数,将其转换为可读的格式后,打印输出时间,并关闭这个链接。
 
 
 
 
 package org.jboss.netty.example.time;  
 
  
 
 import java.util.Date;  
 
  
 
 @ChannelPipelineCoverage("all")  
 
 public class TimeClientHandler extends SimpleChannelHandler {  
 
  
 
     @Override 
 
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
 
         ChannelBuffer buf = (ChannelBuffer) e.getMessage();  
 
         long currentTimeMillis = buf.readInt() * 1000L;  
 
         System.out.println(new Date(currentTimeMillis));  
 
         e.getChannel().close();  
 
     }  
 
  
 
     @Override 
 
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {  
 
         e.getCause().printStackTrace();  
 
         e.getChannel().close();  
 
     }  
 
 
 
这看起来非常简单,与服务端的实现也并未有什么不一样。然而,这个处理器却时常会由于抛出IndexOutOfBoundsException异常而拒绝工做。咱们将在下一节讨论这个问题产生的缘由。
1.7. 流数据的传输处理
 
1.7.1. Socket Buffer的缺陷

对于例如TCP/IP这种基于流的传输协议实现,接收到的数据会被存储在socket的接受缓冲区内。不幸的是,这种基于流的传输缓冲区并非一个包队列,而是一个字节队列。这意味着,即便你以两个数据包的形式发送了两条消息,操做系统却不会把它们当作是两条消息,而仅仅是一个批次的字节序列。所以,在这种状况下咱们就没法保证收到的数据刚好就是远程节点所发送的数据。例如,让咱们假设一个操做系统的TCP/IP堆栈收到了三个数据包:
 
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
 
因为这种流传输协议的广泛性质,在你的应用中有较高的可能会把这些数据读取为另一种形式:
 
+----+-------+---+---+
| AB | CDEFG | H | I |
+----+-------+---+---+
 
所以对于数据的接收方,无论是服务端仍是客户端,应当重构这些接收到的数据,让其变成一种可以让你的应用逻辑易于理解的更有意义的数据结构。在上面所述的这个例子中,接收到的数据应当重构为下面的形式:
 
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
 
1.7.2. 第一种方案

如今让咱们回到时间协议服务客户端的例子中。咱们在这里遇到了一样的问题。一个32位的整数是一个很是小的数据量,所以它经常不会被切分在不一样的数据段内。然而,问题是它确实能够被切分在不一样的数据段内,而且这种可能性随着流量的增长而提升。
最简单的方案是在程序内部建立一个可准确接收4字节数据的累积性缓冲。下面的代码是修复了这个问题后的TimeClientHandler实现。
 
 
package org.jboss.netty.example.time;  
 
import static org.jboss.netty.buffer.ChannelBuffers.*;  
 
import java.util.Date;  
 
@ChannelPipelineCoverage("one")  
public class TimeClientHandler extends SimpleChannelHandler {  
 
    private final ChannelBuffer buf = dynamicBuffer();  
 
    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
        ChannelBuffer m = (ChannelBuffer) e.getMessage();  
        buf.writeBytes(m);  
         
        if (buf.readableBytes() >= 4) {  
            long currentTimeMillis = buf.readInt() * 1000L;  
            System.out.println(new Date(currentTimeMillis));  
            e.getChannel().close();  
        }  
    }  
 
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {  
        e.getCause().printStackTrace();  
        e.getChannel().close();  
    }  
 
 
     代码说明 
1) 这一次咱们使用“one”作为ChannelPipelineCoverage的注解值。这是因为这个修改后的TimeClientHandler不在不在内部保持一个buffer缓冲,所以这个TimeClientHandler实例不能够再被多个Channel通道或ChannelPipeline共享。不然这个内部的buffer缓冲将没法缓冲正确的数据内容。
2) 动态的buffer缓冲也是ChannelBuffer的一种实现,其拥有动态增长缓冲容量的能力。当你没法预估消息的数据长度时,动态的buffer缓冲是一种颇有用的缓冲结构。
3) 首先,全部的数据将会被累积的缓冲至buf容器。
4) 以后,这个处理器将会检查是否收到了足够的数据而后再进行真实的业务逻辑处理,在这个例子中须要接收4字节数据。不然,Netty将重复调用messageReceived方法,直至4字节数据接收完成。
这里还有另外一个地方须要进行修改。你是否还记得咱们把TimeClientHandler实例添加到了这个ClientBootstrap实例的默认ChannelPipeline管道里?这意味着同一个TimeClientHandler实例将被多个Channel通道共享,所以接受的数据也将受到破坏。为了给每个Channel通道建立一个新的TimeClientHandler实例,咱们须要实现一个 ChannelPipelineFactory管道工厂:
 
package org.jboss.netty.example.time;  
 
public class TimeClientPipelineFactory implements ChannelPipelineFactory {  
 
    public ChannelPipeline getPipeline() {  
        ChannelPipeline pipeline = Channels.pipeline();  
        pipeline.addLast("handler", new TimeClientHandler());  
        return pipeline;  
    }  
}
如今,咱们须要把TimeClient下面的代码片断:
 
TimeClientHandler handler = new TimeClientHandler();  
bootstrap.getPipeline().addLast("handler", handler); 
 
替换为:
 
bootstrap.setPipelineFactory(new TimeClientPipelineFactory()); 
 
虽然这看上去有些复杂,而且因为在TimeClient内部咱们只建立了一个链接(connection),所以咱们在这里确实不必引入TimeClientPipelineFactory实例。
然而,当你的应用变得愈来愈复杂,你就总会须要实现本身的ChannelPipelineFactory,这个管道工厂将会令你的管道配置变得更加具备灵活性。
1.7.3. 第二种方案
 
虽然第二种方案解决了时间协议客户端遇到的问题,可是这个修改后的处理器实现看上去却再也不那么简洁。设想一种更为复杂的,由多个可变长度字段组成的协议。你的ChannelHandler实现将变得愈来愈难以维护。
正如你已注意到的,你能够为一个ChannelPipeline添加多个ChannelHandler,所以,为了减少应用的复杂性,你能够把这个臃肿的ChannelHandler切分为多个独立的模块单元。例如,你能够把TimeClientHandler切分为两个独立的处理器:
 TimeDecoder,解决数据分段的问题。
 TimeClientHandler,原始版本的实现。
幸运的是,Netty提供了一个可扩展的类,这个类能够直接拿过来使用帮你完成TimeDecoder的开发:
 
package org.jboss.netty.example.time;  
 
 
public class TimeDecoder extends FrameDecoder {  
 
    @Override 
    protected Object decode(  
            ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)  {  
             
        if (buffer.readableBytes() < 4) {  
            return null;   
        }  
         
        return buffer.readBytes(4);  
    }  
 
代码说明
1) 这里再也不须要使用ChannelPipelineCoverage的注解,由于FrameDecoder老是被注解为“one”。
2) 当接收到新的数据后,FrameDecoder会调用decode方法,同时传入一个FrameDecoder内部持有的累积型buffer缓冲。
3) 若是decode返回null值,这意味着尚未接收到足够的数据。当有足够数量的数据后FrameDecoder会再次调用decode方法。
4) 若是decode方法返回一个非空值,这意味着decode方法已经成功完成一条信息的解码。FrameDecoder将丢弃这个内部的累计型缓冲。请注意你不须要对多条消息进行解码,FrameDecoder将保持对decode方法的调用,直到decode方法返回非空对象。
若是你是一个敢于尝试的人,你或许应当使用ReplayingDecoder,ReplayingDecoder更加简化了解码的过程。为此你须要查看API手册得到更多的帮助信息。
 
package org.jboss.netty.example.time;  
 
public class TimeDecoder extends ReplayingDecoder<VoidEnum> {  
 
    @Override 
    protected Object decode(  
            ChannelHandlerContext ctx, Channel channel,  
            ChannelBuffer buffer, VoidEnum state) {  
             
        return buffer.readBytes(4);  
    }  
此外,Netty还为你提供了一些能够直接使用的decoder实现,这些decoder实现不只可让你很是容易的实现大多数协议,而且还会帮你避免某些臃肿、难以维护的处理器实现。请参考下面的代码包得到更加详细的实例:
org.jboss.netty.example.factorial for a binary protocol, and
 org.jboss.netty.example.telnet for a text line-based protocol
1.8. 使用POJO代替ChannelBuffer

目前为止全部的实例程序都是使用ChannelBuffer作为协议消息的原始数据结构。在这一节,咱们将改进时间协议服务的客户/服务端实现,使用POJO 而不是ChannelBuffer作为协议消息的原始数据结构。
在你的ChannelHandler实现中使用POJO的优点是很明显的;从你的ChannelHandler实现中分离从 ChannelBuffer获取数据的代码,将有助于提升你的ChannelHandler实现的可维护性和可重用性。在时间协议服务的客户/服务端代码中,直接使用ChannelBuffer读取一个32位的整数并非一个主要的问题。然而,你会发现,当你试图实现一个真实的协议的时候,这种代码上的分离是颇有必要的。
首先,让咱们定义一个称之为UnixTime的新类型。
 
package org.jboss.netty.example.time;  
 
import java.util.Date;  
 
public class UnixTime {  
    private final int value;  
     
    public UnixTime(int value) {  
        this.value = value;  
    }  
     
    public int getValue() {  
        return value;  
    }  
     
    @Override 
    public String toString() {  
        return new Date(value * 1000L).toString();  
    }  
如今让咱们从新修改TimeDecoder实现,让其返回一个UnixTime,而不是一个ChannelBuffer。
 
@Override 
protected Object decode(  
        ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {  
    if (buffer.readableBytes() < 4) {  
        return null;  
    }  
 
    return new UnixTime(buffer.readInt());  
}
FrameDecoder和ReplayingDecoder容许你返回一个任何类型的对象。若是它们仅容许返回一个ChannelBuffer类型的对象,咱们将不得不插入另外一个能够从ChannelBuffer对象转换 为UnixTime对象的ChannelHandler实现。

有了这个修改后的decoder实现,这个TimeClientHandler便不会再依赖ChannelBuffer。
 
@Override 
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
    UnixTime m = (UnixTime) e.getMessage();  
    System.out.println(m);  
    e.getChannel().close();  
}
更加简单优雅了,不是吗?一样的技巧也能够应用在服务端,让咱们如今更新TimeServerHandler的实现:
 
@Override 
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {  
    UnixTime time = new UnixTime(System.currentTimeMillis() / 1000);  
    ChannelFuture f = e.getChannel().write(time);  
    f.addListener(ChannelFutureListener.CLOSE);  
如今剩下的惟一须要修改的部分是这个ChannelHandler实现,这个ChannelHandler实现须要把一个UnixTime对象从新转换为一个ChannelBuffer。但这却已经是至关简单了,由于当你对消息进行编码的时候你再也不须要处理数据包的拆分及组装。
 
package org.jboss.netty.example.time;  
     
import static org.jboss.netty.buffer.ChannelBuffers.*;  
 
@ChannelPipelineCoverage("all")  
public class TimeEncoder extends SimpleChannelHandler {  
 
    public void writeRequested(ChannelHandlerContext ctx, MessageEvent  e) {  
        UnixTime time = (UnixTime) e.getMessage();  
         
        ChannelBuffer buf = buffer(4);  
        buf.writeInt(time.getValue());  
         
        Channels.write(ctx, e.getFuture(), buf);  
    }  
     代码说明
1) 由于这个encoder是无状态的,因此其使用的ChannelPipelineCoverage注解值是“all”。实际上,大多数encoder实现都是无状态的。
2) 一个encoder经过重写writeRequested方法来实现对写操做请求的拦截。不过请注意虽然这个writeRequested方法使用了和 messageReceived方法同样的MessageEvent参数,可是它们却分别对应了不一样的解释。一个ChannelEvent事件能够既是一个上升流事件(upstream event)也能够是一个降低流事件(downstream event),这取决于事件流的方向。例如:一个MessageEvent消息事件能够做为一个上升流事件(upstream event)被messageReceived方法调用,也能够做为一个降低流事件(downstream event)被writeRequested方法调用。请参考API手册得到上升流事件(upstream event)和降低流事件(downstream event)的更多信息。
3) 一旦完成了POJO和ChannelBuffer转换,你应当确保把这个新的buffer缓冲转发至先前的 ChannelDownstreamHandler处理,这个降低通道的处理器由某个ChannelPipeline管理。Channels提供了多个能够建立和发送ChannelEvent事件的帮助方法。在这个例子中,Channels.write(...)方法建立了一个新的 MessageEvent事件,并把这个事件发送给了先前的处于某个ChannelPipeline内的 ChannelDownstreamHandler处理器。
另外,一个很不错的方法是使用静态的方式导入Channels类:
import static org.jboss.netty.channel.Channels.*;
...
ChannelPipeline pipeline = pipeline();
write(ctx, e.getFuture(), buf);
fireChannelDisconnected(ctx);
 
 
最后的任务是把这个TimeEncoder插入服务端的ChannelPipeline,这是一个很简单的步骤。
1.9. 关闭你的应用

若是你运行了TimeClient,你确定能够注意到,这个应用并无自动退出而只是在那里保持着无心义的运行。跟踪堆栈记录你能够发现,这里有一些运行状态的I/O线程。为了关闭这些I/O线程并让应用优雅的退出,你须要释放这些由ChannelFactory分配的资源。
一个典型的网络应用的关闭过程由如下三步组成:
关闭负责接收全部请求的server socket。
关闭全部客户端socket或服务端为响应某个请求而建立的socket。
释放ChannelFactory使用的全部资源。
为了让TimeClient执行这三步,你须要在TimeClient.main()方法内关闭惟一的客户链接以及ChannelFactory使用的全部资源,这样作即可以优雅的关闭这个应用。
 
package org.jboss.netty.example.time;  
 
public class TimeClient {  
    public static void main(String[] args) throws Exception {  
        ...  
        ChannelFactory factory = ...;  
        ClientBootstrap bootstrap = ...;  
        ...  
        ChannelFuture future  = bootstrap.connect(...);  
        future.awaitUninterruptible();  
        if (!future.isSuccess()) {  
            future.getCause().printStackTrace();  
        }  
        future.getChannel().getCloseFuture().awaitUninterruptibly();  
        factory.releaseExternalResources();  
    }  
 
代码说明
1) ClientBootstrap对象的connect方法返回一个ChannelFuture对象,这个ChannelFuture对象将告知这个链接操做的成功或失败状态。同时这个ChannelFuture对象也保存了一个表明这个链接操做的Channel对象引用。
2) 阻塞式的等待,直到ChannelFuture对象返回这个链接操做的成功或失败状态。
3) 若是链接失败,咱们将打印链接失败的缘由。若是链接操做没有成功或者被取消,ChannelFuture对象的getCause()方法将返回链接失败的缘由。
4) 如今,链接操做结束,咱们须要等待而且一直到这个Channel通道返回的closeFuture关闭这个链接。每个Channel均可得到本身的closeFuture对象,所以咱们能够收到通知并在这个关闭时间点执行某种操做。
而且即便这个链接操做失败,这个closeFuture仍旧会收到通知,由于这个表明链接的 Channel对象将会在链接操做失败后自动关闭。
5) 在这个时间点,全部的链接已被关闭。剩下的惟一工做是释放ChannelFactory通道工厂使用的资源。这一步仅须要调用 releaseExternalResources()方法便可。包括NIO Secector和线程池在内的全部资源将被自动的关闭和终止。
 
关闭一个客户端应用是很简单的,但又该如何关闭一个服务端应用呢?你须要释放其绑定的端口并关闭全部接受和打开的链接。为了作到这一点,你须要使用一种数据结构记录全部的活动链接,但这却并非一件容易的事。幸运的是,这里有一种解决方案,ChannelGroup。
ChannelGroup是Java 集合 API的一个特有扩展,ChannelGroup内部持有全部打开状态的Channel通道。若是一个Channel通道对象被加入到 ChannelGroup,若是这个Channel通道被关闭,ChannelGroup将自动移除这个关闭的Channel通道对象。此外,你还能够对一个ChannelGroup对象内部的全部Channel通道对象执行相同的操做。例如,当你关闭服务端应用时你能够关闭一个ChannelGroup 内部的全部Channel通道对象。
为了记录全部打开的socket,你须要修改你的TimeServerHandler实现,将一个打开的Channel通道加入全局的ChannelGroup对象,TimeServer.allChannels:
 
@Override 
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {  
    TimeServer.allChannels.add(e.getChannel());  
 
代码说明
是的,ChannelGroup是线程安全的。
 
如今,全部活动的Channel通道将被自动的维护,关闭一个服务端应用有如关闭一个客户端应用同样简单。
 
package org.jboss.netty.example.time;  
 
public class TimeServer {  
 
    static final ChannelGroup allChannels = new DefaultChannelGroup("time-server" );  
 
    public static void main(String[] args) throws Exception {  
        ...  
        ChannelFactory factory = ...;  
        ServerBootstrap bootstrap = ...;  
        ...  
        Channel channel  = bootstrap.bind(...);  
        allChannels.add(channel);  
        waitForShutdownCommand();  
        ChannelGroupFuture future = allChannels.close();  
        future.awaitUninterruptibly();  
        factory.releaseExternalResources();  
    }  
 
代码说明
1) DefaultChannelGroup须要一个组名做为其构造器参数。这个组名仅是区分每一个ChannelGroup的一个标示。
2) ServerBootstrap对象的bind方法返回了一个绑定了本地地址的服务端Channel通道对象。调用这个Channel通道的close()方法将释放这个Channel通道绑定的本地地址。
3) 无论这个Channel对象属于服务端,客户端,仍是为响应某一个请求建立,任何一种类型的Channel对象都会被加入ChannelGroup。所以,你尽可在关闭服务时关闭全部的Channel对象。
4) waitForShutdownCommand()是一个想象中等待关闭信号的方法。你能够在这里等待某个客户端的关闭信号或者JVM的关闭回调命令。
5) 你能够对ChannelGroup管理的全部Channel对象执行相同的操做。在这个例子里,咱们将关闭全部的通道,这意味着绑定在服务端特定地址的 Channel通道将解除绑定,全部已创建的链接也将异步关闭。为了得到成功关闭全部链接的通知,close()方法将返回一个 ChannelGroupFuture对象,这是一个相似ChannelFuture的对象。
 
1.10. 总述

在这一章节,咱们快速浏览并示范了如何使用Netty开发网络应用。下一章节将涉及更多的问题。同时请记住,为了帮助你以及可以让Netty基于你的回馈获得持续的改进和提升,Netty社区 将永远欢迎你的问题及建议。
第二章. 架构总览
 
在这个章节,咱们将阐述Netty提供的核心功能以及在此基础之上如何构建一个完备的网络应用。
2.1. 丰富的缓冲实现

Netty使用自建的buffer API,而不是使用NIO的ByteBuffer来表明一个连续的字节序列。与ByteBuffer相比这种方式拥有明显的优点。Netty使用新的 buffer类型ChannelBuffer,ChannelBuffer被设计为一个可从底层解决ByteBuffer问题,并可知足平常网络应用开发须要的缓冲类型。这些很酷的特性包括:
 
若是须要,容许使用自定义的缓冲类型。
复合缓冲类型中内置的透明的零拷贝实现。
开箱即用的动态缓冲类型,具备像StringBuffer同样的动态缓冲能力。
再也不须要调用的flip()方法。
正常状况下具备比ByteBuffer更快的响应速度。
更多信息请参考:org.jboss.netty.buffer package description
2.2. 统一的异步 I/O API

传统的Java I/O API在应对不一样的传输协议时须要使用不一样的类型和方法。例如:java.net.Socket 和 java.net.DatagramSocket它们并不具备相同的超类型,所以,这就须要使用不一样的调用方式执行socket操做。
这种模式上的不匹配使得在更换一个网络应用的传输协议时变得繁杂和困难。因为(Java I/O API)缺少协议间的移植性,当你试图在不修改网络传输层的前提下增长多种协议的支持,这时便会产生问题。而且理论上讲,多种应用层协议可运行在多种传输层协议之上例如TCP/IP,UDP/IP,SCTP和串口通讯。
让这种状况变得更糟的是,Java新的I/O(NIO)API与原有的阻塞式的I/O(OIO)API并不兼容,NIO.2(AIO)也是如此。因为全部的API不管是在其设计上仍是性能上的特性都与彼此不一样,在进入开发阶段,你经常会被迫的选择一种你须要的API。
例如,在用户数较小的时候你可能会选择使用传统的OIO(Old I/O) API,毕竟与NIO相比使用OIO将更加容易一些。然而,当你的业务呈指数增加而且服务器须要同时处理成千上万的客户链接时你便会遇到问题。这种状况下你可能会尝试使用NIO,可是复杂的NIO Selector编程接口又会耗费你大量时间并最终会阻碍你的快速开发。
Netty有一个叫作Channel的统一的异步I/O编程接口,这个编程接口抽象了全部点对点的通讯操做。也就是说,若是你的应用是基于 Netty的某一种传输实现,那么一样的,你的应用也能够运行在Netty的另外一种传输实现上。Netty提供了几种拥有相同编程接口的基本传输实现:
 
NIO-based TCP/IP transport (See org.jboss.netty.channel.socket.nio),
OIO-based TCP/IP transport (See org.jboss.netty.channel.socket.oio),
OIO-based UDP/IP transport, and
Local transport (See org.jboss.netty.channel.local).
切换不一样的传输实现一般只需对代码进行几行的修改调整,例如选择一个不一样的ChannelFactory实现。
此外,你甚至能够利用新的传输实现没有写入的优点,只需替换一些构造器的调用方法便可,例如串口通讯。并且因为核心API具备高度的可扩展性,你还能够完成本身的传输实现。
2.3. 基于拦截链模式的事件模型

一个定义良好并具备扩展能力的事件模型是事件驱动开发的必要条件。Netty具备定义良好的I/O事件模型。因为严格的层次结构区分了不一样的事件类型,所以Netty也容许你在不破坏现有代码的状况下实现本身的事件类型。这是与其余框架相比另外一个不一样的地方。不少NIO框架没有或者仅有有限的事件模型概念;在你试图添加一个新的事件类型的时候经常须要修改已有的代码,或者根本就不容许你进行这种扩展。
在一个ChannelPipeline内部一个ChannelEvent被一组ChannelHandler处理。这个管道是拦截过滤器 模式的一种高级形式的实现,所以对于一个事件如何被处理以及管道内部处理器间的交互过程,你都将拥有绝对的控制力。例如,你能够定义一个从socket读取到数据后的操做:
 
public class MyReadHandler implements SimpleChannelHandler {  
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) {  
        Object message = evt.getMessage();  
        // Do something with the received message. 
        ...  
 
        // And forward the event to the next handler. 
        ctx.sendUpstream(evt);  
    }  
同时你也能够定义一种操做响应其余处理器的写操做请求:
 
public class MyWriteHandler implements SimpleChannelHandler {  
    public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt) {  
        Object message = evt.getMessage();  
        // Do something with the message to be written. 
        ...  
 
        // And forward the event to the next handler. 
        ctx.sendDownstream(evt);  
    }  
有关事件模型的更多信息,请参考API文档ChannelEvent和ChannelPipeline部分。
2.4. 适用快速开发的高级组件

上述所说起的核心组件已经足够实现各类类型的网络应用,除此以外,Netty也提供了一系列的高级组件来加速你的开发过程。
2.4.1. Codec框架

就像“1.8. 使用POJO代替ChannelBuffer”一节所展现的那样,从业务逻辑代码中分离协议处理部分老是一个很不错的想法。然而若是一切从零开始便会遭遇到实现上的复杂性。你不得不处理分段的消息。一些协议是多层的(例如构建在其余低层协议之上的协议)。一些协议过于复杂以至难以在一台主机(single state machine)上实现。
所以,一个好的网络应用框架应该提供一种可扩展,可重用,可单元测试而且是多层的codec框架,为用户提供易维护的codec代码。
Netty提供了一组构建在其核心模块之上的codec实现,这些简单的或者高级的codec实现帮你解决了大部分在你进行协议处理开发过程会遇到的问题,不管这些协议是简单的仍是复杂的,二进制的或是简单文本的。
2.4.2. SSL / TLS 支持

不一样于传统阻塞式的I/O实现,在NIO模式下支持SSL功能是一个艰难的工做。你不能只是简单的包装一下流数据并进行加密或解密工做,你不得不借助于javax.net.ssl.SSLEngine,SSLEngine是一个有状态的实现,其复杂性不亚于SSL自身。你必须管理全部可能的状态,例如密码套件,密钥协商(或从新协商),证书交换以及认证等。此外,与一般指望状况相反的是SSLEngine甚至不是一个绝对的线程安全实现。
在Netty内部,SslHandler封装了全部艰难的细节以及使用SSLEngine可能带来的陷阱。你所作的仅是配置并将该 SslHandler插入到你的ChannelPipeline中。一样Netty也容许你实现像StartTlS 那样所拥有的高级特性,这很容易。
2.4.3. HTTP实现

HTTP无疑是互联网上最受欢迎的协议,而且已经有了一些例如Servlet容器这样的HTTP实现。所以,为何Netty还要在其核心模块之上构建一套HTTP实现?
与现有的HTTP实现相比Netty的HTTP实现是至关不同凡响的。在HTTP消息的低层交互过程当中你将拥有绝对的控制力。这是由于Netty的 HTTP实现只是一些HTTP codec和HTTP消息类的简单组合,这里不存在任何限制——例如那种被迫选择的线程模型。你能够为所欲为的编写那种能够彻底按照你指望的工做方式工做的客户端或服务器端代码。这包括线程模型,链接生命期,快编码,以及全部HTTP协议容许你作的,全部的一切,你都将拥有绝对的控制力。
因为这种高度可定制化的特性,你能够开发一个很是高效的HTTP服务器,例如:
要求持久化连接以及服务器端推送技术的聊天服务(e.g. Comet )
须要保持连接直至整个文件下载完成的媒体流服务(e.g. 2小时长的电影)
须要上传大文件而且没有内存压力的文件服务(e.g. 上传1GB文件的请求)
支持大规模mash-up应用以及数以万计链接的第三方web services异步处理平台
2.4.4. Google Protocol Buffer 整合

Google Protocol Buffers 是快速实现一个高效的二进制协议的理想方案。经过使用ProtobufEncoder和ProtobufDecoder,你能够把Google Protocol Buffers 编译器 (protoc)生成的消息类放入到Netty的codec实现中。请参考“LocalTime ”实例,这个例子也同时显示出开发一个由简单协议定义 的客户及服务端是多么的容易。
2.5. 总述
在这一章节,咱们从功能特性的角度回顾了Netty的总体架构。Netty有一个简单却不失强大的架构。这个架构由三部分组成——缓冲(buffer),通道(channel),事件模型(event model)——全部的高级特性都构建在这三个核心组件之上。一旦你理解了它们之间的工做原理,你便不难理解在本章简要说起的更多高级特性。
你可能对Netty的总体架构以及每一部分的工做原理仍旧存有疑问。若是是这样,最好的方式是告诉咱们 应该如何改进这份指南。
相关文章
相关标签/搜索