Netty 系列笔记之开篇

1、引言

❀ 众所周知html

Netty 是一款基于 NIO 客户、服务器端的 Java 开源编程框架,提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

❀ 通俗来说java

Netty 一个很是好用的处理 Socket 的 Jar 包,能够用它来开发服务器和客户端。编程

2、为何要学习 Netty

Netty 做为一个优秀的网络通讯框架,许多开源项目都使用它来构建通讯层。好比 Hadoop、Cassandra、Spark、Dubbo、gRPC、RocketMQ、Zookeeper甚至咱们经常使用的 Spring 等等。bootstrap

更重要的是,Netty 是开发高性能 Java 服务器的必学框架。api

能够说做为一个 Java 工程师,要了解 Java 服务器的高阶知识,Netty 是一个必需要学习的东西。promise

3、Netty 的特性

一、设计
  • 为不一样的传输类型(阻塞和非阻塞)提供统一的 API
  • 基于灵活且可扩展的事件模型,可将关注点明确分离
  • 高度可定制的线程模型:单线程、一个或多个线程池
  • 可靠的无链接数据 Socket 支持(UDP)
二、易用
  • 完善的 JavaDoc ,用户指南和样例
  • 无需额外依赖,JDK 5 (Netty 3.x) 、JDK 6 (Netty 4.x)
三、性能
  • 更高的吞吐量,更低的延迟
  • 更省资源
  • 减小没必要要的内存拷贝
四、安全
  • 完整的 SSL/TLS 和 STARTTLS 的支持
五、社区
  • 活跃的社区和众多的开源贡献者

4、初识 Netty

Talk is cheap, show me the code!
一、丢弃服务器

接下来从代码中感觉一下 Netty,首先实现一个 discard(丢弃)服务器,即对收到的数据不作任何处理。安全

  • 实现 ChannelInBoundHandlerAdapter 首先咱们从 handler 的实现开始, Netty 使用 handler 来处理 I/O 事件。服务器

    public class DiscardServerHandler extends ChannelInboundHandlerAdapter { 
    
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
          // 丢弃收到的数据
          ((ByteBuf) msg).release();
      }
    
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
          cause.printStackTrace();
          ctx.close();
        }
    }
    • 1 行,DiscardServerHandler 继承自 ChannelInboundHandlerAdapter,这个类实现了 ChannelInboundHandler接口,ChannelInboundHandler 提供了许多事件处理的接口方法。
    • 4 行,当收到新的消息时,就会调用 chanelRead() 方法。
    • 6 行,ByteBuf 是一个引用计数对象,这个对象必须显式地调用 release() 方法来释放。处理器的职责是释放全部传递处处理器的引用计数对象,下面是比较常见的 chanelRead() 方法实现:网络

      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            // Do something with msg
        } finally {
            ReferenceCountUtil.release(msg);
        }
      }
    • 10 行,exceptionCaught() 方法是在处理事件时发生异常调用的方法。
  • 启动 Handler 实现 handler 后,咱们须要一个 main() 方法来启动它。数据结构

    public class DiscardServer {
    
      private int port;
    
      public DiscardServer(int port) {
          this.port = port;
      }
    
      public void run() throws Exception {
          // 接收进来的链接
          EventLoopGroup boss = new NioEventLoopGroup();
          // 处理已经接收的链接
          EventLoopGroup worker = new NioEventLoopGroup();
          try {
              ServerBootstrap bootstrap = new ServerBootstrap();
              bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
                  @Override
                  protected void initChannel(SocketChannel socketChannel) throws Exception {
                      // 添加自定义的 handler
                      socketChannel.pipeline().addLast(new DiscardServerHandler());
                  }
              }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
              // 绑定端口,开始接收进来的链接
              ChannelFuture channelFuture = bootstrap.bind(port).sync();
              // 关闭
              channelFuture.channel().closeFuture().sync();
          } finally {
              boss.shutdownGracefully();
              worker.shutdownGracefully();
          }
      }
    
      public static void main(String[] args) throws Exception {
          int port = 8080;
          new DiscardServer(port).run();
      }
    }
    • 11 行,EventLoopGroup 是用来处理 I/O 操做的多线程事件循环器,Netty 提供了许多不一样的 EventLoopGroup 的实现用来处理不一样的传输。在本例咱们实现了一个服务端应用,所以须要两个 EventLoopGroup 。第一个用来接收进来的链接,常被称做 boss ;第二个用来处理已经接收的链接,成为 worker。一旦 boss 接收到一个新进来的链接,就会把链接的信息注册到 worker 上面。
    • 15 行,ServerBootstrap 是一个启动 NIO 服务的辅助启动类。
    • 16 行,指定 NioServerSocketChannel 用来讲明一个新的 Channel 如何接收进来的链接。
    • 20 行, ChannelInitializer 用来帮助使用者建立一个新的 channel ,同时可使用 pipline 指定一些特定的处理器。
    • 22 行,经过这两个方法能够指定新配置的 channel 的一些参数配置。
  • 查看接收到的数据 如此,一个基于 Netty 的服务端程序就完成了,可是如今启动起来咱们看不到任何交互,因此咱们稍微修改一下 DiscardServerHandler 类的 channelRead() 方法,能够查看到客户端发来的消息。

    @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          ByteBuf byteBuf = (ByteBuf) msg;
          try {
              while (byteBuf.isReadable()) {
                  System.out.print((char) byteBuf.readByte());
                  System.out.flush();
              }
          } finally {
              ReferenceCountUtil.release(msg);
          }
      }
  • 测试 接下来咱们启动 DiscardServer ,使用 telnet 来测试一下。

    image.png

    控制台接收到了命令行发来的消息:

    image.png

    • *
二、应答服务器

咱们已经实现了服务器能够接收客户端发来的消息,一般服务器会对客户端发来的请求做出回应,下面就经过 ECHO 协议来实现对客户端的消息响应。

ECHO 协议即会把客户端发来的数据原样返回,因此也戏称“乒乓球”协议。

在上述代码的基础上面,咱们只需对 DiscardServerHandler 类的 channelRead() 方法稍加修改:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.write(msg);
        ctx.flush();
}
  • ChannelHandlerContext 对象提供了许多操做,使你可以触发各类各样的 I/O 事件和操做。这里咱们调用了 write(Object) 方法来逐字地把接受到的消息写入。请注意不一样于 DISCARD 的例子咱们并无释放接受到的消息,这是由于当写入的时候 Netty 已经帮咱们释放了。
  • ctx.write(Object) 方法不会使消息写入到通道上,他被缓冲在了内部,你须要调用 ctx.flush() 方法来把缓冲区中数据强行输出。或者能够用更简洁的 cxt.writeAndFlush(msg) 以达到一样的目的。

再次运行 telnet 命令,就会接受到你发送的信息。


三、时间服务器

接下来咱们基于 TIME 协议,实现构建和发送一个消息,而后在完成时关闭链接。和以前的例子不一样的是在不接受任何请求时会发送一个含 32 位的整数的消息,而且一旦消息发送就会当即关闭链接。

TIME 协议能够提供机器可读的日期时间信息。

咱们会在链接建立时发送时间消息,因此须要覆盖 channelActive() 方法:

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 分配空间
        final ByteBuf time = ctx.alloc().buffer(4);
        // 获取 32 位时间戳并写入
        time.writeInt((int) (System.currentTimeMillis() / 1000L));
        final ChannelFuture future = ctx.writeAndFlush(time);
        // 添加监听器
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                assert future == channelFuture;
                // 关闭链接
                ctx.close();
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  • 4 行,channelActive() 方法将会在链接被创建而且准备进行通讯时被调用。
  • 6 行,同 Java 的 NIO 相似,为了构建一个消息,须要为缓冲区分配空间。由于要发送一个 32 位的时间戳,因此至少 4 字节。
  • 8 行,消息构建完毕后,执行写入。回想使用 Java NIO 的 Buffer 时,在读写操做之间,须要调用 buffer.flip( ) 方法设置指针位置。可是在在 Netty 中不须要这样操做,缘由是 Netty 提供了两个指针,一个读指针和一个写指针,在读写时二者不相互影响。不再用担忧忘记调用 flip( ) 方法时数据为空或者数据错误啦。
  • 11 行,在第 9 行执行完 ctx.writeAndFlush(time) 后会返回一个 ChannelFuture 对象,表明着尚未发生的一次 I/O 操做。这意味着任何一个请求操做都不会立刻被执行,由于在 Netty 里全部的操做都是异步的。这样来看,咱们想完成消息发送后关闭链接,直接在后边调用 ctx.close( ) 可能不能马上关闭链接。返回的 ChannelFuture 对象在操做完成后会通知它的监听器,继续执行操做完成后的动做。
    • *
四、时间客户端

对于时间服务端不能直接用 telnet 的方式测试,由于不能靠人工把一个 32 位的二进制数据翻译成时间,因此下面将实现一个时间客户端。

与服务端的实现惟一不一样的就是使用了不一样的 Bootstrap 和 Channel 实现:

public class TimeClient {

    private String host;

    private int port;

    public TimeClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception{
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            }).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
            // 启动
            ChannelFuture future = bootstrap.connect(host, port).sync();
            // 等待链接关闭
            future.channel().closeFuture().sync();
        } finally {
            worker.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        TimeClient timeClient = new TimeClient("localhost", 8080);
        timeClient.run();
    }

}
  • 13 行,对比 server 端只指定了一个 EventLoopGroup ,它即会做为 boss group 也会做为 worker group,尽管客户端不须要使用到 boss group。
  • 15 行,Bootstrap 和 ServerBootstrap 相似,Bootstrap 面向于服务端的 channel ,好比客户端和无链接传输模式的 channel。

再稍微改动一下 handler :

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 在 TCP/IP 中,Netty 会把读到的数据放入 ByteBuf 中
        ByteBuf byteBuf = (ByteBuf) msg;
        try {
            long time = byteBuf.readUnsignedInt() * 1000L;
            System.out.println(new Date(time));
            ctx.close();
        }finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

分别启动 TimeServer 和 TimeClient ,控制台打印出了当前时间:

image.png

然而,屡次运行后处理器有时候会由于抛出 IndexOutOfBoundsException 而拒绝工做。带着这个问题,继续往下面看。

五、处理基于流的传输

比较典型的基于流传输的 TCP/IP 协议,也就是说,应用层两个不一样的数据包,在 TCP/IP 协议传输时,可能会组合或者拆分应用层协议的数据。因为两个数据包之间并没有边界区分,可能致使消息的读取错误。

不少资料也称上述这种现象为 TCP 粘包,而值得注意的是:

一、TCP 协议自己设计就是面向流的,提供可靠传输。 二、正由于面向流,对于应用层的数据包而言,没有边界区分。这就须要应用层主动处理不一样数据包之间的组装。 三、发生粘包现象不是 TCP 的缺陷,只是应用层没有主动作数据包的处理。

回到上面程序,这也就是上述异常发生的缘由。一个 32 位整型是很是小的数据,它并不见得会被常常拆分到到不一样的数据段内。然而,问题是它确实可能会被拆分到不一样的数据段内。

比较常见的两种解决方案就是基于长度或者基于终结符,继续以上面的 TIME 协议程序为基础,着手解决这个问题。由于只发送一个 32 位的整形时间戳,咱们采用基于数据长度的方式:

❀ 解决方案一

最简单的方案是构造一个内部的可积累的缓冲,直到4个字节所有接收到了内部缓冲。修改一下 TimeClientHandler 的代码:

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private ByteBuf buf;

    private static final int CAPACITY = 4;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        buf = ctx.alloc().buffer(CAPACITY);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        buf.release();
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        buf.writeBytes(byteBuf);
        byteBuf.release();
        // 数据大于或等于 4 字节
        if (buf.readableBytes() >= CAPACITY) {
            long time = buf.readUnsignedInt() * 1000L;
            System.out.println(new Date(time));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

其中覆盖了 handler 生命周期的两个方法:

  • 8 行,handlerAdded():当检测到新的链接以后,调用ch.pipeline().addLast(new LifeCycleTestHandler())以后的回调,表示当前的channel中已经成功添加了一个逻辑处理器
  • 13 行,handlerRemoved():在链接关闭后把这条链接上的全部逻辑处理器所有移除掉。
❀ 解决方案二

尽管上述方案已经解决了 TIME 客户端的问题了,可是在处理器中增长了逻辑,咱们能够把处理消息的部分抽取出来,成为一个单独的处理器,而且能够增长多个 ChannelHandler 到 ChannelPipline ,每一个处理器各司其职,减小模块的复杂度。

由此,拆分出一个 TimeDecoder 用于处理消息:

public class TimeDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() >= 4) {
            out.add(in.readBytes(4));
        }
    }
}
  • ByteToMessageDecoder 继承自 ChannelInboundHandlerAdapter ,每当有新数据接收的时候,ByteToMessageDecoder 都会调用 decode() 方法来处理内部的那个累积缓冲。
  • 若是在 decode() 方法里增长了一个对象到 out 对象里,这意味着解码器解码消息成功。ByteToMessageDecoder 将会丢弃在累积缓冲里已经被读过的数据。

最后,修改 TimeClient 的代码,将 TimeDecoder 加入 ChannelPipline :

bootstrap.group(worker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
                }
            }).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);

除此以外,Netty还提供了更多开箱即用的解码器使你能够更简单地实现更多的协议,帮助你避免开发一个难以维护的处理器实现,感兴趣的小伙伴能够自行了解。

六、将消息解码为自定义对象

上述的例子咱们一直在使用 ByteBuf 做为协议消息的主要数据结构,可是实际使用中,须要传输的消息更加复杂,抽象为对象来处理更加方便。继续以 TIME 客户端和服务器为基础,使用自定义的对象代替 ByteBuf 。

  • 定义保存时间的对象 OurTime :

    public class OurTime {
    
      private final long value;
    
      public OurTime() {
          this(System.currentTimeMillis() / 1000L);
      }
    
      public OurTime(long value) {
          this.value = value;
      }
    
      public long value() {
          return value;
      }
    
      @Override
      public String toString() {
          return new Date(value() * 1000L).toString();
      }
    }
  • 修改 TimeDecoder 类,返回 OurTime 类:

    public class TimeDecoder extends ByteToMessageDecoder {
    
      @Override
      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
          if (in.readableBytes() >= 4) {
              out.add(new OurTime(in.readUnsignedInt()));
          }
      }
    }
  • 修改后的 TimeClientHandler 类,处理新消息更加简洁:

    public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          OurTime ourTime = (OurTime) msg;
          System.out.println(ourTime);
          ctx.close();
      }
    
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
          cause.printStackTrace();
          ctx.close();
      }
    }
    • *

而对于服务端来讲,大同小异。

修改 TimeServerHandler 的代码:

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
 }

如今,惟一缺乏的功能是一个编码器,是ChannelOutboundHandler的实现,用来将 OurTime 对象从新转化为一个 ByteBuf。这是比编写一个解码器简单得多,由于没有须要处理的数据包编码消息时拆分和组装。

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (OurTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}

在这几行代码里还有几个重要的事情。第一,经过 ChannelPromise,当编码后的数据被写到了通道上 Netty 能够经过这个对象标记是成功仍是失败。第二, 咱们不须要调用 cxt.flush()。由于处理器已经单独分离出了一个方法 void flush(ChannelHandlerContext cxt),若是像本身实现 flush() 方法内容能够自行覆盖这个方法。

进一步简化操做,你可使用 MessageToByteEncode:

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
        @Override
        protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
            out.writeInt((int)msg.value());
        }
    }

最后在 TimeServerHandler 以前把 TimeEncoder 插入到ChannelPipeline。

5、总结

相信读完这篇文章的从头到尾,小伙伴们对使用 Netty 编写一个客户端和服务端有了大概的了解。后面咱们将继续探究 Netty 的源码实现,并结合其涉及的基础知识进行了解、深刻。

❤ 转载请注明本文地址或来源,谢谢合做 ❤


BLe36I.png