Netty 4.x 官方入门指南 [译]

前言

本篇翻译自netty官方Get Start教程,一方面能把好的文章分享给各位,另外一方面能巩固所学的知识。如有错误和遗漏,欢迎各位指出。html

https://netty.io/wiki/user-gu...java

面临的问题

咱们通常使用专用软件或者是开源库和其余系统通讯。举个例子,咱们一般使用 http 客户端从 web 服务器获取信息,或者经过 web service 执行一个 remote procedure call (远程调用)。然而,一个通用的协议时常不具有很好的扩展性,例如咱们不会使用一个通用 http 服务器去作以下类型的数据交换——大文件,电子邮件,近实时的金融数据或者是游戏数据。所以,一个高度优化的致力于解决某些问题的通信协议是颇有必要的,例如你但愿实现一台优化过的 http 服务器,致力于聊天应用,流媒体传输,大文件传输等。你甚至能够为已有需求量身定作一个全新的通讯协议。另外一个不可避免的状况是,你必须处理一个古老的专用协议,使用他去跟遗留系统通讯,问题是咱们该如何快速实现协议,同时不牺牲应用的稳定性和性能。web

解决方法

The Netty project is an effort to provide an asynchronous event-driven network application framework and tooling for the rapid development of maintainable high-performance · high-scalability protocol servers and clients.编程

In other words, Netty is an NIO client server framework that enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server development.bootstrap

'Quick and easy' does not mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences learned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.api

Some users might already have found other network application framework that claims to have the same advantage, and you might want to ask what makes Netty so different from them. The answer is the philosophy it is built on. Netty is designed to give you the most comfortable experience both in terms of the API and the implementation from the day one. It is not something tangible but you will realize that this philosophy will make your life much easier as you read this guide and play with Netty.promise

Get Started

本章使用简单的例子带你浏览 netty 的核心构造,你快速上手。在本章事后,你就能够写出一个基于 netty 的客户端和服务器。若是你但愿有更好的学习体验,你能够先浏览 Chapter 2, Architectural Overview 后再回来本章学习 (先看这里也是OK的)。缓存

开始以前

能跑通本章例子的最低要求:最新版本的 netty(4.x) 和 JDK 1.6 或以上的版本。
在阅读时,当你对本章中出现的 class 感到疑惑,请查阅他们的 api 文档。而本章几乎全部的 class 都会连接到他们的 api 文档。若是你发现本章中有什么错误的信息、代码语法错误、或者有什么好的想法,也请联系 netty 社区通知咱们。服务器

写一个Discard Server(抛弃服务器)

世界上最简单的协议并非 hello world,而是Discard。这种协议会抛弃掉全部接收到的数据,不会给客户端任何响应,因此实现Discard协议惟一要作的是忽略全部接收到的数据。接下来让咱们着手写一个 handler,用来处理I/O events(I/O事件)。网络

package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

1.DiscardServerHandler 继承ChannelInboundHandlerAdapter,并间接实现了ChannelInboundHandlerChannelInboundHandler接口提供了多种 event handler method (事件处理方法),你可能要逐个实现接口中的方法,但直接继承ChannelInboundHandlerAdapter会是更好的选择。
2.这里咱们重写了channelRead(),当有新数据到达时该方法就会被调用,并附带接收到的数据做为方法参数。在本例中,接收到的数据类型是ByteBuf
3.要实现 Discard 协议,这里 handler 会忽略接收到的数据。ByteBuf做为 reference-counted (引用计数) 对象,经过调用方法release()释放资源,请记住这个 release 动做在 handler 中完成 (原文:是handler的职责)。一般,咱们会像下面那样实现channelRead()

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

4.当 netty 发生 I/O 错误,或者 handler 在处理 event (事件) 抛出异常时,exceptionCaught()就会被调用。大多数状况下咱们应该记录下被捕获的异常,并关闭与之关联的channel(通道),但同时你也能够作一些额外的异常处理,例如在关闭链接以前,你可能会发送一条带有错误代码的响应消息。

目前为止,咱们已经完成了一半的工做,剩下的就是在main()方法中启动Discard服务器。

package io.netty.example.discard;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroup是一个处理I/O操做的事件循环器 (实际上是个线程池)。netty为不一样类型的传输协议提供了多种NioEventLoopGroup的实现。在本例中咱们要实现一个服务端应用,并使用了两个NioEventLoopGroup。第一个一般被称为boss,负责接收已到达的 connection。第二个被称做 worker,当 boss 接收到 connection 并把它注册到 worker 后,worker 就能够处理 connection 上的数据通讯。要建立多少个线程,这些线程如何匹配到Channel上会随着EventLoopGroup实现的不一样而改变,或者你能够经过构造器去配置他们。
  2. ServerBootstrap是用来搭建 server 的协助类。你也能够直接使用Channel搭建 server,然而这样作步骤冗长,不是一个好的实践,大多数状况下建议使用ServerBootstrap
  3. 这里咱们指定NioServerSocketChannel类,用来初始化一个新的Channel去接收到达的connection。
  4. 这里的 handler 会被用来处理新接收的ChannelChannelInitializer是一个特殊的 handler,帮助开发者配置Channel,而多数状况下你会配置Channel下的ChannelPipeline,往 pipeline 添加一些 handler (例如DiscardServerHandler) 从而实现你的应用逻辑。当你的应用变得复杂,你可能会向 pipeline 添加更多的 handler,并把这里的匿名类抽取出来做为一个单独的类。
  5. 你能够给Channel配置特有的参数。这里咱们写的是 TCP/IP 服务器,因此能够配置一些 socket 选项,例如 tcpNoDeply 和 keepAlive。请参考ChannelOptionChannelConfig文档来获取更多可用的 Channel 配置选项,并对此有个大概的了解。
  6. 注意到option()childOption()了吗?option()用来配置NioServerSocketChannel(负责接收到来的connection),而childOption()是用来配置被ServerChannel(这里是NioServerSocketChannel) 所接收的Channel
  7. 剩下的事情就是绑定端口并启动服务器,这里咱们绑定到机器的8080端口。你能够屡次调用bind()(基于不一样的地址)。

刚刚,你使用 netty 完成了第一个服务端程序,可喜可贺!

处理接收到的数据

既然咱们完成了第一个服务端程序,接下来要就要对它进行测试。最简单的方法是使用命令行 telnet,例如在命令行输入telnet localhost 8080,而后再输入点别的东西。
然而咱们并不知道服务端是否真的在工做,由于他是 Discard Server,咱们得不到任何响应。为了证实他真的在工做,咱们让服务端打印接收到的数据。
咱们知道当接收到数据时,channelRead()会被调用。因此让咱们加点代码到 DiscardServerHandler 的channelRead()中:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}
  1. 这步低效的循环能够替换成System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
  2. 你能够用in.release()替换这里的代码。

若是你再运行 telnet 命令,服务端会打印出接收到的数据。
Discard Server 完整的源码放在io.netty.example.discard这个包中。

写一个 Echo Server (打印服务器)

目前为止,咱们写的服务程序消费了数据但没有给出任何响应,而做为一台服务器理应要对每个请求做出响应。接下来让咱们实现 ECHO 协议,学习如何响应消息并把接收到的数据发回客户端。
Echo Server 跟 Discard Server 惟一不一样的地方在于,他把接收到的数据返回给客户端,而不是把他们打印到控制台。因此这里咱们只须要修改channelRead()就好了:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.write(msg); // (1)
    ctx.flush(); // (2)
}
  1. ChannelHandlerContext能触发多种 I/O 事件和操做,这里咱们调用write()方法逐字写回接收到的数据。请注意咱们并无释放接收到的消息Object msg,由于在写数据时ctx.write(msg),netty 已经帮你释放它了。
  2. ctx.write()关没有把消息写到网络上,他在内部被缓存起来,你须要调用ctx.flush()把他刷新到网络上。ctx.writeAndFlush(msg)是个更简洁的方法。

若是再次使用命令行 telnet,你会看到服务端返回了你输入过的东西。完整的 Echo Server 源码放在io.netty.example.echo包下面。

写一个 Time Server (时间服务器)

咱们这一小节要实现 TIME 协议。跟前面的例子不一样,Timer Server 在链接创建时 (收到请求前) 就返回一个32位 (4字节) 整数,并在发送成功后关闭链接。在本例中,将会学习到如何构造和发送一个消息,在发送完成时关闭链接。
由于要在刚创建链接时发送消息而无论后来接收到的数据,此次咱们不能使用channelRead(),取而代之的是channelActive方法,如下是具体实现:

package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 当链接被创建后channelActive()方法会被调用,咱们在方法体中发送一个32位的表明当前时间的整数。
  2. 要发送一个新的消息,须要分配一个新的buffer(缓冲区) 去包含这个消息。咱们要写一个32位的整数,所以缓冲区ByteBuf的容量至少是4个字节。经过ChannelHandlerContext.alloc()获取ByteBufAllocator(字节缓冲区分配器),用他来分配一个新的buffer
  3. 像往常同样把消息写到网络上。
    等一下Σ( ° △ °|||),flip()方法哪去了?还记不记得在NIO中曾经使用过的java.nio.ByteBuffer.flip()(简单总结就是把ByteBuffer从写模式变成读模式)?ByteBuf并无这个方法,由于他包含了两个指针——读指针和写指针 (读写标记,不要理解成C里的指针)。当你往ByteBuf写数据时,写指针会移动而读指针不变。这两个指针刚好标记着数据的起始、终止位置。
    与之相反,原生 NIO 并无提供一个简洁的方式去标记数据的起始和终止位置,你必需要调用flip方法。有 时候你极可能忘记调用flip方法,致使发送不出数据或发送了错误数据。这样的错误并不会发生在 netty,由于 netty 有不一样的指针去应对不一样的操做 (读写操做),这使得编程更加简单,由于你再也不须要 flipping out (疯狂输出原生 NIO)

    其余须要注意的是ChannelHandlerContext.write()/writeAndFlush()方法返回了ChannelFutureChannelFuture表示一个还没发生的 I/O 操做。这意味着你请求的一些 I/O 操做可能还没被处理,由于 netty 中全部的操做都是异步的。举个例子,下面的代码可能在消息发送以前就关闭了链接:

    Channel ch = ...;
       ch.writeAndFlush(message);
       ch.close();

    因此,你要在 (write()返回的)ChannelFuture完成以后再调用close()。当write操做完成后,ChannelFuture会通知到他的listeners(监听器)。需加注意,close()方法可能不会当即关闭连接,一样close()也会返回一个ChannelFuture

  4. 那么咱们如何知道写操做完成了?很简单,只要向ChannelFuture注册监听器 (ChannelFutureListener) 就行。这一步,咱们建立了ChannelFutureListener的匿名类,在写操做完成时关闭连接。
    你也可使用已经定义好的监听器,例如这样:

    f.addListener(ChannelFutureListener.CLOSE);

为了测试 Time server 是否如期工做,你可使用 unix 的命令行:

$ rdate -o <port> -p <host>

写一个 Time Client (时间客户端)

跟 DISCARD 和 ECHO 服务器不一样,咱们要写一个客户端程序应对 TIME 协议,由于你没法把一个32位整数翻译成日期。本节中,咱们确保服务端正常工做,并学习如何使用 netty 写一个客户端程序。
netty 客户端和服务器最大的不一样在于,客户端使用了不一样的BootstrapChannel实现类。请看下面的例子:

package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. BootstrapServerBootstrap类似,但他是做用在客户端或无链接模式的 Channel (通道)。
  2. 若是你只定义了一个EventLoopGroup,他会同时做为 boss group 和 worker group,虽然客户端并无 boss worker 这个概念。
  3. 这里使用NioSocketChannel而不是NioServerSocketChannelNioSocketChannel会被用来建立客户端Channel
  4. ServerBootstrap不一样,这里咱们没有使用childOption(),由于客户端的SocketChannel没有父Channel
  5. 咱们使用connect()代替bind()方法。

正如你所见,客户端代码跟服务端代码没有很大的区别。那么接下来就是实现ChannelHandler,他会从服务端接收一个32位整数,翻译成可读的日期格式并打印出来,最后关闭链接:

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 处理TCP/IP时, netty 把读取的数据放到ByteBuf

看起来很是简单,跟服务端没有多大区别。然而有时候 handler 会发生错误,例如抛出异常IndexOutOfBoundsException,在下一章节咱们会做具体讨论。

基于流的传输协议

关于Socket缓冲区的小警告

TCP/IP这种基于流的传输协议,接收的数据会存储到socket缓冲区。不幸的是,这类缓冲区不是数据包队列,而是字节流队列。这意味着,即便你想发送两个消息并打包成两个数据包,操做系统只会把他们看成一连串字节。所以,这不能保证你读到的数据刚好是远程发送端写出的数据。举个例子,假设操做系统TCP/IP栈收到三个数据包:
图片描述

由于这种流式协议的特性,应用程序颇有可能像下图的方式那样读取数据碎片:
图片描述

因此,做为接收端 (无论是服务端仍是客户端),应把接收到的数据 (字节流) 整理成一个或多个易于理解的数据贞。对于上述的例子,整理以下:
图片描述

解决方案一

让咱们回到 TIME Client 这个例子。一32位整数的数据量很是小,在本例中不该用被分割。然而,问题在于他确实有可能被分割,可能性随着通讯数据量的增大而增大。
一个简单的方法是建立一个内部的cumulative buffer(累积缓冲区),等待数据直到接收到4个字节为止。下面是修改过的TimeClientHandler

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. ChannelHandler有两个与生命周期有关的监听方法:handlerAdded()handlerRemove()。你能够在里面执行任意的初始化或析构任务,只要他们不会阻塞程序很长时间。
  2. 首先,全部接收的数据被累积到缓冲区。
  3. 其次,handler 检查缓冲区buf是否接收到足够的数据 (4个字节),如果,则进行实际业务处理。不然当有更多数据到达时,netty 会再次调用channelRead(),直到缓冲区累积到4个字节。

解决方案二

虽然方案一解决了问题,但修改过的 handler 看上去不是那么简洁。想像一下协议变得更为复杂,例如包含多个可变长字段,你的ChannelInboundHandler很快会变得不可维护。
你可能会注意到,能够向ChannelPipeline添加多个ChannelHandler。因此,你能够把一个庞大复杂的ChannelHandler分割成多个小模块,从而减少应用的复杂性。举个例子,你能够把TimeClientHandler分割成两个handler:

  • 处理数据碎片的TimeDecoder
  • 最初始的简单版本的TimeClientHandler

幸运的是,netty 提供了一个可扩展的父类,帮助你书写TimeDecoder

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoder实现了ChannelInboundHandler,使你更容易去处理数据碎片。
  2. 当有新数据到达时,ByteToMessageDecoder调用decode()方法并维护了一个内部的cumulative buffer(累积缓冲区)
  3. 累积缓冲区数据不足时,decode()方法不会添加任何东西到 out 列表。当有更多数据到达时,ByteToMessageDecoder会再次调用decode()方法。
  4. 若是decode()方法向 out 列表添加了一个对象,这表示decoder(解码器) 成功解析了一个消息。ByteToMessageDecoder会抛弃掉cumulative buffer(累积缓冲区)中已读数据。请记住,你不须要去解析多个消息,由于ByteToMessageDecoder会持续调用decode(),直到他没有往 out 列表添加对象。

既然但愿往ChannelPipeline添加其余 handler (上面的TimeDecoder),咱们要修改TimeClient中的ChannelInitializer

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

若是你充满冒险精神,你能够尝试使用ReplayingDecoder,他会使代码更加简洁:

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

此外,netty 提供了不少开箱即用的decoder,他们已经实现了大多数的网络协议,避免你本身去实现一个庞大的难以维护的handler。请参考下面的包获取更多详细例子:

  • io.netty.example.factorial 二进制协议
  • io.netty.example.telnet 文本协议

使用 POJO 代替 ByteBuf

上面全部例子都使用了ByteBuf做为协议中基本的数据结构。在本小节,咱们将要升级 TIME 协议中的客户端和服务端,使用 POJO 代替ByteBuf
使用 POJO 的优点是显而易见的:你的 handler 变得易于维护和可重用,经过把 (从ByteBuf中抽取信息的) 代码分离出来。在 TIME 协议的例子里,咱们仅仅读取一个32位的整数,直接使用ByteBuf并不会有太大问题。然而,当实现一个真实的网络协议时,你会发现作代码分离颇有必要。

首先,让咱们定义一个新的类型UnixTime

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

如今修改TimeDecoder,让他向out列表添加一个UnixTime而不是ByteBuf

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readUnsignedInt()));
}

既然修改了TimeDecoderTimeClientHandler也不能再使用ByteBuf了:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

是否是更加简单、优雅?相同的窍门一样能使用在服务端。此次让咱们先修改TimeServerHandler

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

如今只剩下encoder(编码器),他须要实现ChannelOutboundHandler,把UnixTime翻译回ByteBuf。这里比书写decoder更加简单,由于咱们再也不须要处理数据包碎片并把他们组装起来了。

package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}
  1. 这一行代码有几个重要的点:
    首先,咱们把参数中ChannelPromise传递到write(),以便 netty 把他标记为成功或失败 (当数据真正写到网络时)。
    其次,咱们没有调用ctx.flush(),由于ChannelOutboundHandlerAdapter中有一个单独的方法void flush(ChannelHandlerContext ctx)专门用来处理flush操做。

你可使用MessageToByteEncoder更加地简化代码:

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

最后一步就是把TimeEncoder添加到服务端ChannelPipeline,留做练习。

关闭你的应用

关闭netty应用很简单——经过shutdownGracefully()去关闭全部建立的EventLoopGroup。他返回一个Future去通知你何时EventLoopGroup和他从属的 Channel 已经彻底关闭了。

总结

本章,咱们快速浏览了 netty,使用他书写了一个可用的网络应用。接下来的章节中会介绍更多关于 netty 的详细资料,咱们也但愿你去重温io.netty.example package包中的例子。netty 社区的大门会向你敞开,你能够向社区提出问题和意见,您的的反馈会帮助netty项目变得更加完善。

相关文章
相关标签/搜索