[网络通讯] Netty 入门实战

[网络通讯] Netty 入门实战

简介html

什么是 Netty?让咱们带着问题来跟着官网的 Demo 教程先入个门。java

  • 依赖git

  • 实战web

    • 丢弃服务器面试

    • 响应服务器shell

    • 时间服务器编程

  • 流数据传输bootstrap

  • 对象序列化传输windows

  • 关闭promise

  • 小结

  • REFERENCES


Netty 是异步事件驱动的Java开源网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

  • Netty 项目旨在为可维护的高性能和高可伸缩性协议服务器和客户端的快速开发提供一个异步事件驱动的网络应用框架和工具。
  • Netty 是一个 NIO 客户机服务器框架,能够快速简单地开发网络应用程序,如协议服务器和客户机。它极大地简化了网络编程,如 TCP 和 UDP 套接字服务器的开发。
  • “快速和简单”并不意味着产生的应用程序会受到可维护性或性能问题的影响。Netty 是根据实现许多协议(如 FTP、 SMTP、 HTTP 以及各类二进制和基于文本的遗留协议)的经验而精心设计的。所以,Netty 成功地找到了一种方法来实现简单的开发、性能、稳定性和灵活性。
  • 一些用户可能已经发现了其余声称具备一样优点的网络应用程序框架,您可能想要问是什么使 Netty 与他们如此不一样。答案就是它所创建的哲学。Netty 的目的是从第一天开始就在 API 和实现方面为您提供最温馨的体验。它不是什么实实在在的东西,可是当你阅读本指南和玩 Netty 的时候,你会意识到这种哲学会让你的生活变得更加轻松。

依赖

dependencies {
    implementation "io.netty:netty-all:4.1.56.Final"
}

实战

世界上最简单的协议实现不是发送Hello World消息,被服务器接受到返回相应的响应结果。而是服务器接收到消息后直接丢弃,不作任何响应。

丢弃服务器

要实现 DISCARD 协议,您须要作的惟一一件事就是忽略全部接收到的数据。让咱们直接从处理程序实现开始,它处理 Netty 生成的 I/O 事件。

// [1]
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
   
    // [2]
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // super.channelRead(ctx, msg);
        // 不处理消息,直接释放
        // [3]
        ((ByteBuf) msg).release();
    }

    // [4]
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭链接
        cause.printStackTrace();
        ctx.close();
    }
}
  1. ChannelInboundHandlerAdapter 实现了接口 ChannelInboundHandler。充当适配器的角色提供了各类能够重写的事件处理程序方法,经过适配器的标准实现方式,能够避免咱们本身实现处理程序接口。
  2. 咱们能够覆盖 channelRead()的事件处理器方法。只要从客户机接收到新数据,就会使用接收到的消息调用此方法。
  3. 为了实现 DISCARD 协议,处理程序必须忽略接收到的消息。 ByteBuf是一个引用计数的对象,必须经过 release()方法来进行释放。一般的事项方式是这样的
// [2]
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // super.channelRead(ctx, msg);
        // 不处理消息,直接释放
        // [3]
        //((ByteBuf) msg).release();
        try {
            // 针对消息 msg 进行处理
        } finally {
         // 释放引用
            ReferenceCountUtil.release(msg);
        }
    }
  1. exceptionCaught()做为异常处理,当 Netty 因为 I/O 错误或处理程序实现因为处理事件时抛出的异常而引起异常时,使用 Throwable 调用事件处理程序方法。在大多数状况下,被捕获的异常应该被记录,其相关的通道应该在这里关闭,尽管这个方法的实现能够根据您想要处理的异常状况而有所不一样。例如,您可能但愿在关闭链接以前发送带有错误代码的响应消息。

启动服务器

public class DiscardServer {
    /**
     * 端口
     */

    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws InterruptedException {
        // [1]
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // [2]
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                 // [3]
                    .channel(NioServerSocketChannel.class)
                 // [4]
                    .childHandler(new ChannelInitializer<SocketChannel>() 
{
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new DiscardServerHandler());
                        }
                    })
                 // [5]
                    .option(ChannelOption.SO_BACKLOG, 128)
                 // [6]
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
   // [7]
            ChannelFuture cf = bootstrap.bind(port).sync();
            cf.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        new DiscardServer(port).run();
    }
}

  1. NioEventLoopGroup 是一个处理I/O操做的多线程事件循环的处理器定义。例子中定义了2个处理器:

    • 第一个一般被称为“ boss” ,接受一个传入的链接。

    • 第二个一般被称为“工人” ,一旦老板接受了链接并注册了与工人接受的链接,就处理接受链接的通讯。

  2. ServerBootstrap 是服务器构造的辅助类,通常不推荐此方式进行服务器的建立。

  3. 此处指定NioServerSocketChannel类,用于实例化一个新的Channel来接受传入的链接。

  4. 此处指定的处理程序将始终由新接受的ChannelChannelInitializer做为特殊的处理程序,用于帮助用户配置新的Channel。每每适用于为新的Channel添加一些处理程序来实现更为复杂的应用程序。

  5. option参数设置,支持设置特定的套接字选项。来知足特定的协议需求,如.option(ChannelOption.TCP_NODELAY, true)来编写TCP/IP 服务协议。

  6. childOptionoption不一样之处在于:

    • option 适用于 NioServerSocketChannel来接受传入的链接。
    • childOption适用于被父级的 ServerChannel接受的 Channels
  7. 绑定到指定端口。

模拟通讯

  • 调整代码,打印接受的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        try {
            // 针对消息 msg 进行处理
            while (in.isReadable()) { // [4]
                System.out.print((char) in.readByte());
                System.out.flush();
            }        
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
  • windows 环境下使用 powershell 输入命令 telnet localhost 8080,进行通讯。
  • powershell 终端输入的字符会同步在控制台打印出来。

响应服务器

目前为止,咱们只接受可是没有任何响应。一台服务器,一般应该响应该请求。让咱们学习如何经过实现ECHO协议向客户端写入响应消息,其中任何接收到的数据都被发送回来。

与前面部分实现的丢弃服务器的惟一区别在于它将接收到的数据发回,而不是将接收的数据输出到控制台。所以,再次修改channelRead()方法是足够的:

参考地址:https://netty.io/4.1/xref/io/netty/example/echo/package-summary.html

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.write(msg);

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭链接
        cause.printStackTrace();
        ctx.close();
    }
}

public class EchoServer {
    /**
     * 端口
     */

    private int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void run() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() 
{
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture cf = bootstrap.bind(port).sync();
            cf.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        new EchoServer(port).run();
    }
}

经过终端输入telnet localhost 8080后输入英文字符会获得响应,原字符返回。如依次输入abc,终端打印结果:

aabbcc
  • ChannelHandlerConetxt 提供了不少方法让你去触发 IO 事件或操做。这里咱们调用 write(object)来逐字的写入接受到的消息。注意,咱们不像 DISCARD 例子里的那样,咱们没有释放咱们收到的消息。这是由于当它被写回到 wire 时,Netty 替咱们释放它。
  • ctx.write(Object) 不会让消息发送,它存在于内部缓冲区,经过调用 ctx.flush() 来把消息发送出去,或者,您能够简洁的调用 ctx.writeAndFlush(msg)。

时间服务器

接下来要实现的协议是 TIME 协议。它不一样于前面的示例,由于它发送包含32位整数的消息,而不接收任何请求,并在消息发送后关闭链接。在本例中,您将学习如何构造和发送消息,以及如何在完成时关闭链接。

由于咱们将忽略任何接收到的数据,可是一旦创建链接就发送消息,因此此次不能使用 channelRead() 方法。相反,咱们应该重写 channelActive()方法。代码以下:

服务端

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception // [1]
        final ByteBuf time = ctx.alloc().buffer(4); // [2]
        time.writeInt(89); //ASCII 10进制,对应 Y
        time.writeInt(105); //ASCII 10进制,对应 i
        final ChannelFuture f = ctx.writeAndFlush(time); // [3]
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // [4]
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭链接
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 如前所述,当创建链接并准备生成通讯量时,将调用 channelActive ()方法。让咱们编写一个32位的整数,它表示此方法中的当前时间。

  2. 要发送一个新消息,咱们须要分配一个新的缓冲区,其中将包含消息。咱们要写一个32位的整数,所以咱们须要一个容量至少为4字节的 ByteBuf。经过 ChannelHandlerContext.alloc ()获取当前 ByteBufAllocator 并分配一个新缓冲区。

  3. 像往常同样,咱们写入一条构造好的消息。可是,等等,哪里冒险了?咱们之前不是叫 java.nio 吗。在 NIO 中发送消息以前使用 ByteBuffer.flip () ?ByteBuf 没有这样的方法,由于它有两个指针: 一个用于读操做,另外一个用于写操做。当您将某些内容写入 ByteBuf 而读取器索引不变时,写入器索引会增长。读者索引和写者索引分别表示消息的开始和结束位置。

    • 相比之下,NIO 缓冲区并不提供一种清晰的方法来肯定消息内容的开始和结束位置而不调用 flip 方法。当您忘记翻转缓冲区时,您将遇到麻烦,由于不会发送任何内容或错误的数据。这种错误在 Netty 不会发生,由于咱们对不一样的操做类型有不一样的指针。当你习惯了它,你会发现它会让你的生活变得更加轻松。

    • 另外一点须要注意的是 ChannelHandlerContext.write () 和 writeAndFlush () 方法返回 ChannelFuture。ChannelFuture 表示还没有发生的 I/O操做。这意味着,任何请求的操做可能还没有执行,由于全部操做在 Netty 都是异步的。例如,下面的代码可能会在发送消息以前关闭链接:

      Channel ch = ...;
      ch.writeAndFlush(message);
      ch.close();
    • 所以,您须要在 ChannelFuture 完成以后调用 close ()方法,该方法由 write ()方法返回,并在完成写操做后通知其侦听器。请注意,close () 也可能不会当即关闭链接,而是返回一个 ChannelFuture。

  4. 那么,当写请求完成时,咱们如何获得通知呢?这很简单,能够添加一个ChannelFutureListener来监听返回的结果ChannelFuture。在这里,咱们建立了一个新的匿名通道 ChannelFutureListener,当操做完成时它会关闭通道。

  • 或者,您可使用预约义的侦听器简化代码:
f.addListener(ChannelFutureListener.CLOSE);
  • 要测试咱们的时间服务器是否正常工做,可使用 telnet localhost 8080 命令,终端在链接上后,打印消息后直接失去链接:
Yi

遗失对主机的链接。

客户端

与 DISCARD 和 ECHO 服务器不一样,咱们须要 TIME 协议的客户端,由于人不能将32位二进制数据转换为日历上的日期。在本节中,咱们将讨论如何确保服务器正常工做,并学习如何使用 Netty 编写客户机。

  • 调整服务端接受请求并返回时间戳
 @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception // [1]
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        // 2208988800为1900年1月1日00:00:00~1970年1月1日00:00:00的总秒数
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
  • 客户端接收服务端的响应并转换为时间格式输出
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭链接
        cause.printStackTrace();
        ctx.close();
    }
}

public class TimeServer {
    /**
     * 端口
     */

    private int port;

    public TimeServer(int port) {
        this.port = port;
    }

    public void run() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 在 Netty,服务器和客户机之间最大也是惟一的区别是使用了不一样的 Bootstrap 和 Channel 实现。请看下面的代码:
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() 
{
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture cf = bootstrap.bind(port).sync();
            cf.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        new TimeServer(port).run();
    }
}

客户端接收到响应打印结果:

Tue Dec 29 12:01:58 CST 2020

流数据传输

在基于流的传输(如 TCP/IP)中,接收到的数据被存储到套接字接收缓冲区中。不幸的是,基于流的传输的缓冲区不是一个包队列,而是一个字节队列。这意味着,即便您将两条消息做为两个独立的数据包发送,操做系统也不会将其视为两条消息,而只是将其视为一堆字节。所以,不能保证您所读到的内容与远程对等方所写的内容彻底一致。

例如,假设一个操做系统的 TCP/IP 协议栈已经接收了三个数据包:

1

因为基于流的协议的这个通常属性,在应用程序中颇有可能如下面的碎片形式读取它们:

2

所以,接收部分,不管是服务器端仍是客户端,都应该将接收到的数据碎片整理成一个或多个有意义的帧,应用程序逻辑能够很容易地理解这些帧。在上面的例子中,接收到的数据应该以下所示:

3

第一个解决方案

如今让咱们回到 TIME 客户端示例。咱们这里也有一样的问题。32位整数是一个很是小的数据量,它不太可能常常被分段。然而,问题在于它多是支离破碎的,而且随着流量的增长,支离破碎的可能性也会增长。

最简单的解决方案是建立一个内部累积缓冲区,并等待全部4个字节都被接收到内部缓冲区。如下是修改后的 TimeClientHandler 实现,它解决了这个问题:

public class TimeClientWithBufferHandler extends ChannelInboundHandlerAdapter {

    private ByteBuf buff;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        buff = ctx.alloc().buffer(4);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        buff.release();
        buff = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf m = (ByteBuf) msg; // (1)
        buff.writeBytes(m);
        m.release();
        if (buff.readableBytes() >= 4) {
            long currentTimeMillis = (buff.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭链接
        cause.printStackTrace();
        ctx.close();
    }
}

  • 有两种生命周期监听方法: handlerAdded() and 及 handlerRemoved()
  • 您能够执行任意初始化任务,只要它不长时间阻塞;
  • 首先,全部接收到的数据应该累积成 buff
  • 而后,处理程序必须检查 buff有足够的数据,在这个例子中是4个字节,而后继续进行实际的业务逻辑,当更多的数据到达时,这个函数会从新调用一个方法,最终全部的4个字节都会被累积;
4
  • 非 4 字节的数据会直接被丢弃掉。

第二种解决方案

尽管第一个解决方案已经解决了 TIME 客户机的问题,可是修改后的处理程序看起来并不那么干净。想象一个更复杂的协议,它由多个字段组成,好比一个可变长度的字段。您的 ChannelInboundHandler 实现将很快变得不可维护。

正如您可能已经注意到的,您能够向 ChannelPipeline 添加多个 ChannelHandler,所以,您能够将一个单片 ChannelHandler 分割为多个模块化的 ChannelHandler,以下降应用程序的复杂性。例如,你能够将 TimeClientHandler 分红两个处理器:

  • TimeDecoder  处理碎片化问题
  • 最初的简单版本 TimeClientHandler

幸运的是,Netty 提供了一个可扩展的类,能够帮助你写出第一个开箱即用的类:

public class TimeDecoder extends ByteToMessageDecoder // (1)

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception // (2)
        if (in.readableBytes() < 4) {
            return// (3)
        }

        out.add(in.readBytes(4)); // (4)
    }
}
public class TimeClientWithDecoder {

    public static void main(String[] args) throws Exception {
        String host = "localhost";
        int port = 8080;
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); 
            b.group(workerGroup);
            b.channel(NioSocketChannel.class)
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                            .addLast(new TimeDecoder()) // (5)
                            .addLast(new TimeClientHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  • ByteToMessageDecoder 使得处理分裂问题变得容易;
  • 每当接收到新数据时, ByteToMessageDecoder利用内部维护的累积缓冲区,调用decode方法来处理新数据;
  • 当累积缓冲区中没有足够的数据时 ByteToMessageDecoder什么都不会添加到 out缓冲区中。当收到更多的数据时会再次调用 decode()
  • 若是 decode()将一个数据添加到 out, 这意味着解码器成功解码了一条信息,将丢弃累积缓冲区的读取部分。请记住,您不须要解码多个消息, ByteToMessageDecoder将继续调用方法,直到它没什么数据能够放入 out了;
  • ChannelPipeline添加处理程序 TimeDecoder来实现数据的分解。

还能够经过如下方式进一步简化解码器:

public class TimeWithReplayingDecoder extends ReplayingDecoder<Void{
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        out.add(in.readBytes(4));
    }
}
// 一样的,别忘了在 ChannelPipeline 中添加相应的处理程序
 b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                            //.addLast(new TimeDecoder())
                            .addLast(new TimeWithReplayingDecoder())
                            .addLast(new TimeClientHandler());
                }
            });

此外,Netty 提供了开箱即用的解码器,使您可以很是容易地实现大多数协议,并帮助您避免最终获得一个不可维护的总体处理程序实现。更详细的例子请参考如下软件包:

  • io.netty.example.factorial 二进制协议
  • io.netty.example.telnet 基于文本行的协议

对象序列化传输

到目前为止,咱们讨论的全部示例都使用 ByteBuf 做为协议消息的主要数据结构。实际的网络通讯过程远比上面的时间协议实现的要更复杂,功能也要更增强大,好比咱们经常使用的 Json 序列化传输,若是用 Netty,可否直接传输对象呢?

在 ChannelHandlers 中使用 POJO 的优点是显而易见的;经过分离从处理程序中提取 ByteBuf 信息的代码,您的处理程序变得更加可维护和可重用。在 TIME 协议的客户端和服务器示例中,咱们只读取一个32位整数,直接使用 Bytebuf 并非一个主要问题。可是,您会发如今实现现实世界的协议时有必要进行分离。

首先,咱们将咱们要传输的时间戳封装成一个简单对象:

public class UnixTime {

    private final long value;

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public UnixTime(long value) {
        this.value = value;
    }

    public long value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

增长解码器:

public class TimeDecoderWithPojo extends ByteToMessageDecoder // (1)

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception // (2)
        if (in.readableBytes() < 4) {
            return// (3)
        }

        //out.add(in.readBytes(4)); // (4)
        out.add(new UnixTime(in.readUnsignedInt())); // (4)
    }
}

增长处理器:

public class TimeClientHandlerWithPojo extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        UnixTime m = (UnixTime) msg;
        System.out.println(m);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭链接
        cause.printStackTrace();
        ctx.close();
    }
}

和前面同样,设置客户端的处理器:

            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                            //.addLast(new TimeDecoder())
                            .addLast(new TimeDecoderWithPojo())
                            .addLast(new TimeClientHandlerWithPojo());
                }
            });

响应结果以下:

经过更新的解码器,``TimeClientHandler 再也不使用 ByteBuf。

更简单和优雅,对不对?一样的技术也能够应用于服务器端。

首先是消息处理器,负责发送一个时间戳数据做为响应结果:

public class TimeServerHandlerWithPojo extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        UnixTime unixTime = new UnixTime();
        System.out.println("准备发送:"+ unixTime);
        final ChannelFuture f = ctx.writeAndFlush(unixTime);
        f.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭链接
        cause.printStackTrace();
        ctx.close();
    }
}

而后是编码处理器,将Pojo转换为ByteBuf进行传输:


public class TimeServerEncoderHandlerWithPojo extends ChannelOutboundHandlerAdapter {
    // 它是 ChannelOutboundHandler 的一个实现,它将 UnixTime 转换回 ByteBuf。这比编写解码器要简单得多,由于在对消息进行编码时不须要处理数据包碎片和汇编。
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int) m.value());
        ctx.write(encoded, promise); // (1)
        // 首先,咱们传递原始的 ChannelPromise as-is,这样当编码的数据实际写入到连线时,Netty 将其标记为成功或失败。
        // 其次,咱们没有调用 ctx.flush ()。有一个单独的处理程序方法 void flush (ChannelHandlerContext ctx) ,它旨在重写 flush ()操做。
    }
}

最后是服务端ChannelPipeline程序设置:

  bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() 
{
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new TimeServerEncoderHandlerWithPojo())
                                    .addLast(new TimeServerHandlerWithPojo());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

服务端接收到请求发送数据:

客户端接收到请求的响应数据:

7

一样的,Netty 也为服务端的消息编码定义了不少拆箱即用的工具类:

public class TimeServerMessageToByteEncoderHandler extends MessageToByteEncoder<UnixTime{
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int) m.value());
        ctx.write(encoded, promise); // (1)
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) throws Exception {
        out.writeInt((int) msg.value());
    }
}
// ChannelPipeline 设置
bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() 
{
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    //.addLast(new TimeServerEncoderHandlerWithPojo())
                                    .addLast(new TimeServerMessageToByteEncoderHandler())
                                    .addLast(new TimeServerHandlerWithPojo());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

关闭

关闭 Netty 应用程序一般很是简单,只需关闭经过 shutdownly() 建立的全部 EventLoopGroups 便可。它返回一个 Future,当 EventLoopGroup 彻底终止而且属于该组的全部通道都已关闭时,它会通知您。(前文示例已演示屡次,此处再也不赘述。)

源码:https://gitee.com/zacsnz/architectrue-adventure/tree/master/netty-examples/netty-chapter-1

小结

Netty 做为高性能的异步通讯框架,提供了不少不少好用的 API。

  • Channel: Channel 接口是 Netty 对网络操做抽象类,它除了包括基本的 I/O 操做,如 bind()、connect()、read()、write() 等。比较经常使用的Channel接口实现类是NioServerSocketChannel(服务端)和NioSocketChannel(客户端),这两个 Channel 能够和 BIO 编程模型中的ServerSocket以及Socket两个概念对应上。Netty 的 Channel 接口所提供的 API,大大地下降了直接使用 Socket 类的复杂性。

  • EventLoop: 定义了 Netty 的核心抽象,用于处理链接的生命周期中所发生的事件。主要做用实际就是负责监听网络事件并调用事件处理器进行相关 I/O 操做的处理。那 Channel 和 EventLoop 直接有啥联系呢?Channel 为 Netty 网络操做(读写等操做)抽象类,EventLoop 负责处理注册到其上的Channel 处理 I/O 操做,二者配合参与 I/O 操做。

  • ChannelFuture: Netty 是异步非阻塞的,全部的 I/O 操做都为异步的。所以,咱们不能马上获得操做是否执行成功,可是,你能够经过 ChannelFuture 接口的 addListener() 方法注册一个 ChannelFutureListener,当操做执行成功或者失败时,监听就会自动触发返回结果。而且,你还能够经过ChannelFuture 的 channel() 方法获取关联的Channel。

  • ChannelHandler: 息的具体处理器。他负责处理读写操做、客户端链接等事情。

  • ChannelPipeline: ChannelHandler 的链,提供了一个容器并定义了用于沿着链传播入站和出站事件流的 API 。当 Channel 被建立时,它会被自动地分配到它专属的ChannelPipeline。咱们能够在 ChannelPipeline 上经过 addLast() 方法添加一个或者多个ChannelHandler ,由于一个数据或者事件可能会被多个 Handler 处理。当一个 ChannelHandler 处理完以后就将数据交给下一个 ChannelHandler 。

  • EventLoopGroup 包含多个 EventLoop(每个 EventLoop 一般内部包含一个线程),上面咱们已经说了 EventLoop 的主要做用实际就是负责监听网络事件并调用事件处理器进行相关 I/O 操做的处理。

  • Bootstrap 是客户端的启动引导类/辅助类。

  • ServerBootstrap 客户端的启动引导类/辅助类。

REFERENCES

  • 新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析
  • 线上API和源码
  • 官网入门使用说明
  • Socket和ServerSocket的简单介绍及例子
  • Netty 4.1 Getting Start (翻译) + Demo
  • Netty实战入门详解——让你完全记住什么是Netty
  • 阿里大牛总结的Netty最全常见面试题,面试不再怕被问Netty了 - JavaAOE的文章 - 知乎
8


本文分享自微信公众号 - 架构探险之道(zacsnz1314)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索