NIO
的那些事咱们在前段时间学习了IO
和NIO
的一些概念性的东西,而且写了一些简单的例子进行实践,虽然简单,但基本上覆盖了NIO
的一些最基本的概念了。
若是还没看过的,若是翻一下以前的文章了解一下,或者看一下网上的其余文章。html
JAVA
的NIO
的那些痛既然咱们学过NIO
,那咱们以JAVA
的NIO
来举个例子,说明一下咱们使用NIO
的一些基本流程:java
ServerSocketChannel
(Server
端)或SocketChannel
(Client
端),监听对应的端口或链接对应的端口configureBlocking(false)
为非阻塞register
注册要监听的描述符Selector.open
打开Selector
Selector.select
获得已就绪的SelectionKey
SelectionKey
进行相应的处理这里我把以前的某些步骤合并了,可能跟以前有前面的文章有点不一致,但整体步骤是同样的。程序员
其实,上面的步骤咱们大能够了解到,咱们真正须要关注的步骤只是第6步,或者说是咱们真正要处理IO
事件的一些逻辑,其余的都是一些通用流程而已。spring
既然如此,咱们真的有必要把时间花费在这些通用的地方吗?bootstrap
偷懒的程序员确定不想这样作,因此有人开发了mina
和netty
一类的NIO
框架,旨在把程序员从这些烦杂的通用流程中释放出来,而是只关注真正的业务逻辑,把这些交由框架去作处理。服务器
mina
和netty
的做者都是同一我的(Trustin Lee,牛人老是各类牛)。mybatis
但鉴于netty
基本上已是事实上的NIO
标准框架了,而且社区一直比较活跃,而mina
已经归档好久了,都已经没更新不少年了。为了不精力太过度散(实际上是我没学习过mina
,不懂-_- ),咱们这里不讨论mina
,直接学习netty
,里面有不少值得咱们学习的东西。多线程
在开始介绍netty
相关的知识前,咱们来了解一下线程模型相关的一些知识,这里参考了不少网上的一些文章,加以本身整理了一下,但愿可以给一些看其余文章不清楚的朋友一些不同的理解。架构
图片来自: https://www.jianshu.com/p/738...
这里的单线程指的是分派线程和工做线程都在同一个线程,能够看回咱们的JAVA
的NIO
示例代码,这里为了方便,咱们也贴在下面:框架
public class MyServer { public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress("localhost", 8001)); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); String str = ""; while(!Thread.currentThread().isInterrupted()) { //这里是一直阻塞,直到有描述符就绪 selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); //链接创建 if (key.isAcceptable()) { try { SocketChannel clientChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } } //链接可读,这时能够直接读 else if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) key.channel(); try { int num = socketChannel.read(readBuffer); str = new String(readBuffer.array(), 0, num); System.out.println("received message:" + str); } catch (IOException e) { e.printStackTrace(); } } } } } }
咱们能够看到在咱们nio
的例子中,咱们没有明确使用多线程,这里就是使用了单线程来处理的。
它有什么好处呢?
实现简单。这是固然的,全部不涉及到多线程的代码都是相对比较简单的,注意,是 相对
有优势的同时确定有缺点,那么这种单线程有什么缺点呢:
性能相对比较差。只有一个线程进行请求的处理,也就是只有一个线程处理
CPU的描述符,假设同一时间有不少信号都就绪了,而且咱们读到
IO
数据后的真正处理逻辑可能比较复杂,那么全部的请求都须要等待当前的请求处理完成后才能处理其余的。这也就致使了它的性能相对(这里的相对是对比其余多线程的处理方式)比较弱。
图片来自: https://www.jianshu.com/p/738...
这里的多线程指的是处理逻辑的多线程,对应到咱们的NIO
代码逻辑里面就是对SelectionKey
的处理是多线程的,咱们直接看代码会直观点:
public class MyServerMultipleThread { @SuppressWarnings("Duplicates") public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress("localhost", 8001)); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while(!Thread.currentThread().isInterrupted()) { //这里是一直阻塞,直到有描述符就绪 selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); //链接创建 if (key.isAcceptable()) { try { SocketChannel clientChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } //链接可读,这时能够直接读 else if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) key.channel(); int num = socketChannel.read(readBuffer); new Thread(() -> { String str = new String(readBuffer.array(), 0, num); System.out.println("received message:" + str); }).start(); } } } } }
这里咱们能够看到,在进行SelectionKey
遍历读完数据后真正处理的时候,咱们新起了一个新的线程进行NIO
的相关处理。
固然,这里的只是一个示例,真正写代码的时候不该该这样无限制的新起线程,而是应该使用线程池,更合理的使用线程,避免线程数量太多,致使 CPU切换太频繁,这样反而起不到优化性能的做用。
图片来自: https://www.jianshu.com/p/738...
一看到这图,估计不少人头都大了,这都什么鬼,这么复杂啊。
实际能够简单一点理解:
注意,这里的多线程不包括accept
请求,accept
仍是由单个线程进行分发。
咱们直接看一下代码会比较容易理解
public class MyServerMultipleThread2 { @SuppressWarnings("Duplicates") public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress("localhost", 8001)); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while(!Thread.currentThread().isInterrupted()) { selector.select(); //这里是一直阻塞,直到有描述符就绪 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); //链接创建 if (key.isAcceptable()) { try { SocketChannel clientChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } //链接可读,这时能够直接读 else if (key.isReadable()) { new Thread(() -> { ByteBuffer readBuffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) key.channel(); int[] num = new int[]{0}; try { num[0] = socketChannel.read(readBuffer); new Thread(() -> { String str = new String(readBuffer.array(), 0, num[0]); System.out.println("received message:" + str); }).start(); } catch (IOException e) { e.printStackTrace(); } }).start(); key.channel().register(key.selector(), SelectionKey.OP_WRITE); } } } } }
这里我没有参考一些网上比较复杂的作法,可能实现起来不大一致,但相对容易理解一点。
accept
请求时,仍是由单前线程单独处理READ
或WRITE
等请求时,咱们新起一个线程去作处理,而且在真正的处理逻辑时,仍是跟上面的多线程逻辑同样,是新起一个线程去作处理。READ
或WRITE
时,注意,须要从新注册相应的WRITE
或READ
事件——由于新起线程后,当前SelectionKey
的信号仍是READ
,若是咱们不作修改,会致使当前的线程会重复屡次处理。具体你们能够下来试试,把后面的register
去掉,看一下会出现什么状况。咱们看到,上面的线程模型,都以性能提高为目的,一步步去进行优化,但同时咱们也看到了,代码是愈来愈复杂,使得咱们在维护咱们真正的逻辑时,有点像是大海捞针,真正的代码逻辑就那么一点,而不少都是一些模板代码。
为了解决这些问题,就须要引出咱们的框架了,框架正是为了帮咱们去约定好一些通用的逻辑而出现的,好比spring
,帮我作好了IOC
和AOP
等的一些逻辑,这些不须要咱们去额外关注;而mybatis
帮咱们作好了ORM
相关的一些处理, DB映射等,这些流程化的东西都已经固化了;而咱们这里要说的netty
,它帮咱们把NIO
这些线程模型相关的东西帮咱们作了不少的优化和抽取,咱们再也不须要管这些流程化的东西,只须要写咱们本身的逻辑。
netty
出场netty
做为一个高性能的NIO
框架,基本上已是事实上的NIO
标准了,包括dubbo
,zookeeper
等内部都比较大量地使用了netty
。或者说具体点,这些框架可以有这么好的性能,大部分功劳要归结到netty
身上。
netty
基础知识看例子前咱们先来补充一些基础知识。netty
有几个重要概念:
ChannelHandler
channel
的事件处理器,里面封装了针对当前channel
的生命周期的方法
ChannelInBoundHandler
channel
的READ
请求处理器,里面封装了当前channel
的对于接收请求相关的生命周期方法
ChannelOutBoundHandler
channel
的WRITE
请求处理器,里面封装了当前channel
的对象发出请求的生命周期方法。
ChannelPipeline
此类是netty
架构中比较重要的一个类,它使用了 责任链模式,把请求从ChannelHandler
中一个个的日后传递,最终到达咱们的业务Handler
。关于Pipeline
的详细描述,咱们后面再详细看看。
ByteBuf
netty
封装了本身的ByteBuf
,与JDK
自带的ByteBuffer
的最主要的区别是它有两个指针,一个供读readerIndex
,一个供写writerIndex
。而至于该类的一些详细信息,你们能够看一下它的JavaDoc
,写得很是详细。
关于OutBound
和上面的InBound
的区别,你们能够简单地区分一下,In
就是请求进入,对应的就是READ
,Out
就是请求发出,对应的就是WRITE
。
基本的概念了解清楚了,那咱们来看一下简单的例子。
其实netty
最好的文档是它的官网文档。咱们就仍是以相似官方源码里面的一个example
来学习一下,实现的功能很简单:
Client
链接成功后传一句话给Server
,Server
回复收到。
server
端ServerHandler
public class MyNettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("from client:" + msg); ctx.writeAndFlush("I received your message:" + msg + System.lineSeparator()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } }
咱们的代码比较简单,打印收到的文本,而且再发回一条语句。咱们能够看到咱们输出的时候加多了一个 换行符——System.lineSeparator()
,这是为何呢?
这里涉及到另一个TCP/IP
一个比较重要的问题, 拆包和粘包,这里咱们先不细说,后面我会有专门的文章来讲一下 拆包和粘包还有一系列TCP/IP
相关的知识,这是很是大的一块了。咱们如今就先简单的知道,加这个 换行符是为了让 Handler知道咱们的消息从哪里结束。
Server
public class MyNettyServer { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(4096)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new MyNettyServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture channelFuture = null; try { channelFuture = serverBootstrap.bind("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
这里涉及到比较多的知识点,总体结构咱们先无论它,咱们重要先关注一下:
线程池
这里定义了两个线程组进行处理,BossGroup
和WorkerGroup
,对应咱们上面的 多线程模型,缘由是netty
并不使用 主从多线程模型——这个咱们之后的文章有机会再细说。
ServerBootStrap
netty
工具类,有助于编写服务器的相关代码,而Client
端对应的就是Bootstrap
了。
pipeline
的添加ch.pipeline().addLast(new LineBasedFrameDecoder(4096)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new MyNettyServerHandler());
这里把4个Handler
添加到Pipeline
的末尾,至于为何是末尾,相应看到后面的pipeline
的解析的时候你们就会知道了。
我这里大概描述一下几个Handler
的做用:
LineBasedFrameDecoder
根据换行符\n
或\r\n
进行内容的分割——即 拆包
StringDecoder
把接收到的内容解析为
String
字符串
StringEncoder
把发出的内容解析为
String
字符串
MyNettyServerHandler
咱们的真正逻辑处理类,这个应该是在前面的几个处理完成后再进行。咱们在后面的pipeline
执行顺序中能够看到为何这样添加。
后面的Client
中的Handler
也能够参考上面的。
Client
端ClientHandler
public class MyNettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush("helloworld" + System.lineSeparator()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("from server:" + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } }
这里咱们的代码也比较简单,就是链接成功的时候发条helloworld
过去服务端,而后再从服务端读到返回的内容。咱们就不细说了。
public class MyNettyClient { public static void main(String[] args) { EventLoopGroup workerGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(4096)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new MyNettyClientHandler()); } }) .option(ChannelOption.SO_KEEPALIVE, true); try { ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); } } }
对比上面的Server
代码,这里的区别,最大的就是咱们只有一个EventLoopGroup
,由于Client
端并不须要接收请求,因此并不须要所谓的BossGroup
。
一切就绪后,咱们能够跑一下看看运行状况:
先运行server
,再运行client
server
能够看到client
能够看到
这表示咱们已经使用netty
写了一个基本能够用的NIO
程序了。
ChannelPipeline
详解ChannelPipeline
做为netty
的一个底层重要组成部分,ChannelHandler
都须要依靠它进行调度,重要性不言而喻。那咱们如今就一块儿来看看ChannelPipeline
到底是怎么调度的。
查看ChannelPipeline
的JavaDoc
咱们能够看到这样一串描述(牛人写描述都是特别认真的)。
大概的意思就是这样的:
InBoundHandler
的添加顺序,从前日后执行。OutBoundHandler
的添加顺序,从后往前执行。另外,文档中又举了一个例子:
咱们套用一下咱们的Server
例子来分析一下:LineBasedFrameDecoder
,StringDecoder
,StringEncoder
,MyNettyServerHandler
当咱们收到消息时,须要执行的Handler
的顺序为:LineBasedFrameDecoder
,StringDecoder
,MyNettyServerHandler
当咱们发出消息时,须要执行的OutboundHandler
的顺序为:StringEncoder
.
基于上面的分析,咱们就能够分析为何咱们前面的例子能够获得那样的结果。
这篇文章,咱们从一开始的线程模型到后面的netty
的示例,这些种种都是为了性能的提升去作的一些优化。在当前大数据的趋势下,更多须要咱们把性能去作到极致。
后面,咱们会再根据netty
中的一些最佳实践来分析它是怎么解析粘包和拆分的。
https://www.jianshu.com/p/738095702b75
https://netty.io/wiki/user-guide-for-4.x.html