从源码的角度分析 OKHttp3 (二) 拦截器的魅力

前言

因为以前项目搭建的是 MVP 架构,由RxJava + Glide + OKHttp + Retrofit 等开源框架组合而成,以前也都是停留在使用层面上,没有深刻的研究,最近打算把它们所有攻下,尚未关注的同窗能够先关注一波,看完这个系列文章,(无论是面试仍是工做中处理问题)相信你都在知道原理的状况下,处理问题更加驾轻就熟。html

Android 图片加载框架 Glide 4.9.0 (一) 从源码的角度分析 Glide 执行流程java

Android 图片加载框架 Glide 4.9.0 (二) 从源码的角度分析 Glide 缓存策略git

从源码的角度分析 Rxjava2 的基本执行流程、线程切换原理github

从源码的角度分析 OKHttp3 (一) 同步、异步执行流程web

从源码的角度分析 OKHttp3 (二) 拦截器的魅力面试

从源码的角度分析 OKHttp3 (三) 缓存策略缓存

从源码的角度分析 Retrofit 网络请求,包含 RxJava + Retrofit + OKhttp 网络请求执行流程服务器

interceptor 拦截器

在上一篇 从源码的角度分析 OKHttp3 (一) 同步、异步执行流程 文章中,最后咱们知道是在 getResponseWithInterceptorChain() 函数中完成了最后的请求与响应,那么内部是怎么完成请求,并把服务端的响应数据回调给调用层,先来看一段代码:cookie

Response getResponseWithInterceptorChain() throws IOException {
    // 构建一个拦截器调用的容器栈
    List<Interceptor> interceptors = new ArrayList<>();
    //配置 OKHttpClient 的时候,以 addInterceptor 方式添加的全局拦截器
    interceptors.addAll(client.interceptors());
    //错误、重定向拦截器
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    //桥接拦截器,桥接应用层与网络层,添加必要的头
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //缓存处理,Last-Modified、ETag、DiskLruCache等
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //链接拦截器
    interceptors.add(new ConnectInterceptor(client));
    //是不是 webSocket
    if (!forWebSocket) {
    //经过okHttpClient.Builder#addNetworkInterceptor()
    //传进来的拦截器只对非网页的请求生效
      interceptors.addAll(client.networkInterceptors());
    }
    //真正访问服务器的拦截器
    interceptors.add(new CallServerInterceptor(forWebSocket));
		
    //真正执行 拦截器的 调用者
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    boolean calledNoMoreExchanges = false;
    try {
      //开始执行
      Response response = chain.proceed(originalRequest);
      //是否取消
      if (transmitter.isCanceled()) {
        //关闭
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }
复制代码

函数中代码很少,但确实精髓所在。经过上面代码跟注释咱们知道网络

  1. 首先建立一个用来装拦截器的容器
  2. 添加全局拦截器跟应用拦截器
  3. 建立 RealInterceptorChain 对象拦截器,并把 拦截器容器、发射器、请求数据等一些配置传入进去
  4. 最后调用 RealInterceptorChainchain.proceed(originalRequest); 函数, 才是真正使 这些拦截器执行起来。

上一篇文章也简单的介绍了拦截器,提到了 责任链模式,可是这个拦截器它是经过 RealInterceptorChain 对象开启了责任链任务的下发,这里感受是否是有点像一个 CEO 在下发任务并一层一层的传递,也有点像 Android 源码中 触摸反馈事件传递,OKHttp 的核心其实就在于拦截器。下面咱们就开始一步一步分析 OKHttp 拦截器的精妙所在。

RealInterceptorChain

经过上一小节对拦截器的介绍,咱们知道最后是在 RealInterceptorChainchain.proceed(originalRequest) 开启执行的拦截任务,下面直接进入源码模式

public final class RealInterceptorChain implements Interceptor.Chain {
	...//省略成员变量属性

  public RealInterceptorChain( List<Interceptor> interceptors, //全部拦截器 Transmitter transmitter,//发射器 @Nullable Exchange exchange, //封装对 OKIO 的请求数据的操做 int index, Request request, Call call, int connectTimeout, int readTimeout, int writeTimeout ){
	...//省略赋值代码
  }
  
  //外部 getResponseWithInterceptorChain 函数中调用
  public Response proceed( Request request, Transmitter transmitter, @Nullable Exchange exchange )throws IOException {
    
    //index 不能超过拦截器容器大小
    if (index >= interceptors.size()) throw new AssertionError();

 		//若是已经存在了一个 request 的请求链接就抛一个异常
    if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
      ...//抛异常代码省略
    }

    // 保证开启调用的惟一性,不然抛一个异常,我的认为这样判断只是使得代码更加健壮,其实这里的 Calls 只会是 1;
    if (this.exchange != null && calls > 1) {
       ...//抛异常代码省略
    }

    //1. 建立下一个拦截器执行的对象
    RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    //2. 取出当前的拦截器
    Interceptor interceptor = interceptors.get(index);
    //3. 调用下一个拦截器的 intercept(Chain) 方法,传入刚才新建的 RealInterceptorChain, //返回 Response
    Response response = interceptor.intercept(next);

		//限制作一些判断,保证程序健壮
    if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
      ...//抛异常代码省略
    }

		//若是返回回来的 response 为空,那么就抛一个异常
    if (response == null) {
      ...//抛异常代码省略
    }

    //若是响应为空,也抛出一个异常
    if (response.body() == null) {
      ...//抛异常代码省略
    }
    
    //真正返回服务端的响应
    return response;
  }
  
}
复制代码

请看上面代码注释 1,2,3 处,这三处代码就是分发拦截器执行的核心代码,首先看注释一 在 RealInterceptorChain 内部又建立一个 RealInterceptorChain 并传入 index + 1 等参数, 这里就是开始递归执行拦截器了,每次执行 get(index + 1)拦截器。注释 2 是取出当前拦截器,注释三是执行拦截器。

这里咱们能够先小节总结下RealInterceptorChain 的做用, 能够把 RealInterceptorChain 这个类看作看一个递归函数 interceptor.intercept(next); 就是开始递归的入口,固然有入口确定有出口,其实出口没有在这个类里面,这里我先透露下吧,实际上是在 CallServerInterceptor 请求与响应处理的拦截器中,最后直接return response;至关于出口。因此 RealInterceptorChain 这个类我的理解就是负责 启动/中止 拦截器的做用,有点像拦截器的调用委托于 RealInterceptorChain 。

那么这里确定是 list.get(index = 0) RetryAndFollowUpInterceptor 拦截器第一个执行了,下面就开始分析 错误、重定向拦截器。

RetryAndFollowUpInterceptor

在上面介绍拦截器的时候讲过,它是错误重连、重定向的拦截器,下面咱们看它的核心代码

public final class RetryAndFollowUpInterceptor implements Interceptor {
  
 
  @Override public Response intercept(Chain chain) throws IOException {
    //拿到当前的请求
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    //拿到 Transmitter 对象
    Transmitter transmitter = realChain.transmitter();
		
    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      //准备链接工做
      transmitter.prepareToConnect(request);
			//判断是否取消
      if (transmitter.isCanceled()) {
        throw new IOException("Canceled");
      }

      Response response;
      boolean success = false;
      try {
        //将当前请求传递给下一个拦截器
        response = realChain.proceed(request, transmitter, null);
        success = true;
      } catch (RouteException e) {
        //检查是否能够继续使用
        if (!recover(e.getLastConnectException(), transmitter, false, request)) 				{
          throw e.getFirstConnectException();
        }
        continue;
      } catch (IOException e) {
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        //检查是否能够继续使用
        if (!recover(e, transmitter, requestSendStarted, request)) throw e;
        continue;
      } finally {
        //若是未成功 释放链接
        if (!success) {
          transmitter.exchangeDoneDueToException();
        }
      }

      //执行到这里说明没有出现异常
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }
			...//省略代码
      //根据响应来处理请求头
      Request followUp = followUpRequest(response, route);
			//若是为空,不须要重定向,直接返回响应
      if (followUp == null) {
        if (exchange != null && exchange.isDuplex()) {
          transmitter.timeoutEarlyExit();
        }
        return response;
      }
			//不为空,须要重定向
      RequestBody followUpBody = followUp.body();
      if (followUpBody != null && followUpBody.isOneShot()) {
        return response;
      }

      closeQuietly(response.body());
      if (transmitter.hasExchange()) {
        exchange.detachWithViolence();
      }
			
      //重定向的次数不能大于 20
      if (++followUpCount > MAX_FOLLOW_UPS) {
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }
			//根据重定向以后的请求再次重试
      request = followUp;
      priorResponse = response;
    }
  } 
}
复制代码

根据上面代码分析能够知道,主要作了如下几点

  1. 拿到当前的请求对象,并拿到 Transmitter 对象
  2. 准备链接,其实真正链接是在ConnectInterceptor 拦截器中
  3. 调用下一个拦截器,也就是 BridgeInterceptor 将请求交于它在预处理。
  4. 在链接的过程当中是否出现异常,判断是否支持继续链接
  5. 若是没有成功就释放资源
  6. 根据响应码判断是否须要重连操做
  7. 若是重连次数大于 20 次则抛异常,不然就将重定向以后的请求重试。

当前 RetryAndFollowUpInterceptor 中的 realChain.proceed(request, transmitter, null); 调用走到了 BridgeInterceptor 应用与网络交互的拦截器。

BridgeInterceptor

当上一个拦截器调用了 proceed 函数以后就会走到当前 intercept 函数里面,里面具体操做咱们看下源码处理

public final class BridgeInterceptor implements Interceptor {
  private final CookieJar cookieJar;

	...//省略构造函数

  @Override public Response intercept(Chain chain) throws IOException {
    //拿到当前请求 Request
    Request userRequest = chain.request();
    //拿到 Request 配置参数的 Builder
    Request.Builder requestBuilder = userRequest.newBuilder();
		//获取到请求体 body
    RequestBody body = userRequest.body();
    //判断请求体是否为空
    if (body != null) {//不为空的状况下
      //获取请求体类型
      MediaType contentType = body.contentType();
      if (contentType != null) {
        //将请求体类型添加 header
        requestBuilder.header("Content-Type", contentType.toString());
      }
			
      //处理请求体长度
      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }
		
    //添加header HOST 主机
    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    //添加链接状态
    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }

		//对数据是否开启 压缩--默认添加 Gzip
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      //添加 gzip 压缩
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
     	//header 中添加 cookie
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }
		//添加 user-agent
    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }
		
    //执行下一个拦截器 CacheInterceptor
    Response networkResponse = chain.proceed(requestBuilder.build());
		//对 url 和 cookie 保存
    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
		
    //拿到响应,添加一些属性
    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }
		//返回响应
    return responseBuilder.build();
  }
  
  ...//省略部分代码
  
}
复制代码

从上面代码中,咱们知道 BridgeInterceptor 主要是对请求头作一些预处理,以后就调用下一个拦截器。

CacheInterceptor

根据上一个拦截器 BridgeInterceptor 调用最后会走到当前的 intercept , 根据上面的拦截器介绍知道,它是获取缓存和更新缓存的做用。下面咱们看下它具体实现

public final class CacheInterceptor implements Interceptor {
  final @Nullable InternalCache cache;

	...//构造函数省略

  @Override public Response intercept(Chain chain) throws IOException {
    //判断缓存若是不为空,就根据请求拿到缓存响应
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();
		//获取缓存策略
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    //根据缓存策略拿到请求
    Request networkRequest = strategy.networkRequest;
    //拿到缓存响应
    Response cacheResponse = strategy.cacheResponse;
    
		...//省略部分代码
    //若是请求跟缓存响应为空的话,就强制使用缓存,返回错误码为 504
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1) //
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // 若是 networkRequest 为空的话,也强制获取缓存
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

    
    Response networkResponse = null;
    try {
      //调用下一个拦截器
      networkResponse = chain.proceed(networkRequest);
    } finally {
     ...
    }

    // 若是缓存不为空
    if (cacheResponse != null) {
      //而且响应码 == 以前定义的 304
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        //生成一个响应
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        //更新响应
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

    //没有缓存使用,读取网络响应
    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // 存入缓存
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }
			
      //检查缓存是否有效
      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          //删除无效缓存
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          
        }
      }
    }
    return response;
  }
复制代码

能够看到这里主要是对缓存作处理,因为这里只讲拦截器的调用和一些基本处理逻辑,OKHttp 缓存机制后面会单独用一篇文章来介绍,在这里只要知道,若是外部 OKHttpClient 配置了缓存的话(看下面代码块,否则缓存都是空的,也不会默认添加缓存),才会执行缓存 put、get、update,因为这里咱们没有配置缓存策略,因此直接调用下一个拦截器,也就是 ConnectInterceptor

File file = new File(Environment.getExternalStorageDirectory() + "/T01");
Cache cache = new Cache(file, 1024 * 1024 * 10);
OkHttpClient okHttpClient = new OkHttpClient.Builder().
                        addInterceptor(new LoggingInterceptor())
                        .cache(cache).
                        build();
复制代码

ConnectInterceptor

(ps: 拦截拦截器主要参考了:juejin.im/post/5d8364…

缓存拦截器执行完成以后, 下一个调用链就是链接拦截器了,看一下代码实现:

public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    //拿到请求
    Request request = realChain.request();
    //拿到 Transmitter 
    Transmitter transmitter = realChain.transmitter();
    
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    //从新建立一个 Exchange
    Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
		//调用proceed方法,里面调用下一个拦截器CallServerInterceptor的intercept方法
    return realChain.proceed(request, transmitter, exchange);
  }
}
复制代码

经过上面代码能够看出 ConnectInterceptor 内部代码很简洁,首先拿到 Request 请求,获取 Transmitter 对象,其次是经过 transmitter 从新建立一个 Exchange , Exchange 是负责将数据写入到建立链接的 IO 流中的交互动做,最后在调用 CallServerInterceptor 拦截器。咱们看下 transmitter.newExchange(chain, doExtensiveHealthChecks) 内部代码实现

Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    synchronized (connectionPool) {
      //若是没有 Exchanges 抛一个异常
      if (noMoreExchanges) {
        throw new IllegalStateException("released");
      }
      if (exchange != null) {
        ...//省略抛异常代码
    }
		
    //经过ExchangeFinder的find方法找到一个ExchangeCodec
    ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
    //建立Exchange,并把ExchangeCodec实例codec传进去,因此Exchange内部持有ExchangeCodec实例
    Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);

    synchronized (connectionPool) {
      this.exchange = result;
      this.exchangeRequestDone = false;
      this.exchangeResponseDone = false;
      return result;
    }
  }
复制代码

ExchangeFinder 对象早在RetryAndFollowUpInterceptor中经过TransmitterprepareToConnect方法建立,它的 find 方法是链接真正建立的地方,ExchangeFinder 是什么?ExchangeFinder 就是负责链接的建立,把建立好的链接放入链接池,若是链接池中已经有该链接,就直接取出复用,因此 ExchangeFinder 管理着两个重要的角色:RealConnectionRealConnectionPool,下面讲解一下 RealConnectionPoolRealConnection,有助于链接机制的理解。

RealConnection

链接的真正实现,实现了 Connection 接口,内部利用 Socket 创建链接,以下:

public interface Connection {
    //返回这个链接使用的Route
    Route route();

    //返回这个链接使用的Socket
    Socket socket();

    //若是是HTTPS,返回TLS握手信息用于创建链接,不然返回null
    @Nullable Handshake handshake();

    //返回应用层使用的协议,Protocol是一个枚举,如HTTP1.一、HTTP2
    Protocol protocol();
}

public final class RealConnection extends Http2Connection.Listener implements Connection {

    public final RealConnectionPool connectionPool;
    //路由
    private final Route route;
    //内部使用这个rawSocket在TCP层创建链接
    private Socket rawSocket;
    //若是没有使用HTTPS,那么socket == rawSocket,不然这个socket == SSLSocket
    private Socket socket;
    //TLS握手
    private Handshake handshake;
    //应用层协议
    private Protocol protocol;
    //HTTP2链接
    private Http2Connection http2Connection;
    //okio库的BufferedSource和BufferedSink,至关于javaIO的输入输出流
    private BufferedSource source;
    private BufferedSink sink;


    public RealConnection(RealConnectionPool connectionPool, Route route) {
        this.connectionPool = connectionPool;
        this.route = route;
    }


    public void connect(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, Call call, EventListener eventListener) {
        //...
    }

    //...
}

复制代码

RealConnection 中有一个 connect 方法,外部能够调用该方法创建链接,connect 方法以下:

//RealConnection.java
public void connect(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, Call call, EventListener eventListener) {
    if (protocol != null) throw new IllegalStateException("already connected");

    RouteException routeException = null;
    List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);

    //路由选择
    if (route.address().sslSocketFactory() == null) {
      if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication not enabled for client"));
      }
      String host = route.address().url().host();
      if (!Platform.get().isCleartextTrafficPermitted(host)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication to " + host + " not permitted by network security policy"));
      }
    } else {
      if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
        throw new RouteException(new UnknownServiceException(
            "H2_PRIOR_KNOWLEDGE cannot be used with HTTPS"));
      }
    }

    //开始链接
    while (true) {
      try {
        if (route.requiresTunnel()) {//若是是通道模式,则创建通道链接
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
          if (rawSocket == null) {
            // We were unable to connect the tunnel but properly closed down our resources.
            break;
          }
        } else {//一、不然进行Socket链接,大部分是这种状况
          connectSocket(connectTimeout, readTimeout, call, eventListener);
        }
        //创建HTTPS链接
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
        break;
      }
      //...省略异常处理

    if (http2Connection != null) {
      synchronized (connectionPool) {
        allocationLimit = http2Connection.maxConcurrentStreams();
      }
    }
  }

复制代码

咱们关注注释1,通常会调用 connectSocket 方法创建 Socket 链接,connectSocket 方法以下:

//RealConnection.java
private void connectSocket(int connectTimeout, int readTimeout, Call call, EventListener eventListener) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    //根据代理类型的不一样建立Socket
    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    eventListener.connectStart(call, route.socketAddress(), proxy);
    rawSocket.setSoTimeout(readTimeout);
    try {
        //一、创建Socket链接
        Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    }
    //...省略异常处理

    try {
        //得到Socket的输入输出流
        source = Okio.buffer(Okio.source(rawSocket));
        sink = Okio.buffer(Okio.sink(rawSocket));
    } 
     //...省略异常处理
}

复制代码

咱们关注注释1,Platform 是 okhttp 中根据不一样 Android 版本平台的差别实现的一个兼容类,这里就不细究,Platform 的 connectSocket 方法最终会调用 rawSocket 的 connect() 方法创建其Socket 链接,创建 Socket 链接后,就能够经过 Socket 链接得到输入输出流 source 和 sink,okhttp 就能够从 source 读取或往 sink 写入数据,source 和 sink 是 BufferedSource 和BufferedSink 类型,它们是来自于okio库,它是一个封装了 java.io 和 java.nio 的库,okhttp 底层依赖这个库读写数据,想要了解 okio 这个库能够看这篇文章拆轮子系列:拆 Okio

RealConnectionPool

链接池,用来管理链接对象 RealConnection ,以下:

public final class RealConnectionPool {

    //线程池
    private static final Executor executor = new ThreadPoolExecutor(
        0 /* corePoolSize */,
        Integer.MAX_VALUE /* maximumPoolSize */, 
        60L /* keepAliveTime */, 
        TimeUnit.SECONDS,
        new SynchronousQueue<>(), 
        Util.threadFactory("OkHttp ConnectionPool", true));
 
    boolean cleanupRunning;
    //清理链接任务,在executor中执行
    private final Runnable cleanupRunnable = () -> {
        while (true) {
            //调用cleanup方法执行清理逻辑
            long waitNanos = cleanup(System.nanoTime());
            if (waitNanos == -1) return;
            if (waitNanos > 0) {
                long waitMillis = waitNanos / 1000000L;
                waitNanos -= (waitMillis * 1000000L);
                synchronized (RealConnectionPool.this) {
                    try {
                        //调用wait方法进入等待
                        RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
                    } catch (InterruptedException ignored) {
                    }
                }
            }
        }
    };

    //双端队列,保存链接
    private final Deque<RealConnection> connections = new ArrayDeque<>();

    void put(RealConnection connection) {
        if (!cleanupRunning) {
            cleanupRunning = true;
            //使用线程池执行清理任务
            executor.execute(cleanupRunnable);
        }
        //将新建链接插入队列
        connections.add(connection);
    }

    long cleanup(long now) {
        //...
    }

    //...
}

复制代码

RealConnectionPool 在内部维护了一个线程池,用来执行清理链接任务 cleanupRunnable ,还维护了一个双端队列 connections ,用来缓存已经建立的链接。要知道建立一次链接要经历 TCP握手,若是是 HTTPS 还要经历 TLS 握手,握手的过程都是耗时的,因此为了提升效率,就须要connections 来对链接进行缓存,从而能够复用;还有若是链接使用完毕,长时间不释放,也会形成资源的浪费,因此就须要 cleanupRunnable 定时清理无用的链接,okhttp 支持 5 个并发链接,默认每一个链接 keepAlive 为 5 分钟,keepAlive 就是链接空闲后,保持存活的时间。

当咱们第一次调用 RealConnectionPool 的 put 方法缓存新建链接时,若是 cleanupRunnable 还没执行,它首先会使用线程池执行 cleanupRunnable ,而后把新建链接放入双端队列,cleanupRunnable 中会调用 cleanup 方法进行链接的清理,该方法返回如今到下次清理的时间间隔,而后调用 wiat 方法进入等待状态,等时间到了后,再次调用 cleanup 方法进行清理,就这样往复循环。咱们来看一下 cleanup 方法的清理逻辑:

//RealConnectionPool.java
long cleanup(long now) {
    
    int inUseConnectionCount = 0;//正在使用链接数
    int idleConnectionCount = 0;//空闲链接数
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    synchronized (this) {
        //遍历全部链接,记录空闲链接和正在使用链接各自的数量
        for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
            RealConnection connection = i.next();

            //若是该链接还在使用,pruneAndGetAllocationCount种经过引用计数的方式判断一个链接是否空闲
            if (pruneAndGetAllocationCount(connection, now) > 0) {
                //使用链接数加1
                inUseConnectionCount++;
                continue;
            }
            
            //该链接没有在使用

            //空闲链接数加1
            idleConnectionCount++;

            //记录keepalive时间最长的那个空闲链接
            long idleDurationNs = now - connection.idleAtNanos;
            if (idleDurationNs > longestIdleDurationNs) {
                longestIdleDurationNs = idleDurationNs;
                //这个链接极可能被移除,由于空闲时间太长
                longestIdleConnection = connection;
            }
        }
        
        //跳出循环后

        //默认keepalive时间keepAliveDurationNs最长为5分钟,空闲链接数idleConnectionCount最大为5个
        if (longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections) {//若是longestIdleConnection的keepalive时间大于5分钟 或 空闲链接数超过5个
            //把longestIdleConnection链接从队列清理掉
            connections.remove(longestIdleConnection);
        } else if (idleConnectionCount > 0) {//若是空闲链接数小于5个 而且 longestIdleConnection链接还没到期清理
            //返回该链接的到期时间,下次再清理
            return keepAliveDurationNs - longestIdleDurationNs;
        } else if (inUseConnectionCount > 0) {//若是没有空闲链接 且 全部链接都还在使用
            //返回keepAliveDurationNs,5分钟后再清理
            return keepAliveDurationNs;
        } else {
            // 没有任何链接,把cleanupRunning复位
            cleanupRunning = false;
            return -1;
        }
    }

    //把longestIdleConnection链接从队列清理掉后,关闭该链接的socket,返回0,当即再次进行清理
    closeQuietly(longestIdleConnection.socket());

    return 0;
}

复制代码

从 cleanup 方法得知,okhttp 清理链接的逻辑以下:

一、首先遍历全部链接,记录空闲链接数 idleConnectionCount 和正在使用链接数inUseConnectionCount ,在记录空闲链接数时,还要找出空闲时间最长的空闲链接longestIdleConnection,这个链接是颇有可能被清理的;

二、遍历完后,根据最大空闲时长和最大空闲链接数来决定是否清理longestIdleConnection,

2.一、若是 longestIdleConnection 的空闲时间大于最大空闲时长 或 空闲链接数大于最大空闲链接数,那么该链接就会被从队列中移除,而后关闭该链接的 socket,返回 0,当即再次进行清理;

2.二、若是空闲链接数小于5个 而且 longestIdleConnection 的空闲时间小于最大空闲时长即还没到期清理,那么返回该链接的到期时间,下次再清理;

2.三、若是没有空闲链接 且 全部链接都还在使用,那么返回默认的 keepAlive 时间,5分钟后再清理;

2.四、没有任何链接,idleConnectionCount 和 inUseConnectionCount 都为0,把cleanupRunning 复位,等待下一次 put 链接时,再次使用线程池执行 cleanupRunnable。

了解了 RealConnectionPool 和 RealConnection 后,咱们再回到 ExchangeFinder 的 find 方法,这里是链接建立的地方。

链接机制

ExchangeFinder的fing方法以下:

//ExchangeFinder.java

  public ExchangeCodec find( OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    
    

    try {
      //1.内部调用 findHealthyConnection 函数返回 RealConnection 链接对象
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      //2. 创建一个新的链接
      return resultConnection.newCodec(client, chain);
    } catch (RouteException e) {
      ...//省略异常处理
    }
  }
复制代码

根据注释 1 咱们知道建立一个 RealConnection ,咱们看下 findHealthyConnection函数

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException {
    while (true) {
      //找到一个链接
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled);

      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      //主要判断链接的可用性
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        candidate.noNewExchanges();
        continue;
      }

      return candidate;
    }
  }
复制代码

接着看 findConnection

//ExchangeFinder.java
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;//返回结果,可用的链接
    Route selectedRoute = null;
    RealConnection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
       if (transmitter.isCanceled()) throw new IOException("Canceled");
      hasStreamFailure = false; .

	 //一、尝试使用已经建立过的链接,已经建立过的链接可能已经被限制建立新的流
      releasedConnection = transmitter.connection;
      //1.一、若是已经建立过的链接已经被限制建立新的流,就释放该链接(releaseConnectionNoEvents中会把该链接置空),并返回该链接的Socket以关闭
      toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
          ? transmitter.releaseConnectionNoEvents()
          : null;

        //1.二、已经建立过的链接还能使用,就直接使用它看成结果、
        if (transmitter.connection != null) {
            result = transmitter.connection;
            releasedConnection = null;
        }

        //二、已经建立过的链接不能使用
        if (result == null) {
            //2.一、尝试从链接池中找可用的链接,若是找到,这个链接会赋值先保存在Transmitter中
            if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
                //2.二、从链接池中找到可用的链接
                foundPooledConnection = true;
                result = transmitter.connection;
            } else if (nextRouteToTry != null) {
                selectedRoute = nextRouteToTry;
                nextRouteToTry = null;
            } else if (retryCurrentRoute()) {
                selectedRoute = transmitter.connection.route();
            }
        }
    }
	closeQuietly(toClose);
    
	//...
    
    if (result != null) {
        //三、若是在上面已经找到了可用链接,直接返回结果
        return result;
    }
    
    //走到这里没有找到可用链接

    //看看是否须要路由选择,多IP操做
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
        newRouteSelection = true;
        routeSelection = routeSelector.next();
    }
    List<Route> routes = null;
    synchronized (connectionPool) {
        if (transmitter.isCanceled()) throw new IOException("Canceled");

        //若是有下一个路由
        if (newRouteSelection) {
            routes = routeSelection.getAll();
            //四、这里第二次尝试从链接池中找可用链接
            if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, false)) {
                //4.一、从链接池中找到可用的链接
                foundPooledConnection = true;
                result = transmitter.connection;
            }
        }

        //在链接池中没有找到可用链接
        if (!foundPooledConnection) {
            if (selectedRoute == null) {
                selectedRoute = routeSelection.next();
            }

           //五、因此这里新建立一个链接,后面会进行Socket链接
            result = new RealConnection(connectionPool, selectedRoute);
            connectingConnection = result;
        }
    }

    // 4.二、若是在链接池中找到可用的链接,直接返回该链接
    if (foundPooledConnection) {
        eventListener.connectionAcquired(call, result);
        return result;
    }

    //5.一、调用RealConnection的connect方法进行Socket链接,这个在RealConnection中讲过
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener);
    
    connectionPool.routeDatabase.connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
        connectingConnection = null;
        //若是咱们刚刚建立了同一地址的多路复用链接,释放这个链接并获取那个链接
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
            result.noNewExchanges = true;
            socket = result.socket();
            result = transmitter.connection;
        } else {
            //5.二、把刚刚新建的链接放入链接池
            connectionPool.put(result);
            //5.三、把刚刚新建的链接保存到Transmitter的connection字段
            transmitter.acquireConnectionNoEvents(result);
        }
    }
    
    closeQuietly(socket);
    eventListener.connectionAcquired(call, result);
    
    //5.四、返回结果
    return result;
}

复制代码

这个findConnection方法就是整个ConnectInterceptor的核心,咱们忽略掉多IP操做和多路复用(HTTP2),假设如今咱们是第一次请求,链接池和Transmitter中没有该链接,因此跳过一、二、3,直接来到5,建立一个新的链接,而后把它放入链接池和Transmitter中;接着咱们用同一个Call进行了第二次请求,这时链接池和Transmitter中有该链接,因此就会走一、二、3,若是Transmitter中的链接还可用就返回,不然从链接池获取一个可用链接返回,因此整个链接机制的大概过程以下:

Transmitter 中的链接和链接池中的链接有什么区别?咱们知道每建立一个 Call,就会建立一个对应的 Transmitter ,一个 Call 能够发起屡次请求(同步、异步),不一样的 Call 有不一样的Transmitter ,链接池是在建立 OkhttpClient 时建立的,因此链接池是全部 Call 共享的,即链接池中的链接全部 Call 均可以复用,而 Transmitter 中的那个链接只是对应它相应的 Call,只能被本次 Call 的全部请求复用。

了解了 okhttp3 的链接机制后,咱们接着下一个拦截器 networkInterceptors 。

networkInterceptors

networkInterceptors 它是 OKHttp 拦截器中的第 6 个拦截器,属于 网络拦截器,那么它的做用是什么请看下面 拦截器实战 中介绍。

最后执行到了 OKHttp 最后一个拦截器 CallServerInterceptor

CallServerInterceptor

根据源码中的介绍:它是链中的最后一个拦截器。它与服务器进行网络请求、响应操做。

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    //拿到 Exchange 与 网络交互
    Exchange exchange = realChain.exchange();
    //拿到请求数据
    Request request = realChain.request();
		//获取当前请求的时间
    long sentRequestMillis = System.currentTimeMillis();
		//写入请求头
    exchange.writeRequestHeaders(request);
		
    boolean responseHeadersStarted = false;
    Response.Builder responseBuilder = null;
    //若是能够写入请求体
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      //若是请求头添加了 100-continue 
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        exchange.flushRequest(); //关闭 IO 流资源
        responseHeadersStarted = true;
        exchange.responseHeadersStart();
        responseBuilder = exchange.readResponseHeaders(true); 
      }

      if (responseBuilder == null) { //若是为空
        if (request.body().isDuplex()) {
          exchange.flushRequest();
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, true));
          request.body().writeTo(bufferedRequestBody);
        } else { //通常走 else
          //写入请求体的操做
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, false));
          request.body().writeTo(bufferedRequestBody);
          bufferedRequestBody.close();
        }
      } else {
        exchange.noRequestBody();
        if (!exchange.connection().isMultiplexed()) {
          // 
          exchange.noNewExchangesOnConnection();
        }
      }
    } else { //若是没有请求体 执行 noRequestBody
      exchange.noRequestBody();
    }

    //若是请求体为空 而且不支持 isDuplex = false IO 流
    if (request.body() == null || !request.body().isDuplex()) {
      exchange.finishRequest();
    }

    if (!responseHeadersStarted) {
      exchange.responseHeadersStart();
    }

    //读取响应的 head
    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(false);
    }

    //构建响应数据
    Response response = responseBuilder
        .request(request)
        .handshake(exchange.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    //拿到响应码
    int code = response.code();
    if (code == 100) {
      // 构建响应
      response = exchange.readResponseHeaders(false)
          .request(request)
          .handshake(exchange.connection().handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();

      code = response.code();
    }
		
    exchange.responseHeadersEnd(response);

    if (forWebSocket && code == 101) {
      // 构建空响应体
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      // 经过响应的 body 构造 响应体
      response = response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build();
    }

   ...//省略部分代码

    return response;
  }
复制代码

在当前拦截器中咱们把请求 head /body 经过 okio 写入了服务端,而后根据服务端的响应数据构建响应头、响应体等一些响应数据。

到这里咱们完成了拦截器全部操做,下面进入拦截器实战。

拦截器实战

OKHttp官网拦截器使用介绍

自定义 Log 打印拦截器

/** * 打印日志拦截器 */
class LoggingInterceptor implements Interceptor {
    private String TAG = "LoggingInterceptor";
    public static String requestBodyToString(RequestBody requestBody) throws IOException {
        if (requestBody == null)return "";
        Buffer buffer = new Buffer();
        requestBody.writeTo(buffer);
        return buffer.readUtf8();
    }

    @Override
    public Response intercept(Chain chain) throws IOException {
        //拿到请求数据
        Request request = chain.request();


        //能够在请求服务器以前添加请求头
        request =   request.newBuilder()
                .addHeader("head-1","1")
                .addHeader("head-2","2")
                .url("https://juejin.im/user/578259398ac2470061f3a3fb")
                .build();


        HttpUrl url = request.url();
        String scheme = url.scheme();// http https
        String host = url.host();// 127.0.0.1
        String path = url.encodedPath();// /test/upload/img
        String query = url.encodedQuery();// userName=DevYk&userPassword=12345
        RequestBody requestBody = request.body();
        String bodyToString = requestBodyToString(requestBody);

        Log.d(TAG,"scheme--》"+scheme);
        Log.d(TAG,"Host--->"+host);
        Log.d(TAG,"path--->"+path);
        Log.d(TAG,"query--->"+query);
        Log.d(TAG,"requestBody---->"+bodyToString+"");
        Log.d(TAG,"head---->"+request.headers().names());

        //调用下一个拦截器
        Response response = chain.proceed(request);
        
       //拿到响应
        ResponseBody responseBody = response.body();
        String body = responseBody.string();
        String type = responseBody.contentType().type();
        String subtype = responseBody.contentType().subtype();

        //打印响应
        Log.d(TAG,"contentType--->"+type+" "+subtype);
        Log.d(TAG,"responseBody--->"+body);

        return chain.proceed(request);
    }
}
复制代码

添加配置

OkHttpClient okHttpClient = new OkHttpClient.Builder().
        addInterceptor(new LoggingInterceptor())
        build();
复制代码

output:

LoggingInterceptor: scheme--》https
LoggingInterceptor: Host--->juejin.im
LoggingInterceptor: path--->/user/578259398ac2470061f3a3fb
LoggingInterceptor: query--->null
  
LoggingInterceptor: requestBody---->
  
LoggingInterceptor: head---->[head-1, head-2]
LoggingInterceptor: responseHeader--->text html
LoggingInterceptor: responseBody---><!DOCTYPE html><html ....
  

复制代码

自定义 全局禁止网络请求拦截器

public class NetworkInterceptor implements Interceptor {
    @Override
    public okhttp3.Response intercept(Chain chain) throws IOException {
        if (true) {
            Response response = new Response.Builder()
                    .code(404) // 其实code能够随便给
                    .protocol(Protocol.HTTP_1_1)
                    .message("根据规定,暂时不能进行网络请求。")
                    .body(ResponseBody.create(MediaType.get("text/html; charset=utf-8"), "")) // 返回空页面
                    .request(chain.request())
                    .build();
            return response;
        } else {
            return chain.proceed(chain.request());
        }
    }
}
复制代码

配置

OkHttpClient okHttpClient = new OkHttpClient.Builder().
        addInterceptor(new LoggingInterceptor()).
  			addInterceptor(new NetworkInterceptor()).
        build();
复制代码

Output:

LoggingInterceptor: responseCode--->404
LoggingInterceptor: responseMessage--->根据规定,暂时不能进行网络请求。
LoggingInterceptor: responseisSuccessful--->false
复制代码

小总结:拦截器分为 应用拦截器、网络拦截器 根据官网解释有这几点:

应用拦截器

  • 无需担忧中间响应,例如重定向和重试。
  • 即便从缓存提供 HTTP 响应,也老是被调用一次。
  • 遵照应用程序的原始意图。不关心 OkHttp 注入的标头,例如 If-None-Match
  • 容许短路而不是Chain.proceed()
  • 容许重试并屡次致电Chain.proceed()

网络拦截器

  • 可以对诸如重定向和重试之类的中间响应进行操做。
  • 不会为使网络短路的缓存响应调用。
  • 观察数据,就像经过网络传输数据同样。
  • 访问Connection带有请求的。

因此怎么选择看本身需求了。

拦截器总结

根据上面的拦截器讲解和实战,相信你们对 OKHttp 拦截器有了必定的认识,这里咱们根据分析来总结下:

其实每个拦截器都对应一个 RealInterceptorChain ,而后每个interceptor 再产生下一个RealInterceptorChain,直到 List 迭代完成。因此上面基本上就是递归,找了一些图片有助于你们理解以下图

uxSrhd.png

参考

OKHttp源码解析(四)--中阶之拦截器及调用链

OKHttp源码分析

相关文章
相关标签/搜索