ChannelHandler是一个接口族的父接口,它的实现负责接受并响应事件通知,在Netty应用程序中,全部的数据处理逻辑都包含在这些核心抽象的实现中。
Echo服务器会响应传入的消息,所以须要实现ChannelInboundHandler接口,用来定义响应入站事件的方法。因为Echo服务器的应用程序只须要用到少许的方法,因此只须要继承ChannelInboundHandlerAdapter类,它提供了ChannelInboundHandler的默认实现。
在ChannelInboundHandler中,咱们感兴趣的方法有:java
package cn.sh.demo.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; /** * @author sh * @ChannelHandler.Sharable 标示一个ChannelHandler能够被多个Channel安全地共享 */ @ChannelHandler.Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; //将接受到的消息输出到客户端 System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8)); //将接收到的消息写给发送者,而不冲刷出站消息 ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { //将消息冲刷到客户端,而且关闭该Channel ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //打印异常堆栈跟踪 cause.printStackTrace(); //关闭该Channel ctx.close(); } }
备注git
主要涉及的内容github
Echo服务引导示例代码bootstrap
package cn.sh.demo.echo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void startServer() throws InterruptedException { EchoServerHandler serverHandler = new EchoServerHandler(); //建立EventLoopGroup EventLoopGroup group = new NioEventLoopGroup(); //建立ServerBootstrap ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) //指定所使用的NIO传输Channel .channel(NioServerSocketChannel.class) //使用指定的端口套接字 .localAddress(new InetSocketAddress(port)) //添加一个EchoServerHandler到子Channel的ChannelPipeline .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { //此处因为EchoServerHandler被注解标注为@Shareble,因此咱们老是使用相同的实例 channel.pipeline().addLast(serverHandler); } }); try { //异步的绑定服务器,调用sync()方法阻塞等待直到绑定完成 ChannelFuture channelFuture = bootstrap.bind().sync(); //获取Channel的CloseFuture,而且阻塞当前线程直到它完成 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //关闭EventLoopGroup,释放全部的资源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { if (args.length != 1) { System.err.println("参数类型或者个数不正确"); return; } //设置端口值 int port = Integer.parseInt(args[0]); //启动Echo服务器 new EchoServer(port).startServer(); } }
备注数组
客户端主要包括的操做:安全
编写客户端主要包括业务逻辑和引导服务器
在该示例中,咱们使用SimpleChannelInboundHandler类来处理全部的事件,主要的方法有:异步
示例代码以下:socket
package cn.sh.demo.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; /** * @author sh * @ChannelHandler.Sharable 标记该类的示例能够被多个Channel共享 */ @ChannelHandler.Sharable public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) { //当一个链接被服务器接受并创建后,发送一条消息 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty", CharsetUtil.UTF_8)); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) { //记录客户端接收到服务器的消息 System.out.println("Client received:" + byteBuf.toString(CharsetUtil.UTF_8)); } /** * 在发生异常时,记录错误并关闭Channel * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
备注
每次在接受数据时,都会调用channelRead0()方法。须要注意的是,由服务器发送的消息可能会被分块接受。也就是说,若是服务器发送了5字节,那么不能保证这5字节会被一次性接受。即便是对于这么少许的数据,channelRead0()方法也可能会被调用两次,第一次使用一个持有3字节的ByteBuf(Netty的字节容器),第二次使用一个持有2字节的ByteBuf。做为一个面向流的协议,TCP保证了字节数组会按照服务器发送它们的顺序被接受。ide
主要和业务逻辑如何处理消息以及Netty如何管理资源有关
客户端中,当channelRead0()方法完成时,已经接受了消息而且处理完毕,当该方法返回时,SimpleChannelInboundHandler负责释放指向保存该消息的ByteBuf的内存引用。
可是在服务器端,你须要将消息返回给客户端,write()操做是异步的,直到channelRead()方法返回后有可能仍然没有完成,ChannelInboundHandlerAdapter在这个时间点上不会释放消息。
服务端的消息是在channelComplete()方法中,经过writeAndFlush()方法调用时被释放。
客户端使用主机和端口参数来链接远程地址,也就是Echo服务器的地址,而不是绑定到一个一直被监听的端口。
示例代码以下:
package cn.sh.demo.echo; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); //建立客户端引导器 Bootstrap bootstrap = new Bootstrap(); //指定使用NioEventLoopGroup来处理客户端事件 bootstrap.group(group) //指定使用NIO传输的Channel类型 .channel(NioSocketChannel.class) //设置服务器的InetSocketAddress .remoteAddress(new InetSocketAddress(host, port)) //在建立Channel时,向ChannelPipeline中添加一个EchoHandler实例 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new EchoClientHandler()); } }); try { //链接到远程节点,阻塞等待直到链接完成 ChannelFuture future = bootstrap.connect().sync(); //阻塞直到Channel关闭 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //关闭线程池而且释放全部的资源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { if (args.length != 2) { System.err.println("参数个数不正确"); return; } int port = Integer.parseInt(args[1]); new EchoClient(args[0], port).start(); } }
备注
服务器和客户端均使用了NIO传输,可是,客户端和服务端能够使用不一样的传输,例如,在服务器使用NIO传输,客户端能够使用OIO传输
服务端的输出以下:
客户端的输出以下:
该文章的示例代码位于cn.sh.demo.echo包下。