Netty是一个高性能,高可扩展性的异步事件驱动的网络应用程序框架,它极大的简化了TCP和UDP客户端和服务器端网络开发。它是一个NIO框架,对Java NIO进行了良好的封装。做为一个异步NIO框架,Netty的全部IO操做都是异步非阻塞的,经过Future-Listener机制,用户能够方便的主动获取或者经过通知机制得到IO操做结果。vue
Netty中最核心的内容主要有如下四个方面:java
EventLoop:EventLoop维护了一个线程和任务队列,支持异步提交执行任务。EventLoop自身实现了Executor接口,当调用executor方法提交任务时,则判断是否启动,未启动则调用内置的executor建立新线程来触发run方法执行,其大体流程参考Netty源码SingleThreadEventExecutor以下:bootstrap
EventLoopGroup: EventLoopGroup主要是管理eventLoop的生命周期,能够将其看做是一个线程池,其内部维护了一组EventLoop,每一个eventLoop对应处理多个Channel,而一个Channel只能对应一个EventLoop设计模式
Bootstrap:BootStrap 是客户端的引导类,主要用于客户端链接远程主机,有1个EventLoopGroup。Bootstrap 在调用 bind()(链接UDP)和 connect()(链接TCP)方法时,会新建立一个单独的、没有父 Channel 的 Channel 来实现全部的网络交换。服务器
ServerBootstrap: ServerBootstrap 是服务端的引导类,主要用户服务端绑定本地端口,有2个EventLoopGroup。ServerBootstarp 在调用 bind() 方法时会建立一个 ServerChannel 来接受来自客户端的链接,而且该 ServerChannel 管理了多个子 Channel 用于同客户端之间的通讯。markdown
Channel:Netty中的Channel是一个抽象的概念,能够理解为对Java NIO Channel的加强和扩展,增长了许多新的属性和方法,如bing方法等。网络
ChannelFuture:ChannelFuture可以注册一个或者多个ChannelFutureListener 实例,当操做完成时,无论成功仍是失败,均会被通知。ChannelFuture存储了以后执行的操做的结果而且没法预测操做什么时候被执行,提交至Channel的操做按照被唤醒的顺序被执行。多线程
ChannelHandler:ChannelHandler用来处理业务逻辑,分别有入站和出站的实现。app
ChannelPipeline: ChannelPipeline 提供了 ChannelHandler链的容器,并定义了用于在该链上传播入站和出站事件流的API。框架
Netty的线程模型是基于Reactor模式的线程实现。关于Reactor模式能够参考 Reactor模式 ,Netty中依据用户的配置能够支持单线程的Reactor模型,多线程的Reactor模型以及主从多Reactor的模型。在Netty中其大体流程以下以下:
服务端代码示例:
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import java.nio.charset.Charset; public class EchoServer { public static void main(String[] args) { // accept线程组,用来接受链接 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // I/O线程组, 用于处理业务逻辑 EventLoopGroup workerGroup = new NioEventLoopGroup(1); try { // 服务端启动引导 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) // 绑定两个线程组 .channel(NioServerSocketChannel.class) // 指定通道类型 .option(ChannelOption.SO_BACKLOG, 100) // 设置TCP链接的缓冲区 .handler(new LoggingHandler(LogLevel.INFO)) // 设置日志级别 .childHandler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 获取处理器链 pipeline.addLast(new EchoServerHandler()); // 添加新的件处理器 } }); // 经过bind启动服务 ChannelFuture f = b.bind(8080).sync(); // 阻塞主线程,知道网络服务被关闭 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } } class EchoServerHandler extends ChannelInboundHandlerAdapter { // 每当从客户端收到新的数据时,这个方法会在收到消息时被调用 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("收到数据:" + ((ByteBuf) msg).toString(Charset.defaultCharset())); ctx.write(Unpooled.wrappedBuffer("Server message".getBytes())); ctx.fireChannelRead(msg); } // 数据读取完后被调用 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } // 当Netty因为IO错误或者处理器在处理事件时抛出的异常时被调用 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客户端代码示例:
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.nio.charset.Charset; public class EchoClient { public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler( new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect("127.0.0.1", 8080).sync(); f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } } class EchoClientHandler extends ChannelInboundHandlerAdapter { private final ByteBuf firstMessage; public EchoClientHandler() { firstMessage = Unpooled.buffer(256); for (int i = 0; i < firstMessage.capacity(); i++) { firstMessage.writeByte((byte) i); } } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(firstMessage); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("收到数据:" + ((ByteBuf) msg).toString(Charset.defaultCharset())); ctx.write(Unpooled.wrappedBuffer("Client message".getBytes())); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }