Http持久链接与HttpClient链接池

Tips:关注公众号:松花皮蛋的黑板报,领取程序员月薪25K+秘籍,进军BAT必备!程序员

1、背景

HTTP协议是无状态的协议,即每一次请求都是互相独立的。所以它的最初实现是,每个http请求都会打开一个tcp socket链接,当交互完毕后会关闭这个链接。浏览器

HTTP协议是全双工的协议,因此创建链接与断开链接是要通过三次握手与四次挥手的。显然在这种设计中,每次发送Http请求都会消耗不少的额外资源,即链接的创建与销毁。安全

因而,HTTP协议的也进行了发展,经过持久链接的方法来进行socket链接复用。bash


从图中能够看到:服务器

  1. 在串行链接中,每次交互都要打开关闭链接
  2. 在持久链接中,第一次交互会打开链接,交互结束后链接并不关闭,下次交互就省去了创建链接的过程。

持久链接的实现有两种:HTTP/1.0+的keep-alive与HTTP/1.1的持久链接。微信

2、HTTP/1.0+的Keep-Alive

从1996年开始,不少HTTP/1.0浏览器与服务器都对协议进行了扩展,那就是“keep-alive”扩展协议。app

注意,这个扩展协议是做为1.0的补充的“实验型持久链接”出现的。keep-alive已经再也不使用了,最新的HTTP/1.1规范中也没有对它进行说明,只是不少应用延续了下来。less

使用HTTP/1.0的客户端在首部中加上"Connection:Keep-Alive",请求服务端将一条链接保持在打开状态。服务端若是愿意将这条链接保持在打开状态,就会在响应中包含一样的首部。若是响应中没有包含"Connection:Keep-Alive"首部,则客户端会认为服务端不支持keep-alive,会在发送完响应报文以后关闭掉当前链接。异步

经过keep-alive补充协议,客户端与服务器之间完成了持久链接,然而仍然存在着一些问题:socket

  • 在HTTP/1.0中keep-alive不是标准协议,客户端必须发送Connection:Keep-Alive来激活keep-alive链接。
  • 代理服务器可能没法支持keep-alive,由于一些代理是"盲中继",没法理解首部的含义,只是将首部逐跳转发。因此可能形成客户端与服务端都保持了链接,可是代理不接受该链接上的数据。

3、HTTP/1.1的持久链接

HTTP/1.1采起持久链接的方式替代了Keep-Alive。

HTTP/1.1的链接默认状况下都是持久链接。若是要显式关闭,须要在报文中加上Connection:Close首部。即在HTTP/1.1中,全部的链接都进行了复用。

然而如同Keep-Alive同样,空闲的持久链接也能够随时被客户端与服务端关闭。不发送Connection:Close不意味着服务器承诺链接永远保持打开。

4、HttpClient如何生成持久链接

HttpClien中使用了链接池来管理持有链接,同一条TCP链路上,链接是能够复用的。HttpClient经过链接池的方式进行链接持久化。

其实“池”技术是一种通用的设计,其设计思想并不复杂:

  1. 当有链接第一次使用的时候创建链接
  2. 结束时对应链接不关闭,归还到池中
  3. 下次同个目的的链接可从池中获取一个可用链接
  4. 按期清理过时链接

全部的链接池都是这个思路,不过咱们看HttpClient源码主要关注两点:

  • 链接池的具体设计方案,以供之后自定义链接池参考
  • 如何与HTTP协议对应上,即理论抽象转为代码的实现

4.1 HttpClient链接池的实现

HttpClient关于持久链接的处理在下面的代码中能够集中体现,下面从MainClientExec摘取了和链接池相关的部分,去掉了其余部分:

public class MainClientExec implements ClientExecChain {

    @Override
    public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException, HttpException {
     //从链接管理器HttpClientConnectionManager中获取一个链接请求ConnectionRequest
        final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);final HttpClientConnection managedConn;
        final int timeout = config.getConnectionRequestTimeout();        //从链接请求ConnectionRequest中获取一个被管理的链接HttpClientConnection
        managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
     //将链接管理器HttpClientConnectionManager与被管理的链接HttpClientConnection交给一个ConnectionHolder持有
        final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
        try {
            HttpResponse response;
            if (!managedConn.isOpen()) {          //若是当前被管理的链接不是出于打开状态,须要从新创建链接
                establishRoute(proxyAuthState, managedConn, route, request, context);
            }
       //经过链接HttpClientConnection发送请求
            response = requestExecutor.execute(request, managedConn, context);
       //经过链接重用策略判断是否链接可重用         
            if (reuseStrategy.keepAlive(response, context)) {
                //得到链接有效期
                final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                //设置链接有效期
                connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);          //将当前链接标记为可重用状态
                connHolder.markReusable();
            } else {
                connHolder.markNonReusable();
            }
        }
        final HttpEntity entity = response.getEntity();
        if (entity == null || !entity.isStreaming()) {
            //将当前链接释放到池中,供下次调用
            connHolder.releaseConnection();
            return new HttpResponseProxy(response, null);
        } else {
            return new HttpResponseProxy(response, connHolder);
        }
}复制代码

这里看到了在Http请求过程当中对链接的处理是和协议规范是一致的,这里要展开讲一下具体实现。

PoolingHttpClientConnectionManager是HttpClient默认的链接管理器,首先经过requestConnection()得到一个链接的请求,注意这里不是链接。

public ConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {final Future<CPoolEntry> future = this.pool.lease(route, state, null);
        return new ConnectionRequest() {
            @Override
            public boolean cancel() {
                return future.cancel(true);
            }
            @Override
            public HttpClientConnection get(
                    final long timeout,
                    final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
                final HttpClientConnection conn = leaseConnection(future, timeout, tunit);
                if (conn.isOpen()) {
                    final HttpHost host;
                    if (route.getProxyHost() != null) {
                        host = route.getProxyHost();
                    } else {
                        host = route.getTargetHost();
                    }
                    final SocketConfig socketConfig = resolveSocketConfig(host);
                    conn.setSocketTimeout(socketConfig.getSoTimeout());
                }
                return conn;
            }
        };
    }复制代码

能够看到返回的ConnectionRequest对象其实是一个持有了Future<CPoolEntry>,CPoolEntry是被链接池管理的真正链接实例。

从上面的代码咱们应该关注的是:

  • Future<CPoolEntry> future = this.pool.lease(route, state, null)
    •   如何从链接池CPool中得到一个异步的链接,Future<CPoolEntry>
  • HttpClientConnection conn = leaseConnection(future, timeout, tunit)
    •   如何经过异步链接Future<CPoolEntry>得到一个真正的链接HttpClientConnection

4.2 Future

看一下CPool是如何释放一个Future<CPoolEntry>的,AbstractConnPool核心代码以下:

private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit tunit,
            final Future<E> future) throws IOException, InterruptedException, TimeoutException {
     //首先对当前链接池加锁,当前锁是可重入锁ReentrantLockthis.lock.lock();
        try {        //得到一个当前HttpRoute对应的链接池,对于HttpClient的链接池而言,总池有个大小,每一个route对应的链接也是个池,因此是“池中池”
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            E entry;
            for (;;) {
                Asserts.check(!this.isShutDown, "Connection pool shut down");          //死循环得到链接
                for (;;) {            //从route对应的池中拿链接,多是null,也多是有效链接
                    entry = pool.getFree(state);            //若是拿到null,就退出循环
                    if (entry == null) {
                        break;
                    }            //若是拿到过时链接或者已关闭链接,就释放资源,继续循环获取
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    }
                    if (entry.isClosed()) {
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {              //若是拿到有效链接就退出循环
                        break;
                    }
                }          //拿到有效链接就退出
                if (entry != null) {
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }
          //到这里证实没有拿到有效链接,须要本身生成一个                
                final int maxPerRoute = getMax(route);
                //每一个route对应的链接最大数量是可配置的,若是超过了,就须要经过LRU清理掉一些链接
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }
          //当前route池中的链接数,没有达到上线
                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);            //判断链接池是否超过上线,若是超过了,须要经过LRU清理掉一些链接
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();               //若是空闲链接数已经大于剩余可用空间,则须要清理下空闲链接
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }              //根据route创建一个链接
                        final C conn = this.connFactory.create(route);              //将这个链接放入route对应的“小池”中
                        entry = pool.add(conn);              //将这个链接放入“大池”中
                        this.leased.add(entry);
                        return entry;
                    }
                }
         //到这里证实没有从得到route池中得到有效链接,而且想要本身创建链接时当前route链接池已经到达最大值,即已经有链接在使用,可是对当前线程不可用
                boolean success = false;
                try {
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }            //将future放入route池中等待
                    pool.queue(future);            //将future放入大链接池中等待
                    this.pending.add(future);            //若是等待到了信号量的通知,success为true
                    if (deadline != null) {
                        success = this.condition.awaitUntil(deadline);
                    } else {
                        this.condition.await();
                        success = true;
                    }
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }
                } finally {
                    //从等待队列中移除
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                //若是没有等到信号量通知而且当前时间已经超时,则退出循环
                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
                    break;
                }
            }       //最终也没有等到信号量通知,没有拿到可用链接,则抛异常
            throw new TimeoutException("Timeout waiting for connection");
        } finally {       //释放对大链接池的锁
            this.lock.unlock();
        }
    }复制代码

上面的代码逻辑有几个重要点:

  • 链接池有个最大链接数,每一个route对应一个小链接池,也有个最大链接数
  • 不管是大链接池仍是小链接池,当超过数量的时候,都要经过LRU释放一些链接
  • 若是拿到了可用链接,则返回给上层使用
  • 若是没有拿到可用链接,HttpClient会判断当前route链接池是否已经超过了最大数量,没有到上限就会新建一个链接,并放入池中
  • 若是到达了上限,就排队等待,等到了信号量,就从新得到一次,等待不到就抛超时异常
  • 经过线程池获取链接要经过ReetrantLock加锁,保证线程安全

到这里为止,程序已经拿到了一个可用的CPoolEntry实例,或者抛异常终止了程序。

4.3 HttpClientConnection

protected HttpClientConnection leaseConnection(
            final Future<CPoolEntry> future,
            final long timeout,
            final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
        final CPoolEntry entry;
        try {       //从异步操做Future<CPoolEntry>中得到CPoolEntry
            entry = future.get(timeout, tunit);
            if (entry == null || future.isCancelled()) {
                throw new InterruptedException();
            }
            Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
            if (this.log.isDebugEnabled()) {
                this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
            }       //得到一个CPoolEntry的代理对象,对其操做都是使用同一个底层的HttpClientConnection
            return CPoolProxy.newProxy(entry);
        } catch (final TimeoutException ex) {
            throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
        }
    }复制代码

5、HttpClient如何复用持久链接?

在上一章中,咱们看到了HttpClient经过链接池来得到链接,当须要使用链接的时候从池中得到。

对应着第三章的问题:

  1. 当有链接第一次使用的时候创建链接
  2. 结束时对应链接不关闭,归还到池中
  3. 下次同个目的的链接可从池中获取一个可用链接
  4. 按期清理过时链接

咱们在第四章中看到了HttpClient是如何处理一、3的问题的,那么第2个问题是怎么处理的呢?

即HttpClient如何判断一个链接在使用完毕后是要关闭,仍是要放入池中供他人复用?再看一下MainClientExec的代码


          //发送Http链接                response = requestExecutor.execute(request, managedConn, context);
                //根据重用策略判断当前链接是否要复用
                if (reuseStrategy.keepAlive(response, context)) {
                    //须要复用的链接,获取链接超时时间,以response中的timeout为准
                    final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                    if (this.log.isDebugEnabled()) {
                        final String s;               //timeout的是毫秒数,若是没有设置则为-1,即没有超时时间
                        if (duration > 0) {
                            s = "for " + duration + " " + TimeUnit.MILLISECONDS;
                        } else {
                            s = "indefinitely";
                        }
                        this.log.debug("Connection can be kept alive " + s);
                    }            //设置超时时间,当请求结束时链接管理器会根据超时时间决定是关闭仍是放回到池中
                    connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
                    //将链接标记为可重用            connHolder.markReusable();
                } else {            //将链接标记为不可重用
                    connHolder.markNonReusable();
                }复制代码

能够看到,当使用链接发生过请求以后,有链接重试策略来决定该链接是否要重用,若是要重用就会在结束后交给HttpClientConnectionManager放入池中。

那么链接复用策略的逻辑是怎么样的呢?

public class DefaultClientConnectionReuseStrategy extends DefaultConnectionReuseStrategy {

    public static final DefaultClientConnectionReuseStrategy INSTANCE = new DefaultClientConnectionReuseStrategy();

    @Override
    public boolean keepAlive(final HttpResponse response, final HttpContext context) {
     //从上下文中拿到request
        final HttpRequest request = (HttpRequest) context.getAttribute(HttpCoreContext.HTTP_REQUEST);
        if (request != null) {       //得到Connection的Header
            final Header[] connHeaders = request.getHeaders(HttpHeaders.CONNECTION);
            if (connHeaders.length != 0) {
                final TokenIterator ti = new BasicTokenIterator(new BasicHeaderIterator(connHeaders, null));
                while (ti.hasNext()) {
                    final String token = ti.nextToken();            //若是包含Connection:Close首部,则表明请求不打算保持链接,会忽略response的意愿,该头部这是HTTP/1.1的规范
                    if (HTTP.CONN_CLOSE.equalsIgnoreCase(token)) {
                        return false;
                    }
                }
            }
        }     //使用父类的的复用策略
        return super.keepAlive(response, context);
    }

}复制代码

看一下父类的复用策略

if (canResponseHaveBody(request, response)) {
                final Header[] clhs = response.getHeaders(HTTP.CONTENT_LEN);
                //若是reponse的Content-Length没有正确设置,则不复用链接          //由于对于持久化链接,两次传输之间不须要从新创建链接,则须要根据Content-Length确认内容属于哪次请求,以正确处理“粘包”现象                //因此,没有正确设置Content-Length的response链接不能复用
                if (clhs.length == 1) {
                    final Header clh = clhs[0];
                    try {
                        final int contentLen = Integer.parseInt(clh.getValue());
                        if (contentLen < 0) {
                            return false;
                        }
                    } catch (final NumberFormatException ex) {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        if (headerIterator.hasNext()) {
            try {
                final TokenIterator ti = new BasicTokenIterator(headerIterator);
                boolean keepalive = false;
                while (ti.hasNext()) {
                    final String token = ti.nextToken();            //若是response有Connection:Close首部,则明确表示要关闭,则不复用
                    if (HTTP.CONN_CLOSE.equalsIgnoreCase(token)) {
                        return false;            //若是response有Connection:Keep-Alive首部,则明确表示要持久化,则复用
                    } else if (HTTP.CONN_KEEP_ALIVE.equalsIgnoreCase(token)) {
                        keepalive = true;
                    }
                }
                if (keepalive) {
                    return true;
                }
            } catch (final ParseException px) {
                return false;
            }
        }
     //若是response中没有相关的Connection首部说明,则高于HTTP/1.0版本的都复用链接  
        return !ver.lessEquals(HttpVersion.HTTP_1_0);复制代码

总结一下:

  • 若是request首部中包含Connection:Close,不复用
  • 若是response中Content-Length长度设置不正确,不复用
  • 若是response首部包含Connection:Close,不复用
  • 若是reponse首部包含Connection:Keep-Alive,复用
  • 都没命中的状况下,若是HTTP版本高于1.0则复用

从代码中能够看到,其实现策略与咱们第2、三章协议层的约束是一致的。

6、HttpClient如何清理过时链接

在HttpClient4.4版本以前,在从链接池中获取重用链接的时候会检查下是否过时,过时则清理。

以后的版本则不一样,会有一个单独的线程来扫描链接池中的链接,发现有离最近一次使用超过设置的时间后,就会清理。默认的超时时间是2秒钟。

public CloseableHttpClient build() {            //若是指定了要清理过时链接与空闲链接,才会启动清理线程,默认是不启动的
            if (evictExpiredConnections || evictIdleConnections) {          //创造一个链接池的清理线程
                final IdleConnectionEvictor connectionEvictor = new IdleConnectionEvictor(cm,
                        maxIdleTime > 0 ? maxIdleTime : 10, maxIdleTimeUnit != null ? maxIdleTimeUnit : TimeUnit.SECONDS,
                        maxIdleTime, maxIdleTimeUnit);
                closeablesCopy.add(new Closeable() {
                    @Override
                    public void close() throws IOException {
                        connectionEvictor.shutdown();
                        try {
                            connectionEvictor.awaitTermination(1L, TimeUnit.SECONDS);
                        } catch (final InterruptedException interrupted) {
                            Thread.currentThread().interrupt();
                        }
                    }

                });          //执行该清理线程
                connectionEvictor.start();
}复制代码

能够看到在HttpClientBuilder进行build的时候,若是指定了开启清理功能,会建立一个链接池清理线程并运行它。

public IdleConnectionEvictor(
            final HttpClientConnectionManager connectionManager,
            final ThreadFactory threadFactory,
            final long sleepTime, final TimeUnit sleepTimeUnit,
            final long maxIdleTime, final TimeUnit maxIdleTimeUnit) {
        this.connectionManager = Args.notNull(connectionManager, "Connection manager");
        this.threadFactory = threadFactory != null ? threadFactory : new DefaultThreadFactory();
        this.sleepTimeMs = sleepTimeUnit != null ? sleepTimeUnit.toMillis(sleepTime) : sleepTime;
        this.maxIdleTimeMs = maxIdleTimeUnit != null ? maxIdleTimeUnit.toMillis(maxIdleTime) : maxIdleTime;
        this.thread = this.threadFactory.newThread(new Runnable() {
            @Override
            public void run() {
                try {            //死循环,线程一直执行
                    while (!Thread.currentThread().isInterrupted()) {              //休息若干秒后执行,默认10秒
                        Thread.sleep(sleepTimeMs);               //清理过时链接
                        connectionManager.closeExpiredConnections();               //若是指定了最大空闲时间,则清理空闲链接
                        if (maxIdleTimeMs > 0) {
                            connectionManager.closeIdleConnections(maxIdleTimeMs, TimeUnit.MILLISECONDS);
                        }
                    }
                } catch (final Exception ex) {
                    exception = ex;
                }

            }
        });
    }复制代码

总结一下:

  • 只有在HttpClientBuilder手动设置后,才会开启清理过时与空闲链接
  • 手动设置后,会启动一个线程死循环执行,每次执行sleep必定时间,调用HttpClientConnectionManager的清理方法清理过时与空闲链接。

7、本文总结

  • HTTP协议经过持久链接的方式,减轻了早期设计中的过多链接问题
  • 持久链接有两种方式:HTTP/1.0+的Keep-Avlive与HTTP/1.1的默认持久链接
  • HttpClient经过链接池来管理持久链接,链接池分为两个,一个是总链接池,一个是每一个route对应的链接池
  • HttpClient经过异步的Future<CPoolEntry>来获取一个池化的链接
  • 默认链接重用策略与HTTP协议约束一致,根据response先判断Connection:Close则关闭,在判断Connection:Keep-Alive则开启,最后版本大于1.0则开启
  • 只有在HttpClientBuilder中手动开启了清理过时与空闲链接的开关后,才会清理链接池中的链接
  • HttpClient4.4以后的版本经过一个死循环线程清理过时与空闲链接,该线程每次执行都sleep一会,以达到按期执行的效果

上面的研究是基于HttpClient源码的我的理解,若是有误,但愿你们积极留言讨论。


文章来源:www.liangsonghua.me

关注微信公众号:松花皮蛋的黑板报,获取更多精彩!

公众号介绍:分享在京东工做的技术感悟,还有JAVA技术和业内最佳实践,大部分都是务实的、能看懂的、可复现的