Okhttp3源码解析(3)-Call分析(总体流程)

###前言 前面咱们讲了 Okhttp的基本用法 Okhttp3源码解析(1)-OkHttpClient分析 Okhttp3源码解析(2)-Request分析web

newCall分析

Call初始化

咱们首先看一下在哪用到了Call:缓存

final Call call = okHttpClient.newCall(request);
复制代码

想起来了吧?不管是get仍是post请求 都要生成call对象,在上面咱们发现call实例须要一个okHttpClientrequest实例 ,咱们先点进Call类去看看:bash

public interface Call extends Cloneable {
//请求
  Request request();
//同步
  Response execute() throws IOException;
  //异步
  void enqueue(Callback responseCallback);
  //取消请求
  void cancel();
  //是否在请求过程当中
  boolean isExecuted();
  //是否取消
  boolean isCanceled();
  Call clone();
  //工厂接口
  interface Factory {
    Call newCall(Request request);
  }
}
复制代码

咱们发现Call是个接口, 并定义了一些方方法(方法含义在注释上)。 咱们继续看newCal()方法服务器

@Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }
复制代码

继续点击newRealCall()去:微信

private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
  }

  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }
复制代码

从代码中咱们发如今newRealCall()中初始化了RealCallRealCall中初始化了retryAndFollowUpInterceptorwebsocket

  • client: OkHttpClient 实例
  • originalRequest : 最初的Request
  • forWebSocket :是否支持websocket通讯
  • retryAndFollowUpInterceptor 从字面意思来讲, 是重试和重定向拦截器 ,至于它有什么做用咱们继续往下看

同步请求分析

Response response = call.execute();
复制代码

咱们点进execute()中查看:cookie

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);
    }
  }

复制代码

从上面代码得知步骤: (1).经过 synchronized 保证线程同步,判断是否已经执行过 ,若是是直接抛异常 (2). captureCallStackTrace(); 字面意思:捕获调用堆栈跟踪,咱们经过源码发现里面涉及到了retryAndFollowUpInterceptor (3). eventListener 回调CallStart() (4). client.dispatcher().executed(this); 看到了dispatcher是否是很熟悉?以前在分析okhttpClient初始化的时候遇到了,咱们点击executed()方法进去:网络

synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }
复制代码

发现把咱们传进来的realcall放到了runningSyncCalls队列中,从字面意思来讲就是正在运行的同步的调用队列中,为何说是队列呢? :数据结构

private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
复制代码

Deque即双端队列。是一种具备队列和栈的性质的数据结构。双端队列中的元素能够从两端弹出,相比list增长[]运算符重载。异步

(5).咱们回到execute()继续往下分析,剩下的代码咱们提取出三行代码:

  • equesr result = getResponseWithInterceptorChain(); 生成一个Response 实例
  • eventListener.callFailed(this, e); :eventListener的callFailed回调
  • client.dispatcher().finished(this); :dispatcher实例的finished方法

不难看出,getResponseWithInterceptorChain()必定是此方法中的核心,字面意思是获取拦截器链的响应,这就明白了,就是经过拦截器链处理后返回Response

getResponseWithInterceptorChain() 分析
Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());    //自定义
    interceptors.add(retryAndFollowUpInterceptor); //错误与跟踪拦截器
    interceptors.add(new BridgeInterceptor(client.cookieJar()));   //桥拦截器
    interceptors.add(new CacheInterceptor(client.internalCache())); //缓存拦截器
    interceptors.add(new ConnectInterceptor(client));   //链接拦截器
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());  //网络拦截器
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));  //调用服务器拦截器

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());   

    return chain.proceed(originalRequest);
  }
复制代码

从上面代码不难看出, 对最初的request作了层层拦截,每一个拦截器的原理咱们放在之后的章节去讲, 这里就不展开了!
这里须要强调的一下 interceptors.addAll(client.interceptors());client.interceptors() 是咱们自定义的拦截器 它是在哪定义的?如何添加?咱们去OkHttpClient类中发现:

能够经过初始化 okHttpClient实例 .addInterceptor的形式 添加。

异步请求分析

call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                Log.d("okhttp_error",e.getMessage());
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                Gson gson=new Gson();

                Log.d("okhttp_success",response.body().string());
            }
   });
复制代码

点击enqueue()查看:

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
复制代码

(1).经过 synchronized 保证线程同步,判断是否已经执行过 ,若是是直接抛异常 (2). captureCallStackTrace(); 字面意思:捕获调用堆栈跟踪,咱们经过源码发现里面涉及到了retryAndFollowUpInterceptor (3). eventListener 回调CallStart() (4). client.dispatcher().enqueue(new AsyncCall(responseCallback)); 调用了Dispatcher.enqueue()并传入了一个**new AsyncCall(responseCallback)实例,点击AsyncCall**查看: AsyncCall 是RealCall的内部类!

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }
复制代码

AsyncCall继承了NamedRunnable ,咱们看下NamedRunnable是什么:

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}
复制代码

原来NamedRunnable 实现了Runnable 接口 是个线程类,在run()中 添加了抽象的execute();方法,看到这里 咱们应该有一个反应,那就是AsyncCall中具体的execute()应该在子线程执行
咱们继续分析,client.dispatcher().enqueue(new AsyncCall(responseCallback)); 点击进入enqueue():

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }
复制代码
  • runningAsyncCalls 正在运行的异步请求的队列
  • maxRequests 最大的请求数 64
  • maxRequestsPerHost host最大请求数 5 (能够经过Get与Set方式自定义设置)

若是正在运行的异步请求的队列大小低于64而且 正在请求的host数量低于5,就会执行(知足条件)

runningAsyncCalls.add(call);
     executorService().execute(call);
复制代码

这里把 AsyncCall实例添加到 runningAsyncCalls中。 ExecutorService 表示线程池 继续看 executorService()

public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
复制代码

其实就是生成了executorService 实例,这就明白了,AsyncCall实例放入线程池中执行了!

若是不知足上面的请求数等条件:

readyAsyncCalls.add(call);
复制代码

就会被添加到一个等待就绪的异步请求队列中,目的是什么呢??? 固然是等待时机再次添加到runningAsyncCalls中并放入线程池中执行,这块逻辑在 AsyncCall类中的 execute() 至于缘由咱们继续往下看!

刚才咱们说了,若是条件知足, AsyncCall实例就会在线程池中执行(.start),那咱们直接去看run()中的 execute()

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
复制代码

上面代码中得知, 首先经过层层拦截器链处理生成了response;而后经过一系列的判断,responseCallback进行onResponseonFailure回调,最后调用的Dispatcher.finifshed() 这里须要注意的是 这里的Dispatcher.finifshed(this)与同步中的Dispatcher.finifshed(this)不同 参数不一样。

/** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }
复制代码

咱们继续看具体的finifshed()方法:

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }
复制代码

在线程同步的状况下 执行了promoteCalls();

private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }
复制代码

通过一系列的判断, 对等待就绪的异步队列进行遍历,生成对应的AsyncCall实例,并添加到runningAsyncCalls中,最后放入到线程池中执行! 这里就是咱们上面说到的等待就绪的异步队列如何与runningAsyncCalls对接的逻辑。

总结

同步请求流程:
  • 生成call实例realcall
  • Dispatcher.executed()中的runningSyncCalls 添加realcall到此队列中
  • 经过 getResponseWithInterceptorChain() 对request层层拦截,生成Response
  • 经过Dispatcher.finished(),把call实例从队列中移除,返回最终的response
异步请求流程:
  • 生成一个AsyncCall(responseCallback)实例(实现了Runnable)
  • AsyncCall实例放入了Dispatcher.enqueue()中,并判断maxRequests (最大请求数)maxRequestsPerHost(最大host请求数)是否知足条件,若是知足就把AsyncCall添加到runningAsyncCalls中,并放入线程池中执行;若是条件不知足,就添加到等待就绪的异步队列,当那些知足的条件的执行时 ,在Dispatcher.finifshed(this)中的promoteCalls();方法中 对等待就绪的异步队列进行遍历,生成对应的AsyncCall实例,并添加到runningAsyncCalls中,最后放入到线程池中执行,一直到全部请求都结束。

至此OKhttp总体流程就分析完了, 下一篇会分块去分析,但愿对你们有所帮助...

你们能够关注个人微信公众号:「秦子帅」一个有质量、有态度的公众号!

公众号
相关文章
相关标签/搜索