本文主要讲述Netty框架的一些特性以及重要组件,但愿看完以后能对Netty框架有一个比较直观的感觉,但愿能帮助读者快速入门Netty,减小一些弯路。html
官方的介绍:java
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.linux
Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。程序员
从官网上介绍,Netty是一个网络应用程序框架,开发服务器和客户端。也就是用于网络编程的一个框架。既然是网络编程,Socket就不谈了,为何不用NIO呢?web
对于这个问题,以前我写了一篇文章《NIO入门》对NIO有比较详细的介绍,NIO的主要问题是:算法
相对地,Netty的优势有不少:编程
上面这张图就是在官网首页的架构图,咱们从上到下分析一下。bootstrap
绿色的部分Core核心模块,包括零拷贝、API库、可扩展的事件模型。api
橙色部分Protocol Support协议支持,包括Http协议、webSocket、SSL(安全套接字协议)、谷歌Protobuf协议、zlib/gzip压缩与解压缩、Large File Transfer大文件传输等等。安全
红色的部分Transport Services传输服务,包括Socket、Datagram、Http Tunnel等等。
以上可看出Netty的功能、协议、传输方式都比较全,比较强大。
首先搭建一个HelloWord工程,先熟悉一下API,还有为后面的学习作铺垫。如下面这张图为依据:
使用的版本是4.1.20,相对比较稳定的一个版本。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
复制代码
public class MyServer {
public static void main(String[] args) throws Exception {
//建立两个线程组 boosGroup、workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//建立服务端的启动对象,设置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置两个线程组boosGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)
//设置服务端通道实现类型
.channel(NioServerSocketChannel.class)
//设置线程队列获得链接个数
.option(ChannelOption.SO_BACKLOG, 128)
//设置保持活动链接状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
//使用匿名内部类的形式初始化通道对象
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//给pipeline管道设置处理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});//给workerGroup的EventLoop对应的管道设置处理器
System.out.println("java技术爱好者的服务端已经准备就绪...");
//绑定端口号,启动服务端
ChannelFuture channelFuture = bootstrap.bind(6666).sync();
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
复制代码
/** * 自定义的Handler须要继承Netty规定好的HandlerAdapter * 才能被Netty框架所关联,有点相似SpringMVC的适配器模式 **/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取客户端发送过来的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//发送消息给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到消息,并给你发送一个问号?", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//发生异常,关闭通道
ctx.close();
}
}
复制代码
public class MyClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//建立bootstrap对象,配置参数
Bootstrap bootstrap = new Bootstrap();
//设置线程组
bootstrap.group(eventExecutors)
//设置客户端的通道实现类型
.channel(NioSocketChannel.class)
//使用匿名内部类初始化通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加客户端通道的处理器
ch.pipeline().addLast(new MyClientHandler());
}
});
System.out.println("客户端准备就绪,随时能够起飞~");
//链接服务端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
//对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
} finally {
//关闭线程组
eventExecutors.shutdownGracefully();
}
}
}
复制代码
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送消息到服务端
ctx.writeAndFlush(Unpooled.copiedBuffer("歪比巴卜~茉莉~Are you good~马来西亚~", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收服务端发送过来的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
}
复制代码
先启动服务端,再启动客户端,就能够看到结果:
MyServer打印结果:
MyClient打印结果:
若是Handler处理器有一些长时间的业务处理,能够交给taskQueue异步处理。怎么用呢,请看代码演示:
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取到线程池eventLoop,添加线程,执行
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
//长时间操做,不至于长时间的业务操做致使Handler阻塞
Thread.sleep(1000);
System.out.println("长时间的业务处理");
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
复制代码
咱们打一个debug调试,是能够看到添加进去的taskQueue有一个任务。
延时任务队列和上面介绍的任务队列很是类似,只是多了一个可延迟必定时间再执行的设置,请看代码演示:
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
//长时间操做,不至于长时间的业务操做致使Handler阻塞
Thread.sleep(1000);
System.out.println("长时间的业务处理");
} catch (Exception e) {
e.printStackTrace();
}
}
},5, TimeUnit.SECONDS);//5秒后执行
复制代码
依然打开debug进行调试查看,咱们能够有一个scheduleTaskQueue任务待执行中
在搭建HelloWord工程的时候,咱们看到有一行这样的代码:
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666);
复制代码
不少操做都返回这个ChannelFuture对象,究竟这个ChannelFuture对象是用来作什么的呢?
ChannelFuture提供操做完成时一种异步通知的方式。通常在Socket编程中,等待响应结果都是同步阻塞的,而Netty则不会形成阻塞,由于ChannelFuture是采起相似观察者模式的形式进行获取结果。请看一段代码演示:
//添加监听器
channelFuture.addListener(new ChannelFutureListener() {
//使用匿名内部类,ChannelFutureListener接口
//重写operationComplete方法
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//判断是否操做成功
if (future.isSuccess()) {
System.out.println("链接成功");
} else {
System.out.println("链接失败");
}
}
});
复制代码
Bootstrap和ServerBootStrap是Netty提供的一个建立客户端和服务端启动器的工厂类,使用这个工厂类很是便利地建立启动类,根据上面的一些例子,其实也看得出来能大大地减小了开发的难度。首先看一个类图:
能够看出都是继承于AbstractBootStrap抽象类,因此大体上的配置方法都相同。
通常来讲,使用Bootstrap建立启动器的步骤可分为如下几步:
在上一篇文章《Reactor模式》中,咱们就讲过服务端要使用两个线程组:
通常建立线程组直接使用如下new就完事了:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
复制代码
有点好奇的是,既然是线程组,那线程数默认是多少呢?深刻源码:
//使用一个常量保存
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
//NettyRuntime.availableProcessors() * 2,cpu核数的两倍赋值给常量
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//若是不传入,则使用常量的值,也就是cpu核数的两倍
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
复制代码
经过源码能够看到,默认的线程数是cpu核数的两倍。假设想自定义线程数,可使用有参构造器:
//设置bossGroup线程数为1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//设置workerGroup线程数为16
EventLoopGroup workerGroup = new NioEventLoopGroup(16);
复制代码
这个方法用于设置通道类型,当创建链接后,会根据这个设置建立对应的Channel实例。
使用debug模式能够看到
通道类型有如下:
NioSocketChannel: 异步非阻塞的客户端 TCP Socket 链接。
NioServerSocketChannel: 异步非阻塞的服务器端 TCP Socket 链接。
经常使用的就是这两个通道类型,由于是异步非阻塞的。因此是首选。
OioSocketChannel: 同步阻塞的客户端 TCP Socket 链接。
OioServerSocketChannel: 同步阻塞的服务器端 TCP Socket 链接。
稍微在本地调试过,用起来和Nio有一些不一样,是阻塞的,因此API调用也不同。由于是阻塞的IO,几乎没什么人会选择使用Oio,因此也很难找到例子。我稍微琢磨了一下,通过几回报错以后,总算调通了。代码以下:
//server端代码,跟上面几乎同样,只需改三个地方
//这个地方使用的是OioEventLoopGroup
EventLoopGroup bossGroup = new OioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup)//只须要设置一个线程组boosGroup
.channel(OioServerSocketChannel.class)//设置服务端通道实现类型
//client端代码,只需改两个地方
//使用的是OioEventLoopGroup
EventLoopGroup eventExecutors = new OioEventLoopGroup();
//通道类型设置为OioSocketChannel
bootstrap.group(eventExecutors)//设置线程组
.channel(OioSocketChannel.class)//设置客户端的通道实现类型
复制代码
NioSctpChannel: 异步的客户端 Sctp(Stream Control Transmission Protocol,流控制传输协议)链接。
NioSctpServerChannel: 异步的 Sctp 服务器端链接。
本地没启动成功,网上看了一些网友的评论,说是只能在linux环境下才能够启动。从报错信息看:SCTP not supported on this platform,不支持这个平台。由于我电脑是window系统,因此网友说的有点道理。
首先说一下这两个的区别。
option()设置的是服务端用于接收进来的链接,也就是boosGroup线程。
childOption()是提供给父管道接收到的链接,也就是workerGroup线程。
搞清楚了以后,咱们看一下经常使用的一些设置有哪些:
SocketChannel参数,也就是childOption()经常使用的参数:
SO_RCVBUF Socket参数,TCP数据接收缓冲区大小。 TCP_NODELAY TCP参数,当即发送数据,默认值为Ture。 SO_KEEPALIVE Socket参数,链接保活,默认值为False。启用该功能时,TCP会主动探测空闲链接的有效性。
ServerSocketChannel参数,也就是option()经常使用参数:
SO_BACKLOG Socket参数,服务端接受链接的队列长度,若是队列已满,客户端链接将被拒绝。默认值,Windows为200,其余为128。
因为篇幅限制,其余就不列举了,你们能够去网上找资料看看,了解一下。
ChannelPipeline是Netty处理请求的责任链,ChannelHandler则是具体处理请求的处理器。实际上每个channel都有一个处理器的流水线。
在Bootstrap中childHandler()方法须要初始化通道,实例化一个ChannelInitializer,这时候须要重写initChannel()初始化通道的方法,装配流水线就是在这个地方进行。代码演示以下:
//使用匿名内部类的形式初始化通道对象
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//给pipeline管道设置自定义的处理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});
复制代码
处理器Handler主要分为两种:
ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)
入站指的是数据从底层java NIO Channel到Netty的Channel。
出站指的是经过Netty的Channel来操做底层的java NIO Channel。
ChannelInboundHandlerAdapter处理器经常使用的事件有:
注册事件 fireChannelRegistered。
链接创建事件 fireChannelActive。
读事件和读完成事件 fireChannelRead、fireChannelReadComplete。
异常通知事件 fireExceptionCaught。
用户自定义事件 fireUserEventTriggered。
Channel 可写状态变化事件 fireChannelWritabilityChanged。
链接关闭事件 fireChannelInactive。
ChannelOutboundHandler处理器经常使用的事件有:
端口绑定 bind。
链接服务端 connect。
写事件 write。
刷新时间 flush。
读事件 read。
主动断开链接 disconnect。
关闭 channel 事件 close。
还有一个相似的handler(),主要用于装配parent通道,也就是bossGroup线程。通常状况下,都用不上这个方法。
提供用于服务端或者客户端绑定服务器地址和端口号,默认是异步启动。若是加上sync()方法则是同步。
有五个同名的重载方法,做用都是用于绑定地址端口号。不一一介绍了。
//释放掉全部的资源,包括建立的线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
复制代码
会关闭全部的child Channel。关闭以后,释放掉底层的资源。
Channel是什么?不妨看一下官方文档的说明:
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind
翻译大意:一种链接到网络套接字或能进行读、写、链接和绑定等I/O操做的组件。
若是上面这段说明比较抽象,下面还有一段说明:
A channel provides a user:
the current state of the channel (e.g. is it open? is it connected?), the configuration parameters of the channel (e.g. receive buffer size), the I/O operations that the channel supports (e.g. read, write, connect, and bind), and the ChannelPipeline which handles all I/O events and requests associated with the channel.
翻译大意:
channel为用户提供:
通道当前的状态(例如它是打开?仍是已链接?)
channel的配置参数(例如接收缓冲区的大小)
channel支持的IO操做(例如读、写、链接和绑定),以及处理与channel相关联的全部IO事件和请求的ChannelPipeline。
boolean isOpen(); //若是通道打开,则返回true
boolean isRegistered();//若是通道注册到EventLoop,则返回true
boolean isActive();//若是通道处于活动状态而且已链接,则返回true
boolean isWritable();//当且仅当I/O线程将当即执行请求的写入操做时,返回true。
复制代码
以上就是获取channel的四种状态的方法。
获取单条配置信息,使用getOption(),代码演示:
ChannelConfig config = channel.config();//获取配置参数
//获取ChannelOption.SO_BACKLOG参数,
Integer soBackLogConfig = config.getOption(ChannelOption.SO_BACKLOG);
//由于我启动器配置的是128,因此我这里获取的soBackLogConfig=128
复制代码
获取多条配置信息,使用getOptions(),代码演示:
ChannelConfig config = channel.config();
Map<ChannelOption<?>, Object> options = config.getOptions();
for (Map.Entry<ChannelOption<?>, Object> entry : options.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
/** SO_REUSEADDR : false WRITE_BUFFER_LOW_WATER_MARK : 32768 WRITE_BUFFER_WATER_MARK : WriteBufferWaterMark(low: 32768, high: 65536) SO_BACKLOG : 128 如下省略... */
复制代码
写操做,这里演示从服务端写消息发送到客户端:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("这波啊,这波是肉蛋葱鸡~", CharsetUtil.UTF_8));
}
复制代码
客户端控制台:
//收到服务端/127.0.0.1:6666的消息:这波啊,这波是肉蛋葱鸡~
复制代码
链接操做,代码演示:
ChannelFuture connect = channelFuture.channel().connect(new InetSocketAddress("127.0.0.1", 6666));//通常使用启动器,这种方式不经常使用
复制代码
经过channel获取ChannelPipeline,并作相关的处理:
//获取ChannelPipeline对象
ChannelPipeline pipeline = ctx.channel().pipeline();
//往pipeline中添加ChannelHandler处理器,装配流水线
pipeline.addLast(new MyServerHandler());
复制代码
在NioEventLoop中,有一个成员变量selector,这是nio包的Selector,在以前《NIO入门》中,我已经讲过Selector了。
Netty中的Selector也和NIO的Selector是同样的,就是用于监听事件,管理注册到Selector中的channel,实现多路复用器。
在前面介绍Channel时,咱们知道能够在channel中装配ChannelHandler流水线处理器,那一个channel不可能只有一个channelHandler处理器,确定是有不少的,既然是不少channelHandler在一个流水线工做,确定是有顺序的。
因而pipeline就出现了,pipeline至关于处理器的容器。初始化channel时,把channelHandler按顺序装在pipeline中,就能够实现按序执行channelHandler了。
在一个Channel中,只有一个ChannelPipeline。该pipeline在Channel被建立的时候建立。ChannelPipeline包含了一个ChannelHander造成的列表,且全部ChannelHandler都会注册到ChannelPipeline中。
在Netty中,Handler处理器是有咱们定义的,上面讲过经过集成入站处理器或者出站处理器实现。这时若是咱们想在Handler中获取pipeline对象,或者channel对象,怎么获取呢。
因而Netty设计了这个ChannelHandlerContext上下文对象,就能够拿到channel、pipeline等对象,就能够进行读写等操做。
经过类图,ChannelHandlerContext是一个接口,下面有三个实现类。
实际上ChannelHandlerContext在pipeline中是一个链表的形式。看一段源码就明白了:
//ChannelPipeline实现类DefaultChannelPipeline的构造器方法
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
//设置头结点head,尾结点tail
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
复制代码
下面我用一张图来表示,会更加清晰一点:
咱们先看一下EventLoopGroup的类图:
其中包括了经常使用的实现类NioEventLoopGroup。OioEventLoopGroup在前面的例子中也有使用过。
从Netty的架构图中,能够知道服务器是须要两个线程组进行配合工做的,而这个线程组的接口就是EventLoopGroup。
每一个EventLoopGroup里包括一个或多个EventLoop,每一个EventLoop中维护一个Selector实例。
咱们不妨看一段DefaultEventExecutorChooserFactory的源码:
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
@Override
public EventExecutor next() {
//idx.getAndIncrement()至关于idx++,而后对任务长度取模
return executors[idx.getAndIncrement() & executors.length - 1];
}
复制代码
这段代码能够肯定执行的方式是轮询机制,接下来debug调试一下:
它这里还有一个判断,若是线程数不是2的N次方,则采用取模算法实现。
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
复制代码
参考Netty官网文档:API文档
创做不易,以为有用就点个赞吧。
我不要下次必定,但愿此次必定素质三连,感谢!
想第一时间看到我更新的文章,能够微信搜索公众号「java技术爱好者
」,拒绝作一条咸鱼,我是一个努力让你们记住的程序员。咱们下期再见!!!
能力有限,若是有什么错误或者不当之处,请你们批评指正,一块儿学习交流!