OkHttp从使用到源代码分析(3)-使用三步走中的源码分析

上文讲到了OkHttp的使用方法,那么在同步异步中具体代码是怎样构建出来的呢,这边文章就是问说明这个问题而来。java

首先回顾一下是哪三步web

  1. 建立OkHttpClientRequest对象
  2. Request封装成Call对象
  3. 调用Callexecute()或者enqueue()方法

第一步:建立OkHttpClientRequest对象

在使用OkHttp以前,会建立OkHttpClientRequest对象,这是使用OkHttp的基础,那么这个对象是怎么构建出来的呢?
首先咱们来看OkHttpClient,查看源代码cookie

public OkHttpClient() {
  this(new Builder());
}

能够得知,OkHttp能够直接经过new的方式建立对象,默认建立使用的是默认参数,还有就是咱们所熟悉的Builder模式建立:网络

public Builder() {
  dispatcher = new Dispatcher();
  protocols = DEFAULT_PROTOCOLS;
  connectionSpecs = DEFAULT_CONNECTION_SPECS;
  eventListenerFactory = EventListener.factory(EventListener.NONE);
  proxySelector = ProxySelector.getDefault();
  if (proxySelector == null) {
    proxySelector = new NullProxySelector();
  }
  cookieJar = CookieJar.NO_COOKIES;
  socketFactory = SocketFactory.getDefault();
  hostnameVerifier = OkHostnameVerifier.INSTANCE;
  certificatePinner = CertificatePinner.DEFAULT;
  proxyAuthenticator = Authenticator.NONE;
  authenticator = Authenticator.NONE;
  connectionPool = new ConnectionPool();
  dns = Dns.SYSTEM;
  followSslRedirects = true;
  followRedirects = true;
  retryOnConnectionFailure = true;
  callTimeout = 0;
  connectTimeout = 10_000;
  readTimeout = 10_000;
  writeTimeout = 10_000;
  pingInterval = 0;
}

在Builder模式中,能够设置咱们想要设置的参数,例如以前例子中的设置超时时间app

OkHttpClient mClient = new OkHttpClient.Builder()
	.callTimeout(5, TimeUnit.SECONDS) //设置超时时间
	.build(); //构建OkHttpClient对象

在Builder中,有两个比较重要的参数异步

dispatcher = new Dispatcher();
···
connectionPool = new ConnectionPool();

前一个参数dispatcher,是OkHttp的核心之一,管理着OkHttp的调度,会根据请求是同步仍是异步,选择相应的任务调度,关于这点,后面会深刻分析,后一个参数是链接池,看一看其实现socket

/** * Create a new connection pool with tuning parameters appropriate for a single-user application. * The tuning parameters in this pool are subject to change in future OkHttp releases. Currently * this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity. */
  public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }

  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.delegate = new RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit);
  }

可见,设置了5个链接,其空闲销毁时间为5分钟,connectionPool简单看到这,后面也会详细说明。
接下来看下Request的构建过程,一样的,Request经过Builder构建参数,而且只能经过Builder模式来构建。参数会涉及到哪些呢?async

Request(Builder builder) {
  this.url = builder.url;
  this.method = builder.method;
  this.headers = builder.headers.build();
  this.body = builder.body;
  this.tags = Util.immutableMap(builder.tags);
}

那么默认的GET请求方式是哪里来的呢?ide

public Builder() {
  this.method = "GET";
  this.headers = new Headers.Builder();
}

能够看到,在Builder初始化的时候,就已经设置了GET请求方式和请求头信息。
还能够看到,get和post的区别在代码中的体现svg

public Builder get() {
  return method("GET", null);
}
···
public Builder post(RequestBody body) {
  return method("POST", body);
}

到此为止,第一步便已完成。接下来看第二步。

第二步:将Request封装成Call对象

什么是Call对象?Call对象表明了实际的Http请求。能够简单地理解为RequestResponse联系的桥梁。
查看Call源代码,发现其只是一个接口,其具体实现是由RealCall完成的

/** * Prepares the {@code request} to be executed at some point in the future. */
@Override public Call newCall(Request request) {
 return RealCall.newRealCall(this, request, false /* for web socket */);
}

而RealCall中使用了静态方法去建立RealCall,RealCall也私有化,只能经过newRealCall()方法建立。

private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
  this.client = client;
  this.originalRequest = originalRequest;
  this.forWebSocket = forWebSocket;
}

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
  // Safely publish the Call instance to the EventListener.
  RealCall call = new RealCall(client, originalRequest, forWebSocket);
  call.transmitter = new Transmitter(client, call);
  return call;
}

到这里,真正的Call对象已经建立完毕,那么接下来即是同步和异步请求的发送了。

第三步:调用Callexecute()或者enqueue()方法

同步和异步,最直接的体现就出如今这里,同步调用的是execute(),获得Response,而异步调用enqueue(),传入callback。
同步发送请求便会进入阻塞状态,直到获得响应。而异步会在工做线程中执行。

execute()

首先查看同步请求的执行,一样是在RealCall中具体实现

@Override public Response execute() throws IOException {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  transmitter.timeoutEnter();
  transmitter.callStart();
  try {
    client.dispatcher().executed(this);
    return getResponseWithInterceptorChain();
  } finally {
    client.dispatcher().finished(this);
  }
}

首先看同步代码块,这里有一个标志位executed,表明了每一个Call只能执行一次,执行过了则会抛出异常,为执行过,标志位设为true。而后是对Transmitter的设置,Transmitter类是OkHttp的应用层和网络层的一个桥梁类。这里的主要关注点是try里面的代码块,首先是返回了分发器,这个dispatcher正是在OkHttpClient初始化的时候赋值的:
client.dispatcher().executed(this);

public Dispatcher dispatcher() {
  return dispatcher;
}

而后到了最重要的代码

/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
  runningSyncCalls.add(call);
}

这里将Call对象添加到了队列当中,那么runningSyncCalls是在那里定义的呢?

/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

这里能够看到三个队列,那么这三个队列分别是什么做用呢,其实注释已经写得很明白了。第一个是异步就绪队列,还没有执行的Call,第二个是异步的执行队列,第三个是同步的执行队列,注意,后两个队列里面包含了取消但没完成的Call。

继续回到源代码return getResponseWithInterceptorChain();
这里直接返回了Response对象,这实际上是一个拦截器链,会依次调用拦截器操做,是OkHttp的一个核心,包含多种拦截器,后面会逐个详细说明。

最后在finally中的代码client.dispatcher().finished(this);

/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
  finished(runningSyncCalls, call);
}

private <T> void finished(Deque<T> calls, T call) {
  Runnable idleCallback;
  synchronized (this) {
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    idleCallback = this.idleCallback;
  }

  boolean isRunning = promoteAndExecute();

  if (!isRunning && idleCallback != null) {
    idleCallback.run();
  }
}

会逐个回收同步请求,没法移除的会抛出异常。

enqueue()

接下来分析异步请求的执行流程,一样是在RealCall中具体实现

@Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  transmitter.callStart();
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

前面和同步同样,只执行一次的判断和transmitter的start,不一样在于此处调用的是dispatcher的enqueue()方法。传入了AsyncCall对象,那么这又是啥玩意儿?

final class AsyncCall extends NamedRunnable {
···
}

而NamedRunnable就是一个Runnable。

那么接下来看一看dispatcher的enqueue()方法

void enqueue(AsyncCall call) {
  synchronized (this) {
    readyAsyncCalls.add(call);

    // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
    // the same host.
    if (!call.get().forWebSocket) {
      AsyncCall existingCall = findExistingCallWithHost(call.host());
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
    }
  }
  promoteAndExecute();
}

这里在同步代码块中,将call添加到了readyAsyncCalls队列,判断是否为forWebSocket,这个参数在以前设置RealCall的时候,传入了固定参数false。因此这里会执行findExistingCallWithHost(call.host())

@Nullable private AsyncCall findExistingCallWithHost(String host) {
  for (AsyncCall existingCall : runningAsyncCalls) {
    if (existingCall.host().equals(host)) return existingCall;
  }
  for (AsyncCall existingCall : readyAsyncCalls) {
    if (existingCall.host().equals(host)) return existingCall;
  }
  return null;
}

依次从runningAsyncCallsreadyAsyncCalls队列中查找匹配的队列,查找到则返回,若是existingCall不存在,则共享同一host。

最后执行promoteAndExecute();

/** * Promotes eligible calls from {@link #readyAsyncCalls} to {@link #runningAsyncCalls} and runs * them on the executor service. Must not be called with synchronization because executing calls * can call into user code. * * @return true if the dispatcher is currently running calls. */
private boolean promoteAndExecute() {
  assert (!Thread.holdsLock(this));

  List<AsyncCall> executableCalls = new ArrayList<>();
  boolean isRunning;
  synchronized (this) {
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall asyncCall = i.next();

      if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
      if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

      i.remove();
      asyncCall.callsPerHost().incrementAndGet();
      executableCalls.add(asyncCall);
      runningAsyncCalls.add(asyncCall);
    }
    isRunning = runningCallsCount() > 0;
  }

  for (int i = 0, size = executableCalls.size(); i < size; i++) {
    AsyncCall asyncCall = executableCalls.get(i);
    asyncCall.executeOn(executorService());
  }

  return isRunning;
}

这里主要作的工做经过for循环,依次从就绪队列中取出AsyncCall实例,这个实例是在上一次的方法中添加的,接下来有两个大小判断

private int maxRequests = 64;
private int maxRequestsPerHost = 5;

正在执行的线程数不能大于64,Host不能大于5。同时知足条件,那么将这个实例移除,而且添加到executableCallsrunningAsyncCalls当中,将正在运行的同步异步Call数量大于零的结果赋值给isRunning,这样就能获得时候还有Call正在运行

public synchronized int runningCallsCount() {
  return runningAsyncCalls.size() + runningSyncCalls.size();
}

接下来循环从executableCalls取出Call实例执行,executorService作了非空判断,主要是保证线程池为一个单例。

public synchronized ExecutorService executorService() {
  if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}

能够看到,这里实例化了一个线程池服务,而后执行executeOn()

/** * Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up * if the executor has been shut down by reporting the call as failed. */
void executeOn(ExecutorService executorService) {
  assert (!Thread.holdsLock(client.dispatcher()));
  boolean success = false;
  try {
    executorService.execute(this);
    success = true;
  } catch (RejectedExecutionException e) {
    InterruptedIOException ioException = new InterruptedIOException("executor rejected");
    ioException.initCause(e);
    transmitter.noMoreExchanges(ioException);
    responseCallback.onFailure(RealCall.this, ioException);
  } finally {
    if (!success) {
      client.dispatcher().finished(this); // This call is no longer running!
    }
  }
}

这里能够看到,执行了executorService.execute(this);,而且若是没有抛出异常,success赋值true,再看finally中,当success为false时,回收Call实例。若是抛出异常,调用responseCallback.onFailure(RealCall.this, ioException);回调。线程池调用execute()会调用每一个线程的run()方法,也就是NamedRunnablerun()方法

/** * Runnable implementation which always sets its thread name. */
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

而在NamedRunnable中的run()调用了execute()方法,接下来看调用的execute()方法。

@Override protected void execute() {
  boolean signalledCallback = false;
  transmitter.timeoutEnter();
  try {
    Response response = getResponseWithInterceptorChain();
    signalledCallback = true;
    responseCallback.onResponse(RealCall.this, response);
  } catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
    } else {
      responseCallback.onFailure(RealCall.this, e);
    }
  } finally {
    client.dispatcher().finished(this);
  }
}

这里同步同样,调用了getResponseWithInterceptorChain(),获得了Response对象,而且在未抛出异常的状况下,回调了responseCallback.onResponse(RealCall.this, response);,最后在finally中调用了client.dispatcher().finished(this);。到这里,再来看一看这个finish()方法,会调用promoteAndExecute();,也就是直到所有线程执行完毕。这也使得等待队列可以所有到达运行队列的一个主要缘由。

至此,异步的执行流程便已分析完毕。关于拦截链,会在后面进行详细分析。

总结

经过源代码分析,能够看到同步请求会阻塞线程,所以须要放到子线程中运行
异步请求回调方法位于子线程

经过同步和异步的分析,咱们知道了dispatcher其实就是维护了一个线程池,关于dispatcher更多内容会在后面详细解读。

热门开源项目源代码分析导航