Netty源码分析第3章(客户端接入流程)---->第2节: 处理接入事件之handle的建立

 

Netty源码分析第三章: 客户端接入流程html

 

第二节: 处理接入事件之handle的建立数组

 

上一小节咱们剖析完成了与channel绑定的ChannelConfig初始化相关的流程, 这一小节继续剖析客户端链接事件的处理oop

回到上一章NioEventLoop的processSelectedKey ()方法:源码分析

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { //获取到channel中的unsafe
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //若是这个key不是合法的, 说明这个channel可能有问题
    if (!k.isValid()) { //代码省略
 } try { //若是是合法的, 拿到key的io事件
        int readyOps = k.readyOps(); //连接事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } //写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } //读事件和接受连接事件 //若是当前NioEventLoop是work线程的话, 这里就是op_read事件 //若是是当前NioEventLoop是boss线程的话, 这里就是op_accept事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }

咱们看其中的if判断:this

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)

上一小节咱们分析过, 若是当前NioEventLoop是work线程的话, 这里就是op_read事件, 若是是当前NioEventLoop是boss线程的话, 这里就是op_accept事件, 这里咱们以boss线程为例进行分析spa

 

以前咱们讲过, 不管处理op_read事件仍是op_accept事件, 都走的unsafe的read()方法, 这里unsafe是经过channel拿到, 咱们知道若是是处理accept事件, 这里的channel是NioServerSocketChannel, 这里与之绑定的unsafe是NioMessageUnsafe线程

咱们跟到NioMessageUnsafe的read()方法:rest

public void read() { //必须是NioEventLoop方法调用的, 不能经过外部线程调用
    assert eventLoop().inEventLoop(); //服务端channel的config
    final ChannelConfig config = config(); //服务端channel的pipeline
    final ChannelPipeline pipeline = pipeline(); //处理服务端接入的速率
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //设置配置
 allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //建立jdk底层的channel //readBuf用于临时承载读到连接
                int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //分配器将读到的连接进行计数
 allocHandle.incMessagesRead(localRead); //链接数是否超过最大值
            } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍历每一条客户端链接
        for (int i = 0; i < size; i ++) { readPending = false; //传递事件, 将建立NioSokectChannel进行传递 //最终会调用ServerBootstrap的内部类ServerBootstrapAcceptor的channelRead()方法
 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //代码省略
    } finally { //代码省略
 } }

首先获取与NioServerSocketChannel绑定config和pipeline, config咱们上一小节进行分析过, pipeline咱们将在下一章进行剖析code

咱们看这一句:htm

final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

这里经过RecvByteBufAllocator接口调用了其内部接口Handler

咱们看其RecvByteBufAllocator接口:

public interface RecvByteBufAllocator { Handle newHandle(); interface Handle { int guess(); void reset(ChannelConfig config); void incMessagesRead(int numMessages); void lastBytesRead(int bytes); int lastBytesRead(); void attemptedBytesRead(int bytes); int attemptedBytesRead(); boolean continueReading(); void readComplete(); } }

咱们看到RecvByteBufAllocator接口只有一个方法newHandle(), 顾名思义就是用于建立Handle对象的方法, 而Handle中的方法, 才是实际用于操做的方法

在RecvByteBufAllocator实现类中包含Handle的子类, 具体实现关系以下:

3-2-1

回到read()方法中再看这段代码:

final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

unsafe()返回当前channel绑定的unsafe对象, recvBufAllocHandle()最终会调用AbstractChannel内部类AbstractUnsafe的recvBufAllocHandle()方法

跟进AbstractUnsafe的recvBufAllocHandle()方法:

public RecvByteBufAllocator.Handle recvBufAllocHandle() { //若是不存在, 则建立一个recvHandle的实例
    if (recvHandle == null) { recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle; }

若是若是是第一次执行到这里, 自身属性recvHandle为空, 会建立一个recvHandle实例, config()返回NioServerSocketChannel绑定的ChannelConfig, getRecvByteBufAllocator()获取其RecvByteBufAllocator对象, 这两部分上一小节剖析过了, 这里经过newHandle()建立一个Handle, 这里会走到AdaptiveRecvByteBufAllocator类中的newHandle()方法中

跟进newHandle()方法中:

public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); }

这里建立HandleImpl传入了三个参数, 这三个参数咱们上一小节剖析过, minIndex为最小内存在SIZE_TABLE中的下标, maxIndex为最大内存在SEIZE_TABEL中的下标, initial是初始内存, 咱们跟到HandleImpl的构造方法中:

public HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex; index = getSizeTableIndex(initial); nextReceiveBufferSize = SIZE_TABLE[index]; }

初始化minIndex和maxIndex, 根据initial找到当前的下标, nextReceiveBufferSize是根据当前的下标找到对应的内存

这样, 咱们就建立了个Handle对象

在这里咱们须要知道, 这个handle, 是和channel惟一绑定的属性, 而AdaptiveRecvByteBufAllocator对象是和ChannelConfig对象惟一绑定的, 间接也是和channel进行惟一绑定

 

继续回到read()方法:

public void read() { //必须是NioEventLoop方法调用的, 不能经过外部线程调用
    assert eventLoop().inEventLoop(); //服务端channel的config
    final ChannelConfig config = config(); //服务端channel的pipeline
    final ChannelPipeline pipeline = pipeline(); //处理服务端接入的速率
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //设置配置
 allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //建立jdk底层的channel //readBuf用于临时承载读到连接
                int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //分配器将读到的连接进行计数
 allocHandle.incMessagesRead(localRead); //链接数是否超过最大值
            } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍历每一条客户端链接
        for (int i = 0; i < size; i ++) { readPending = false; //传递事件, 将建立NioSokectChannel进行传递 //最终会调用ServerBootstrap的内部类ServerBootstrapAcceptor的channelRead()方法
 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //代码省略
    } finally { //代码省略
 } }

继续往下跟:

allocHandle.reset(config);

这个段代码是从新设置配置, 也就是将以前的配置信息进行初始化, 最终会走到, DefaultMaxMessagesRecvByteBufAllocator中的内部类MaxMessageHandle的reet中

咱们跟进reset中:

public void reset(ChannelConfig config) { this.config = config; maxMessagePerRead = maxMessagesPerRead(); totalMessages = totalBytesRead = 0; }

这里仅仅对几个属性作了赋值, 简单介绍下这几个属性:

config:当前channelConfig对象

maxMessagePerRead:表示读取消息的时候能够读取几回(循环次数), maxMessagesPerRead()返回的是RecvByteBufAllocator的maxMessagesPerRead属性, 上一小节已经作过剖析

totalMessages:表明目前读循环已经读取的消息个数, 在NIO传输模式下也就是已经执行的循环次数, 这里初始化为0

totalBytesRead:表明目前已经读取到的消息字节总数, 这里一样也初始化为0

咱们继续往下走, 这里首先是一个do-while循环, 循环体里经过int localRead = doReadMessages(readBuf)这种方式将读取到的链接数放入到一个List集合中, 这一步咱们下一小节再分析, 咱们继续往下走:

 

咱们首先看allocHandle.incMessagesRead(localRead)这一步, 这里的localRead表示此次循环往readBuf中放入的链接数, 在Nio模式下这, 若是读取到一条链接会返回1

跟到中的MaxMessageHandle的incMessagesRead(int amt)方法中:

public final void incMessagesRead(int amt) { totalMessages += amt; }

这里将totalMessages增长amt, 也就是+1

这里totalMessage, 刚才已经剖析过, 在NIO传输模式下也就是已经执行的循环次数, 这里每次执行一次循环都会加一

再去看循环终止条件allocHandle.continueReading()

跟到MaxMessageHandle的continueReading()方法中:

public boolean continueReading() { //config.isAutoRead()默认返回true // totalMessages < maxMessagePerRead //totalMessages表明当前读到的连接, 默认是1 //maxMessagePerRead每一次最大读多少连接(默认16)
    return config.isAutoRead() && attemptedBytesRead == lastBytesRead && totalMessages < maxMessagePerRead && totalBytesRead < Integer.MAX_VALUE; }

咱们逐个分析判断条件:

config.isAutoRead(): 这里默认为true

attemptedBytesRead == lastBytesRead: 表示本次读取的字节数和最后一次读取的字节数相等, 由于到这里都没有进行字节数组的读取操做, 因此默认都为0, 这里也返回true

totalMessages < maxMessagePerRead: 表示当前读取的次数是否小于最大读取次数, 咱们知道totalMessages每次循环都会自增, 而maxMessagePerRead默认值为16, 因此这里会限制循环不能超过16次, 也就是最多一次只能读取16条链接

totalBytesRead < Integer.MAX_VALUE: 表示读取的字节数不能超过int类型的最大值

这里就剖析完了Handle的建立和初始化过程, 而且剖析了循环终止条件等相关的逻辑

 

上一节: 初始化NioSocketChannelConfig

下一节: NioSocketChannel的建立

相关文章
相关标签/搜索