包括如下主要组件:java
这些组件表明了不一样的构造:资源、逻辑、通知。编程
Channel是Java NIO的一个基本构造。
在网络通信中,数据要从发送端应用程序->发送端硬件或则其余实体->服务端硬件或则其余实体->服务端应用,所以须要一个数据传输过程当中的载体,这个载体就是Channel,它能够被打开或者被关闭,链接或则断开链接。segmentfault
一个回调就是一个方法,好比A方法在某些特定的流程中,调用B方法,这个B方法,就是A方法的回调。安全
在java并发编程学习中,咱们也学到了这个对象,能够看作是对异步操做结果的一个占位符,在完成以前会一直阻塞,完成后提供对结果的访问。Netty也提供了本身的ChannelFuture的实现,用于在执行异步操做的时候使用。服务器
在Netty中,使用不一样的事件来通知咱们状态的改变或则状态的改变,以便咱们基于这些事件来触发不一样的动做。
这些事件包括出站入站等,咱们能够针对这些事件定义不一样的处理方式,好比记录日志、数据转换、流控制、业务逻辑处理等,这些处理方式,都是在ChannelHandler中实现的。网络
简单的说,他就是一个线程池,EventLoopGroup接口最终基础了ExecutorService接口。并发
服务端引导是ServerBootstrap,客户端是Bootstrap。
ServerBootstrap用于配置Channel、EventLoopGroup、ChannelHandler以及监听并接受传入链接请求的端口。
Bootstrap与ServerBootstrap不一样的是,使用服务端的地址和端口来链接服务端。异步
服务端流程:socket
EchoServer,经过ServerBootstrap引导类,配置不一样的信息,这部分的代码也是通用性的,咱们实际的业务处理逻辑在ChannelHandler上。ide
public class EchoServer { public static void main(String[] args) throws InterruptedException { final EchoServerHandler echoServerHandler = new EchoServerHandler(); // 建立NioEventLoopGroup类型的EventLoopGroup EventLoopGroup group = new NioEventLoopGroup(); try { // 建立ServerBootstrap ServerBootstrap sbs = new ServerBootstrap(); sbs.group(group) // 设置Channel为NIO的服务端Channel .channel(NioServerSocketChannel.class) // 绑定本地端口 .localAddress(new InetSocketAddress(Const.PORT)) // 把echoServerHandler加入到ChannelPipeline中 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(echoServerHandler); } }); // 异步绑定服务器,阻塞到服务器绑定完成 ChannelFuture sync = sbs.bind().sync(); // 获取channel的closeFuture,阻塞到关闭 sync.channel().closeFuture().sync(); } finally { // 优雅的关掉group并释放全部的资源 group.shutdownGracefully().sync(); } } }
EchoServerHandler,实际的业务逻辑处理代码,这部分是根据咱们的业务不一样进行定制化开发的。
// 标记为ChannelHandler.Sharable时,标识能被多个channel安全的共享,是线程安全的 @ChannelHandler.Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { /** * 服务端接收客户端信息的时候调用 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; String request = byteBuf.toString(CharsetUtil.UTF_8); String resp = "Hello " + request; System.out.println("服务端收到信息:" + byteBuf.toString(CharsetUtil.UTF_8)); // 把消息发送给客户端 ctx.write(Unpooled.copiedBuffer(resp, CharsetUtil.UTF_8)); } /** * 服务端处理客户端最后一条消息后调用 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)// flush数据 .addListener(ChannelFutureListener.CLOSE);// 关闭Channel } /** * 服务端处理消息过程当中,对异常的处理 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 打印异常 cause.printStackTrace(); // 关闭Channel ctx.close(); } }
客户端流程:
EchoClient
public class EchoClient { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { // 建立Bootstrap Bootstrap bs = new Bootstrap(); // bs.group(group) // 设置Channel为NIO的客户端Channel .channel(NioSocketChannel.class) // 设置服务器的地址端口信息 .remoteAddress(new InetSocketAddress(Const.IP, Const.PORT)) .handler(new EchoClientHandler()); // 链接远程服务器,阻塞到链接完成 ChannelFuture cf = bs.connect().sync(); // 获取channel的closeFuture,阻塞到关闭 cf.channel().closeFuture().sync(); } finally { // 优雅的关掉group并释放全部的资源 group.shutdownGracefully().sync(); } } }
EchoClientHandler
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { /** * 客户端收到服务器消息后调用 * * @param channelHandlerContext * @param byteBuf * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { // 处理收到的消息 System.out.println("客户端收到信息:" + byteBuf.toString(CharsetUtil.UTF_8)); } /** * 客户端与服务器链接后调用 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 当channel是活跃的时候,往服务端发送一条消息 ctx.writeAndFlush(Unpooled.copiedBuffer("Netty", CharsetUtil.UTF_8)); } /** * 客户端处理消息过程当中,对异常的处理 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }