Netty源码分析第2章(NioEventLoop)---->第4节: NioEventLoop线程的启动

 

Netty源码分析第二章: NioEventLoophtml

 

第四节: NioEventLoop线程的启动promise

 

以前的小节咱们学习了NioEventLoop的建立以及线程分配器的初始化, 那么NioEventLoop是如何开启的呢, 咱们这一小节继续学习服务器

NioEventLoop的开启方法在其父类SingleThreadEventExecutor中的execute(Runnable task)方法中, 咱们跟到这个方法:异步

@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } //判断当前线程是否是eventLoop线程
    boolean inEventLoop = inEventLoop(); //若是是eventLoop线程
    if (inEventLoop) { addTask(task); } else { //不是eventLoop线程启动线程
 startThread(); //添加task
 addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }

这个方法传入一个Runnble对象, 也就是一个任务ide

首先boolean inEventLoop = inEventLoop()方法会判断是否是NioEventLoop线程
oop

跟进 inEventLoop()方法:源码分析

@Override public boolean inEventLoop() { return inEventLoop(Thread.currentThread()); }

这里inEventLoop(Thread.currentThread())方法传入了当前线程对象, 这个方法会调用当前类的inEventLoop(Thread thread)方法
学习

跟进inEventLoop(Thread thread)方法:优化

@Override public boolean inEventLoop(Thread thread) { return thread == this.thread; }

咱们看到判断的依据是当前线程对象是否是NioEventLoop绑定的线程对象, 这里咱们会想到开启线程确定会为NioEventLoop绑定一个线程对象, 若是判断当前线程对象不是当前NioEventLoop绑定的线程对象, 说明执行此方法的线程不是当前NioEventLoop线程, 那么这个线程如何初始化的, 后面咱们会讲到, 咱们继续看execute(Runnable task)方法:this

若是是NioEventLoop线程, 则会经过addTask(task)添加任务, 经过NioEventLoop异步执行, 那么这个task是何时执行的, 一样后面会讲到

跟一下addTask(task):

protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } //若是添加不成功
    if (!offerTask(task)) { reject(task); } }

这里offerTask(task)表明添加一个task, 跟进去:

final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } //往taskQ中添加一个task
    return taskQueue.offer(task); }

咱们看到taskQueue.offer(task)将一个task添加到任务队列, 而这个任务队列taskQueue就是咱们NioEventLoop初始化的时候与NioEventLoop惟一绑定的任务队列

回顾一下初始构造方法:

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }

在这里经过 taskQueue = newTaskQueue(this.maxPendingTasks) 建立了taskQueue

回到execute(Runnable task)方法中, 咱们继续往下看:

若是不是NioEventLoop线程咱们经过startThread()开启一个NioEventLoop线程

跟到startThread()以前, 咱们先继续往下走:

开启NioEventLoop线程以后, 又经过addTask(task)taskQueue添加任务

 

最后咱们注意有这么一段代码:

if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); }

addTaskWakesUp表明添加task以后, NioEventLoopselect()操做是否是要唤醒, 这个属性是在初始化NioEventLoop的时候传入的, 你们能够回顾下, 默认是false, 这里!addTaskWakesUp就是须要唤醒, wakesUpForTask(task)addTaskWakesUp意义相同, 默认是true, 能够看代码:

protected boolean wakesUpForTask(Runnable task) { return true; }

这里恒为true, 因此这段代码就是添加task时须要经过wakeup(inEventLoop)唤醒, 这样NioEventLoop在作select()操做时若是正在阻塞则马上唤醒, 而后执行任务队列的task

回到execute(Runnable task)方法中咱们跟进开启线程的startThread()方法中:

private void startThread() { //判断线程是否启动, 未启动则启动
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { //当前线程未启动, 则启动
 doStartThread(); } } }

前面的判断是判断当前NioEventLoop线程是否启动, 若是未启动, 则经过doStartThread()方法启动, 咱们第一次执行execute(Runnable task)线程是未启动的, 因此会执行doStartThread(), 后续该线程则不会再执行doStartThread()方法

咱们跟进doStartThread()方法中:

private void doStartThread() { assert thread == null; //线程执行器执行线程(全部的eventLoop共用一个线程执行器)
    executor.execute(new Runnable() { @Override public void run() { //将当前线程复制给属性
            thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { //开始轮询
                SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { //代码省略
 } } }); }

咱们重点关注executor.execute()这个方法, 其中executor就是咱们建立NioEventLoop的线程器, execute()就是开启一个线程

回顾下execute()方法:

public void execute(Runnable command) { //起一个线程
 threadFactory.newThread(command).start(); }

咱们看到经过线程工厂开启一个线程, 因为前面的小节已经剖析, 这里再也不赘述

 

开启线程则执行Runnble类中的run()方法, 咱们看到在run()方法里经过 thread = Thread.currentThread() 将新开启的线程对象赋值NioEventLoopthread的属性, 这样就能够经过线程对象的判断, 来肯定是否是NioEventLoop线程了

 

后面咱们看到 SingleThreadEventExecutor.this.run() , 这里this, 就是当前NioEventLoop对象, 而这里的run()方法, 就是NioEventLoop中的run()方法, 在这个run()方法中, 真正开始了selector的轮询工做, 对于run()方法的详细剖析, 咱们会在以后的小节中进行

 

刚才咱们剖析了NioEventLoop的启动方法, 那么根据咱们的分析, 就是第一次调用NioEventLoopexecute(Runnable task)方法的时候, 则会开启NioEventLoop线程, 以后的调用只是往taskQueue中添加任务, 那么第一次是何时开启的呢?这里咱们要回顾上一章讲过的内容

 

上一章中咱们讲过在AbstractServerBootstrap中有个initAndRegister()方法, 这个方法主要用于channel的初始化和注册, 其中注册的代码为:

ChannelFuture regFuture = config().group().register(channel);

其中group()咱们剖析过是Boss线程的group, 咱们剖析过其中的register(channel)方法:

public ChannelFuture register(Channel channel) { return next().register(channel); }

首先跟到next()方法:

public EventLoop next() { return (EventLoop) super.next(); }

首先调用了其父类MultithreadEventExecutorGroupnext方法, 跟进去:

public EventExecutor next() { return chooser.next(); }

这里chooser, 就是初始化NioEventLoopGroup的线程选择器, 为此分配了不一样的策略, 这里再也不赘述, 经过这个方法, 返回一个NioEventLoop线程

 

回到MultithreadEventLoopGroup类的register()方法中, next().register(channel)表明分配后的NioEventLoopregister()方法, 这里会调用NioEventLoop的父类SingleThreadEventLoop类中的register()方法

跟到SingleThreadEventLoop类中的register()方法:

public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }

DefaultChannelPromise是一个监听器, 它会跟随channel的读写进行监听, 绑定传入的channelNioEventLoop, 有关Promise后面的章节会讲到

这里咱们继续跟进register(new DefaultChannelPromise(channel, this))

public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }

unsafe()方法返回建立channel初始化的unsafe()对象, 若是是NioSeverSocketChannel, 则绑定NioMessageUnsafe对象, 上一小节进行剖析过这里再也不赘述

 

最终这个unsafe对象会调用到AbstractChannel的内部类AbstractUnsafe中的register()方法, 这里register(), 不管是客户端channel和服务器channel都会经过这个一个register注册, 在之后的客户端接入章节中咱们会看到

这里咱们继续看register方法:

public final void register(EventLoop eventLoop, final ChannelPromise promise) { //代码省略 //全部的复制操做, 都交给eventLoop处理(1)
    AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { //作实际主注册(2)
 register0(promise); } }); } catch (Throwable t) { //代码省略
 } } }

这里咱们上一小节分析过, 再也不陌生, 这里只分析有关NioEventLoop相关的内容

咱们首先看到 AbstractChannel.this.eventLoop = eventLoop , 获取当前channelNioEventLoop, 经过上一章的学习, 咱们知道每一个channel建立的时候会绑定一个NioEventLoop

这里经过eventLoop.inEventLoop()判断当前线程是不是NioEventLoop线程, inEventLoop()方法在前面的小节剖析过, 这里再也不赘述

 

若是是NioEventLoop线程则经过register0(promise)方法作实际的注册, 可是咱们第一次执行注册方法的时候, 若是是服务器channel是则是由server的用户线程执行的, 若是是客户端channel, 则是由Boss线程执行的, 因此走到这里均不是当前channelNioEventLoop的线程, 因而会走到下面的eventLoop.execute()方法中

 

eventLoop.execute()上一小节剖析过, 就是将task添加到taskQueue中而且开启器NioEventLoop线程, 因此, 在这里就开启了NioEventLoop线程, 有关开启步骤, 能够经过上一小节内容进行回顾

 

这里注意一点, 有的资料会讲第一次开启NioEventLoop线程是在AbstractBootstrapdoBind0(regFuture, channel, localAddress, promise)方法中开启的, 我的通过debug和分析, 实际上并非那样的, 但愿你们不要被误导

 

 

简单看下doBind0(regFuture, channel, localAddress, promise)方法:

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { //绑定端口
 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }

这里虽然调用了eventLoopexecute()方法, 可是eventLoop线程在注册期间已经启动, 因此这里不会重复启动, 只会将任务添加到taskQueue

 

其实这里咱们也可以看出, 其实绑定端口的相关操做, 一样是也是eventLoop线程中执行的

 

上一节: 初始化线程选择器

下一节: 优化selector

相关文章
相关标签/搜索