Netty3文档翻译(一)

简单找了下发现网上没有关于Netty3比较完整的源码解析的文章,因而我就去读官方文档,为了增强记忆,翻译成了中文,有适当的简化。bootstrap

原文档地址:Netty3文档缓存

Chapter 1 开始

一、开始以前

运行demo的前提有两个:最新版本的Netty3和JDK1.5以上安全

二、写一个Discard Server

最简单的协议就是Discard协议——忽略全部接收到的数据而且不做任何响应。咱们从Netty处理I/O事件的handler实现开始:服务器

public class DiscardServerHandler extends SimpleChannelHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {

        e.getCause().printStackTrace();

        Channel ch = e.getChannel();
        ch.close();
    }
}
  • DiscardServerHandler 继承SimpleChannelHandler——ChannelHandler的一个实现;
  • messageReceived方法接收MessageEvent类型的参数,它包含接收的客户端数据;
  • exceptionCaught方法在出现I/O错误或者处理事件时抛出错误时被调用,一般包含记录错误信息和关闭通道的动做;

接下来写一个main方法来开启使用DiscardServerHandler的服务:网络

public class DiscardServer {
    
    public static void main(String[] args) throws Exception {
        ChannelFactory factory =
                new NioServerSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool());

        ServerBootstrap bootstrap = new ServerBootstrap(factory);

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(new DiscardServerHandler());
            }
        });
        
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);

        bootstrap.bind(new InetSocketAddress(8080));
    }
}
  • ChannelFactory是建立和管理Channel及其关联资源的工厂,它负责处理全部I/O请求而且执行I/O生成ChannelEvent。可是它不是本身建立I/O线程,而是从调用构造方法时指定的线程池中获取线程。服务端应用使用NioServerSocketChannelFactory;
  • ServerBootstrap是一个设置服务端的帮助类;
  • 当服务端接收到一个新的链接,指定的ChannelPipelineFactory就会建立一个新的ChannelPipeline,这个新的Pipeline包含一个DiscardServerHandler对象;
  • 你能够给Channel实现设置具体的参数,选项带"child."前缀表明应用在接收到的Channel上而不是服务端自己的ServerSocketChannel;
  • 剩下的就是绑定端口启动服务,能够绑定多个不一样的端口。

三、处理接收到的数据

咱们能够经过"telnet localhost 8080"命令去测试服务,但由于是Discard服务,咱们都不知道服务是否正常工做。因此咱们修改下服务,让它打印出接收到的数据。数据结构

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    ChannelBuffer buf = (ChannelBuffer) e.getMessage();
    while(buf.readable()) {
        System.out.println((char) buf.readByte());
        System.out.flush();
    }
}
  • ChannelBuffer是Netty基本的存储字节的数据结构,跟NIO的ByteBuffer相似,可是更容易使用更灵活。好比Netty容许你在尽可能少的内存复制次数的状况下把多个ChannelBuffer组合成一个。

四、写一个Echo服务

一个服务一般对请求是有响应的。接下来咱们尝试写一个实现Echo协议——将接收的数据原路返回给客户端的服务:异步

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    Channel ch = e.getChannel();
    ch.write(e.getMessage());
}
  • MessageEvent继承了ChannnelEvent,一个ChannnelEvent持有它相关的Channel的引用。咱们能够获取这个Channel而后调用写方法写入数据返回给客户端。

五、写一个时间服务

此次咱们实现一个时间协议——在不须要任何请求数据的状况下返回一个32位整型数字而且在发送以后关闭链接。由于咱们忽略请求数据,只须要在链接创建的发送消息,因此此次不能使用messageReceived方法而是重写channelConnected方法:socket

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
    Channel ch = e.getChannel();

    ChannelBuffer time = ChannelBuffers.buffer(4);
    time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

    ChannelFuture f = ch.write(time);

    f.addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            Channel ch = future.getChannel();
            ch.close();
        }
    });
}
  • channelConnected方法在链接创建的时间被调用,而后咱们写入一个32位整型数字表明以秒为单位的当前时间;
  • 咱们使用ChannelBuffers工具类分配了一个容量为4字节的ChannelBuffer来存放这个32位整型数字;
  • 而后咱们把ChannelBuffer写入Channel...等一下,flip方法哪里去了?在NIO中咱们不是要在写入通道前调用ByteBuffer的flip方法的吗?ChannelBuffer没有这个方法,由于它有两个指针,一个用于读操做一个用于写操做。当数据写入ChannelBuffer时写索引增长而读索引不变。读索引和写索引相互独立。对比之下,Netty的ChannelBuffer比NIO的buffer更容易使用。
  • 另外须要注意的一点是ChannelBuffer的write方法返回的是一个ChannelFuture对象。它表示一个还未发生的I/O操做,由于Netty中全部操做都是异步的。因此咱们必须在ChannelFuture收到操做完成的通知以后才能关闭Channel。哦,对了,close方法也是返回ChannelFuture...
  • 那么问题来了,咱们如何获得操做完成的通知呢?只须要简单得向返回的ChannelFuture对象中添加一个ChannelFutureListener,这里咱们建立了一个ChannelFutureListener的匿名内部类,它在操做完成的时候会关闭Channel。

六、写一个时间客户端

咱们还须要一个遵照时间协议,即能把整型数字翻译成日期的客户端。Netty服务端和客户端惟一的区别就是要求不一样的Bootstrap和ChannelFactory:tcp

public static void main(String[] args) throws Exception {
    String host = args[0];
    int port = Integer.parseInt(args[1]);

    ChannelFactory factory =
            new NioClientSocketChannelFactory(
                    Executors.newCachedThreadPool(),
                    Executors.newCachedThreadPool());

    ClientBootstrap bootstrap = new ClientBootstrap(factory);

    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            return Channels.pipeline(new TimeClientHandler());
        }
    });

    bootstrap.setOption("tcpNoDelay", true);
    bootstrap.setOption("keepAlive", true);

    bootstrap.connect(new InetSocketAddress(host, port));
}
  • NioClientSocketChannelFactory,用来建立一个客户端Channel;
  • ClientBootstrap是ServerBootStrap在客户端的对应部分;
  • 须要注意的是设置参数时不须要"child."前缀,客户端SocketChannel没有父Channel;
  • 对应服务端的bind方法,这里咱们须要调用connect方法。

另外咱们须要一个ChannelHandler实现,负责把接收到服务端返回的32位整型数字翻译成日期并打印出来,而后断开链接:ide

public class TimeClientHandler extends SimpleChannelHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        ChannelBuffer buf = (ChannelBuffer) e.getMessage();
        long currentTimeMillis = buf.readInt() * 1000L;
        System.out.println(new Date(currentTimeMillis));
        e.getChannel().close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        e.getCause().printStackTrace();
        e.getChannel().close();
    }
}

看上去很简单是吧?可是实际运行过程当中这个handler有时会抛出一个IndexOutOfBoundsException。下一节咱们会讨论为何会这样。

七、处理基于流的传输

7.一、一个关于Socket Buffer的小警告

在像TCP/IP那样基于流的传输中,接收数据保存在一个socket接收缓存中。可是这个缓存不是一个以包为单位的队列,而是一个以字节为单位的队列。这就意味着,即便发送两个独立的消息,操做系统会把他们视为一个字节串。所以,不能保证你读到的和另外一端写入的同样。因此,不论是客户端仍是服务端,对于接收到的数据都须要整理成符合应用程序逻辑的结构。

7.二、第一种解决方式

回到前面的时间客户端的问题,32位整型数字很小,可是它也是能够拆分的,特别是当流量上升的时候,被拆分的可能性也随之上升。
一个简单的处理方式就是内部建立一个累计的缓存,直到接收满4个字节才进行处理。

private final ChannelBuffer buf = dynamicBuffer();

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    ChannelBuffer m = (ChannelBuffer) e.getMessage();
    buf.writeBytes(m);
    if (buf.readableBytes() >= 4) {
        long currentTimeMillis = buf.readInt() * 1000L;
        System.out.println(new Date(currentTimeMillis));
        e.getChannel().close();
    }
}
  • ChannelBuffers.dynamicBuffer()返回一个自动扩容的ChannelBuffer;
  • 全部接收的数据都累积到这个动态缓存中;
  • handler须要检查缓存是否满4个字节,是的话才能继续业务逻辑;不然,Netty会在数据继续到达以后持续调用messageReceive。

7.三、第二种解决方案

第一种方案有不少问题,好比一个复杂的协议,由多个可变长度的域组成,这种状况下第一种方案的handler就没法支持了。
你会发现你能够添加多个ChannelHandler到ChannelPipeline中,利用这个特性,你能够把一个臃肿的ChannelHandler拆分到多个模块化的ChannelHandler中,这样能够下降应用程序的复杂度。好比,你能够把TimeClientHandler拆分红两个handler:

  • TimeDecoder,负责分段问题;
  • 最初那个简版的TimeClientHandler.

Netty提供了可扩展的类帮助你实现TimeDecoder:

public class TimeDecoder extends FrameDecoder {

    @Override
    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {

        if (buffer.readableBytes() < 4) {
            return null;
        }

        return buffer.readBytes(4);
    }
}
  • FrameDecoder是ChannelHandler的一种实现,专门用来处理分段问题;
  • FrameDecoder在每次接收到新的数据时调用decode方法,携带一个内部维持的累积缓存;
  • 若是返回null,说明目前数据接收的还不够,当数据量足够时FrameDecoder会再次调用方法;
  • 若是返回非null对象,表明解码成功,FrameDecoder会丢弃累积缓存中剩余的数据。你无需提供批量解码,FrameDecoder会继续调用decode方法直到返回null。

拆分以后,咱们须要修改TimeClient的ChannelPipelineFactory实现:

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    public ChannelPipeline getPipeline() {
        return Channels.pipeline(
                new TimeDecoder(),
                new TimeClientHandler());
    }
});

Netty还提供了进一步简化解码的ReplayingDecoder:

public class TimeDecoder extends ReplayingDecoder<VoidEnum> {
    @Override
    protected Object decode(
            ChannelHandlerContext ctx, Channel channel,
            ChannelBuffer buffer, VoidEnum state) {
        return buffer.readBytes(4);
    }
 }

此外,Netty提供了一批开箱即用的解码器,让你能够简单得实现大多数协议:

  • org.jboss.netty.example.factorial 用于二进制协议;
  • org.jboss.netty.example.telnet 用于基于行的文本协议.

八、用POJO替代ChannelBuffer

上面的demo咱们都是用ChannelBuffer做为协议化消息的基本数据结构,这一节咱们用POJO替代ChannelBuffer。将从ChannelBuffer提取信息的代码跟handler分离开,会使handler变得更加可维护的和可重用的。从上面的demo里不容易看出这个优点,可是实际应用中分离颇有必要。
首先,咱们定义一个类型UnixTime:

public class UnixTime {

    private final int value;

    public UnixTime(int value) {
        this.value = value;
    }

    public int getValue() {
        return value;
    }

    @Override
    public String toString() {
        return new Date(value * 1000L).toString();
    }
}

如今咱们能够修改TimeDecoder让它返回一个UnixTime而不是ChannelBuffer:

@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {

    if (buffer.readableBytes() < 4) {
        return null;
    }

    return new UnixTime(buffer.readInt());
}

编码器改了,那么相应的TimeClientHandler就不会继续使用ChannelBuffer:

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    UnixTime m = (UnixTime) e.getMessage();
    System.out.println(m);
    e.getChannel().close();
}

一样的技术也能够应用到服务端的TimeServerHandler上:

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
    UnixTime time = new UnixTime((int)(System.currentTimeMillis() / 1000));
    ChannelFuture f = e.getChannel().write(time);
    f.addListener(ChannelFutureListener.CLOSE);
}

能这样运用的前提是有一个编码器,能够把UnixTime对象翻译成ChannelBuffer:

public class TimeEncoder extends SimpleChannelHandler {

    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) {
        UnixTime time = (UnixTime) e.getMessage();
        ChannelBuffer buf = buffer(4);
        buf.writeInt(time.getValue());
        Channels.write(ctx, e.getFuture(), buf);
    }
}
  • 一个编码器重写writeRequested方法拦截一个写请求。这里须要注意的一点是,尽管这里的writeRequested方法参数里也有一个MessageEvent对象,客户端TimeClientHandler的messageReceived的参数里也有一个,可是它们的解读是彻底不一样的。一个ChannelEvent能够是upstream也能够是downstream事件,这取决于事件的流向。messageReceived方法里的MessageEvent是一个upstream事件,而writeRequested方法里的是downstream事件。
  • 当把POJO类转化为ChannelBuffer后,你须要把ChannelBuffer转发到以前在ChannelPipeline内的ChannelDownstreamHandler,也就是TimeServerHandler。Channels提供了多个帮助方法建立和发送ChanenlEvent。

一样,TimeEncoder也须要加入到服务端的ChannelPipeline中:

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    public ChannelPipeline getPipeline() {
        return Channels.pipeline(
                new TimeServerHandler(),
                new TimeEncoder());
    }
});

九、关闭你的应用程序

为了关闭I/O线程让应用程序优雅得退出,咱们须要释放ChannelFactory分配的资源。
一个典型网络应用程序的关闭过程分为三步:

  1. 关闭全部服务端socket链接;
  2. 关闭全部非服务端socket链接(包括客户端socket和服务端接收到的socket);
  3. 释放ChannelFactory使用的全部资源。

应用到TimeClient上:

ChannelFuture future = bootstrap.connect(...);
future.awaitUninterruptibly();
if (!future.isSuccess()) {
    future.getCause().printStackTrace();
}
future.getChannel().getCloseFuture().awaitUninterruptibly();
factory.releaseExternalResources();
  • CilentBootStrap的connect方法返回一个ChannelFuture,当链接尝试成功或者失败时会通知到ChannelFuture。它还持有链接尝试关联的Channel的引用;
  • ChannelFuture.awaitUninterruptibly()等待ChannelFuture肯定链接是否尝试成功;
  • 若是链接失败,咱们打印出失败的缘由。ChannelFuture.getCause()会在链接即没有成功也没有取消的状况下返回失败的缘由;
  • 链接尝试的状况处理以后,咱们还须要等待链接关闭。每一个Channel有它本身的closeFuture,用来通知你链接关闭而后你能够针对关闭作一些动做。即便链接尝试失败了,closeFuture仍然会被通知,由于Channel会在链接失败后自动关闭;
  • 全部链接关闭以后,剩下的就是释放ChannelFactory使用的资源了。释放过程很简单,调用它的releaseExternalResources方法,全部相关的NIO Selector和线程池将会自动关闭。

关闭一个客户端很简单,那服务端呢?你须要从端口解绑并关闭全部接收到的链接。前提是你须要一个保持跟踪活跃链接的数据结构,Netty提供了ChannelGroup。

ChannelGroup是Java集合API一个特殊的的扩展,它表明一组打开的Channel。若是一个Channel被添加到ChannelGroup,而后这个Channel被关闭了,它会从ChannelGroup中自动移除。你能够对同一ChannelGroup中的Channel作批量操做,好比在关闭服务的时候关闭全部Channel。

要跟踪打开的socket,你须要修改TimeServerHandler,把新打开的Channel添加到全局的ChannelGroup变量中。ChannelGroup是线程安全的。

@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
    TimeServer.allChannels.add(e.getChannel());
}

如今咱们自动维持了一个包含全部活跃Channel的列表,关闭服务端就像关闭客户端同样容易了。

public class TimeServer {
    static final ChannelGroup allChannels = new DefaultChannelGroup("time-server");

    public static void main(String[] args) throws Exception {
        ...
        ChannelFactory factory = ...;
        ...
        ServerBootstrap bootstrap = ...;
        ...
        Channel channel = bootstrap.bind(new InetSocketAddress(8080));
        allChannels.add(channel);
        waitForShutdownCommand();
        ChannelGroupFuture future = allChannels.close();
        future.awaitUninterruptibly();
        factory.releaseExternalResources();
    }
}
  • DefaultChannelGroup构造方法接收组名为参数,组名是它的惟一标识;
  • ServerBootstrap的bind方法返回一个服务端的绑定指定本地地址的Channel,调用Channel的close方法将会使它与本地地址解绑;
  • 全部Channel类型均可以被添加到ChannelGroup中,不论是客户端、服务端或是服务端接收的。由于你能够在服务器关闭时同时关闭绑定的Channel和接收到的Channel;
  • waitForShutdownCommand()是一个等待关闭信号的虚构方法。
  • 咱们能够对ChannelGroup中的Channel进行统一操做,这里咱们调用close方法,至关于解绑服务端Channel而且异步关闭全部接收到的Channel。close方法返回一个功能和ChannelFuture相近的ChannelGroupFuture,在全部链接都成功关闭通知咱们。

十、总结

这一节咱们快速浏览了Netty,示范了如何用Netty写一个能正常工做的网络应用。下一节将介绍Netty的更多细节。

相关文章
相关标签/搜索