❀ 众所周知:html
Netty 是一款基于 NIO 客户、服务器端的 Java 开源编程框架,提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
❀ 通俗来说:java
Netty 一个很是好用的处理 Socket 的 Jar 包,能够用它来开发服务器和客户端。编程
Netty 做为一个优秀的网络通讯框架,许多开源项目都使用它来构建通讯层。好比 Hadoop、Cassandra、Spark、Dubbo、gRPC、RocketMQ、Zookeeper甚至咱们经常使用的 Spring 等等。bootstrap
更重要的是,Netty 是开发高性能 Java 服务器的必学框架。api
能够说做为一个 Java 工程师,要了解 Java 服务器的高阶知识,Netty 是一个必需要学习的东西。promise
Talk is cheap, show me the code!
接下来从代码中感觉一下 Netty,首先实现一个 discard(丢弃)服务器,即对收到的数据不作任何处理。安全
实现 ChannelInBoundHandlerAdapter 首先咱们从 handler 的实现开始, Netty 使用 handler 来处理 I/O 事件。服务器
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 丢弃收到的数据 ((ByteBuf) msg).release(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
第 6 行,ByteBuf 是一个引用计数对象,这个对象必须显式地调用 release() 方法来释放。处理器的职责是释放全部传递处处理器的引用计数对象,下面是比较常见的 chanelRead() 方法实现:网络
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // Do something with msg } finally { ReferenceCountUtil.release(msg); } }
启动 Handler 实现 handler 后,咱们须要一个 main() 方法来启动它。数据结构
public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { // 接收进来的链接 EventLoopGroup boss = new NioEventLoopGroup(); // 处理已经接收的链接 EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加自定义的 handler socketChannel.pipeline().addLast(new DiscardServerHandler()); } }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); // 绑定端口,开始接收进来的链接 ChannelFuture channelFuture = bootstrap.bind(port).sync(); // 关闭 channelFuture.channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; new DiscardServer(port).run(); } }
EventLoopGroup
是用来处理 I/O 操做的多线程事件循环器,Netty 提供了许多不一样的 EventLoopGroup 的实现用来处理不一样的传输。在本例咱们实现了一个服务端应用,所以须要两个 EventLoopGroup
。第一个用来接收进来的链接,常被称做 boss ;第二个用来处理已经接收的链接,成为 worker。一旦 boss 接收到一个新进来的链接,就会把链接的信息注册到 worker 上面。ServerBootstrap
是一个启动 NIO 服务的辅助启动类。NioServerSocketChannel
用来讲明一个新的 Channel 如何接收进来的链接。ChannelInitializer
用来帮助使用者建立一个新的 channel ,同时可使用 pipline 指定一些特定的处理器。查看接收到的数据 如此,一个基于 Netty 的服务端程序就完成了,可是如今启动起来咱们看不到任何交互,因此咱们稍微修改一下 DiscardServerHandler
类的 channelRead()
方法,能够查看到客户端发来的消息。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; try { while (byteBuf.isReadable()) { System.out.print((char) byteBuf.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); } }
DiscardServer
,使用 telnet
来测试一下。控制台接收到了命令行发来的消息:
咱们已经实现了服务器能够接收客户端发来的消息,一般服务器会对客户端发来的请求做出回应,下面就经过 ECHO 协议来实现对客户端的消息响应。
ECHO 协议即会把客户端发来的数据原样返回,因此也戏称“乒乓球”协议。
在上述代码的基础上面,咱们只需对 DiscardServerHandler
类的 channelRead()
方法稍加修改:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.write(msg); ctx.flush(); }
cxt.writeAndFlush(msg)
以达到一样的目的。再次运行 telnet
命令,就会接受到你发送的信息。
接下来咱们基于 TIME 协议,实现构建和发送一个消息,而后在完成时关闭链接。和以前的例子不一样的是在不接受任何请求时会发送一个含 32 位的整数的消息,而且一旦消息发送就会当即关闭链接。
TIME 协议能够提供机器可读的日期时间信息。
咱们会在链接建立时发送时间消息,因此须要覆盖 channelActive()
方法:
public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 分配空间 final ByteBuf time = ctx.alloc().buffer(4); // 获取 32 位时间戳并写入 time.writeInt((int) (System.currentTimeMillis() / 1000L)); final ChannelFuture future = ctx.writeAndFlush(time); // 添加监听器 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { assert future == channelFuture; // 关闭链接 ctx.close(); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
channelActive()
方法将会在链接被创建而且准备进行通讯时被调用。Buffer
时,在读写操做之间,须要调用 buffer.flip( )
方法设置指针位置。可是在在 Netty 中不须要这样操做,缘由是 Netty 提供了两个指针,一个读指针和一个写指针,在读写时二者不相互影响。不再用担忧忘记调用 flip( )
方法时数据为空或者数据错误啦。ctx.writeAndFlush(time)
后会返回一个 ChannelFuture
对象,表明着尚未发生的一次 I/O 操做。这意味着任何一个请求操做都不会立刻被执行,由于在 Netty 里全部的操做都是异步的。这样来看,咱们想完成消息发送后关闭链接,直接在后边调用 ctx.close( )
可能不能马上关闭链接。返回的 ChannelFuture
对象在操做完成后会通知它的监听器,继续执行操做完成后的动做。对于时间服务端不能直接用 telnet
的方式测试,由于不能靠人工把一个 32 位的二进制数据翻译成时间,因此下面将实现一个时间客户端。
与服务端的实现惟一不一样的就是使用了不一样的 Bootstrap 和 Channel 实现:
public class TimeClient { private String host; private int port; public TimeClient(String host, int port) { this.host = host; this.port = port; } public void run() throws Exception{ EventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(worker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); // 启动 ChannelFuture future = bootstrap.connect(host, port).sync(); // 等待链接关闭 future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); } } public static void main(String[] args) throws Exception { TimeClient timeClient = new TimeClient("localhost", 8080); timeClient.run(); } }
再稍微改动一下 handler :
public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 在 TCP/IP 中,Netty 会把读到的数据放入 ByteBuf 中 ByteBuf byteBuf = (ByteBuf) msg; try { long time = byteBuf.readUnsignedInt() * 1000L; System.out.println(new Date(time)); ctx.close(); }finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
分别启动 TimeServer 和 TimeClient ,控制台打印出了当前时间:
然而,屡次运行后处理器有时候会由于抛出 IndexOutOfBoundsException 而拒绝工做。带着这个问题,继续往下面看。
比较典型的基于流传输的 TCP/IP 协议,也就是说,应用层两个不一样的数据包,在 TCP/IP 协议传输时,可能会组合或者拆分应用层协议的数据。因为两个数据包之间并没有边界区分,可能致使消息的读取错误。
不少资料也称上述这种现象为 TCP 粘包,而值得注意的是:
一、TCP 协议自己设计就是面向流的,提供可靠传输。 二、正由于面向流,对于应用层的数据包而言,没有边界区分。这就须要应用层主动处理不一样数据包之间的组装。 三、发生粘包现象不是 TCP 的缺陷,只是应用层没有主动作数据包的处理。
回到上面程序,这也就是上述异常发生的缘由。一个 32 位整型是很是小的数据,它并不见得会被常常拆分到到不一样的数据段内。然而,问题是它确实可能会被拆分到不一样的数据段内。
比较常见的两种解决方案就是基于长度或者基于终结符,继续以上面的 TIME 协议程序为基础,着手解决这个问题。由于只发送一个 32 位的整形时间戳,咱们采用基于数据长度的方式:
最简单的方案是构造一个内部的可积累的缓冲,直到4个字节所有接收到了内部缓冲。修改一下 TimeClientHandler
的代码:
public class TimeClientHandler extends ChannelInboundHandlerAdapter { private ByteBuf buf; private static final int CAPACITY = 4; @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { buf = ctx.alloc().buffer(CAPACITY); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { buf.release(); buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; buf.writeBytes(byteBuf); byteBuf.release(); // 数据大于或等于 4 字节 if (buf.readableBytes() >= CAPACITY) { long time = buf.readUnsignedInt() * 1000L; System.out.println(new Date(time)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
其中覆盖了 handler 生命周期的两个方法:
handlerAdded()
:当检测到新的链接以后,调用ch.pipeline().addLast(new LifeCycleTestHandler())
以后的回调,表示当前的channel中已经成功添加了一个逻辑处理器handlerRemoved()
:在链接关闭后把这条链接上的全部逻辑处理器所有移除掉。尽管上述方案已经解决了 TIME 客户端的问题了,可是在处理器中增长了逻辑,咱们能够把处理消息的部分抽取出来,成为一个单独的处理器,而且能够增长多个 ChannelHandler 到 ChannelPipline ,每一个处理器各司其职,减小模块的复杂度。
由此,拆分出一个 TimeDecoder 用于处理消息:
public class TimeDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() >= 4) { out.add(in.readBytes(4)); } } }
ByteToMessageDecoder
继承自 ChannelInboundHandlerAdapter
,每当有新数据接收的时候,ByteToMessageDecoder
都会调用 decode()
方法来处理内部的那个累积缓冲。decode()
方法里增长了一个对象到 out 对象里,这意味着解码器解码消息成功。ByteToMessageDecoder
将会丢弃在累积缓冲里已经被读过的数据。最后,修改 TimeClient 的代码,将 TimeDecoder 加入 ChannelPipline :
bootstrap.group(worker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); } }).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
除此以外,Netty还提供了更多开箱即用的解码器使你能够更简单地实现更多的协议,帮助你避免开发一个难以维护的处理器实现,感兴趣的小伙伴能够自行了解。
上述的例子咱们一直在使用 ByteBuf 做为协议消息的主要数据结构,可是实际使用中,须要传输的消息更加复杂,抽象为对象来处理更加方便。继续以 TIME 客户端和服务器为基础,使用自定义的对象代替 ByteBuf 。
定义保存时间的对象 OurTime :
public class OurTime { private final long value; public OurTime() { this(System.currentTimeMillis() / 1000L); } public OurTime(long value) { this.value = value; } public long value() { return value; } @Override public String toString() { return new Date(value() * 1000L).toString(); } }
修改 TimeDecoder 类,返回 OurTime 类:
public class TimeDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() >= 4) { out.add(new OurTime(in.readUnsignedInt())); } } }
修改后的 TimeClientHandler 类,处理新消息更加简洁:
public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { OurTime ourTime = (OurTime) msg; System.out.println(ourTime); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
而对于服务端来讲,大同小异。
修改 TimeServerHandler 的代码:
@Override public void channelActive(ChannelHandlerContext ctx) { ChannelFuture f = ctx.writeAndFlush(new UnixTime()); f.addListener(ChannelFutureListener.CLOSE); }
如今,惟一缺乏的功能是一个编码器,是ChannelOutboundHandler的实现,用来将 OurTime 对象从新转化为一个 ByteBuf。这是比编写一个解码器简单得多,由于没有须要处理的数据包编码消息时拆分和组装。
public class TimeEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { UnixTime m = (OurTime) msg; ByteBuf encoded = ctx.alloc().buffer(4); encoded.writeInt((int)m.value()); ctx.write(encoded, promise); // (1) } }
在这几行代码里还有几个重要的事情。第一,经过 ChannelPromise,当编码后的数据被写到了通道上 Netty 能够经过这个对象标记是成功仍是失败。第二, 咱们不须要调用 cxt.flush()。由于处理器已经单独分离出了一个方法 void flush(ChannelHandlerContext cxt),若是像本身实现 flush() 方法内容能够自行覆盖这个方法。
进一步简化操做,你可使用 MessageToByteEncode:
public class TimeEncoder extends MessageToByteEncoder<UnixTime> { @Override protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) { out.writeInt((int)msg.value()); } }
最后在 TimeServerHandler 以前把 TimeEncoder 插入到ChannelPipeline。
相信读完这篇文章的从头到尾,小伙伴们对使用 Netty 编写一个客户端和服务端有了大概的了解。后面咱们将继续探究 Netty 的源码实现,并结合其涉及的基础知识进行了解、深刻。
❤ 转载请注明本文地址或来源,谢谢合做 ❤