NioEventLoopGroup初始化

本文是我对Netty的NioEventLoopGroup及NioEventLoop初始化工做的源码阅读笔记, 以下图,是Netty的Reactor线程模型图,本文描述NioEventLoopGroup等价于我在图中标红的MainReactor组件,全篇围绕它的初始化展开,不免地方理解的不正确,欢迎留言java

Reactor线程模型图

在Nio网络编程模型的图示是下面那张图, 单条Thread全职执行一个Selector,首先是服务端在启动的时候,会把表明服务端的ServerSockerChannel注册进Selector,且感兴趣的事件是Accept, 一旦有客户端请求创建链接,ServerSockerChannel的accept事件就会被Selector感知到,进行下一步处理算法

NIO模型图

对NioEventLoopGroup最感性的认识,是在必定程度上,它实际上是对上图组件的封装,那么封装了哪些部分呢?编程

  • 对Thread的封装

NioEventLoopGroup维护的是事件循环,EventLoop, 在Netty的主从Reactor线程模型中,两个事件循环组其实也是线程组,由于每个EventLoop在他的整个生命周期中都始终和一条线程惟一绑定,EventLoop的线程使用的是它本身封装的FastThreadLocalThread, 这条线程使得EventLoop有了处理事件的能力设计模式

  • 对Selector的封装

NioEventLoopGroup维护的是事件循环,EventLoop,一样维护着属于本身的Selector选择器,这个选择器使得EventLoop拥有了轮询绑定在本身身上的Channel的能力. 而且Netty对JDK原生的选择器作出了升级,使用自定义的数组替换了原生Selector的HashSet集合SelectedKeys,使得时间的复杂度在任什么时候刻都是O1api

如今看,每个EventLoop都是一个工做单元, 它的初始化过程是怎样的? 下面就开始正式的阅读源码数组

继承体系图

上图是一个简化了体系图,,慢慢撸出他们的关系安全

流程图

这个图使咱们下面源码的流程图, 完成NioEventLoopGroup的初始化网络

入口:数据结构

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

通过几层this()构造方法的调用,咱们来到它的这个构造方法, 它调用了父类的构造方法,带上了默认的select策略,以及拒绝执行任务的handler多线程

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                     final SelectStrategyFactory selectStrategyFactory) {
//todo  0   null        根据系统选出 nioXXXprovider      默认的选择策略
// todo 调用父类的构造方法   MultithreadEventLoopGroup  多线程的事件循环组
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

接着进入super() , 进入MultiThreadEventLoopGroup 多线程事件循环组, 它干了件大事, 初始化了NioEventLoopGroup线程的数量,注意,是数量, 源码在下面: 固然,咱们通常会把BossGroup的线程数量设置为1

// todo 当MultithreadEventLoopGroup被加载进 JVM就会执行, 对 DEFAULT_EVENT_LOOP_THREADS进行初始化
    static {
        // todo max方法取最大值,
        // todo  SystemPropertyUtil.getInt,这是个系统辅助类, 若是系统中有 io.netty.eventLoopThreads,就取它, 没有的话,去后面的值
        // todo NettyRuntime.availableProcessors() 是当前的 系统的核数*2  , 在个人电脑上 2*2*2=8条线程
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    // todo  接着 使用父类的构造方法,   nThreads=  DEFAULT_EVENT_LOOP_THREADS
    // todo Object... args  是 selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()的简写
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

接着进入它的Supper() 来到了它的父类的构造方法,MultiThreadEventLoopExcutorGroup的构造方法, 这个类是相当重要的类,它作了大量的初始化工做, 整体上看:

  • 首先: 根据上一步肯定的它能够拥有的线程数,循环建立并初始化一个EventExecutor[] 这个数组其实就是盛放EventLoop的数组, 当这个for循环结束后,实际上NioEventLoopGroup就添加完成了EventLoop
  • 初始化选择器工厂, 这个工厂的做用是, 当出现新的IO事件须要处理时,经过工厂的轮询算法,从NioEventLoopGroup中选取一个NioEventLoop处理

源码:

// todo 在这个构造方法中,完成了一些属性的赋值, 完全构造完成  事件循环组对象
            // todo  Object... args  是 selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()的简写
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                    EventExecutorChooserFactory chooserFactory, Object... args) {
                if (nThreads <= 0) {
                    throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
                }

                if (executor == null) {
            // todo 下面须要的参数,一开始使用无参的构造方法时, 传递进来的 就是null ,执行这一行代码, 建立默认的线程工厂
            /// todo ThreadPerTaskExecutor  意味为当前的事件循环组   建立Executor , 用于 针对每个任务的Executor   线程的执行器
            // todo  newDefaultThreadFactory根据它的特性,能够给线程加名字等,
            // todo 比传统的好处是 把建立线程和 定义线程须要作的任务分开, 咱们只关心任务,  二者解耦
            // todo 每次执行任务都会建立一个线程实体
            // todo NioEventLoop 线程命名规则  nioEventLoop-1-XX    1表明是第几个group   XX第几个eventLoop
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        // todo  循环
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // todo 建立EventLoop
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
    // todo chooser 在这里 初始化了
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

这个过程当中的细节:

  • Netty的executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

默认的线程工厂,建立出的Thread,并非JDK原生的Thread,而是Netty本身封装的

protected Thread newThread(Runnable r, String name) {
        System.out.println(threadGroup+"   threadGroup");
        return new FastThreadLocalThread(threadGroup, r, name);
    }
  • Netty的ThreadPerTaskExecutor源码以下, 能够看到,它的execute直接关联着Thread.start()方法, 一旦执行它就会开启新的线程, 固然源码看到这里时,它是没有没执行的,由于线程和NioEventLoop关联着,再往下就看NioEventLoop的实现
*  todo 这里实际上使用了设计模式
 *  todo 1. command是用户定义的任务, 命令模式; 直观的 我定义一种任务, 程序不须要知道我执行的命令是什么,可是当我把任务扔给你, 你帮我执行就行了
 *  todo 2. 代理设计模型, 代理了ThreadFactory  , 把原本给ThreadPerTaskExecutor执行的任务给了ThreadFactory
 */
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    // todo  必须实现 Executor 里面惟一的抽象方法, execute , 执行性 任务
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

其次,上面的newChild(executor, args);方法实际上是抽象方法,真正运行时会执行子类NioEventLoopGroup的实现, 以下:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    // todo !! 真正建立事件循环组的逻辑在这里!!!
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

 // todo 这里是 它的构造方法
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        // todo 进入到父类,  着重看他是如何建立出 TaskQueue的
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        // todo 经常使用的 属性
        provider = selectorProvider;
        // todo 获取Selector选择器
        final SelectorTuple selectorTuple = openSelector();

        // todo SelectorTuple是netty维护 jdk 原生的Selector的包装类, 下面看,他有两个Selector, 一个是通过包装的,一个是未通过包装的
        selector = selectorTuple.selector;  //
        unwrappedSelector = selectorTuple.unwrappedSelector;  // todo Jdk 原生的Selector
        selectStrategy = strategy;
    }

流程图

继续跟进去,上面的NioEventLoopGroup的体系图也就分析到右半部分了,如上图,是这半部分初始化工做的主要流程, 下面是它的构造方法,能够看到主要完成了两件事

  • 进入父类的构造方法完成任务队列的建立
  • 打开选择器,而且进行了优化

细节:
能够看到,如今已经进入了它的三级父类SingleThreadEventExecutor

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                boolean addTaskWakesUp, int maxPendingTasks,
                                RejectedExecutionHandler rejectedHandler) {

super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// todo 保存线程执行器
this.executor = ObjectUtil.checkNotNull(executor, "executor");
// todo 任务队列  , 进入查看
taskQueue = newTaskQueue(this.maxPendingTasks);
System.out.println(taskQueue.getClass());
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

在这个类中进行了以下的工做:

  • 调用父类的构造方法,设置本身的父Group
  • 初始化本身的线程执行器
  • 建立任务队列

队列有啥用?

咱们知道,Netty中的线程可不止一个, 多个EventLoop意味着多个线程, 任务队列的做用就是当其余线程拿到CPU的执行权时,却获得了其余线程的IO请求,这时当前线程就把这个请求以任务的方式提交到对应线程的任务队列里面

建立的什么任务队列?

有个误区, 当我跟进newTaskQueue(this.maxPendingTasks);方法时, 进入的方法建立了一个LinkedBlockingQueue队列, 实际上建立的确是MpscQueue , 这并不奇怪,是由于NioEventLoop把这个方法重写了, 源码以下:

@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    // This event loop never calls takeTask()
    return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
            : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}

这个任务队列的特性是 非阻塞的,多生产者单消费者, 正好和Netty的线程模型对应

此外,这个SingleThreadEventExecutor还有不少重要的方法

  • excute执行任务
  • 尝试开启线程(初始化EventLoop的线程)
  • 开启线程
  • 执行全部任务
  • 聚合定时任务
  • 把任务丢进队列
  • 把任务从队列中取出

NioEventLoop打开本身的队列时,作了哪些优化?

经过反射,借助Java.Security.AccessController提供的线程安全的策略执行机制把原生JDK的selector的SelectedKeys这个HashSet替换成了数组,使得他的事件复杂度在任什么时候刻都是O1

// todo 这里进行了 优化, netty把hashSet转换成了数组, 由于在JDK的NIO模型中,获取Selector时, Selector里面内置的存放SelectionKey的容器是Set集合
// todo  而netty把它替换成了本身的数据结构, 数组, 从而使在任何状况下, 它的时间复杂度都是 O1
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
    // todo 使用jdk 的api建立新的 selector
    unwrappedSelector = provider.openSelector();
} catch (IOException e) {
    throw new ChannelException("failed to open a new selector", e);
}

if (DISABLE_KEYSET_OPTIMIZATION) { // todo 若是不须要优化,就返回原生的selector , 默认为false 即 使用优化
    return new SelectorTuple(unwrappedSelector);
}
// todo  接下来 netty会用下面这个SelectedSelectionKeySet数据结构 替换原来的 keySet , 进入查看
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
    @Override
    public Object run() {
        try {
            // todo 经过反射  sun.nio.ch.SelectorImpl 或者这个类
            return Class.forName(
                    "sun.nio.ch.SelectorImpl",
                    false,
                    PlatformDependent.getSystemClassLoader());
        } catch (Throwable cause) {
            return cause;
        }
    }
});
// todo 判断是否获取到了这个类
if (!(maybeSelectorImplClass instanceof Class) ||
        // ensure the current selector implementation is what we can instrument.
        !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
    if (maybeSelectorImplClass instanceof Throwable) {
        Throwable t = (Throwable) maybeSelectorImplClass;
        logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
    }
    return new SelectorTuple(unwrappedSelector);
}

// todo 肯定是Selector的实现类  换了个名字
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

/**
 * todo 类java.security.AccessController提供了一个默认的安全策略执行机制,它使用栈检查来决定潜在不安全的操做是否被容许。
 * todo 这个访问控制器不能被实例化,它不是一个对象,而是集合在单个类中的多个静态方法。
 */
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
    @Override
    public Object run() {
        try {
            // todo 经过反射, 获取到 selectorImplClass的两个字段 selectedKeys   publicSelectedKeys
            // todo  selectedKeys   publicSelectedKeys底层都是 hashSet() 实现的, 如今获取出来了, 放入上面的数组数据结构中
            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

            // todo trySetAccessible 能够强制访问私有的对象
            Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);
            if (cause != null) {
                return cause;
            }
            // todo trySetAccessible 能够强制访问私有的对象
            cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);
            if (cause != null) {
                return cause;
            }
            // todo 真正的把经过反射获得的 那两个字段放入咱们本身的数据结构中
            // // todo  下面是把咱们的NioEventLoop中的 unwrappedSelector 的 selectedKeysField的属性 直接设置成 优化后的selectedKeySet
            selectedKeysField.set(unwrappedSelector, selectedKeySet);
            publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
            return null;
        } catch (NoSuchFieldException e) {
            return e;
        } catch (IllegalAccessException e) {
            return e;
        }
    }
});

if (maybeException instanceof Exception) {
    selectedKeys = null;
    Exception e = (Exception) maybeException;
    logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
    return new SelectorTuple(unwrappedSelector);
}

// todo 初始化本身维护被选中的key的集合  --> 数组类型的
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
        new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

到如今为止, NioEventLoopGroup和NioEventLoop就都初始化完成了,固然这是初始化,程序运行到如今,依然只有一条主线程, EventLoop的Thrad还没start()干活,可是起码已经有能力准备启动了

总结一下:
就像下面的体系同样, 五脏俱全

  • NioEventLoopGroup
    • NIoEventLoop
      • excutor(线程执行器) , 执行IO任务/非IO任务
      • selector 选择器
    • Chooser
    • NIoEventLoop
      • excutor(线程执行器) , 执行IO任务/非IO任务
      • selector 选择器
    • Chooser
    • NIoEventLoop
      • excutor(线程执行器) , 执行IO任务/非IO任务
      • selector 选择器
    • Chooser
相关文章
相关标签/搜索