做为纵横情场多年的老手,宪程在把到妹子后一般有如下策略 (假设宪程是影流之主
的第1024代传人而且只剩下了分身的能力)java
将妹子存到队列中,不时发微信去撩一下,若是有意向的话宪程会使用分身能力再建立一个宪程
去把妹git
宪程本身执行把妹的操做,若是期间又有新的妹子看上他咋办呢,那就将该妹子交给本身的分身宪程
去轮询处理,而且宪程在把完妹子以后会尝试去把分身宪程
的轮询任务给接过来,毕竟本体老是要掌握主动权的,若是没有接过来咋办?只能选择成为分身了,毕竟此时分身宪程
已经接过了本体的工做,某种意义上他已经成为了本体
。github
建议在阅读以前先了解如下Tomcat的NIO模型,没有对比就没有伤害,你会发现Jetty NIO模型的有趣之处web
若是时间充足的话,我建议你直接阅读附录,了解如何Debug Jetty NIO功能算法
既然要了解Jetty的NIO模型,从线程的角度来讲能够分为如下几类spring
空闲线程
此角色会根据提交到线程池中的任务,将本身转变为I/O线程或者轮询线程tomcat
Acceptor线程
该角色主要负责接收来自客户端的链接并对其进行封装以后,选择一个Selector来提交此任务微信
轮询线程
此角色主要负责轮询事件,并处理其余角色提交给此角色的任务,另外此角色能够根据所设定的策略将轮询任务交给其余线程,在执行完I/O任务以后归还到线程池中成为空闲线程
网络
主要参与的类有多线程
Connector
该角色主要负责JettyNIO模型中各个组件的启动和,协调工做
SelectorManager
此角色主要对ManagedSelector
进行管理,想要和Selector进行交互可使用此类
ManagedSelector
封装了JDK原生的selector
, 并对外提供对selector
执行操做的内部类、接口以及方法
重点 全部线程共用一个线程池
关键类
org.eclipse.jetty.server.ServerConnector
Connector即链接器,是Jetty对于网络I/O模型的一个抽象,主要负责组装,启动Jetty NIO模型中所须要用到的组件。所以,咱们主要注意力集中到其实现上也就是ServerConnector
上。
初始化Connector链接器,咱们须要向其提供如下关键参数(隐去了和本文无关的参数,有兴趣的可自行了解)
回收
以及提供
ByteBuffer给I/O线程使用accept
操做线程的数量selector
线程数量可是,大部分的初始化工做并非在ServerConnector
中执行的,而是在其父类中执行的操做,所以咱们将目光转移到 org.eclipse.jetty.server.AbstractConnector
该类的初始化代码以下,其主要作了如下工做
max(1,min(4,CPU核心数÷8))
进行计算,也就是说默认的Acceptor数量最少有一个,最多有4个想象一下,若是ServerSocketChannel被设置为阻塞状态以便多个线程同时执行accept操做,那么大多数状况下多数线程将会陷入阻塞状态,而且线程从阻塞态恢复是有线程上下文切换的成本的所以Acceptor线程并非越多越好
public AbstractConnector( Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories) {
_server = server;
//检查是否设置线程池,若是没有则使用Server的
_executor = executor != null ? executor : _server.getThreadPool();
if (scheduler == null)
scheduler = _server.getBean(Scheduler.class);
_scheduler = scheduler != null ? scheduler : new ScheduledExecutorScheduler(String.format("Connector-Scheduler-%x", hashCode()), false);
// 检查是否指定ByteBufferPool,若是没有则本身建立一个
if (pool == null)
pool = _server.getBean(ByteBufferPool.class);
_byteBufferPool = pool != null ? pool : new ArrayByteBufferPool();
// 将这些对象交给Jetty统一管理(不在本文讨论范围内,不展开)
addBean(_server, false);
addBean(_executor);
if (executor == null)
unmanage(_executor); // inherited from server
addBean(_scheduler);
addBean(_byteBufferPool);
// ConnectionFactory主要使用来处理对应的HTTP协议
for (ConnectionFactory factory : factories)
{
addConnectionFactory(factory);
}
// 若是未指定Acceptor的数量则根据CPU核数执行计算
int cores = ProcessorUtils.availableProcessors();
if (acceptors < 0)
//根据此式能够推出Acceptor数量最大是4最小是1
acceptors = Math.max(1, Math.min(4, cores / 8));
// Acceptor数量大于CPU核心数
// 将会引发大量的线程陷入阻塞状态
// 没有东西能够accept不就阻塞了吗
// 而要激活阻塞的线程则须要切换线程上下文会引发性能的浪费
if (acceptors > cores)
LOG.warn("Acceptors should be <= availableProcessors: " + this);
_acceptors = new Thread[acceptors];
}
复制代码
以下图所示个人电脑为4核心
的i5CPU,那么默认的Acceptor线程应该只有一个
Acceptor是一个定义在AbstractConnector
中的内部类, 其主要工做不断调用在子类中实现accept方法,也就是接收链接的实现延迟到了子类中。
其代以下,能够学到很多小技巧, 若是你不想看代码,其总结以下
public void run() {
// 给线程起给名字
final Thread thread = Thread.currentThread();
String name = thread.getName();
_name = String.format("%s-acceptor-%d@%x-%s", name, _id, hashCode(), AbstractConnector.this.toString());
thread.setName(_name);
// 设置优先级
int priority = thread.getPriority();
if (_acceptorPriorityDelta != 0)
thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _acceptorPriorityDelta)));
// 保存对此线程的引用
_acceptors[_id] = thread;
try
{
while (isRunning())
{
// 加锁,等待来自其余线程的信号说能够开始干活了
try (Locker.Lock lock = _locker.lock())
{
if (!_accepting && isRunning())
{
_setAccepting.await();
continue;
}
}
catch (InterruptedException e)
{
continue;
}
try
{
//调用子类的accept方法
accept(_id);
}
catch (Throwable x)
{
if (!handleAcceptFailure(x))
break;
}
}
}
finally
{
// 发生异常了,则将线程的名称以及优先级调回原来的值
thread.setName(name);
if (_acceptorPriorityDelta != 0)
thread.setPriority(priority);
//释放引用
synchronized (AbstractConnector.this)
{
_acceptors[_id] = null;
}
CountDownLatch stopping = _stopping;
if (stopping != null)
stopping.countDown();
}
}
复制代码
在子类ServerConnector
中,accept
主要执行如下操做
阻塞
的形式接收来自客户端的链接SocketChannel
为非阻塞模式
,并禁用nagle算法
SocketChannel
封装成一个Accept
事件,交给轮询线程
处理 ServerConnector
中的代码@Override
public void accept(int acceptorID) throws IOException {
ServerSocketChannel serverChannel = _acceptChannel;
if (serverChannel != null && serverChannel.isOpen())
{
SocketChannel channel = serverChannel.accept();
accepted(channel);
}
}
private void accepted(SocketChannel channel) throws IOException {
channel.configureBlocking(false);
Socket socket = channel.socket();
configure(socket); // socket.setTcpNoDelay(true);
_manager.accept(channel);
}
复制代码
SelectorManager中最终被调用的代码
public void accept(SelectableChannel channel, Object attachment) {
final ManagedSelector selector = chooseSelector();
selector.submit(selector.new Accept(channel, attachment));
}
复制代码
轮询线程
主要负责轮询I/O事件以及处理其余线程提交到本线程任务。而且咱们能够为轮询线程
指定执行策略, 在后面咱们能够看到执行策略将如何影响轮询线程
行为。
首先,咱们须要先明确哪些类会参与到轮询线程的工做中,也就是说咱们要先理清楚轮询线程的调用链。
如上图堆栈跟踪图红框所标注的部分所示,参与到轮询线程主要堆栈结构以下图所示。
ManagedSelector
此类主要封装了JDK的selector
类,并对外暴露操做此Selector的方法和类EatWhatYouKill
此类即轮询线程执行策略,该类会不断调用SelectorProducer.produce 方法产生封装好的I/O任务,并根据其策略来决定执行这个I/O任务的方式SelectorProducer
此类为ManagedSelector
的内部类,实现线程执行策略里面的ExecutionStrategy.Producer
接口,该类专门用于生成供轮询线程处理的I/O任务Jetty将JDK原生的Selector
类封装成为ManagedSelector
,该类主要功能是对外暴露对其封装的selector
执行操做的接口和内部类. 其关键方法和内部类以下
SelectorUpdate接口 若是要对ManagedSelector
所管理的selector
进行更新(如执行注册感兴趣的I/O事件)能够实现此接口,该接口定义以下
public interface SelectorUpdate {
void update(Selector selector);
}
复制代码
submit方法 该方法主要用于外界将SelectorUpdate
提交到轮询线程中以便执行对Selector
的更新操做,简单来讲此方法会执行如下操做
public void submit(SelectorUpdate update) {
if (LOG.isDebugEnabled())
LOG.debug("Queued change {} on {}", update, this);
Selector selector = null;
synchronized (ManagedSelector.this)
{
//加事件加入处理队列
_updates.offer(update);
//检查是否正在轮询,若是正在轮询,则会执行唤醒操做
//所以在此处须要将selecting置为false
if (_selecting)
{
selector = _selector;
// To avoid the extra select wakeup.
_selecting = false;
}
}
if (selector != null)
{
//执行唤醒操做,以便对selector执行更新操做
if (LOG.isDebugEnabled())
LOG.debug("Wakeup on submit {}", this);
selector.wakeup();
}
}
复制代码
SelectorProducer
是ManagedSelector
的内部类,该类实现了轮询线程执行策略的ExecutionStrategy.Producer
接口
interface Producer {
// 返回一个Runnable任务供轮询线程执行
Runnable produce();
}
复制代码
所以SelectorProducer
须要不断调用selector
去轮询看有无新的I/O事件以供处理,除此以外它还须要处理外部类向ManagedSelector
经过调用submit
方法提交的SelectorUpdate
任务
其向线程执行策略类所提供produce
方法代以下所示,总的来讲主要完成如下几项工做
processUpdates
)@Override
public Runnable produce() {
while (true)
{
//处理以前查询到事件
Runnable task = processSelected();
if (task != null)
return task;
//处理外部类所提交的update任务
//该方法最终会致使提交的SelectorUpdate.update被调用
processUpdates();
//此方法的调用可能会
//致使客户端SocketChannel感兴趣的事件发生变动
updateKeys();
//执行select操做,并将查询到事件保存起来
if (!select())
return null;
}
}
复制代码
processUpdates 此方法主要是处理外部类提交的SelectorUpdate
任务,经过复制引用很是巧妙的避免了并发问题
private void processUpdates() {
synchronized (ManagedSelector.this)
{
//倒腾数据,将要处理队列的引用保存
//到另外一个变量上,原有的引用能够继续对外提供服务
//整个数据倒腾过程很是短,性能影响较小
Deque<SelectorUpdate> updates = _updates;
_updates = _updateable;
_updateable = updates;
}
if (LOG.isDebugEnabled())
LOG.debug("updateable {}", _updateable.size());
//遍历事件队列,处理update方法
for (SelectorUpdate update : _updateable)
{
if (_selector == null)
break;
try
{
if (LOG.isDebugEnabled())
LOG.debug("update {}", update);
//调用事件的update方法,并传入selector
update.update(_selector);
}
catch (Throwable ex)
{
LOG.warn(ex);
}
}
_updateable.clear();
Selector selector;
int updates;
//再次检查是否有新的事件被提交,若是有则执行唤醒操做
synchronized (ManagedSelector.this)
{
//外部类提交的任务会保存到updates中
updates = _updates.size();
_selecting = updates == 0;
selector = _selecting ? null : _selector;
}
if (LOG.isDebugEnabled())
LOG.debug("updates {}", updates);
if (selector != null)
{
if (LOG.isDebugEnabled())
LOG.debug("wakeup on updates {}", this);
selector.wakeup();
}
}
复制代码
select() 该方法主要执行轮询操做,并将轮询到事件保存起来以供下一次循环的时候返回,在这个方法中展示jetty如何处理空轮询
事件(空轮询
是指selector在执行select操做时,没有查询到任何事件却返回了,这个BUG一般会形成CPU100%
的使用率,从而使系统崩溃)
private boolean select() {
try
{
Selector selector = _selector;
if (selector != null && selector.isOpen())
{
if (LOG.isDebugEnabled())
LOG.debug("Selector {} waiting with {} keys", selector, selector.keys().size());
int selected = selector.select();
//没查询到事件, 空轮询事件处理
if (selected == 0)
{
if (LOG.isDebugEnabled())
LOG.debug("Selector {} woken with none selected", selector);
//若是线程被中断,而且标志位被设置了不在运行则执行推出逻辑
if (Thread.interrupted() && !isRunning())
throw new ClosedSelectorException();
//开启了此参数则当即执行一次select操做
if (FORCE_SELECT_NOW)
selected = selector.selectNow();
}
if (LOG.isDebugEnabled())
LOG.debug("Selector {} woken up from select, {}/{}/{} selected", selector, selected, selector.selectedKeys().size(), selector.keys().size());
int updates;
synchronized (ManagedSelector.this)
{
// 完成了select操做则设置标志位
_selecting = false;
updates = _updates.size();
}
_keys = selector.selectedKeys();
_cursor = _keys.isEmpty() ? Collections.emptyIterator() : _keys.iterator();
if (LOG.isDebugEnabled())
LOG.debug("Selector {} processing {} keys, {} updates", selector, _keys.size(), updates);
return true;
}
}
catch (Throwable x)
{
_selector = null;
if (isRunning())
LOG.warn(x);
else
{
LOG.warn(x.toString());
LOG.debug(x);
}
closeNoExceptions(_selector);
}
return false;
}
复制代码
与Netty的空轮询处理策略不一样,Jetty的处理策略是再select一次并当即返回,但这样彷佛并不能解决空轮询的BUG问题详情
EatWhatYouKill
是线程执行策略的一种,也是Jetty默认的指策略,其思想来源于若是猎人杀死一只猎物,那么猎人就应该吃掉它
(若是你吃过新鲜的虾你就会对这种哲学
深有体会),换种说法就是轮询线程若是查询到一次I/O事件就应该直接处理它
(想起引子了吗)
P.S. 关键代码
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill
之因此这样作的缘由是由于切换线程是一件比较费时操做(相对来讲),所以在这种策略下轮询线程A若是获取到一个事件会有如下策略
标志为非阻塞任务
,那么线程A会当即执行
此任务若是任务阻塞类型未知或者被标记为阻塞状态
若是线程池中的线程都处于繁忙
状态,则将其提交到线程池种等待执行
若是线程池种有空闲线程B,则尝试将线程A负责轮询功
能交给线程B,若是当即获取到线程B
成功,则线程A会直接执行获取到的任务, 任务执行完成后,线程A会尝试夺回
交给线程B的轮询任务,若是夺回失败则变为空闲线程等待分配任务。(想起引子了吗?)
除此以外,线程A还会尝试直接执行任务而且不会交出轮询工做 (代码太长,只摘出关键代码)
case BLOCKING:
synchronized (this)
{
if (_pending)
{
//轮询工做陷入了停滞,所以是IDLE状态
_state = State.IDLE;
mode = Mode.EXECUTE_PRODUCE_CONSUME;
}
//tryExecute 若是当即分配到了线程则返回true
//this的run方法也就是实现轮询线程核心的方法
//所以此行代码至关于将轮询的工做转移给了其余线程
else if (_tryExecutor.tryExecute(this))
{
_pending = true;
//因为轮询工做的转移
//所以当前轮询工做至关于陷入空闲状态
//因此须要将此对象的状态至为IDLE
//(轮询线程和当前线程使用同一个对象)
_state = State.IDLE;
mode = Mode.EXECUTE_PRODUCE_CONSUME;
}else
{
//前二者均不知足则将任务提交到线程池
mode = Mode.PRODUCE_EXECUTE_CONSUME;
}
}
break;
复制代码
任务的执行策略
case EXECUTE_PRODUCE_CONSUME:
_epcMode.increment();
//直接在当前线程调用
runTask(task);
// 尝试夺回轮询任务
synchronized (this)
{
// 若是State还处于空闲状态
// 说明
// 线程B还未开始执行轮询任务,能够直接夺回
// 若是线程B已经开始轮询
// 则选择离开
if (_state == State.IDLE)
{
// 返回true则继续轮询
return true;
}
}
//返回false则结束轮询任务,变为空闲线程
return false;
复制代码
相较于循规蹈矩的Tomcat,Jetty的设计更为激进,更富有冒险主义者的精神,从我的角度来讲更喜欢Jetty的设计,但从业务的角度来讲仍是选择Tomcat较为稳妥毕竟稳定是业务的基本需求,而且Tomcat的性能也不会太差。
以线程的类别来进行划分的话, Jetty的NIO模型以下图所示
Acceptor
线程负责接收来自客户端的新链接,并将其封装成一个事件提交给轮询线程处理轮询线程
轮询线程处理负责轮询I/O事件以外,还须要处理外部线程所提交的selector
更新任务,而且根据设定的执行策略,轮询线程可能会在本线程直接执行I/O任务,并将轮询任务移交给其余空闲的线程,或者选择一个空闲的线程来执行I/O操做I/O线程
主要负责处理I/O操做从线程类别的角度来看Jetty的NIO模型相对简单,但其引入的轮询线程执行策略使得线程之间身份能够发生转变, 得益于此Jetty能够直接轮询线程直接执行I/O任务减小了线程上下文切换所带来的性能消耗,提高了性能。
切换线程是有成本的 Jetty经过直接在轮询线程执行I/O任务来提高性能,来减小线程上下文的切换,除此以外,咱们还能够实现协程的机制来减小线程上下文切换所带来的成本(参考Go语言)
Acceptor线程应适量 若是将ServerSocket设置为阻塞模式,那么accept操做将致使线程陷入阻塞,从accept方法返回时将引发线程上下的切换,所以并非越多越好
咱们使用SpringBoot来Debug Jetty,所以咱们须要在pom.xml
中引入Jetty,因为SpringBoot默认使用Tomcat所以咱们须要将其替换掉,依赖以下所示.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
</dependency>
复制代码
使用的SpringBoot版本是2.2.0其所依赖的Jetty版本号是9.4.20
若是你要了解Connector是如何工做的请关注如下类 org.eclipse.jetty.server.ServerConnector
若是你想要了解Jetty NIO 如何轮询以及处理事件,那么请关注如下类 org.eclipse.jetty.io.ManagedSelector
并在其内部类 SelectorProducer
的produce
方法打上断点,以下图所示,你将了解到整个轮询过程当中都发生了什么
右键小红点,选择Thread,以免进入不了断点的状况,毕竟咱们调试的是多线程程序
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill