Jetty9源码剖析 - Connection组件 - SelectChannelEndPoint

转载自ph0ly:http://www.ph0ly.com

一、概念

EndPoint表示一个逻辑上的传输端点,数据的读取和写入都是从端点开始,配合回调机制(当读操作发生时可以通知到回调函数,而写操作也提供回调机制,当完成写入后,调用写回调函数),可以完成异步处理的。端点的实现通常具有超时的特性,在端点未产生任何读写的情况下,超时机制可以保证EndPoint关闭(包括关联的Channel或其他资源),避免一些资源的浪费。SelectChannelEndPoint是其中一种实现,能支持NIO任务生产逻辑的实现

二、继承体系

继承体系

我们看到整个继承体系还是非常简单的

AbstractEndPoint是抽象的EndPoint实现,绝大部分逻辑都放到了AbstractEndPoint,例如回调注册、端点超时等操作

ChannelEndPoint表示一个包装了SocketChannel的EndPoint,主要封装一些对SocketChannel的操作,包括对Channel的数据读写

 

SelectChannelEndPoint主要封装EPC执行任务需要的接口,其实就是实现了ManagedSelector.SelectableEndPoint接口,后续IO事件发生时会触发这个EndPoint的onSelected方法,获取到待执行任务

三、总体架构

架构

从图中可以看出SelectChannelEndPoint主要通过操纵SocketChannel完成IO的读写

当IO事件发生时,SelectChannelEndPoint在EPC的触发下首先执行onSelected,完成回调类型任务的选择,完成后返回一个任务,这个任务由EPC继续执行,调用到任务的run方法,这时候会触发之前EndPoint上关联的读或者写回调,从而调用到了HttpConnection.onFillable或者WriteFlusher的写操作,这样就能触发到后续的操作了

四、源码剖析

1. 构造函数

构造函数-1

之前在SelectorManager里面其实已经介绍过,EndPoint的创建是由ServerConnectorManager来创建,调用的是上图的方法,都是赋值,比较简单
重点看下super里面在做什么

构造函数-2

ChannelEndPoint也比较简单,把channel放进来,同时取出socket,再看上一层的super

构造函数-3

仍然很简单,再上一级就是IdleTimeout了

构造函数-4

都是赋值操作,比较简单不解释

2. 读数据

读操作

读数据核心就是这个方法了,EPC在执行生产任务的时候调用了SelectorProducer.produce,实际就是先调这个方法拿任务,拿到任务再在当前线程执行(不清楚的读者可以回去看下EPC和ManagedSelector相关的文章),下面我们来仔细分析下

首先看当前这个SocketChannel关联的SelectionKey的interestOps,拿到准备好的readyOps,然后将当前的_desiredInterestOps去掉当前准备好的操作,那如果之前注册的OP_READ,刚好来一个读事件,其实这里就会导致_desiredInterestOps变成0,而在updateKey里面会触发更改这个SelectionKey的interestOps,这样就会导致其实什么都不监听了,相信读者看到这里比较懵逼,这是什么操作?
其实这里是保证在当前这条连接上的数据能按照顺序处理,先让前面这个事件处理完成后,在再HttpConnection.onFillable里面会切回读模式,继续监听接下来的事件,本身HTTP/1.x协议就是顺序执行,正好适合这种模式。另外对于OP_WRITE来说,如果不取消,就会导致Selector一直检测到写事件,那就会导致无限write,这也是要去掉这个readyOps的原因

 

后面就检测当前是读还是写,然后根据读写事件类型拿到一个任务,可能是3种情况:只写、只读、读写,这个不难理解。但是有些读者可能会问,这里为啥要有写操作?其实对于NIO写操作来说,往Channel里面写数据不一定一次性能写完,在底层缓冲区满了的情况下,write操作可能会返回0,也就是一个都没写成功,如果while一直写入,CPU就打爆了,因此利用Selector来检测OP_WRITE操作,当可以写的时候尝试去写,这样保证CPU不会轻易飙升

再来看下这3个任务的实现

3个任务

_runFillable提供的是读操作,利用fillInterest注册的回调,调用它的fillable方法,通常就是调用到的就是HttpConnection.onFillable方法

_runCompleteWrite提供的是写操作,从名字上来看是完成写,其实就是在应用层拿到OutputStream往流里面写的时候,部分数据没写完,这个时候需要完成剩下的数据写入,这里会调用WriteFlusher来刷,WriteFlusher最终调用到EndPoint的flush,完成向Channel的写入,这块在下面的写入里面详细讲解

 

_runCompleteWriteFillable提供先写后读操作,优先完成之前未完成写入的数据,之后再执行读操作

最后附上FillInterest.fillable的实现

FillInterest.fillable

比较简单,提供了回调机制,调用succeeded,AbstractConnection里面的ReadCallback实现的succeeded就是自身的onFillable方法,自然就调到HttpConnection.onFillable了

接下来再来看下如何从Channel读到数据的,后面HttpConnection会触发这个方法,拿到真实的数据

EndPoint.fill

HttpConnection.onFillable里面会利用EndPoint提供的fill方法,完成向Buffer里面刷入数据,这样才能开始自己的解析
从上面可以看到,其实很简单,就是切换ByteBuffer的模式为读,如果读到了数据,那就把定时器置为非空闲,保证端点的活性,如果读到-1了,那就是EOF,连接的读方向已经跪了,所以关闭输入,最后返回读到了多个字节,出现IO异常也是关了输入方向,最后把Buffer切回到写模式

3. 写数据

3.1 直接写

EndPoint本身对外提供的其实是write方法作为写入方法

EndPoint.write

可以看到这里其实还是调用了WriteFlusher来完成写入

WriteFlusher.write

WriteFlusher的实现首先调自己的flush方法,完成刷缓冲(这个下面会解释),这个flush方法会返回未完成写入的Buffer,如果存在未完成的Buffer,这里会将这些Buffer作为PendingState存下来,并利用onIncompleteFlush来向Channel的SelectionKey修改OP_WRITE的操作,这样其实就能在之后仍然保证之前的数据能刷完。如果flush一次性刷完了,那其实也不用改OP_WRITE,因此直接就callback.succeeded了

接下来再来看下WriteFlusher.flush的实现

WriteFlusher.flush

可以看到这里会调用EndPoint.flush来刷数据(下面会讲解它的实现),刷完后,会给出标识是否完成了,如果完成了,那这里就返回了,如果没完成,这里就会去把这些未完成的Buffer靠背到新的数组里面去,并返回给上一级调用

再来看下EndPoint.flush是如何实现的,其实更具体说是ChannelEndPoint,当然也是我们的SelectChannelEndPoint

EndPoint.flush

可以看到就是直接操作了Channel.write,直接写到了SocketChannel,其实看起来Jetty这里是有个bug,else分支的buffers应该一直是0,那什么也不会做
因为Channel.write会操作ByteBuffer的postion,所以最后判断是否为空,那就能看出哪些Buffer未完成写入,只要有一个未完成,那其实整体就是未完成写入

3.2 完成写

上面其实提到了写操作可能并不能一次性写完,那就要利用WriteFlusher的completeWrite来完成写入,接下来我们看下它的实现

WriteFlusher.completeWrite

这里从之前存下来的PendingState拿出未完成的写,之前说到PendingState会记录未完成写入的Buffer数组,因此这里直接拿出来,调WriterFlusher.flush(上面已经讲解过了),如果仍然存在未完成的,那继续放到PendingState,等待下一次完成写入,否则就让PendingState.complete,其实就是调了callback.succeeded

4. 超时

之前我们看到继承体系中,EndPoint其实是继承了IdleTimeout,那也就是它具有超时处理的能力,超时处理在EndPoint发挥了什么作用呢?接下来我们一起来看下
IdleTimeout的逻辑比较简单,就是定时器定时检测当前是否已经到了超时时间,如果超时了,则执行一个抽象的onIdleExpired方法,让业务层自己处理,在AbstractEndPoint里面就实现了这部分逻辑

AbstractEndPoint.onIdleExpired

这里会触发_fillInterest.onFail,这个操作会导致整个连接进入读关闭状态,_writeFlusher.onFail有可能会触发写操作关闭状态,这样其实就导致在超时后,Http连接直接被关了,通常超时时间是30秒,这两个操作这里就不细讲了,感兴趣的读者可以下来看下源码

五、总结

EndPoint作为传输的端点(在实际用的通常就是SelectChannelEndPoint)它封装了对Channel的数据操作,同时对EPC暴露出任务获取接口,利用Callback调到下一级操作,可以说EndPoint是真实数据交互的桥梁,帮助上层的Connection读写数据,在Jetty的通信层具有非常核心的作用,实现相对还是有些复杂,希望读者们能有很多收获。接下来的文章我会带领大家一起来看下HttpConnection是如何完成后续的HTTP协议解析的,欢迎大家持续关注~