NIO 在Jetty中的应用

引子

做为纵横情场多年的老手,宪程在把到妹子后一般有如下策略 (假设宪程是影流之主的第1024代传人而且只剩下了分身的能力)java

  • 将妹子存到队列中,不时发微信去撩一下,若是有意向的话宪程会使用分身能力再建立一个宪程去把妹git

  • 宪程本身执行把妹的操做,若是期间又有新的妹子看上他咋办呢,那就将该妹子交给本身的分身宪程去轮询处理,而且宪程在把完妹子以后会尝试去把分身宪程的轮询任务给接过来,毕竟本体老是要掌握主动权的,若是没有接过来咋办?只能选择成为分身了,毕竟此时分身宪程已经接过了本体的工做,某种意义上他已经成为了本体github

Jetty NIO 模型

建议在阅读以前先了解如下Tomcat的NIO模型,没有对比就没有伤害,你会发现Jetty NIO模型的有趣之处web

概述

若是时间充足的话,我建议你直接阅读附录,了解如何Debug Jetty NIO功能算法

既然要了解Jetty的NIO模型,从线程的角度来讲能够分为如下几类spring

  • 空闲线程 此角色会根据提交到线程池中的任务,将本身转变为I/O线程或者轮询线程tomcat

  • Acceptor线程 该角色主要负责接收来自客户端的链接并对其进行封装以后,选择一个Selector来提交此任务微信

  • 轮询线程 此角色主要负责轮询事件,并处理其余角色提交给此角色的任务,另外此角色能够根据所设定的策略将轮询任务交给其余线程,在执行完I/O任务以后归还到线程池中成为空闲线程网络

主要参与的类有多线程

  • Connector 该角色主要负责JettyNIO模型中各个组件的启动和,协调工做

  • SelectorManager 此角色主要对ManagedSelector进行管理,想要和Selector进行交互可使用此类

  • ManagedSelector 封装了JDK原生的selector, 并对外提供对selector执行操做的内部类、接口以及方法

重点 全部线程共用一个线程池

Connector

关键类 org.eclipse.jetty.server.ServerConnector

Connector即链接器,是Jetty对于网络I/O模型的一个抽象,主要负责组装,启动Jetty NIO模型中所须要用到的组件。所以,咱们主要注意力集中到其实现上也就是ServerConnector上。

初始化Connector链接器,咱们须要向其提供如下关键参数(隐去了和本文无关的参数,有兴趣的可自行了解)

  • 用来执行接收新链接、处理I/O、轮询事件任务的线程池
  • ByteBuffer 对象池, 该对象池能够回收以及提供ByteBuffer给I/O线程使用
  • 负责执行accept操做线程的数量
  • 负责执行轮询任务的selector线程数量

可是,大部分的初始化工做并非在ServerConnector中执行的,而是在其父类中执行的操做,所以咱们将目光转移到 org.eclipse.jetty.server.AbstractConnector

该类的初始化代码以下,其主要作了如下工做

  • 检查是否指定线程池,若是没有则和Server共用一个线程池
  • 检查是否指定ByteBufferPool,若是没有则使用ArrayByteBuffer
  • 检查是否设置Acceptor数量,若是没有则按照max(1,min(4,CPU核心数÷8))进行计算,也就是说默认的Acceptor数量最少有一个,最多有4个

想象一下,若是ServerSocketChannel被设置为阻塞状态以便多个线程同时执行accept操做,那么大多数状况下多数线程将会陷入阻塞状态,而且线程从阻塞态恢复是有线程上下文切换的成本的所以Acceptor线程并非越多越好

public AbstractConnector( Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories) {
        _server = server;
        //检查是否设置线程池,若是没有则使用Server的
        _executor = executor != null ? executor : _server.getThreadPool();
        if (scheduler == null)
            scheduler = _server.getBean(Scheduler.class);
        _scheduler = scheduler != null ? scheduler : new ScheduledExecutorScheduler(String.format("Connector-Scheduler-%x", hashCode()), false);
        
        // 检查是否指定ByteBufferPool,若是没有则本身建立一个
        if (pool == null)
            pool = _server.getBean(ByteBufferPool.class);
        _byteBufferPool = pool != null ? pool : new ArrayByteBufferPool();
        // 将这些对象交给Jetty统一管理(不在本文讨论范围内,不展开)
        addBean(_server, false);
        addBean(_executor);
        if (executor == null)
            unmanage(_executor); // inherited from server
        addBean(_scheduler);
        addBean(_byteBufferPool);
        // ConnectionFactory主要使用来处理对应的HTTP协议
        for (ConnectionFactory factory : factories)
        {
            addConnectionFactory(factory);
        }
        // 若是未指定Acceptor的数量则根据CPU核数执行计算
        int cores = ProcessorUtils.availableProcessors();
        if (acceptors < 0)
           //根据此式能够推出Acceptor数量最大是4最小是1
            acceptors = Math.max(1, Math.min(4, cores / 8));
        // Acceptor数量大于CPU核心数
        // 将会引发大量的线程陷入阻塞状态
        // 没有东西能够accept不就阻塞了吗
        // 而要激活阻塞的线程则须要切换线程上下文会引发性能的浪费
        if (acceptors > cores)
            LOG.warn("Acceptors should be <= availableProcessors: " + this);
        _acceptors = new Thread[acceptors];
    }

复制代码

以下图所示个人电脑为4核心的i5CPU,那么默认的Acceptor线程应该只有一个

4核心CPU
在启动你的Jetty以后咱们能够用JConsole来验证一下
正如你所看到的,以qtp开头的线程用于NIO的线程池,其中一个Acceptor线程阻塞在accept()方法上

Acceptor

Acceptor是一个定义在AbstractConnector中的内部类, 其主要工做不断调用在子类中实现accept方法,也就是接收链接的实现延迟到了子类中。

其代以下,能够学到很多小技巧, 若是你不想看代码,其总结以下

  • 获取执行当前代码线程,给他起个名字,见上一节JConsole的截图
  • 将Acceptor线程优先级调至最高(固然,不必定起做用,还得看人操做系统理不理你)
  • 在执行accept操做以前须要等待来自其余线程的放行信号
  • 不断循环执行accept操做
public void run() {
           // 给线程起给名字
            final Thread thread = Thread.currentThread();
            String name = thread.getName();
            _name = String.format("%s-acceptor-%d@%x-%s", name, _id, hashCode(), AbstractConnector.this.toString());
            thread.setName(_name);
            // 设置优先级
            int priority = thread.getPriority();
            if (_acceptorPriorityDelta != 0)
                thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _acceptorPriorityDelta)));
            // 保存对此线程的引用
            _acceptors[_id] = thread;
            
            try
            {
                while (isRunning())
                {
                    // 加锁,等待来自其余线程的信号说能够开始干活了
                    try (Locker.Lock lock = _locker.lock())
                    {
                        if (!_accepting && isRunning())
                        {
                            _setAccepting.await();
                            continue;
                        }
                    }
                    catch (InterruptedException e)
                    {
                        continue;
                    }

                    try
                    {
                       //调用子类的accept方法
                        accept(_id);
                    }
                    catch (Throwable x)
                    {
                        if (!handleAcceptFailure(x))
                            break;
                    }
                }
            }
            finally
            {
               // 发生异常了,则将线程的名称以及优先级调回原来的值
                thread.setName(name);
                if (_acceptorPriorityDelta != 0)
                    thread.setPriority(priority);
                
                //释放引用
                synchronized (AbstractConnector.this)
                {
                    _acceptors[_id] = null;
                }
                CountDownLatch stopping = _stopping;
                if (stopping != null)
                    stopping.countDown();
            }
        }
复制代码

在子类ServerConnector中,accept主要执行如下操做

  • 阻塞的形式接收来自客户端的链接
  • 设置客户端SocketChannel非阻塞模式,并禁用nagle算法
  • 交给SelectorManager来处理, 该类会将客户端SocketChannel封装成一个Accept事件,交给轮询线程处理 ServerConnector中的代码
@Override
    public void accept(int acceptorID) throws IOException {
        ServerSocketChannel serverChannel = _acceptChannel;
        if (serverChannel != null && serverChannel.isOpen())
        {
            SocketChannel channel = serverChannel.accept();
            accepted(channel);
        }
    }

    private void accepted(SocketChannel channel) throws IOException {
        channel.configureBlocking(false);
        Socket socket = channel.socket();
        configure(socket); // socket.setTcpNoDelay(true);
        _manager.accept(channel);
    }
复制代码

SelectorManager中最终被调用的代码

public void accept(SelectableChannel channel, Object attachment) {
        final ManagedSelector selector = chooseSelector();
        selector.submit(selector.new Accept(channel, attachment));
    }
复制代码

轮询线程

轮询线程主要负责轮询I/O事件以及处理其余线程提交到本线程任务。而且咱们能够为轮询线程指定执行策略, 在后面咱们能够看到执行策略将如何影响轮询线程行为。

首先,咱们须要先明确哪些类会参与到轮询线程的工做中,也就是说咱们要先理清楚轮询线程的调用链。

如上图堆栈跟踪图红框所标注的部分所示,参与到轮询线程主要堆栈结构以下图所示。

  • ManagedSelector 此类主要封装了JDK的selector类,并对外暴露操做此Selector的方法和类
  • EatWhatYouKill 此类即轮询线程执行策略,该类会不断调用SelectorProducer.produce 方法产生封装好的I/O任务,并根据其策略来决定执行这个I/O任务的方式
  • SelectorProducer 此类为ManagedSelector的内部类,实现线程执行策略里面的ExecutionStrategy.Producer接口,该类专门用于生成供轮询线程处理的I/O任务

ManagedSelector

Jetty将JDK原生的Selector类封装成为ManagedSelector,该类主要功能是对外暴露对其封装的selector执行操做的接口和内部类. 其关键方法和内部类以下

SelectorUpdate接口 若是要对ManagedSelector所管理的selector进行更新(如执行注册感兴趣的I/O事件)能够实现此接口,该接口定义以下

public interface SelectorUpdate {
        void update(Selector selector);
    }
复制代码

submit方法 该方法主要用于外界将SelectorUpdate提交到轮询线程中以便执行对Selector的更新操做,简单来讲此方法会执行如下操做

  • 将update事件加入队列
  • 检查Selector是否正在执行select操做,若是是则将其唤醒,使其从阻塞状态返回以便咱们对其进行更新
public void submit(SelectorUpdate update) {
        if (LOG.isDebugEnabled())
            LOG.debug("Queued change {} on {}", update, this);

        Selector selector = null;
        synchronized (ManagedSelector.this)
        {
            //加事件加入处理队列
            _updates.offer(update);
            //检查是否正在轮询,若是正在轮询,则会执行唤醒操做
            //所以在此处须要将selecting置为false
            if (_selecting)
            {
                selector = _selector;
                // To avoid the extra select wakeup.
                _selecting = false;
            }
        }

        if (selector != null)
        {
           //执行唤醒操做,以便对selector执行更新操做
            if (LOG.isDebugEnabled())
                LOG.debug("Wakeup on submit {}", this);
            selector.wakeup();
        }
    }
复制代码

SelectorProducer

SelectorProducerManagedSelector的内部类,该类实现了轮询线程执行策略的ExecutionStrategy.Producer接口

interface Producer {
        // 返回一个Runnable任务供轮询线程执行
        Runnable produce();
    }
复制代码

所以SelectorProducer须要不断调用selector去轮询看有无新的I/O事件以供处理,除此以外它还须要处理外部类向ManagedSelector经过调用submit方法提交的SelectorUpdate任务

其向线程执行策略类所提供produce方法代以下所示,总的来讲主要完成如下几项工做

  • 执行一个循环,直到轮询到感兴趣的任务(一次只返回一个,被轮询到事件会被保存起来供下一次使用)
  • 处理外部类向其提交的任务(调用processUpdates)
  • 更新客户端SocketChannel感兴趣的事件
@Override
        public Runnable produce() {
            while (true)
            {
                //处理以前查询到事件
                Runnable task = processSelected();
                if (task != null)
                    return task;
                //处理外部类所提交的update任务
                //该方法最终会致使提交的SelectorUpdate.update被调用
                processUpdates();
                //此方法的调用可能会
                //致使客户端SocketChannel感兴趣的事件发生变动
                updateKeys();
                //执行select操做,并将查询到事件保存起来
                if (!select())
                    return null;
            }
        }
复制代码

processUpdates 此方法主要是处理外部类提交的SelectorUpdate任务,经过复制引用很是巧妙的避免了并发问题

private void processUpdates() {
            synchronized (ManagedSelector.this)
            {
                //倒腾数据,将要处理队列的引用保存
                //到另外一个变量上,原有的引用能够继续对外提供服务
                //整个数据倒腾过程很是短,性能影响较小
                Deque<SelectorUpdate> updates = _updates;
                _updates = _updateable;
                _updateable = updates;
            }

            if (LOG.isDebugEnabled())
                LOG.debug("updateable {}", _updateable.size());
            //遍历事件队列,处理update方法
            for (SelectorUpdate update : _updateable)
            {
                if (_selector == null)
                    break;
                try
                {
                    if (LOG.isDebugEnabled())
                        LOG.debug("update {}", update);
                    //调用事件的update方法,并传入selector
                    update.update(_selector);
                }
                catch (Throwable ex)
                {
                    LOG.warn(ex);
                }
            }
            _updateable.clear();

            Selector selector;
            int updates;
            //再次检查是否有新的事件被提交,若是有则执行唤醒操做
            synchronized (ManagedSelector.this)
            {
               //外部类提交的任务会保存到updates中
                updates = _updates.size();
                _selecting = updates == 0;
                selector = _selecting ? null : _selector;
            }

            if (LOG.isDebugEnabled())
                LOG.debug("updates {}", updates);

            if (selector != null)
            {
                if (LOG.isDebugEnabled())
                    LOG.debug("wakeup on updates {}", this);
                selector.wakeup();
            }
        }

复制代码

select() 该方法主要执行轮询操做,并将轮询到事件保存起来以供下一次循环的时候返回,在这个方法中展示jetty如何处理空轮询事件(空轮询是指selector在执行select操做时,没有查询到任何事件却返回了,这个BUG一般会形成CPU100%的使用率,从而使系统崩溃)

private boolean select() {
            try
            {
                Selector selector = _selector;
                if (selector != null && selector.isOpen())
                {
                    if (LOG.isDebugEnabled())
                        LOG.debug("Selector {} waiting with {} keys", selector, selector.keys().size());
                    int selected = selector.select();
                    //没查询到事件, 空轮询事件处理
                    if (selected == 0)
                    {
                        if (LOG.isDebugEnabled())
                            LOG.debug("Selector {} woken with none selected", selector);
                        //若是线程被中断,而且标志位被设置了不在运行则执行推出逻辑
                        if (Thread.interrupted() && !isRunning())
                            throw new ClosedSelectorException();
                        //开启了此参数则当即执行一次select操做
                        if (FORCE_SELECT_NOW)
                            selected = selector.selectNow();
                    }
                    if (LOG.isDebugEnabled())
                        LOG.debug("Selector {} woken up from select, {}/{}/{} selected", selector, selected, selector.selectedKeys().size(), selector.keys().size());

                    int updates;
                    synchronized (ManagedSelector.this)
                    {
                        // 完成了select操做则设置标志位
                        _selecting = false;
                        updates = _updates.size();
                    }

                    _keys = selector.selectedKeys();
                    _cursor = _keys.isEmpty() ? Collections.emptyIterator() : _keys.iterator();
                    if (LOG.isDebugEnabled())
                        LOG.debug("Selector {} processing {} keys, {} updates", selector, _keys.size(), updates);

                    return true;
                }
            }
            catch (Throwable x)
            {
                _selector = null;
                if (isRunning())
                    LOG.warn(x);
                else
                {
                    LOG.warn(x.toString());
                    LOG.debug(x);
                }
                closeNoExceptions(_selector);
            }
            return false;
        }
复制代码

与Netty的空轮询处理策略不一样,Jetty的处理策略是再select一次并当即返回,但这样彷佛并不能解决空轮询的BUG问题详情

EatWhatYouKill

EatWhatYouKill是线程执行策略的一种,也是Jetty默认的指策略,其思想来源于若是猎人杀死一只猎物,那么猎人就应该吃掉它(若是你吃过新鲜的虾你就会对这种哲学深有体会),换种说法就是轮询线程若是查询到一次I/O事件就应该直接处理它(想起引子了吗)

P.S. 关键代码org.eclipse.jetty.util.thread.strategy.EatWhatYouKill

之因此这样作的缘由是由于切换线程是一件比较费时操做(相对来讲),所以在这种策略下轮询线程A若是获取到一个事件会有如下策略

  • 若是此任务被标志为非阻塞任务,那么线程A会当即执行此任务

若是任务阻塞类型未知或者被标记为阻塞状态

  • 若是线程池中的线程都处于繁忙状态,则将其提交到线程池种等待执行

  • 若是线程池种有空闲线程B,则尝试将线程A负责轮询功能交给线程B,若是当即获取到线程B成功,则线程A会直接执行获取到的任务, 任务执行完成后,线程A会尝试夺回交给线程B的轮询任务,若是夺回失败则变为空闲线程等待分配任务。(想起引子了吗?)

  • 除此以外,线程A还会尝试直接执行任务而且不会交出轮询工做 (代码太长,只摘出关键代码)

case BLOCKING:
        synchronized (this)
        {
            if (_pending)
            {
                //轮询工做陷入了停滞,所以是IDLE状态
                _state = State.IDLE;
                mode = Mode.EXECUTE_PRODUCE_CONSUME;
            }
            //tryExecute 若是当即分配到了线程则返回true
            //this的run方法也就是实现轮询线程核心的方法
            //所以此行代码至关于将轮询的工做转移给了其余线程
            else if (_tryExecutor.tryExecute(this))
            {
                _pending = true;
                //因为轮询工做的转移
                //所以当前轮询工做至关于陷入空闲状态
                //因此须要将此对象的状态至为IDLE
                //(轮询线程和当前线程使用同一个对象)
                _state = State.IDLE;
                mode = Mode.EXECUTE_PRODUCE_CONSUME;
            }else
            {
               //前二者均不知足则将任务提交到线程池
                mode = Mode.PRODUCE_EXECUTE_CONSUME;
            }
        }
        break;
复制代码

任务的执行策略

case EXECUTE_PRODUCE_CONSUME:
                _epcMode.increment();
                //直接在当前线程调用
                runTask(task);

                // 尝试夺回轮询任务
                synchronized (this)
                {
                   // 若是State还处于空闲状态
                   // 说明
                   // 线程B还未开始执行轮询任务,能够直接夺回
                   // 若是线程B已经开始轮询
                   // 则选择离开
                    if (_state == State.IDLE)
                    {
                        // 返回true则继续轮询
                        return true;
                    }
                }
                //返回false则结束轮询任务,变为空闲线程
                return false;
复制代码

总结

相较于循规蹈矩的Tomcat,Jetty的设计更为激进,更富有冒险主义者的精神,从我的角度来讲更喜欢Jetty的设计,但从业务的角度来讲仍是选择Tomcat较为稳妥毕竟稳定是业务的基本需求,而且Tomcat的性能也不会太差。

以线程的类别来进行划分的话, Jetty的NIO模型以下图所示

  • Acceptor 线程负责接收来自客户端的新链接,并将其封装成一个事件提交给轮询线程处理
  • 轮询线程 轮询线程处理负责轮询I/O事件以外,还须要处理外部线程所提交的selector更新任务,而且根据设定的执行策略,轮询线程可能会在本线程直接执行I/O任务,并将轮询任务移交给其余空闲的线程,或者选择一个空闲的线程来执行I/O操做
  • I/O线程 主要负责处理I/O操做

从线程类别的角度来看Jetty的NIO模型相对简单,但其引入的轮询线程执行策略使得线程之间身份能够发生转变, 得益于此Jetty能够直接轮询线程直接执行I/O任务减小了线程上下文切换所带来的性能消耗,提高了性能。

思想迁移

切换线程是有成本的 Jetty经过直接在轮询线程执行I/O任务来提高性能,来减小线程上下文的切换,除此以外,咱们还能够实现协程的机制来减小线程上下文切换所带来的成本(参考Go语言)

Acceptor线程应适量 若是将ServerSocket设置为阻塞模式,那么accept操做将致使线程陷入阻塞,从accept方法返回时将引发线程上下的切换,所以并非越多越好

如何Debug Jetty

咱们使用SpringBoot来Debug Jetty,所以咱们须要在pom.xml中引入Jetty,因为SpringBoot默认使用Tomcat所以咱们须要将其替换掉,依赖以下所示.

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jetty</artifactId>
        </dependency>
复制代码

使用的SpringBoot版本是2.2.0其所依赖的Jetty版本号是9.4.20

  • 若是你要了解Connector是如何工做的请关注如下类 org.eclipse.jetty.server.ServerConnector

  • 若是你想要了解Jetty NIO 如何轮询以及处理事件,那么请关注如下类 org.eclipse.jetty.io.ManagedSelector 并在其内部类 SelectorProducerproduce方法打上断点,以下图所示,你将了解到整个轮询过程当中都发生了什么

右键小红点,选择Thread,以免进入不了断点的状况,毕竟咱们调试的是多线程程序

  • 若是你想要了解线程执行的策略,那么请关注如下类(此类执行机制较为复杂,若是想Debug到全部的状况,最好结合必定的策略,如在Controller代码处阻塞住线程等) org.eclipse.jetty.util.thread.strategy.EatWhatYouKill
相关文章
相关标签/搜索