OkHttp源码分析

大概按照设计模式分析流程web

先来一张图设计模式

image.png

咱们先从OkHttp的具体使用开始 :缓存

val okHttpClient = OkHttpClient()
        //val okHttpClient = OkHttpClient().newBuilder().build()
        val request = Request.Builder().build()

        val call = okHttpClient.newCall(request)
        //异步请求
        call.enqueue(object :Callback {
            override fun onFailure(call: Call, e: IOException) {

            }

            override fun onResponse(call: Call, response: Response) {

            }

        })
        
        //同步请求
        val response = call.execute()
复制代码

我们这里先分析异步请求:服务器

首先,分析OkhttpClient的建立markdown

//从这里能够看出来 他是经过建造者模式来建立对象
    public OkHttpClient() {
        this(new Builder());
    }
    
    //------------------Builder-------------------
    public Builder() {
      //建立分配器,对Call进行处理
      dispatcher = new Dispatcher();
      protocols = DEFAULT_PROTOCOLS;
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      if (proxySelector == null) {
        proxySelector = new NullProxySelector();
      }
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      //对各类超时的处理
      callTimeout = 0;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
    }
    
    
复制代码

而后咱们再来看Request的建立cookie

//建造者模式 
        Request.Builder().build()
        
        //只有一个构造函数,而且protected ,只能经过build
        Request(Builder builder) 
复制代码

咱们看看内部的build都干了什么网络

public Builder() {
      //设置默认请求为GET
      this.method = "GET";
      //初始化请求头集合
      this.headers = new Headers.Builder();
    }
    
    //调用Request的有参函数, 建立请求
    public Request build() {
      if (url == null) throw new IllegalStateException("url == null");
      return new Request(this);
    }
    
复制代码

上面的代码应该没什么难度 ,咱们继续进行下一步 。异步

okHttpClient.newCall(request)socket

//建立请求的Call
    @Override public Call newCall(Request request) {
        //工厂模式建立
        return RealCall.newRealCall(this, request, false /* for web socket */);
    }
    
复制代码

看到@Override应该想到这个方法不是继承就是实现async

public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory
    
    //实现了这个接口
    interface Factory {
        Call newCall(Request request);
    }
    
复制代码

经过这个能够发现 ,这里利用了工厂模式的思想 ,不关心具体的实现细节,提供一个方法负责拿到Call对象就能够了 ,具体的细节交给子类实现 。

再回到咱们的主线流程

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        // Safely publish the Call instance to the EventListener.
        //构建了RealCall
        RealCall call = new RealCall(client, originalRequest, forWebSocket);
        //事件发射器 ,控制connection stream requests responses
        call.transmitter = new Transmitter(client, call);
        return call;
      }
复制代码

异步请求

拿到Call以后,接下来开始执行一步请求

call.enqueue(object :Callback {
        override fun onFailure(call: Call, e: IOException) {

        }

        override fun onResponse(call: Call, response: Response) {

        }

    })
复制代码

咱们直接看RealCallenqueue

@Override public void enqueue(Callback responseCallback) {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        //发射器执行 eventListener.callStart()
        transmitter.callStart();
        //这里是关键的代码
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
   }
复制代码

咱们先看看Dispatcher

public Dispatcher dispatcher() {
        return dispatcher;
  }
  
  //貌似没什么 ,让咱们看看enqueue方法
  void enqueue(AsyncCall call) {
    synchronized (this) {
      //加入到异步的准备集合中    这里须要说一下 Okhttp的有 异步队列  同步队列  异步准备队列,是经过队列的形式存储请求信息的 
      readyAsyncCalls.add(call);

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      //默认是false
      if (!call.get().forWebSocket) {
        //从正在运行的队列中查找有没有这个call 有就返回 ,不然在等待队列中查找 
        AsyncCall existingCall = findExistingCallWithHost(call.host());
        //若是存在 将这个call 赋值到 callsPerHost 变量
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
      }
    }
    
    //这里会用到callsPerHost
    promoteAndExecute();
  }
  
  
  /**
   * Promotes eligible calls from {@link #readyAsyncCalls} to {@link #runningAsyncCalls} and runs
   * them on the executor service. Must not be called with synchronization because executing calls
   * can call into user code.
   *
   * @return true if the dispatcher is currently running calls.
   */
  private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
        //从准备队列读值
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();
           
        //maxRequests 最大请求数(64)
        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        //maxRequestsPerHost 最大请求同一个主机的数量(5)
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        asyncCall.callsPerHost().incrementAndGet();
        //加入到执行集合中
        executableCalls.add(asyncCall);
        //加入到运行队列中
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      //这里开始真正的执行 
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }
  
  
  ------------------------------AsyncCall---------------------------------

/**
     * Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
     * if the executor has been shut down by reporting the call as failed.
     */
    void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
        //这里开始执行 内部handler来执行call 这个 runnable
        executorService.execute(this);
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        //异常返回
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }
复制代码

让咱们来看看AsyncCall

final class AsyncCall extends NamedRunnable {
        ***
        AsyncCall(Callback responseCallback) {
          super("OkHttp %s", redactedUrl());
          this.responseCallback = responseCallback;
        }
        
        ***
    }
复制代码

咱们能够看到AsyncCall 继承的是 NamedRunnable

/**
 * Runnable implementation which always sets its thread name.
 */
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }
   
  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}
复制代码

能够看到一个Runnable

run方法中设置了线程的名称 ,执行了 execute();

@Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        //重要的方法来了getResponseWithInterceptorChain()
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        //返回网络响应信息
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } catch (Throwable t) {
        cancel();
        if (!signalledCallback) {
          IOException canceledException = new IOException("canceled due to " + t);
          canceledException.addSuppressed(t);
          responseCallback.onFailure(RealCall.this, canceledException);
        }
        throw t;
      } finally {
        //从完成任务 释放call
        client.dispatcher().finished(this);
      }
    }
复制代码

终于找到了Response的身影 , 说明getResponseWithInterceptorChain这个方法中进行的网络请求 ,下面进行了一些异常判断的接口回调 ,释放call资源 ,就先不分析 ,咱们这里看看、getResponseWithInterceptorChain 这个方法 。

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    //添加用户的自定义拦截器
    interceptors.addAll(client.interceptors());
    //添加剧定向的拦截器
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    //封装request和response过滤器
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //缓存相关的过滤器,负责读取缓存直接返回、更新缓存
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //负责和服务器创建链接
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      //配置 OkHttpClient 时设置的 networkInterceptors
      interceptors.addAll(client.networkInterceptors());
    }
    //负责向服务器发送请求数据、从服务器读取响应数据(实际网络请求)
    interceptors.add(new CallServerInterceptor(forWebSocket));

    //originalRequest 就是真实的请求 ,这里注意一下这个 0
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    

    boolean calledNoMoreExchanges = false;
    try {
      //这里开始请求
      Response response = chain.proceed(originalRequest);
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }
复制代码

能够看到上面调用了 Interceptor.Chainproceed方法获取 Response,咱们看看Interceptor.Chain 的实现类 RealInterceptorChainproceed方法

@Override public Response proceed(Request request) throws IOException {
    //是个多态
    return proceed(request, transmitter, exchange);
  }

  public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
      throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();
    
    //这里其实为了确保每个拦截器只执行一次
    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.exchange != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    //注意这里的 index + 1
    RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    //拿出第index个拦截器 还记得我们的初始是0
    Interceptor interceptor = interceptors.get(index);
    //执行intercept
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }
复制代码

上面能够看出来,执行intercept获取了Response,让咱们往里走,咱们先用一个系统拦截器进行分析 。

public final class RetryAndFollowUpInterceptor implements Interceptor {
  ***

  @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Transmitter transmitter = realChain.transmitter();

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      //准备链接请求
      transmitter.prepareToConnect(request);

      if (transmitter.isCanceled()) {
        throw new IOException("Canceled");
      }

      Response response;
      boolean success = false;
      try {
        // !!!  这里又一次调用了realChain.proceed() 开始下一个拦截器
        response = realChain.proceed(request, transmitter, null);
        success = true;
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), transmitter, false, request)) {
          throw e.getFirstConnectException();
        }
        continue;
      } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, transmitter, requestSendStarted, request)) throw e;
        continue;
      } finally {
        // The network call threw an exception. Release any resources.
        if (!success) {
          transmitter.exchangeDoneDueToException();
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }

      Exchange exchange = Internal.instance.exchange(response);
      Route route = exchange != null ? exchange.connection().route() : null;
      Request followUp = followUpRequest(response, route);

      if (followUp == null) {
        if (exchange != null && exchange.isDuplex()) {
          transmitter.timeoutEarlyExit();
        }
        return response;
      }

      RequestBody followUpBody = followUp.body();
      if (followUpBody != null && followUpBody.isOneShot()) {
        return response;
      }

      closeQuietly(response.body());
      if (transmitter.hasExchange()) {
        exchange.detachWithViolence();
      }

      if (++followUpCount > MAX_FOLLOW_UPS) {
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      request = followUp;
      priorResponse = response;
    }
  }
  
  ***
}
复制代码

看到上面的代码就开始OkHttp最有意思的环节了 ,也是它最经典的模式 ,责任链模式, 有一点递归的思想 ,一层一层 ,最后获得这个Response 。

同步请求

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.timeoutEnter();
    transmitter.callStart();
    try {
      //经过调度器将请求放到队列中
      client.dispatcher().executed(this);
      //真正开始请求
      return getResponseWithInterceptorChain();
    } finally {
      client.dispatcher().finished(this);
    }
  }
复制代码

看完了异步 ,其实同步很简单 ,一个流程 ,就到这里吧 。