Boot
BossGroup
和WorkerGroup
: 前者负责接收客户端的链接,后者负责网络的读写。类型都是NioEventLoopGroup
NioEventLoopGroup
至关于一个事件循环组, 这个组中含有多个事件循环 ,每个事件循环是 NioEventLoop
NioEventLoop
表示一个不断循环的执行处理任务的线程, 每一个 NioEventLoop 都有一个 selector
, 用于监听绑 定在其上的 socket 的网络通信。
- 每一个 Boss NioEventLoop 循环执行的步骤有 3 步
- 轮询 accept 事件
- 处理 accept 事件 , 与 client 创建链接 , 生成
NioScocketChannel
, 并将其注册到某个 worker NIOEventLoop
上 的 selector
- 处理任务队列的任务
- 每一个 Worker NIOEventLoop 循环执行的步骤:
- 轮询 read, write 事件
- 处理 i/o 事件, 即 read , write 事件,在对应 NioScocketChannel 处理
- 处理任务队列的任务 , 即 runAllTasks
核心对象
ChannelInitializer
通道初始化接口
ChannelHandler
通道处理程序,Netty默认提供一些开箱即用的的处理器
ChannelPipeline
NioEventLoop
不知道咋描述,能够视为Reactor
- execute 提交一个任务到同步任务队列中,会发生阻塞
- schedule 提交一个定时任务到
scheduledTaskQueue
异步队列中,
CODE
Server
//建立 BossGroup 和 WorkerGroup
//说明
//1. 建立两个线程组 bossGroup 和 workerGroup
//2. bossGroup 只是处理链接请求 , 真正的和客户端业务处理,会交给 workerGroup 完成
//3. 两个都是无限循环
//4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
// 默认实际 cpu 核数 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
//建立服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用 NioSocketChannel 做为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列获得链接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动链接状态
.childHandler(new ChannelInitializer<socketchannel>() {//建立一个通道测试对象(匿名对象)
//给 pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//咱们自定义一个 Handler 须要继续 netty 规定好的某个 HandlerAdapter(规范)
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
}); // 给咱们的 workerGroup 的 EventLoop 对应的管道设置处理器
System.out.println(".....服务器 is ready...");
//绑定一个端口而且同步, 生成了一个 ChannelFuture 对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//对关闭通道进行监听
cf.channel().closeFuture().sync();
Client
//客户端须要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
//建立客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<socketchannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//咱们自定义一个 Handler 须要继续 netty 规定好的某个 HandlerAdapter(规范)
ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); //加入本身的处理器
}
});
System.out.println("客户端 ok..");
//启动客户端去链接服务器端
//关于 ChannelFuture 要分析,涉及到 netty 的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
ChannelHandler
ChannelInboundHandler
public interface ChannelInboundHandler extends ChannelHandler {
// 当channel被注册到EventLoop时被调用
void channelRegistered(ChannelHandlerContext var1) throws Exception;
// 当channel已经被建立,但还未注册到EventLoop(或者从EventLoop中注销)被调用
void channelUnregistered(ChannelHandlerContext var1) throws Exception;
// 当channel处于活动状态(链接到远程节点)被调用
void channelActive(ChannelHandlerContext var1) throws Exception;
// 当channel处于非活动状态(没有链接到远程节点)被调用
void channelInactive(ChannelHandlerContext var1) throws Exception;
// 当从channel读取数据时被调用
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
// 当channel的上一个读操做完成时被调用
void channelReadComplete(ChannelHandlerContext var1) throws Exception;
// 当ChannelInboundHandler.fireUserEventTriggered()方法被调用时被调用
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
// 当channel的可写状态发生改变时被调用
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
// 当处理过程当中发生异常时被调用
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}
ChannelOutboundHandler
public interface ChannelOutboundHandler extends ChannelHandler {
// 当请求将Channel绑定到一个地址时被调用
// ChannelPromise是ChannelFuture的一个子接口,定义了如setSuccess(),setFailure()等方法
void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;
// 当请求将Channel链接到远程节点时被调用
void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;
// 当请求将Channel从远程节点断开时被调用
void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 当请求关闭Channel时被调用
void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 当请求将Channel从它的EventLoop中注销时被调用
void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 当请求从Channel读取数据时被调用
void read(ChannelHandlerContext var1) throws Exception;
// 当请求经过Channel将数据写到远程节点时被调用
void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;
// 当请求经过Channel将缓冲中的数据冲刷到远程节点时被调用
void flush(ChannelHandlerContext var1) throws Exception;
}