前言
上篇文章讲到了 ProtocolHandler 及其默认实现类 Http11NioProtocol,在 Http11NioProtocol 的构造方法中建立了一个 NioEndpoint 对象,而且在 Http11NioProtocol 的 init 和 start 方法中最重要的步骤是调用这个 NioEndpoint 对象的 init 和 start 方法。NioEndpoint 继承自 AbstractJsseEndpoint,而 AbstractJsseEndpoint 继承自 AbstractEndpoint。tomcat
1. AbstractEndpoint#init
NioEndpoint 的 init 方法在起父类的父类 AbstractEndpoint 里。dom
private boolean bindOnInit = true; public final void init() throws Exception { if (bindOnInit) { bindWithCleanup(); bindState = BindState.BOUND_ON_INIT; } if (this.domain != null) { // Register endpoint (as ThreadPool - historical name) oname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\""); Registry.getRegistry(null, null).registerComponent(this, oname, null); ObjectName socketPropertiesOname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\",subType=SocketProperties"); socketProperties.setObjectName(socketPropertiesOname); Registry.getRegistry(null, null).registerComponent(socketProperties, socketPropertiesOname, null); for (SSLHostConfig sslHostConfig : findSslHostConfigs()) { registerJmx(sslHostConfig); } } }
private void bindWithCleanup() throws Exception { try { bind(); } catch (Throwable t) { // Ensure open sockets etc. are cleaned up if something goes // wrong during bind ExceptionUtils.handleThrowable(t); unbind(); throw t; } }
在 init 方法里,首先调用了 bindWithCleanup() 方法,而后根据须要的 SSLHostConfig 调用了 registerJmx(sslHostConfig) 方法,registerJmx 方法是将 sslHostConfig 注册到 MBeanServer 中。
bindWithCleanup() 方法里就只是调用了 bind() 方法,bind() 是一个 abstract 方法,其实如今 NioEndpoint 类socket
2. NioEndpoint#bindide
protected int acceptorThreadCount = 1; private int pollerThreadCount = Math.min(2,Runtime.getRuntime().availableProcessors()); /** * Initialize the endpoint. */ @Override public void bind() throws Exception { initServerSocket(); // Initialize thread count defaults for acceptor, poller if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } if (pollerThreadCount <= 0) { //minimum one poller thread pollerThreadCount = 1; } setStopLatch(new CountDownLatch(pollerThreadCount)); // Initialize SSL if needed initialiseSsl(); selectorPool.open(); } protected void setStopLatch(CountDownLatch stopLatch) { this.stopLatch = stopLatch; }
bind 方法中首先调用 initServerSocket 方法,而后初始化呢了 acceptorThreadCount 和 pollerThreadCount 两个属性,这两个属性一个是指 Accepter 线程的个数,另外一个是指 Poller 线程的个数,acceptorThreadCount 默认是 1,pollerThreadCount 默认是
Accepter 和 Poller 线程构成了 tomcat 的线程模型,
再而后建立一个 CountDownLatch 对象并赋值给 stopLatch 属性
接着调用 initialiseSsl() 方法来初始化 ssl 的实现类。
2.1. initialiseSslthis
private String sslImplementationName = null; private SSLImplementation sslImplementation = null; public String getSslImplementationName() { return sslImplementationName; } protected void initialiseSsl() throws Exception { if (isSSLEnabled()) { sslImplementation = SSLImplementation.getInstance(getSslImplementationName()); for (SSLHostConfig sslHostConfig : sslHostConfigs.values()) { sslHostConfig.setConfigType(getSslConfigType()); createSSLContext(sslHostConfig); } // Validate default SSLHostConfigName if (sslHostConfigs.get(getDefaultSSLHostConfigName()) == null) { throw new IllegalArgumentException(sm.getString("endpoint.noSslHostConfig", getDefaultSSLHostConfigName(), getName())); } } }
initialiseSsl() 方法就是建立一个 SSLImplementation 的实现类并赋值给 sslImplementation 属性。
SSLImplementation 是一个抽象类,tomcat 中它的实现类有 JSSEImplementation 和 OpenSSLImplementation。其中 JSSEImplementation 是默认的实现类。
看一看出 bind() 最重要的一步就是调用了 initServerSocket() 方法。线程
2.2 initServerSocketcode
/** * Server socket "pointer". */ private volatile ServerSocketChannel serverSock = null; /** * Allows the server developer to specify the acceptCount (backlog) that * should be used for server sockets. By default, this value * is 100. */ private int acceptCount = 100; public int getAcceptCount() { return acceptCount; } /** * Use System.inheritableChannel to obtain channel from stdin/stdout. */ private boolean useInheritedChannel = false; public boolean getUseInheritedChannel() { return useInheritedChannel; } // Separated out to make it easier for folks that extend NioEndpoint to // implement custom [server]sockets protected void initServerSocket() throws Exception { if (!getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset()); serverSock.socket().bind(addr,getAcceptCount()); } else { // Retrieve the channel provided by the OS Channel ic = System.inheritedChannel(); if (ic instanceof ServerSocketChannel) { serverSock = (ServerSocketChannel) ic; } if (serverSock == null) { throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited")); } } serverSock.configureBlocking(true); //mimic APR behavior }
默认状况下 useInheritedChannel 是 false,所以会走 if 块。if 块里先初始化 serverSock 这个 ServerSocketChannel 类型的属性,而后设置了一些 ServerSocketChannel 的属性。orm
public void setProperties(ServerSocket socket) throws SocketException{ if (rxBufSize != null) socket.setReceiveBufferSize(rxBufSize.intValue()); if (performanceConnectionTime != null && performanceLatency != null && performanceBandwidth != null) socket.setPerformancePreferences( performanceConnectionTime.intValue(), performanceLatency.intValue(), performanceBandwidth.intValue()); if (soReuseAddress != null) socket.setReuseAddress(soReuseAddress.booleanValue()); if (soTimeout != null && soTimeout.intValue() >= 0) socket.setSoTimeout(soTimeout.intValue()); }
socketProperties 在 AbstractEndpoint 里。server
2.3 selectorPool.open()
bind() 方法最后调用了 selectorPool.open() 方法。对象
private NioSelectorPool selectorPool = new NioSelectorPool();
selectorPool 是 NioEndpoint 里的一个属性。
protected NioBlockingSelector blockingSelector; protected volatile Selector SHARED_SELECTOR; public void open() throws IOException { enabled = true; getSharedSelector(); if (SHARED) { blockingSelector = new NioBlockingSelector(); blockingSelector.open(getSharedSelector()); } } protected Selector getSharedSelector() throws IOException { if (SHARED && SHARED_SELECTOR == null) { synchronized ( NioSelectorPool.class ) { if ( SHARED_SELECTOR == null ) { SHARED_SELECTOR = Selector.open(); } } } return SHARED_SELECTOR; }
NioSelectorPool#open 方法里,先初始化了 SHARED_SELECTOR,而后建立了一个 NioBlockingSelector 对象并赋值给 blockingSelector 属性,而后调用了这个对象的 open 方法。NioBlockingSelector 是 tomcat 里定义的类。
protected Selector sharedSelector; protected BlockPoller poller; public void open(Selector selector) { sharedSelector = selector; poller = new BlockPoller(); poller.selector = sharedSelector; poller.setDaemon(true); poller.setName("NioBlockingSelector.BlockPoller-" + threadCounter.incrementAndGet()); poller.start(); }
NioBlockingSelector#open 方法的入参是 NioSelectorPool#open 里的 SHARED_SELECTOR 对象,在 open 方法里把 SHARED_SELECTOR 对象赋值给 sharedSelector 属性,而后建立了一个 BlockPoller 对象,并调用了它的 start 方法,BlockPoller 的父类是 Thread,调用 BlockPoller 的 start 方法实际上是启动一个线程,BlockPoller 重载了 Thread 的 run 方法。
3. NioEndpoint#start
NioEndpoint 的 start 方法在父类 AbstractEndpoint 里,
public final void start() throws Exception { if (bindState == BindState.UNBOUND) { bindWithCleanup(); bindState = BindState.BOUND_ON_START; } startInternal(); }
AbstractEndpoint#start 里只是简单调用了一下 startInternal() 方法,而 NioEndpoint 重载了 startInternal 方法。
/** * Cache for SocketProcessor objects */ protected SynchronizedStack<SocketProcessorBase<S>> processorCache; /** * Cache for poller events */ private SynchronizedStack<PollerEvent> eventCache; /** * Bytebuffer cache, each channel holds a set of buffers (two, except for SSL holds four) */ private SynchronizedStack<NioChannel> nioChannels; /** * Start the NIO endpoint, creating acceptor, poller threads. */ @Override public void startInternal() throws Exception { if (!running) { running = true; paused = false; processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool()); // Create worker collection if ( getExecutor() == null ) { createExecutor(); } initializeConnectionLatch(); // Start poller threads pollers = new Poller[getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); } startAcceptorThreads(); } }
startInternal 方法里先建立了三个 SynchronizedStack 对象分别赋值给 processorCache,eventCache 和 nioChannels,这三个属性都使用来复用的,分别复用 SocketProcessorBase 对象,PollerEvent 对象 和 NioChannel 对象。其中 processorCache 在 AbstractEndpoint 里声明,其余两个在 NioEndpoint 里声明。
而后,调用 createExecutor()。createExecutor 在 AbstractEndpoint 里声明
private Executor executor = null; public Executor getExecutor() { return executor; } public void createExecutor() { internalExecutor = true; TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); }
createExecutor 方法建立了一个线程池而且赋值给 executor 属性。
接着,startInternal 方法 调用了 initializeConnectionLatch 方法,
protected LimitLatch initializeConnectionLatch() { if (maxConnections==-1) return null; if (connectionLimitLatch==null) { connectionLimitLatch = new LimitLatch(getMaxConnections()); } return connectionLimitLatch; }
initializeConnectionLatch 方法初始换了 connectionLimitLatch 属性,这个属性是用来限制 tomcat 的最大链接数的。
再而后,startInternal 建立了 pollerThreadCount 个 Poller 对象和线程,并启动了这些线程,这些线程成为 Poller 线程。Poller 类实现了 Runnable 接口。
最后 startInternal 调用了 startAcceptorThreads() 方法。
protected void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new ArrayList<>(count); for (int i = 0; i < count; i++) { Acceptor<U> acceptor = new Acceptor<>(this); String threadName = getName() + "-Acceptor-" + i; acceptor.setThreadName(threadName); acceptors.add(acceptor); Thread t = new Thread(acceptor, threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } }
startAcceptorThreads 方法里建立了 acceptorThreadCount 个 Accepter 对象和线程,并启动了这些线程,这些线程被称为 Acceptor 线程。Acceptor 跟 Poller 同样,也实现了 Runnable 接口。
Acceptor 线程处理客户端链接,而 Poller 处理这些链接通道上的读写事件。Acceptor 和 Poller 构成了 tomcat 的线程模型,是很是重要的组件,后面的文章会单独讲解,这里先不作讨论。
小结本文介绍了 NioEndpoint 的启动方法 init 和 start。在 init 方法里建立了 ServerSocketChannel 对象(在 NioEndpoint#initServerSocket 方法里)和一个 Selector 对象(在 NioSelectorPool#open 方法里)。在 start 方法里,启动了 Acceptor 线程和 Poller 线程。