深刻了解Netty【六】Netty工做原理


引言

前面学习了NIO与零拷贝、IO多路复用模型、Reactor主从模型。 服务器基于IO模型管理链接,获取输入数据,又基于线程模型,处理请求。 下面来学习Netty的具体应用。java

一、Netty线程模型

Netty线程模型是创建在Reactor主从模式的基础上,主从 Rreactor 多线程模型: 主从 Rreactor 多线程模型.jpgreact

可是在Netty中,bossGroup至关于mainReactor,workerGroup至关于SubReactor与Worker线程池的合体。如:git

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
      .channel(NioServerSocketChannel.class);
  • bossGroup bossGroup线程池负责监听端口,获取一个线程做为MainReactor,用于处理端口的Accept事件。
  • workerGroup workerGroup线程池负责处理Channel(通道)的I/O事件,并处理相应的业务。

在启动时,能够初始化多个线程。bootstrap

EventLoopGroup bossGroup = new NioEventLoopGroup(2);
EventLoopGroup workerGroup = new NioEventLoopGroup(3);

二、Netty示例(客户端、服务器)

下面的例子演示了Netty的简单使用。服务器

2.一、服务端

2.1.一、 EchoServerHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;

/**
 * EchoServerHandler
 */
// 标识这类的实例之间能够在 channel 里面共享
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server received: "   in.toString(CharsetUtil.UTF_8));
        ctx.write(in);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
2.1.二、 EchoServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

/**
 * Echo服务端
 */
public class EchoServer {
    private final int port;
    private EchoServer(int port) {
        this.port = port;
    }
    private void start() throws Exception {
        //建立 EventLoopGroup
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup work = new NioEventLoopGroup();
        try {
            //建立 ServerBootstrap
            ServerBootstrap b = new ServerBootstrap();
            b.group(boss, work)
                    //指定使用 NIO 的传输 Channel
                    .channel(NioServerSocketChannel.class)
                    //设置 socket 地址使用所选的端口
                    .localAddress(new InetSocketAddress(port))
                    //添加 EchoServerHandler 到 Channel 的 ChannelPipeline
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            //绑定的服务器;sync 等待服务器关闭
            ChannelFuture f = b.bind().sync();
            System.out.println(EchoServer.class.getName()   " started and listen on "   f.channel().localAddress());
            //关闭 channel 和 块,直到它被关闭
            f.channel().closeFuture().sync();
        } finally {
            //关机的 EventLoopGroup,释放全部资源。
            group.shutdownGracefully().sync();
        }
    }
    public static void main(String[] args) throws Exception {
        //设置端口值(抛出一个 NumberFormatException 若是该端口参数的格式不正确)
        int port = 9999;
        //服务器start()
        new EchoServer(port).start();
    }

}

2.二、客户端

2.2.一、EchoClientHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) {
        System.out.println("Client received: "   msg.toString(CharsetUtil.UTF_8));
    }
}
2.2.二、EchoClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

public class EchoClient {
    private final String host;
    private final int port;
    private EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    private void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //建立 Bootstrap
            Bootstrap b = new Bootstrap();
            //指定 EventLoopGroup 来处理客户端事件。
            //因为使用 NIO 传输,因此用到了 NioEventLoopGroup 的实现
            b.group(group)
                    //使用的 channel 类型是一个用于 NIO 传输
                    .channel(NioSocketChannel.class)
                    //设置服务器的 InetSocketAddress
                    .remoteAddress(new InetSocketAddress(host, port))
                    //当创建一个链接和一个新的通道时,建立添加到 EchoClientHandler 实例 到 channel pipeline
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            //链接到远程;等待链接完成
            ChannelFuture f = b.connect().sync();
            //阻塞直到 Channel 关闭
            f.channel().closeFuture().sync();
        } finally {
            //调用 shutdownGracefully() 来关闭线程池和释放全部资源
            group.shutdownGracefully().sync();
        }
    }
    public static void main(String[] args) throws Exception {
        //服务器地址及端口
        String host = "localhost";
        int port = 9999;
        new EchoClient(host, port).start();
    }
}

三、Netty工做原理

服务端 Netty Reactor 工做架构图.jpg

服务端包含了1个boss NioEventLoopGroup和1个work NioEventLoopGroup。 NioEventLoopGroup至关于1个事件循环组,组内包含多个事件循环(NioEventLoop),每一个NioEventLoop包含1个Selector和1个事件循环线程。多线程

3.一、boss NioEventLoop循环任务

  • 轮询Accept事件。
  • 处理Accept IO事件,与Client创建链接,生成NioSocketChannel,并将NioSocketChannel注册到某个work NioEventLoop的Selector上。
  • 处理任务队列中的任务。

3.二、work NioEventLoop循环任务

  • 轮询Read、Write事件。
  • 处理IO事件,在NioSocketChannel可读、可写事件发生时进行处理。
  • 处理任务队列中的任务。

3.三、任务队列中的任务

  1. 用户程序自定义的普通任务
ctx.channel().eventLoop().execute(new Runnable() {
   @Override
   public void run() {
       //...
   }
});
  1. 非当前 Reactor 线程调用 Channel 的各类方法 例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,而后调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到任务队列中后被异步消费。架构

  2. 用户自定义定时任务异步

ctx.channel().eventLoop().schedule(new Runnable() {
   @Override
   public void run() {
       //...
   }
}, 60, TimeUnit.SECONDS);

参考

这多是目前最透彻的Netty原理架构解析 Netty 实战精髓篇 Netty入门教程  Essential Netty in Actionsocket

tencent.jpg

相关文章
相关标签/搜索