简单找了下发现网上没有关于Netty3比较完整的源码解析的文章,因而我就去读官方文档,为了增强记忆,翻译成了中文,有适当的简化。bootstrap
原文档地址:Netty3文档缓存
运行demo的前提有两个:最新版本的Netty3和JDK1.5以上安全
最简单的协议就是Discard协议——忽略全部接收到的数据而且不做任何响应。咱们从Netty处理I/O事件的handler实现开始:服务器
public class DiscardServerHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); } }
接下来写一个main方法来开启使用DiscardServerHandler的服务:网络
public class DiscardServer { public static void main(String[] args) throws Exception { ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ServerBootstrap bootstrap = new ServerBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline(new DiscardServerHandler()); } }); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); bootstrap.bind(new InetSocketAddress(8080)); } }
咱们能够经过"telnet localhost 8080"命令去测试服务,但由于是Discard服务,咱们都不知道服务是否正常工做。因此咱们修改下服务,让它打印出接收到的数据。数据结构
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); while(buf.readable()) { System.out.println((char) buf.readByte()); System.out.flush(); } }
一个服务一般对请求是有响应的。接下来咱们尝试写一个实现Echo协议——将接收的数据原路返回给客户端的服务:异步
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { Channel ch = e.getChannel(); ch.write(e.getMessage()); }
此次咱们实现一个时间协议——在不须要任何请求数据的状况下返回一个32位整型数字而且在发送以后关闭链接。由于咱们忽略请求数据,只须要在链接创建的发送消息,因此此次不能使用messageReceived方法而是重写channelConnected方法:socket
@Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { Channel ch = e.getChannel(); ChannelBuffer time = ChannelBuffers.buffer(4); time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); ChannelFuture f = ch.write(time); f.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { Channel ch = future.getChannel(); ch.close(); } }); }
咱们还须要一个遵照时间协议,即能把整型数字翻译成日期的客户端。Netty服务端和客户端惟一的区别就是要求不一样的Bootstrap和ChannelFactory:tcp
public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); ChannelFactory factory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ClientBootstrap bootstrap = new ClientBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline(new TimeClientHandler()); } }); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true); bootstrap.connect(new InetSocketAddress(host, port)); }
另外咱们须要一个ChannelHandler实现,负责把接收到服务端返回的32位整型数字翻译成日期并打印出来,而后断开链接:ide
public class TimeClientHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); long currentTimeMillis = buf.readInt() * 1000L; System.out.println(new Date(currentTimeMillis)); e.getChannel().close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); e.getChannel().close(); } }
看上去很简单是吧?可是实际运行过程当中这个handler有时会抛出一个IndexOutOfBoundsException。下一节咱们会讨论为何会这样。
在像TCP/IP那样基于流的传输中,接收数据保存在一个socket接收缓存中。可是这个缓存不是一个以包为单位的队列,而是一个以字节为单位的队列。这就意味着,即便发送两个独立的消息,操做系统会把他们视为一个字节串。所以,不能保证你读到的和另外一端写入的同样。因此,不论是客户端仍是服务端,对于接收到的数据都须要整理成符合应用程序逻辑的结构。
回到前面的时间客户端的问题,32位整型数字很小,可是它也是能够拆分的,特别是当流量上升的时候,被拆分的可能性也随之上升。
一个简单的处理方式就是内部建立一个累计的缓存,直到接收满4个字节才进行处理。
private final ChannelBuffer buf = dynamicBuffer(); @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer m = (ChannelBuffer) e.getMessage(); buf.writeBytes(m); if (buf.readableBytes() >= 4) { long currentTimeMillis = buf.readInt() * 1000L; System.out.println(new Date(currentTimeMillis)); e.getChannel().close(); } }
第一种方案有不少问题,好比一个复杂的协议,由多个可变长度的域组成,这种状况下第一种方案的handler就没法支持了。
你会发现你能够添加多个ChannelHandler到ChannelPipeline中,利用这个特性,你能够把一个臃肿的ChannelHandler拆分到多个模块化的ChannelHandler中,这样能够下降应用程序的复杂度。好比,你能够把TimeClientHandler拆分红两个handler:
Netty提供了可扩展的类帮助你实现TimeDecoder:
public class TimeDecoder extends FrameDecoder { @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) { if (buffer.readableBytes() < 4) { return null; } return buffer.readBytes(4); } }
拆分以后,咱们须要修改TimeClient的ChannelPipelineFactory实现:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline( new TimeDecoder(), new TimeClientHandler()); } });
Netty还提供了进一步简化解码的ReplayingDecoder:
public class TimeDecoder extends ReplayingDecoder<VoidEnum> { @Override protected Object decode( ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, VoidEnum state) { return buffer.readBytes(4); } }
此外,Netty提供了一批开箱即用的解码器,让你能够简单得实现大多数协议:
上面的demo咱们都是用ChannelBuffer做为协议化消息的基本数据结构,这一节咱们用POJO替代ChannelBuffer。将从ChannelBuffer提取信息的代码跟handler分离开,会使handler变得更加可维护的和可重用的。从上面的demo里不容易看出这个优点,可是实际应用中分离颇有必要。
首先,咱们定义一个类型UnixTime:
public class UnixTime { private final int value; public UnixTime(int value) { this.value = value; } public int getValue() { return value; } @Override public String toString() { return new Date(value * 1000L).toString(); } }
如今咱们能够修改TimeDecoder让它返回一个UnixTime而不是ChannelBuffer:
@Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) { if (buffer.readableBytes() < 4) { return null; } return new UnixTime(buffer.readInt()); }
编码器改了,那么相应的TimeClientHandler就不会继续使用ChannelBuffer:
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { UnixTime m = (UnixTime) e.getMessage(); System.out.println(m); e.getChannel().close(); }
一样的技术也能够应用到服务端的TimeServerHandler上:
@Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { UnixTime time = new UnixTime((int)(System.currentTimeMillis() / 1000)); ChannelFuture f = e.getChannel().write(time); f.addListener(ChannelFutureListener.CLOSE); }
能这样运用的前提是有一个编码器,能够把UnixTime对象翻译成ChannelBuffer:
public class TimeEncoder extends SimpleChannelHandler { public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) { UnixTime time = (UnixTime) e.getMessage(); ChannelBuffer buf = buffer(4); buf.writeInt(time.getValue()); Channels.write(ctx, e.getFuture(), buf); } }
一样,TimeEncoder也须要加入到服务端的ChannelPipeline中:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline( new TimeServerHandler(), new TimeEncoder()); } });
为了关闭I/O线程让应用程序优雅得退出,咱们须要释放ChannelFactory分配的资源。
一个典型网络应用程序的关闭过程分为三步:
应用到TimeClient上:
ChannelFuture future = bootstrap.connect(...); future.awaitUninterruptibly(); if (!future.isSuccess()) { future.getCause().printStackTrace(); } future.getChannel().getCloseFuture().awaitUninterruptibly(); factory.releaseExternalResources();
关闭一个客户端很简单,那服务端呢?你须要从端口解绑并关闭全部接收到的链接。前提是你须要一个保持跟踪活跃链接的数据结构,Netty提供了ChannelGroup。
ChannelGroup是Java集合API一个特殊的的扩展,它表明一组打开的Channel。若是一个Channel被添加到ChannelGroup,而后这个Channel被关闭了,它会从ChannelGroup中自动移除。你能够对同一ChannelGroup中的Channel作批量操做,好比在关闭服务的时候关闭全部Channel。
要跟踪打开的socket,你须要修改TimeServerHandler,把新打开的Channel添加到全局的ChannelGroup变量中。ChannelGroup是线程安全的。
@Override public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) { TimeServer.allChannels.add(e.getChannel()); }
如今咱们自动维持了一个包含全部活跃Channel的列表,关闭服务端就像关闭客户端同样容易了。
public class TimeServer { static final ChannelGroup allChannels = new DefaultChannelGroup("time-server"); public static void main(String[] args) throws Exception { ... ChannelFactory factory = ...; ... ServerBootstrap bootstrap = ...; ... Channel channel = bootstrap.bind(new InetSocketAddress(8080)); allChannels.add(channel); waitForShutdownCommand(); ChannelGroupFuture future = allChannels.close(); future.awaitUninterruptibly(); factory.releaseExternalResources(); } }
这一节咱们快速浏览了Netty,示范了如何用Netty写一个能正常工做的网络应用。下一节将介绍Netty的更多细节。