引言html
DISCARD
。它是一种协议,在没有任何响应的状况下丢弃任何接收到的数据。
package io.netty.example.discard;
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Handles a server-side channel. */ public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1) @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) // Discard the received data silently.丢弃接收到的数据 ((ByteBuf) msg).release(); // (3) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // Do something with msg } finally { ReferenceCountUtil.release(msg); } }
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.丢掉全部进来的消息
*/
public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) 该对象至关于Socket中使用一个线程专门用户监听一个socket端口,而后将监听到的socket对象传入另外一对象 EventLoopGroup workerGroup = new NioEventLoopGroup();// 该对象至关于Socket中对于每一个socket链接都都单独开辟了一个线程进行数据解析出处理 try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new DiscardServer(port).run(); } }
注:一、NioEventLoopGroup是一个处理I/O操做的多线程事件循环。Netty为不一样类型的传输提供了各类EventLoopGroup实现。在本例中,咱们正在实现一个服务器端应用程序,所以将使用两个NioEventLoopGroup。第一个,一般被称为“老板”,接受进入的链接。第二个一般称为“worker”,在boss接受链接并将接受的链接注册给worker时,它将处理已接受链接的流量。使用多少线程以及如何将它们映射到建立的通道取决于EventLoopGroup实现,甚至能够经过构造函数进行配置。java
Channel
设置服务器。可是,请注意,这是一个冗长的过程,在大多数状况下不须要这样作。@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { // (1) System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); // (2) } }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); // (1) ctx.flush(); // (2) }
若是您再次运行telnet命令,您将看到服务器返回您发送给它的任何内容。git
echo服务器的完整源代码位于发行版的io.net .example.echo包中。github
package io.netty.example.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { // (1) final ByteBuf time = ctx.alloc().buffer(4); // (2) time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3) f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; ctx.close(); } }); // (4) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
三、像往常同样,咱们编写构造好的消息。web
可是等等,抛硬币在哪里?在使用NIO发送消息以前,咱们不是曾经调用java.nio.ByteBuffer.flip()吗?ByteBuf没有这样的方法,由于它有两个指针;一个用于读操做,另外一个用于写操做。当您向ByteBuf写入内容时,写入器索引会增长,而读取器索引不会改变。阅读器索引和写入器索引分别表示消息开始和结束的位置。ajax
相反,NIO缓冲区没有提供一种干净的方法来肯定消息内容在哪里开始和结束,而不调用flip方法。当您忘记翻转缓冲区时,您将遇到麻烦,由于不会发送任何或不正确的数据。在Netty中不会发生这样的错误,由于对于不一样的操做类型,咱们有不一样的指针。当你习惯了它,你会发现它让你的生活变得更容易——一个没有翻转的生活!
编程
要注意的另外一点是ChannelHandlerContext.write()(和writeAndFlush())方法返回ChannelFuture。ChannelFuture表示还没有发生的I/O操做。这意味着,因为Netty中的全部操做都是异步的,所以可能尚未执行任何请求的操做。例如,如下代码可能会在发送消息以前关闭链接:bootstrap
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
f.addListener(ChannelFutureListener.CLOSE);
要测试咱们的时间服务器是否按预期工做,您可使用UNIX rdate命令:api
$ rdate -o <port> -p <host>
package io.netty.example.time;
public class TimeClient { public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) b.channel(NioSocketChannel.class); // (3) b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (5) // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
package io.netty.example.time;
import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; // (1) try { long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } finally { m.release(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
TIME
客户端示例。咱们在这里遇到一样的问题。32位整数是很是少许的数据,而且不太可能常常被分段。然而,问题在于它多是碎片化的,而且随着流量的增长,碎片化的可能性将增长。
TimeClientHandler
修复此问题的修改实现:
package io.netty.example.time;
import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { private ByteBuf buf; @Override public void handlerAdded(ChannelHandlerContext ctx) { buf = ctx.alloc().buffer(4); // (1) } @Override public void handlerRemoved(ChannelHandlerContext ctx) { buf.release(); // (1) buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; buf.writeBytes(m); // (2) m.release(); if (buf.readableBytes() >= 4) { // (3) long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
TIME
客户端的问题,但修改后的处理程序看起来并不干净。想象一个更复杂的协议,它由多个字段组成,例如可变长度字段。您的
ChannelInboundHandler
实施将很快变得没法维护。
ChannelHandler
为a 添加多个
ChannelPipeline
,所以,您能够将一个单片拆分
ChannelHandler
为多个模块化,以下降应用程序的复杂性。例如,您能够拆分
TimeClientHandler
为两个处理程序:
TimeDecoder
它涉及碎片问题,以及TimeClientHandler
package io.netty.example.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2) if (in.readableBytes() < 4) { return; // (3) } out.add(in.readBytes(4)); // (4) } }
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); } });
若是你是一个喜欢冒险的人,你可能想试试ReplayingDecoder,这将解码器变得更加简单。不过,您须要参考API参考以得到更多信息。promise
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { out.add(in.readBytes(4)); } }
io.netty.example.factorial
基于二进制协议io.netty.example.telnet
基于文本行的协议.ChannelHandler
中使用POJO的优点是显而易见的; 经过分离
ByteBuf
从处理程序中提取信息的代码,您的处理程序变得更易于维护和重用。在
TIME
客户端和服务器示例中,咱们只读取一个32位整数,这不是
ByteBuf
直接使用的主要问题。可是,您会发如今实现真实世界协议时必须进行分离。
UnixTime
。
package io.netty.example.time;
import java.util.Date; public class UnixTime { private final long value; public UnixTime() { this(System.currentTimeMillis() / 1000L + 2208988800L); } public UnixTime(long value) { this.value = value; } public long value() { return value; } @Override public String toString() { return new Date((value() - 2208988800L) * 1000L).toString(); } }
咱们如今能够修改它TimeDecoder
来产生一个UnixTime
而不是一个ByteBuf
。
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < 4) { return; } out.add(new UnixTime(in.readUnsignedInt())); }
使用更新的解码器,TimeClientHandler
再也不使用ByteBuf
:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { UnixTime m = (UnixTime) msg; System.out.println(m); ctx.close(); }
更简单,更优雅,对吧?能够在服务器端应用相同的技术。咱们TimeServerHandler
此次更新第一次:
@Override
public void channelActive(ChannelHandlerContext ctx) { ChannelFuture f = ctx.writeAndFlush(new UnixTime()); f.addListener(ChannelFutureListener.CLOSE); }
如今,惟一缺乏的部分是一个编码器,它的实现ChannelOutboundHandler
将一个UnixTime
转换为一个ByteBuf
。它比编写解码器简单得多,由于编码消息时无需处理数据包碎片和汇编。
package io.netty.example.time;
public class TimeEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { UnixTime m = (UnixTime) msg; ByteBuf encoded = ctx.alloc().buffer(4); encoded.writeInt((int)m.value()); ctx.write(encoded, promise); // (1) } }
MessageToByteEncoder
:
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) { out.writeInt((int)msg.value()); } }
Future
。
io.netty.example
包中的Netty示例。