在上一节中咱们描述了Tomcat的总体架构,咱们知道了Tomcat分为两个大组件,一个链接器和一个容器。而咱们此次要讲的EndPoint
的组件就是属于链接器里面的。它是一个通讯的端点,就是负责对外实现TCP/IP协议。EndPoint
是个接口,它的具体实现类就是AbstractEndpoint
,而AbstractEndpoint
具体的实现类就有AprEndpoint
、Nio2Endpoint
、NioEndpoint
。java
AprEndpoint
:对应的是APR模式,简单理解就是从操做系统级别解决异步IO的问题,大幅度提升服务器的处理和响应性能。可是启用这种模式须要安装一些其余的依赖库。Nio2Endpoint
:利用代码来实现异步IONioEndpoint
:利用了JAVA的NIO实现了非阻塞IO,Tomcat默认启动是以这个来启动的,而这个也是咱们的讲述重点。咱们知道NioEndpoint
的原理仍是对于Linux的多路复用器的使用,而在多路复用器中简单来讲就两个步骤。编程
而NioEndpoint
为了实现上面这两步,用了五个组件来。这五个组件是LimitLatch
、Acceptor
、Poller
、SocketProcessor
、Executor
bash
/**
* Threads used to accept new connections and pass them to worker threads.
*/
protected List<Acceptor<U>> acceptors;
/**
* counter for nr of connections handled by an endpoint
*/
private volatile LimitLatch connectionLimitLatch = null;
/**
* The socket pollers.
*/
private Poller[] pollers = null;
内部类
SocketProcessor
/**
* External Executor based thread pool.
*/
private Executor executor = null;
复制代码
咱们能够看到在代码中定义的这五个组件。具体这五个组件是干吗的呢?服务器
LimitLatch
:链接控制器,负责控制最大的链接数Acceptor
:负责接收新的链接,而后返回一个Channel
对象给Poller
Poller
:能够将其当作是NIO中Selector
,负责监控Channel
的状态SocketProcessor
:能够当作是一个被封装的任务类Executor
:Tomcat本身扩展的线程池,用来执行任务类用图简单表示就是如下的关系架构
接下来咱们就来分别的看一下每一个组件里面关键的代码app
咱们上面说了LimitLatch
主要是用来控制Tomcat所能接收的最大数量链接,若是超过了此链接,那么Tomcat就会将此链接线程阻塞等待,等里面有其余链接释放了再消费此链接。那么LimitLatch
是如何作到呢?咱们能够看LimitLatch
这个类框架
public class LimitLatch {
private static final Log log = LogFactory.getLog(LimitLatch.class);
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
public Sync() {
}
@Override
protected int tryAcquireShared(int ignored) {
long newCount = count.incrementAndGet();
if (!released && newCount > limit) {
// Limit exceeded
count.decrementAndGet();
return -1;
} else {
return 1;
}
}
@Override
protected boolean tryReleaseShared(int arg) {
count.decrementAndGet();
return true;
}
}
private final Sync sync;
//当前链接数
private final AtomicLong count;
//最大链接数
private volatile long limit;
private volatile boolean released = false;
}
复制代码
咱们能够看到它内部实现了AbstractQueuedSynchronizer
,AQS其实就是一个框架,实现它的类能够自定义控制线程何时挂起何时释放。limit
参数就是控制的最大链接数。咱们能够看到AbstractEndpoint
调用LimitLatch
的countUpOrAwait
方法来判断是否能获取链接。异步
public void countUpOrAwait() throws InterruptedException {
if (log.isDebugEnabled()) {
log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
}
sync.acquireSharedInterruptibly(1);
}
复制代码
AQS是如何知道何时阻塞线程呢?即不能获取链接呢?这些就靠用户本身实现AbstractQueuedSynchronizer
本身来定义何时获取链接,何时释放链接了。能够看到Sync类重写了tryAcquireShared
和tryReleaseShared
方法。在tryAcquireShared
方法中定义了一旦当前链接数大于了设置的最大链接数,那么就会返回-1
表示将此线程放入AQS队列中等待。socket
Acceptor
是接收链接的,咱们能够看到Acceptor
实现了Runnable
接口,那么在哪会新开启线程来执行Acceptor
的run方法呢?在AbstractEndpoint
的startAcceptorThreads
方法中。ide
protected void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
Acceptor<U> acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor-" + i;
acceptor.setThreadName(threadName);
acceptors.add(acceptor);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
复制代码
能够看到这里能够设置开启几个Acceptor
,默认是一个。而一个端口只能对应一个ServerSocketChannel
,那么这个ServerSocketChannel
在哪初始化呢?咱们能够看到在Acceptor<U> acceptor = new Acceptor<>(this);
这句话中传入了this进去,那么应该是由Endpoint
组件初始化的链接。在NioEndpoint
的initServerSocket
方法中初始化了链接。
// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket() throws Exception {
if (!getUseInheritedChannel()) {
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
serverSock.socket().bind(addr,getAcceptCount());
} else {
// Retrieve the channel provided by the OS
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
}
serverSock.configureBlocking(true); //mimic APR behavior
}
复制代码
这里面咱们可以看到两点
ServerSocketChannel
被设置成了阻塞的模式,也就是说是以阻塞方式接受链接的。或许会有疑问。在平时的NIO编程中Channel不是都要设置成非阻塞模式吗?这里解释一下,若是是设置成非阻塞模式那么就必须设置一个Selector
不断的轮询,可是接受链接只须要阻塞一个通道便可。这里须要注意一点,每一个Acceptor
在生成PollerEvent
对象放入Poller
队列中时都是随机取出Poller
对象的,具体代码能够看以下,因此Poller
中的Queue
对象设置成了SynchronizedQueue<PollerEvent>
,由于可能有多个Acceptor
同时向此Poller
的队列中放入PollerEvent
对象。
public Poller getPoller0() {
if (pollerThreadCount == 1) {
return pollers[0];
} else {
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}
}
复制代码
什么是操做系统级别的链接呢?在TCP的三次握手中,系统一般会每个LISTEN状态的Socket维护两个队列,一个是半链接队列(SYN):这些链接已经收到客户端SYN;另外一个是全链接队列(ACCEPT):这些连接已经收到客户端的ACK,完成了三次握手,等待被应用调用accept方法取走使用。
全部的Acceptor
共用这一个链接,在Acceptor
的run
方法中,放一些重要的代码。
@Override
public void run() {
// Loop until we receive a shutdown command
while (endpoint.isRunning()) {
try {
//若是到了最大链接数,线程等待
endpoint.countUpOrAwaitConnection();
U socket = null;
try {
//调用accept方法得到一个链接
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// 出异常之后当前链接数减掉1
endpoint.countDownConnection();
}
// 配置Socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
}
复制代码
里面咱们能够获得两点
LimitLatch
组件判断的。endpoint.setSocketOptions(socket)
这段代码protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
// 设置Socket为非阻塞模式,供Poller调用
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
NioChannel channel = null;
if (nioChannels != null) {
channel = nioChannels.pop();
}
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
//注册ChannelEvent,实际上是将ChannelEvent放入到队列中,而后Poller从队列中取
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
复制代码
其实里面重要的就是将Acceptor
与一个Poller
绑定起来,而后两个组件经过队列通讯,每一个Poller都维护着一个SynchronizedQueue
队列,ChannelEvent
放入到队列中,而后Poller
从队列中取出事件进行消费。
咱们能够看到Poller
是NioEndpoint
的内部类,而它也是实现了Runnable
接口,能够看到在其类中维护了一个Quene和Selector,定义以下。因此本质上Poller
就是Selector
。
private Selector selector;
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
复制代码
重点在其run方法中,这里删减了一些代码,只展现重要的。
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
//查看是否有链接进来,若是有就将Channel注册进Selector中
hasEvents = events();
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
continue;
}
if (keyCount == 0) {
hasEvents = (hasEvents | events());
}
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (socketWrapper == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, socketWrapper);
}
}
// Process timeouts
timeout(keyCount,hasEvents);
}
getStopLatch().countDown();
}
复制代码
其中主要的就是调用了events()
方法,就是不断的查看队列中是否有Pollerevent
事件,若是有的话就将其取出而后把里面的Channel
取出来注册到该Selector
中,而后不断轮询全部注册过的Channel
查看是否有事件发生。
咱们知道Poller
在轮询Channel
有事件发生时,就会调用将此事件封装起来,而后交给线程池去执行。那么这个包装类就是SocketProcessor
。而咱们打开此类,可以看到它也实现了Runnable
接口,用来定义线程池Executor
中线程所执行的任务。那么这里是如何将Channel
中的字节流转换为Tomcat须要的ServletRequest
对象呢?其实就是调用了Http11Processor
来进行字节流与对象的转换的。
Executor
实际上是Tomcat定制版的线程池。咱们能够看它的类的定义,能够发现它实际上是扩展了Java的线程池。
public interface Executor extends java.util.concurrent.Executor, Lifecycle
复制代码
在线程池中最重要的两个参数就是核心线程数和最大线程数,正常的Java线程池的执行流程是这样的。
可是在Tomcat自定义的线程池中是不同的,经过重写了execute
方法实现了本身的任务处理逻辑。
差异就在于第四步的差异,原生线程池的处理策略是只要当前线程数大于最大线程数,那么就抛异常,而Tomcat的则是若是当前线程数大于最大线程数,就再尝试一次,若是仍是满的才会抛异常。下面是定制化线程池execute
的执行逻辑。
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
//得到任务队列
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
复制代码
在代码中,咱们能够看到有这么一句submittedCount.incrementAndGet();
,为何会有这句呢?咱们能够看看这个参数的定义。简单来讲这个参数就是定义了任务已经提交到了线程池中,可是尚未执行的任务个数。
/**
* The number of tasks submitted but not yet finished. This includes tasks
* in the queue and tasks that have been handed to a worker thread but the
* latter did not start executing the task yet.
* This number is always greater or equal to {@link #getActiveCount()}.
*/
private final AtomicInteger submittedCount = new AtomicInteger(0);
复制代码
为何会有这么一个参数呢?咱们知道定制的队列是继承了LinkedBlockingQueue
,而LinkedBlockingQueue
队列默认是没有边界的。因而咱们就传入了一个参数,maxQueueSize
给构造的队列。可是在Tomcat的任务队列默认状况下是无限制的,那么这样就会出一个问题,若是当前线程达到了核心线程数,则开始向队列中添加任务,那么就会一直是添加成功的。那么就不会再建立新的线程。那么在什么状况下要新建线程呢?
线程池中建立新线程会有两个地方,一个是小于核心线程时,来一个任务建立一个线程。另外一个是超过核心线程而且任务队列已满,则会建立临时线程。
那么如何规定任务队列是否已满呢?若是设置了队列的最大长度固然好了,可是Tomcat默认状况下是没有设置,因此默认是无限的。因此Tomcat的TaskQueue
继承了LinkedBlockingQueue
,重写了offer
方法,在里面定义了何时返回false。
@Override
public boolean offer(Runnable o) {
if (parent==null) return super.offer(o);
//若是当前线程数等于最大线程数,此时不能建立新线程,只能添加进任务队列中
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//若是已提交可是未完成的任务数小于等于当前线程数,说明能处理过来,就放入队列中
if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
//到这一步说明,已提交可是未完成的任务数大于当前线程数,若是当前线程数小于最大线程数,就返回false新建线程
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
return super.offer(o);
}
复制代码
这就是submittedCount
的意义,目的就是为了在任务队列长度无限的状况下,让线程池有机会建立新的线程。
上面的知识有部分是看着李号双老师的深刻拆解Tomcat总结的,又结合着源码深刻了解了一下,当时刚看文章的时候以为本身都懂了,可是再深刻源码的时候又会发现本身不懂。因此知识若是只是看了而不运用,那么知识永远都不会是本身的。经过Tomcat链接器这一小块的源码学习,除了一些经常使用知识的实际运用,例如AQS、锁的应用、自定义线程池须要考虑的点、NIO的应用等等。还有整体上的设计思惟的学习,模块化设计,和现在的微服务感受很类似,将一个功能点内部分为多种模块,这样不管是在之后替换或者是升级时都能游刃有余。