在上节咱们知道Netty启动后会动起一个selector线程监听IO事件,IO事件包括如下几个:java
读事件便可以发生在客户端也可会发生在服务端,当客户端或服务端注册读事件并接受到远端发送的数据就会触发读事件。编程
写事件便可以发生在客户端也可会发生在服务端,写事件能够由外部直接调用触发,当出现写半包时(出如今TCP缓存满的状况),Netty会注册写操做位,待TCP缓存消耗后也会触发写事件。数组
只发生在服务端,服务端启动的时候会注册接收操做位监听客户端的链接。promise
只发生在客户端,客户端启动时会尝试链接服务端,链接是异步的不必定立刻成功不成功则须要注册链接操做位监听客户端的链接成功。缓存
下面从服务端的角度介绍Netty启动后,接收客户端链接的流程,以及客户端链接上后服务端的读和写的流程。多线程
当Selector轮询到接收事件会在NioEventLoop类中的processSelectedKey方法中进行处理,源码以下:并发
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { ... int readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } } ... }
服务端的接收逻辑直接委托给unsafe.read()处理,unsafe中有2个实现类NioMessageUnsafe和NioByteUnsafe,因为服务端启动初始化的Channel用的是NioServerSocketChannel,因此unsafe的实现类是NioMessageUnsafe,下面看下unsafe.read()的实现:异步
public void read() { ... try { for (;;) { //获取接收结果 int localRead = doReadMessages(readBuf); //若是接收结果为空直接推出 if (localRead == 0) { break; } //异常状况下返回,tcp协议未用到 if (localRead < 0) { closed = true; break; } //非自动读,退出并去掉监听客户端的链接事件,变成手工注册,通常不用 if (!config.isAutoRead()) { break; } //每波的最大处理链接请求数默认为16 if (readBuf.size() >= maxMessagesPerRead) { break; } } } catch (Throwable t) { exception = t; } setReadPending(false); int size = readBuf.size(); //调用pipeline链处理客户端链接事件 for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } //清理接收对象 readBuf.clear(); //调用pipeline链处理接收完成事件 pipeline.fireChannelReadComplete(); ... }
接收客户端的处理逻辑主要流程以下:tcp
SocketChannel是服务端和客户端通信的核心操做类,pipeline.fireChannelRead方法在以前讲过是一个调用链,调用用户的配置ChannelHandler,这里系统会调用初始化channel时系统自动注册的ServerBootstrapAcceptor里的channelRead方法(初始化channel流程能够阅读上节内容)ide
channelRead是Netty的核心代码主要对SocketChannel进一步封装使剥离AcceptorSelector线程,独立出跟客户端通信IOSelector线程。咱们来看下它的实现:
public void channelRead(ChannelHandlerContext ctx, Object msg) { //获取与客户端通信的通道SocketChannel(下面叫childChannel) final Channel child = (Channel) msg; //将用户配置的处理器childChannel设置到childChannel child.pipeline().addLast(childHandler); //将用户配置的系统参数设置到childChannel for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } //将用户配置的属性设置到childChannel for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { //在从线程池里注册childChannel childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
channelRead流程比较简单就是在从线程池注册childChannel,而后从线程池起相应的selector线程处理服务端和客户端的读事件和写事件。
这里childChannel的注册流程和服务端启动时的channel的注册流程基本同样不过这里默认自动注册SelectionKey.OP_READ读事件而不是SelectionKey.OP_ACCEPT接收事件。须要注意的是从线程池的每一个线程会建立一个selector对象而一个selector可能注册多个childChannel。
完成上面的流程客户端就能够跟服务端通信了,若是客户端发送了数据,服务端的从selector线程就会了轮询到读事件,一样读事件会在NioEventLoop类中的processSelectedKey方法中进行处理,源码以下:
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { ... int readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } } ... }
能够看到读事件源码跟接收事件的源码是同一块,只是unsafe实现的类不同,因为childChannel的实现类是NioSocketChannel所以unsafe的实现类是NioByteUnsafe,下面重点分析下unsafe.read()的实现:
@Override public final void read() { final ChannelConfig config = config(); //readPending状态是非自动读状况下使用,readPending若是是false表示数据已读完移除读操做位 if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); return; } final ChannelPipeline pipeline = pipeline(); //获取ByteBuf构造器 final ByteBufAllocator allocator = config.getAllocator(); //获取自动读模式下的一次性读取的最大的次数 final int maxMessagesPerRead = config.getMaxMessagesPerRead(); //获取ByteBuf容量分配器 RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int totalReadAmount = 0; boolean readPendingReset = false; do { //构造ByteBuf byteBuf = allocHandle.allocate(allocator); //获取byteBuf最大可写字节 int writable = byteBuf.writableBytes(); //将接收到的字段写入到byteBuf,并获取接收数据长度 int localReadAmount = doReadBytes(byteBuf); //未读到数据释放byteBuf,跳出读取逻辑 if (localReadAmount <= 0) { byteBuf.release(); byteBuf = null; close = localReadAmount < 0; break; } //数据读完设置readPending为false if (!readPendingReset) { readPendingReset = true; setReadPending(false); } //交给用户配置的数据解析器(ChannelHandler)处理读到的数据 pipeline.fireChannelRead(byteBuf); byteBuf = null; //避免溢出,若是读取的数据量大于Integer的最大值则直接退出 if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { totalReadAmount = Integer.MAX_VALUE; break; } //加上此次读到的数据量 计算出总数据量 totalReadAmount += localReadAmount; //非自动读,退出 if (!config.isAutoRead()) { break; } //若是读到的数据量小于byteBuf最大可写字节 说明数据已经接受完,退出循环 if (localReadAmount < writable) { break; } //不然数据还未读完,继续读,直到读完或者读取次数大于最大次数 } while (++ messages < maxMessagesPerRead); //触发读取完成的处理器 pipeline.fireChannelReadComplete(); //记录此次读取数据的总量,以便后续动态建立byteBuf的大小 allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }
unsafe.read()的实现比较复杂咱们按步骤来分析
allocator做用是构造怎么样类型的ByteBuf好比默认就是构造了UnpooledUnsafeDirectByteBuf,UnpooledUnsafeDirectByteBuf提供了非池化的堆外内存直接操做的支持。
allocHandle做用是分配ByteBuf的容量,allocHandle有2个实现类分别是FixedRecvByteBufAllocator和AdaptiveRecvByteBufAllocator,FixedRecvByteBufAllocator实现比较简单,根据用户的配置分配固定的容量,AdaptiveRecvByteBufAllocator是默认的实现,它会根据上次分配的容量动态调整大小。
static final int DEFAULT_MINIMUM = 64; static final int DEFAULT_INITIAL = 1024; static final int DEFAULT_MAXIMUM = 65536; private static final int INDEX_INCREMENT = 4; private static final int INDEX_DECREMENT = 1;
AdaptiveRecvByteBufAllocator的最小容量为64字节,默认初始容量为1024,最大容量为65536字节,其扩展步伐值为4,收缩步伐值为1。
static { List<Integer> sizeTable = new ArrayList<Integer>(); for (int i = 16; i < 512; i += 16) { sizeTable.add(i); } for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); } SIZE_TABLE = new int[sizeTable.size()]; for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); } }
AdaptiveRecvByteBufAllocator初始化了53个容量选项当容量小于512字节时以16字节的步伐增加,当容量大于512字节时,容量以上一个容量的2倍增加。
咱们重点分析下扩容策略方法record()
public void record(int actualReadBytes) { //本次读取的总容量与上次容量收缩后的前一个位置的容量比较,若是比它还小或相等说明还在收缩 if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) { //是否持续在收缩,是的话上次容量收缩1个步伐 if (decreaseNow) { index = Math.max(index - INDEX_DECREMENT, minIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; //不是持续收缩,标记下,不作收缩处理 } else { decreaseNow = true; } //若是当前容量大于上次的容量,则扩容4个步伐 } else if (actualReadBytes >= nextReceiveBufferSize) { index = Math.min(index + INDEX_INCREMENT, maxIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } }
record()的参数actualReadBytes表示本次读取的总容量。以上代码的总结以下:
容量的收缩如下2个条件 - 1.若是本次读取的总容量比上次容量收缩后的前一个位置的容量还小或相等。 - 2.容量至少持续2次在收缩。 容量的扩展如下1个条件 - 1.若是本次读取的总容量大于上次的容量。
以上分析了构造ByteBuf,下面继续分析unsafe.read()方法。
将接收到的字段写入到byteBuf,并获取接收数据长度。
若是未读到数据,说明数据已经读物,释放byteBuf缓存,跳出读取循环逻辑。
这里的doReadBytes可能返回0,大于0和-1,等于0说明已无数据可读,大于0表示读到数据,-1表示读取异常须要设置close标志为true用于关闭链接。
避免溢出,若是读取的数据量大于Integer的最大值则直接跳出读取循环逻辑。
交给用户配置的数据解析器(ChannelHandler)处理读到的数据。
这里交给数据解析器数据,不必定是你想要的完整的数据,可能出现半包和粘包的现象,这须要数据解析器处理才能得到完整数据。
累计数据总量totalReadAmount,以便后面扩容策略用。
非自动读,跳出读取循环逻辑。
非自动状况下须要编程人员本身注册读操做位,才能触发读事件
若是读到的数据量小于byteBuf最大可写字节 说明数据已经接受完,跳出读取循环逻辑。
若是读到的数据量等于byteBuf最大可写字节,说明TCP缓存区还能还有数据,须要再次循环去读。
这里最大的循环次数模式是16次(可配)若是超过这个次数不管TCP缓存区是否还要数据都会终止循环,等下个selector周期再去读。
触发读取完成的处理器。
这里的处理器须要编程人员本身配置。
记录此次读取数据的总量,以便后续动态建立byteBuf的大小,动态扩容上面已经讲过,这里不累述。
若是读取发生IO异常,则关闭链接。(上面讲过的读取状态返回-1而且close标志设置为true的状况)。
以上就是整个读事件的整个流程,下面来分析下写事件流程。
服务端通常在接受到数据处理结束后给客户端端返回一个响应数据,发送响应数据则须要调用ChannelHandlerContext#writeAndFlush方法。咱们之外部调用ChannelHandlerContext#writeAndFlush(如下称ctx.writeAndFlush)方法为例来分析写事件流程,ctx.writeAndFlush方法看字面意思就是写入和刷新,ctx.writeAndFlush的写入是不会真正的发送,而是存到缓存中,刷新后才从缓存拿出数据发送。ctx.writeAndFlush也是调用链,开发人员能够实现ChannelOutboundHandler里的
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
方法捕获写数据事件或者实现
void flush(ChannelHandlerContext ctx) throws Exception;
方法来捕获刷新数据事件。
ctx.writeAndFlush的真正实现是其私有方法write,我看来看下write的源码实现:
private void write(Object msg, boolean flush, ChannelPromise promise) { //获取下个ChannelOutboundHandler的包装ChannelHandlerContext AbstractChannelHandlerContext next = findContextOutbound(); //获取ChannelHandlerContext里分配的线程 EventExecutor executor = next.executor(); //若是是跟当前线程同一个 if (executor.inEventLoop()) { //直接调用write next.invokeWrite(msg, promise); //若是须要刷新调用flush if (flush) { next.invokeFlush(); } //若是非同一个线程,须要异步处理 } else { //获取可读取数据量 int size = channel.estimatorHandle().size(msg); if (size > 0) { ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); //由于异步发送事先增长待发送缓存量,占用空间 if (buffer != null) { buffer.incrementPendingOutboundBytes(size); } } Runnable task; //若是须要刷新,建立带刷新的任务 if (flush) { task = WriteAndFlushTask.newInstance(next, msg, size, promise); //不然建立不带刷新的任务 } else { task = WriteTask.newInstance(next, msg, size, promise); } safeExecute(executor, task, promise, msg); } }
实现比较简单,取链中下个ChannelHandlerContext,这里的ChannelHandlerContext是ChannelHandler的包装类,维护了ChannelHandlerContext的next和prev节点。
若是ChannelHandlerContext中配置的线程跟当前是同线程则同步调用写和刷新的方法,这里若是ChannelHandlerContext中没有配置线程默认取的channel里的线程。
若是不是当前线程就要建立任务异步执行写和刷新的方法,这里若是是异步执行的话会事先增长待发送缓存量预占用空间,待要执行写的方法的时候会还原缓存占用空间。
增长待发送缓存量主要是为了反正发送的数据太大致使缓存消耗速度小于写入速度,若是超过用户配置的大小(默认64k),会给用户配置的处理器发报警,这里涉及Netty缓存设计,不过多介绍。
netty写的过程会调用用户配置的处理器,这里编程人员能够实现ChannelOutboundHandler的
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
方法来捕获写事件(好比对数据进行编码)。固然写的最终流程是在channel的unsafe里执行:
public final void write(Object msg, ChannelPromise promise) { ... int size; try { //包装msg msg = filterOutboundMessage(msg); //获取数据大小 size = estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } //写入缓存 outboundBuffer.addMessage(msg, size, promise); }
写入缓存前,会对msg进行包装,这里的包装主要将存储在Java堆内存的待数据写入到堆外内存,存在堆外内存好处就是真正的发送时减小一次堆内拷贝到堆外的过程,提高发送效率。
最后将转化好的msg写入到outboundBuffer缓存,这里的写入也会执行一次增长待发送缓存量操做,因此上面讲的在异步写入的写入操做真正执行前会会还原缓存占用空间,为的就是避免重复的增长待发送缓存量操做。
刷新的过程成也是同样,会先调用用户配置的处理器,这里编程人员能够实现ChannelOutboundHandler的
void flush(ChannelHandlerContext ctx) throws Exception;
方法来捕获刷新事件,固然写的最终流程也是在channel的unsafe里执行:
public final void flush() { //获取缓存容器 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; //若是缓存已经释放,则退出 if (outboundBuffer == null) { return; } //准备待发送数据 outboundBuffer.addFlush(); //刷新 flush0(); }
比较简单,先获取缓存而后准备待发送数据最后调用flush0()刷新,这里须要注意的是发送相关的处理不要配置成多线程,这里会出现并发准备待发送数据的问题。
下面来看下flush0()的实现:
protected void flush0() { //避免再次进入 if (inFlush0) { return; } //获取缓存容器 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; //若是当前通道(channel)已经关闭,或断开链接,则执行删除当前待发送数据操做。 if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true); } else { outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { //发送数据操做 doWrite(outboundBuffer); } catch (Throwable t) { //出现IO异常而且配置自动关闭则关闭全部 if (t instanceof IOException && config().isAutoClose()) { close(voidPromise(), t, false); //不然执行删除当前待发送数据操做。 } else { outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false; } }
flush0()主要有2个操做
咱们重点分析下doWrite方法:
protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { //获取须要待发送数据个数 int size = in.size(); //若是待发送数据为空,清理写操做位并退出 if (size == 0) { clearOpWrite(); break; } long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; //获取待发送数据 ByteBuffer[] nioBuffers = in.nioBuffers(); //获取待发送数据总个数 int nioBufferCnt = in.nioBufferCount(); //获取待发送数据总字节数 long expectedWrittenBytes = in.nioBufferSize(); //获取JDK的SocketChannel SocketChannel ch = javaChannel(); switch (nioBufferCnt) { //若是为0可能除了ByteBuffers类型外还要其余类型要发送,交给父类处理 case 0: super.doWrite(in); return; //若是是单个待发送数据,调用JDK的SocketChannel单个发送方法 case 1: ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; //若是是多个待发送数据,调用JDK的SocketChannel多个发送方法 default: //发送尝试,默认尝试16次 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { //调用JDK的SocketChannel的write发送数据 final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); //若是发送的量为0,通道不可写,TCP缓存队列已满,设置写操做位标记setOpWrite为true,退出发送 if (localWrittenBytes == 0) { setOpWrite = true; break; } //指望发送字节数减掉已经成功发送字节数 expectedWrittenBytes -= localWrittenBytes; //累加已发送字节数 writtenBytes += localWrittenBytes; //指望发送字节数为0,说明已经发送完毕,设置发送结束标志done为true,退出发送 if (expectedWrittenBytes == 0) { done = true; break; } } break; } // 释放彻底写入的缓冲区,并更新部分写入的缓冲区的索引。 in.removeBytes(writtenBytes); //若是数据未发送完,处理未发送完成逻辑 if (!done) { incompleteWrite(setOpWrite); break; } } }
doWrite方法是一个大循环,它每次处理完会再试,直到没有须要待发送数据,就是删除写操做位(若是有)。
doWrite发送数据前先获取3个局部变量:
待发送数据数组nioBuffers。
获取待发送数据总个数nioBufferCnt。
获取期待待发送数据总字节数expectedWrittenBytes。
根据待发送数据总个数nioBufferCnt,发送要分3种状况。
若是nioBufferCnt为0,说明待发送的ByteBuffers类型数据为0,但可能除了ByteBuffers类型外还要其余类型要发送,交给父类处理。
若是nioBufferCnt为1,说明待发送的ByteBuffer数据只有1个,调用JDK的SocketChannel的实现单个数据发送的write方法。
若是nioBufferCnt大于1,说明待发送的ByteBuffer数据有多个,调用JDK的SocketChannel的实现多个数据发送的write方法。
第1种通常不会出现咱们不作分析,重点分析下第2第3种状况,第2第3种方法除了调用JDK的SocketChannel的write方法实现不同,其余逻辑彻底相同。
发送的逻辑可能会出现一次发送不彻底的状况这里默认尝试16次发送(可配),最终会出现3种最终发送状况:
发送数据完成。
发送彻底失败,出现通道不可写状况。
尝试16次发送后只是部分发送成功,出现通道不可写或发送数据量太大状况。
第1和第3种状况是有数据发送成功的状况,因此发送完后会释放彻底发送成功的缓冲区,并更新部分发送成功的缓冲区的索引。
第2和第3状况是有产生数据未发送成功的状况,因此会调用incompleteWrite作相应的后续处理。
咱们来看下incompleteWrite方法实现:
protected final void incompleteWrite(boolean setOpWrite) { // 若是出现通道不可写状况,则注册写操做位由selector异步轮询到OP_WRITE事件的时候调用foreceFlush进行flush if (setOpWrite) { setOpWrite(); //若是出现数据量太大状况,放入channel线程中排队处理未发送数据,以便在此期间能够执行其余任务 } else { Runnable flushTask = this.flushTask; if (flushTask == null) { flushTask = this.flushTask = new Runnable() { @Override public void run() { flush(); } }; } eventLoop().execute(flushTask); } }
incompleteWrite方法有2种状况:
若是出现通道不可写状况,则注册写操做位由selector异步轮询到OP_WRITE事件的时候去刷新。
出现不可写状况一般都是TCP缓存空间满了,只有TCP缓存预留空间大于发送低潮限度时才会触发OP_WRITE事件,因此出现通道不可写状况必须注册写操做位交给系统来判断TCP缓存空间是否可写。
若是出现数据量太大状况,放入channel线程中排队去刷新未发送数据,这样以便在此期间能够执行其余任务。
以上就是对Netty的IO事件的分析