Netty源码分析第5章(ByteBuf)---->第10节: SocketChannel读取数据过程

 

Netty源码分析第五章: ByteBufhtml

 

第十节: SocketChannel读取数据过程java

咱们第三章分析过客户端接入的流程, 这一小节带你们剖析客户端发送数据, Server读取数据的流程:redis

首先舒适提示, 这一小节高度耦合第三章的第1, 2节的内容, 不少知识这里并不会重复讲解, 若是对以前的知识印象不深入建议恶补第三章的第1, 2节的内容以后再学习这一小节api

咱们首先看NioEventLoop的processSelectedKey方法:oop

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { //获取到channel中的unsafe
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //若是这个key不是合法的, 说明这个channel可能有问题
    if (!k.isValid()) { //代码省略
 } try { //若是是合法的, 拿到key的io事件
        int readyOps = k.readyOps(); //连接事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } //写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } //读事件和接受连接事件 //若是当前NioEventLoop是work线程的话, 这里就是op_read事件 //若是是当前NioEventLoop是boss线程的话, 这里就是op_accept事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }

 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) 这里的判断表示轮询到大事件是op_read或者op_accept事件源码分析

以前的章节分析过, 若是当前NioEventLoop是work线程的话, 那么这里就是op_read事件, 也就是读事件, 表示客户端发来了数据流学习

这里会调用unsafe的redis()方法进行读取spa

若是是work线程, 那么这里的channel是NioServerSocketChannel, 其绑定的unsafe是NioByteUnsafe, 这里会走进NioByteUnsafe的read()方法中:.net

public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; } allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }

首先获取SocketChannel的config, pipeline等相关属性线程

 final ByteBufAllocator allocator = config.getAllocator(); 这一步是获取一个ByteBuf的内存分配器, 用于分配ByteBuf

这里会走到DefaultChannelConfig的getAllocator方法中:

public ByteBufAllocator getAllocator() { return allocator; }

这里返回的DefualtChannelConfig的成员变量, 咱们看这个成员变量:

private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;

这里调用ByteBufAllocator的属性DEFAULT, 跟进去:

ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;

咱们看到这里又调用了ByteBufUtil的静态属性DEFAULT_ALLOCATOR, 再跟进去:

static final ByteBufAllocator DEFAULT_ALLOCATOR;

DEFAULT_ALLOCATOR这个属性是在static块中初始化的

咱们跟到static块中:

static { String allocType = SystemPropertyUtil.get( "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled"); allocType = allocType.toLowerCase(Locale.US).trim(); ByteBufAllocator alloc; if ("unpooled".equals(allocType)) { alloc = UnpooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else if ("pooled".equals(allocType)) { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType); } DEFAULT_ALLOCATOR = alloc; //代码省略
}

首先判断运行环境是否是安卓, 若是不是安卓, 在返回"pooled"字符串保存在allocType中

而后经过if判断, 最后局部变量alloc = PooledByteBufAllocator.DEFAULT, 最后将alloc赋值到成员变量DEFAULT_ALLOCATOR

咱们跟到PooledByteBufAllocator的DEFAULT属性中:

public static final PooledByteBufAllocator DEFAULT =
        new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());

咱们看到这里直接经过new的方式, 建立了一个PooledByteBufAllocator对象, 也就是基于申请一块连续内存进行缓冲区分配的缓冲区分配器

缓冲区分配器的知识, 咱们以前小节进行了详细的剖析, 这里就再也不赘述

回到NioByteUnsafe的read()方法中:

public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; } allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }

这里 ByteBufAllocator allocator = config.getAllocator()中的allocator , 就是PooledByteBufAllocator

 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle()  是建立一个handle, 咱们以前的章节讲过, handle是对RecvByteBufAllocator进行实际操做的对象

咱们跟进recvBufAllocHandle:

public RecvByteBufAllocator.Handle recvBufAllocHandle() { //若是不存在, 则建立一个handle的实例
    if (recvHandle == null) { recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle; }

这里是咱们以前剖析过的逻辑, 若是不存在, 则建立handle的实例, 具体建立过程咱们能够回顾第三章的第二小节, 这里就再也不赘述

一样allocHandle.reset(config)是将配置重置, 第三章的第二小节也对其进行过剖析

重置完配置以后, 进行do-while循环, 有关循环终止条件allocHandle.continueReading(), 以前小节也有过详细剖析, 这里也再也不赘述

在do-while循环中, 首先看 byteBuf = allocHandle.allocate(allocator) 这一步, 这里传入了刚才建立的allocate对象, 也就是PooledByteBufAllocator:

这里会跑到DefaultMaxMessagesRecvByteBufAllocator类的allocate方法中:

public ByteBuf allocate(ByteBufAllocator alloc) { return alloc.ioBuffer(guess()); }

这里的guess方法, 会调用AdaptiveRecvByteBufAllocator的guess方法:

public int guess() { return nextReceiveBufferSize; }

这里会返回AdaptiveRecvByteBufAllocator的成员变量nextReceiveBufferSize, 也就是下次所分配缓冲区的大小, 根据咱们以前学习的内容, 第一次分配的时候会分配初始大小, 也就是1024字节

回到DefaultMaxMessagesRecvByteBufAllocator类的allocate方法中:

这样, alloc.ioBuffer(guess())就会分配一个PooledByteBuf

咱们跟到AbstractByteBufAllocator的ioBuffer方法中:

public ByteBuf ioBuffer(int initialCapacity) { if (PlatformDependent.hasUnsafe()) { return directBuffer(initialCapacity); } return heapBuffer(initialCapacity); }

这里首先判断是否能获取jdk的unsafe对象, 默认为true, 因此会走到directBuffer(initialCapacity)中, 这里最终会分配一个PooledUnsafeDirectByteBuf对象, 具体分配流程咱们再以前小节作过详细剖析

回到NioByteUnsafe的read()方法中:

分配完了ByteBuf以后, 再看这一步allocHandle.lastBytesRead(doReadBytes(byteBuf)):

首先看参数doReadBytes(byteBuf)方法, 这步是将channel中的数据读取到咱们刚分配的ByteBuf中, 并返回读取到的字节数

这里会调用到NioSocketChannel的doReadBytes方法:

protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }

首先拿到绑定在channel中的handler, 由于咱们已经建立了handle, 因此这里会直接拿到

再看allocHandle.attemptedBytesRead(byteBuf.writableBytes())这步, byteBuf.writableBytes()返回byteBuf的可写字节数, 也就是最多能从channel中读取多少字节写到ByteBuf, allocate的attemptedBytesRead会把可写字节数设置到DefaultMaxMessagesRecvByteBufAllocator 类的attemptedBytesRead属性中

跟到DefaultMaxMessagesRecvByteBufAllocator中的attemptedBytesRead咱们会看到:

public void attemptedBytesRead(int bytes) { attemptedBytesRead = bytes; }

 

继续看doReadBytes方法:

最后, 经过byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead())将jdk底层的channel中的数据写入到咱们建立的ByteBuf中, 并返回实际写入的字节数

回到NioByteUnsafe的read()方法中:

继续看allocHandle.lastBytesRead(doReadBytes(byteBuf))这步

刚才咱们剖析过doReadBytes(byteBuf)返回的是世界写入ByteBuf的字节数

再看lastBytesRead方法, 跟到DefaultMaxMessagesRecvByteBufAllocator的lastBytesRead方法中:

public final void lastBytesRead(int bytes) { lastBytesRead = bytes; totalBytesRead += bytes; if (totalBytesRead < 0) { totalBytesRead = Integer.MAX_VALUE; } }

这里会赋值两个属性, lastBytesRead表明最后读取的字节数, 这里赋值为咱们刚才写入ByteBuf的字节数, totalBytesRead表示总共读取的字节数, 这里将写入的字节数追加

继续看NioByteUnsafe的read()方法:

若是最后一次读取数据为0, 说明已经将channel中的数据所有读取完毕, 将新建立的ByteBuf释放循环利用, 并跳出循环

allocHandle.incMessagesRead(1)这步是增长消息的读取次数, 由于咱们循环最多16次, 因此当增长消息次数增长到16会结束循环

读取完毕以后, 会经过pipeline.fireChannelRead(byteBuf)将传递channelRead事件, 有关channelRead事件, 咱们在第四章也进行了详细的剖析

这里读者会有疑问, 若是一次读取不完, 就传递channelRead事件, 那么server接收到的数据有可能就是不完整的, 其实关于这点, netty也作了相应的处理, 咱们会在以后的章节详细剖析netty的半包处理机制

 

循环结束后, 会执行到allocHandle.readComplete()这一步

咱们知道第一次分配ByteBuf的初始容量是1024, 可是初始容量不必定必定知足全部的业务场景, netty中, 将每次读取数据的字节数进行记录, 而后以后次分配ByteBuf的时候, 容量会尽量的符合业务场景所须要大小, 具体实现方式, 就是在readComplete()这一步体现的

咱们跟到AdaptiveRecvByteBufAllocator的readComplete()方法中:

public void readComplete() { record(totalBytesRead()); }

这里调用了record方法, 而且传入了这一次所读取的字节总数

跟到record方法中:

private void record(int actualReadBytes) { if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) { if (decreaseNow) { index = Math.max(index - INDEX_DECREMENT, minIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } else { decreaseNow = true; } } else if (actualReadBytes >= nextReceiveBufferSize) { index = Math.min(index + INDEX_INCREMENT, maxIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } }

首先看判断条件 if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) 

这里index是当前分配的缓冲区大小所在的SIZE_TABLE中的索引, 将这个索引进行缩进, 而后根据缩进后的因此找出SIZE_TABLE中所存储的内存值, 再判断是否大于等于此次读取的最大字节数, 若是条件成立, 说明分配的内存过大, 须要缩容操做, 咱们看if块中缩容相关的逻辑

首先 if (decreaseNow) 会判断是否马上进行收缩操做, 一般第一次不会进行收缩操做, 而后会将decreaseNow设置为true, 表明下一次直接进行收缩操做

假设须要马上进行收缩操做, 咱们看收缩操做的相关逻辑:

 index = Math.max(index - INDEX_DECREMENT, minIndex) 这一步将索引缩进一步, 但不能小于最小索引值

而后经过 nextReceiveBufferSize = SIZE_TABLE[index] 获取设置索引以后的内存, 赋值在nextReceiveBufferSize, 也就是下次须要分配的大小, 下次就会根据这个大小分配ByteBuf了, 这样就实现了缩容操做

再看 else if (actualReadBytes >= nextReceiveBufferSize) 

这里判断此次读取字节的总量比上次分配的大小还要大, 则进行扩容操做

扩容操做也很简单, 索引步进, 而后拿到步进后的索引所对应的内存值, 做为下次所须要分配的大小

NioByteUnsafe的read()方法中:

通过了缩容或者扩容操做以后, 经过pipeline.fireChannelReadComplete()传播ChannelReadComplete()事件

以上就是读取客户端消息的相关流程

 

 

第五章总结

 

        本章主要剖析了ByteBuf的基本操做以及缓冲区分配等相关知识.

 

        缓冲区分配, 分为经过调用jdk的api的方式和分配一块连续内存的方式

 

        其中, 经过分配连续内存的方式分配缓冲区中, 又介绍了在page级别分配的逻辑和在subpage级别分配的逻辑

 

        page级别分配时经过操做内存二叉树的方式记录分配状况

 

        subpage级别分配是经过位图的方式记录分配状况

 

        最后介绍了NioSocketChannel处理读事件的相关逻辑

 

        整体来讲, 这一章的内容难度是比较大的, 但愿同窗课后经过多调试的方式进行熟练掌握

 

 上一节: ByteBuf的回收

 下一节: ByteToMessageDecoder

相关文章
相关标签/搜索