netty源码分析2-3-server启动-动态分析

分享内容如下

  1. client请求连接后,sever端的处理
  2. 网络标识位的变化
  3. 子Reactor启动

1.client请求连接后,sever端的处理

 

 

 

client连接时创建NioSocketChannel,触发DefaultChannelPipeline.read()

AbstractNioMessageChannel$NioMessageUnsafe.read()

@Override

public void read() {

for (;;) {

//创建

int localRead = doReadMessages(readBuf);

}

 

//。。。。。。。。。。。。

int size = readBuf.size();

for (int i = 0; i < size; i ++) {

//触发ServerBootstrapAcceptor channelRead 为 新创建的NioSocketChannel注册read事件

pipeline.fireChannelRead(readBuf.get(i));

}

readBuf.clear();

//触发DefaultChannelPipeline.read() ,最终去修改网络标识位,还没有改?

pipeline.fireChannelReadComplete();

//。。。。。。。。。。。

pipeline.fireExceptionCaught(exception);

}

}

//使用childEventGroup创建NioSocketChannel,装入buf中

@Override

protected int doReadMessages(List<Object> buf) throws Exception {

SocketChannel ch = javaChannel().accept();

//。。。。。。

buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));

//。。。。。。。

return 0;

}

 

public ChannelPipeline fireChannelReadComplete() {

head.fireChannelReadComplete();

//又执行 NioSocketChannel(AbstractNioChannel).doBeginRead() 的原因?

if (channel.config().isAutoRead()) {

read();

}

return this;

}

 

 

ServerBootstrapAcceptor-channelRead

//添加childHandler到pipeline执行链中,设置参数,执行register

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) {

Channel child = (Channel) msg;

 

child.pipeline().addLast(childHandler);//

 

for (Entry<ChannelOption<?>, Object> e: childOptions) {

try {

if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {

logger.warn("Unknown channel option: " + e);

}

} catch (Throwable t) {

logger.warn("Failed to set a channel option: " + child, t);

}

}

 

for (Entry<AttributeKey<?>, Object> e: childAttrs) {

child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());

}

//执行register

child.unsafe().register(child.newPromise());

}

开篇图中的过程伴随着网络标志位interestOps的变化,搞懂变化的过程,对理解netty server,client启动的过程有很大帮助。

此部分中sever端 负责IO操作的selector是新的 与server端负责连接请求的selector不同, 此处的selector eventLoop().selector获取的。

 

2.网络标识位的变化

server端第一次执行register(eventLoop().selector, 0, this); 此处interestOps=0,什么时候 interestOps=SelectionKey.OP_ACCEPT?

NioServerSocketChannel 初始化时 传入 SelectionKey.OP_ACCEPT 。

ServerBootstrap 执行bind(),调用register()执行fireChannelActive ,更新了 interestOps=SelectionKey.OP_ACCEPT

 

NioServerSocketChannel更新interestOps调用链如下

NioServerSocketChannel(AbstractNioChannel).doBeginRead() line: 330

AbstractNioMessageChannel$NioMessageUnsafe(AbstractChannel$AbstractUnsafe).beginRead() line: 585

DefaultChannelPipeline$HeadHandler.read(ChannelHandlerContext) line: 1055

ChannelHandlerInvokerUtil.invokeReadNow(ChannelHandlerContext) line: 132

DefaultChannelHandlerInvoker.invokeRead(ChannelHandlerContext) line: 263

DefaultChannelHandlerContext.read() line: 403

DefaultChannelPipeline.read() line: 923

NioServerSocketChannel(AbstractChannel).read() line: 214

DefaultChannelPipeline.fireChannelActive() line: 819

AbstractChannel$AbstractUnsafe$2.run() line: 474

NioEventLoop(SingleThreadEventExecutor).runAllTasks(long) line: 318 ???

NioEventLoop.run() line: 353

SingleThreadEventExecutor$5.run() line: 794

Thread.run() line: 745

更新 interestOps=SelectionKey.OP_ACCEPT的代码如下

AbstractNioChannel

//更新interestOps(网络标识位)

protected void doBeginRead() throws Exception {

if (inputShutdown) {

return;

}

 

final SelectionKey selectionKey = this.selectionKey;

if (!selectionKey.isValid()) {

return;

}

 

final int interestOps = selectionKey.interestOps();

if ((interestOps & readInterestOp) == 0) {

selectionKey.interestOps(interestOps | readInterestOp);

}

}

 

chilldGroup执行register后 什么时候更改网络标志位为读即interestOps=SelectionKey.OP_READ?

客户端发起连接请求后更改的

调用链如下

NioSocketChannel(AbstractNioChannel).doBeginRead() line: 331

AbstractNioByteChannel$NioByteUnsafe(AbstractChannel$AbstractUnsafe).beginRead() line: 585

DefaultChannelPipeline$HeadHandler.read(ChannelHandlerContext) line: 1055

ChannelHandlerInvokerUtil.invokeReadNow(ChannelHandlerContext) line: 132

DefaultChannelHandlerInvoker.invokeRead(ChannelHandlerContext) line: 263

DefaultChannelHandlerContext.read() line: 403

DefaultChannelPipeline.read() line: 923

NioSocketChannel(AbstractChannel).read() line: 214

DefaultChannelPipeline.fireChannelActive() line: 819

AbstractNioByteChannel$NioByteUnsafe(AbstractChannel$AbstractUnsafe).register0(ChannelPromise) line: 429

AbstractChannel$AbstractUnsafe.access$100(AbstractChannel$AbstractUnsafe, ChannelPromise) line: 369

AbstractChannel$AbstractUnsafe$1.run() line: 403

NioEventLoop(SingleThreadEventExecutor).runAllTasks(long) line: 318

NioEventLoop.run() line: 353

SingleThreadEventExecutor$5.run() line: 794

Thread.run() line: 745

更新网络标准位的处理与server处理相同。

ServerBootstrapAcceptor-channelRead 执行了 child.unsafe().register(child.newPromise());

 

重点关注:AbstractNioByteChannel$NioByteUnsafe(AbstractChannel$AbstractUnsafe).register0(ChannelPromise) line: 429

private void register0(ChannelPromise promise) {

try {

// check if the channel is still open as it could be closed in the mean time when the register

// call was outside of the eventLoop

if (!ensureOpen(promise)) {

return;

}

doRegister();

registered = true;

promise.setSuccess();

pipeline.fireChannelRegistered();

// 此时isActive()==true,执行了ChannelActive事件,更新了interestOps=SelectionKey.OP_READ

//注意NioServerSocketChannel执行register执行到了这里,但是isActive()=fase

if (isActive()) {

pipeline.fireChannelActive();

}

} catch (Throwable t) {

//。。。。。

}

}

io.netty.channel.AbstractNioChannel

protected void doRegister() throws Exception {

boolean selected = false;

for (;;) {

//初始化注册 网络标识位为0

selectionKey = javaChannel().register(eventLoop().selector, 0, this);

}

}

isActive()的不同实现

//NioSocketChannel类的判断,即是否连接

public boolean isActive() {

SocketChannel ch = javaChannel();

return ch.isOpen() && ch.isConnected();

}

NioServerSocketChannel类的判断,即是否已经绑定,注意 执行accpet后,这个判断就等于true了

public boolean isActive() {

return javaChannel().socket().isBound();

}

创建NioSocketChannel时设置网络标识位为 SelectionKey.OP_READ

protected AbstractNioByteChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch) {

super(parent, eventLoop, ch, SelectionKey.OP_READ);

}

 

只有 isActive()等于true才能执行 pipeline.fireChannelActive(),创建服务端channel(NioServerSocketChannel)时执行 bind后才能触发,创建客户端连接(NioSocketChannel)的时候会立执行accept , 然后isActive()等于true, pipeline.fireChannelActive() 立即被触发。

pipeline.fireChannelActive(); 触发了创建了 网络标识位的更新, 这个值是在 创建 SocketChannel创建的,NioServerSocketChannel的值是 SelectionKey.OP_ACCEPT,NioSocketChannel的值是 SelectionKey.OP_READ。

 

综上所述,服务端的在执行完bind后 才会将网络位更新成SelectionKey.OP_ACCEPT,而客户端连接会直接更新成SelectionKey.OP_READ(这里面跳过了执行注册,并设置网络标识位为0的步骤)

子Reactor启动

子Reactor也是在执行 register过程中启动的

第一次执行register

pipeline.fireChannelRegistered(); -- ChannelInitializer channelRegistered channel add ServerBootStrapAccpter

第二次执行register

pipeline.fireChannelRegistered(); child channel add childHandler

fireChannelRead

客户端连接触发,最后调用ServerBootStrapAccpter channelRead()

 

执行bind触发

pipeline.fireChannelActive(); 修改网络标志位 sever channel update SelectionKey.OP_ACCEPT

ServerBootStrapAccpter channelRead() 执行 register 触发

pipeline.fireChannelActive(); 修改网络标志位 sever channel update SelectionKey.OP_ACCEPT