在 OkHttp 知识梳理(1) - OkHttp 源码解析之入门 中,介绍了OkHttp
的简单使用及同步请求的实现流程,今天这篇文章,咱们来一块儿学习一下异步请求的内部实现原理及线程调度。java
首先,让咱们回顾一下异步请求的实现方式:面试
private void startAsyncRequest() {
//如下三步和同步请求的步骤相同。
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(URL).build();
Call call = client.newCall(request);
//区别在于拿到 RealCall 对象以后的处理方式。
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
String result = response.body().string();
//返回结果给主线程。
Message message = mMainHandler.obtainMessage(MSG_UPDATE_UI, result);
mMainHandler.sendMessage(message);
}
});
}
复制代码
能够看到,对于异步请求而言,前面三步和同步请求是相同的,区别在于发起请求时,同步请求使用的是call.execute()
,只有当整个请求完成时才会从.execute()
函数返回。缓存
而对于异步请求来讲,.enqueue(Callback)
方法只要调用完就当即返回了,当网络请求返回以后会回调Callback
的onResponse/onFailure
方法,而且这两个回调方法是在子线程执行的,这也是异步请求和同步请求之间最主要的差异。服务器
下面咱们就来分析一下异步请求的内部实现逻辑。网络
对于前面三步的内部实现再也不重复说明了,你们能够查看 OkHttp 知识梳理(1) - OkHttp 源码解析之入门 中的分析。最终咱们会获得一个RealCall
实例,它表明了一个执行的任务。接下来看enqueue
内部作了什么。多线程
public void enqueue(Callback responseCallback) {
//首先判断该对象是否曾经被执行过。
synchronized(this) {
if(this.executed) {
throw new IllegalStateException("Already Executed");
}
this.executed = true;
}
//捕获堆栈信息。
this.captureCallStackTrace();
//通知监听者请求开始了。
this.eventListener.callStart(this);
//调用调度器的 enqueue 方法。
this.client.dispatcher().enqueue(new RealCall.AsyncCall(responseCallback));
}
复制代码
这里,咱们又见到了熟悉的dispatcher()
类,它enqueue
的实现为:异步
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
//执行任务的线程池。
private ExecutorService executorService;
//等待被执行的异步请求任务队列。
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque();
//正在被执行的异步请求任务队列。
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque();
public synchronized ExecutorService executorService() {
if(this.executorService == null) {
this.executorService = new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), Util.threadFactory("OkHttp Dispatcher", false));
}
return this.executorService;
}
synchronized void enqueue(AsyncCall call) {
//若是当前正在请求的数量小于 64,而且对于同一 host 的请求小于 5,才发起请求。
if(this.runningAsyncCalls.size() < this.maxRequests && this.runningCallsForHost(call) < this.maxRequestsPerHost) {
//将该任务加入到正在请求的队列当中。
this.runningAsyncCalls.add(call);
//经过线程池执行任务。
this.executorService().execute(call);
//不然加入到等待队列当中。
} else {
this.readyAsyncCalls.add(call);
}
}
复制代码
Dispatcher
的enqueue
首先会判断:若是当前正在请求的数量小于64
,而且对于同一host
的请求小于5
,才发起请求。发起请求以前会将RealCall
加入到runningAsyncCalls
队列当中,并经过ThreadPoolExecutor
来执行该请求,ide
ThreadPoolExecutor
是Java
提供的线程池,在 多线程知识梳理(6) - 线程池四部曲之 ThreadPoolExecutor 中咱们已经介绍过它,这里根据它的参数配置能够看出,它对应于CachedThreadPool
,该线程池的特色是 线程池大小无界,适用于执行不少的短时间异步任务的程序或者是负载较轻的服务器。 函数
SynchonousQueue
,它的 每一个插入操做都必须等待另外一个线程的移除操做,对于线程池而言,也就是说:在添加任务到等待队列时,必需要有一个空闲线程正在尝试从等待队列获取任务,才有可能添加成功。60s
内都没法获取到新的任务将会被销毁。线程池的execute
函数接收Runnable
的接口实现类做为参数,在该任务被执行时将会调用它的run()
方法,这上面的AsyncCall
也是同样的道理,它继承了NamedRunnable
抽象类,而NamedRunnable
又实现了Runnable
接口,当NamedRunnable
的run()
方法被回调时,会调用AsyncCall
的execute()
方法。源码分析
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
//responseCallback 就是调用 call.enqueue 方法时传入的回调。
AsyncCall(Callback responseCallback) {
super("OkHttp %s", new Object[]{RealCall.this.redactedUrl()});
this.responseCallback = responseCallback;
}
//该函数是在子线程当中执行的。
protected void execute() {
boolean signalledCallback = false;
try {
//和同步请求的逻辑相同。
Response response = RealCall.this.getResponseWithInterceptorChain();
if(RealCall.this.retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
this.responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
this.responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException var6) {
if(signalledCallback) {
Platform.get().log(4, "Callback failure for " + RealCall.this.toLoggableString(), var6);
} else {
RealCall.this.eventListener.callFailed(RealCall.this, var6);
this.responseCallback.onFailure(RealCall.this, var6);
}
} finally {
//调用 Dispatcher 的 finished 方法
RealCall.this.client.dispatcher().finished(this);
}
}
}
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(this.name);
try {
//调用子类的 execute() 方法。
this.execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
复制代码
execute()
方法最终是在 子线程当中执行的,这里咱们看到了熟悉的一句话:
Response response = RealCall.this.getResponseWithInterceptorChain();
复制代码
这里面就是进行请求的核心逻辑,咱们在 OkHttp 知识梳理(1) - OkHttp 源码解析之入门 中的3.4
节中已经介绍过了,这里会经过一系列的拦截器进行处理,重试请求、缓存处理和网络请求都是在里面完成的,最终获得返回的Response
,并根据状况回调最开始传入的Callback
的onResponse/onFailure
方法。
当回调完以后,最终会调用Dispatcher
的finished
方法:
void finished(AsyncCall call) {
//若是是异步请求,那么最后一个参数为 true。
this.finished(this.runningAsyncCalls, call, true);
}
void finished(RealCall call) {
//若是是同步请求,那么最后一个参数为 false。
this.finished(this.runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized(this) {
//从当前正在执行的任务列表中将它移除。
if (!calls.remove(call)) {
throw new AssertionError("Call wasn't in-flight!");
}
//寻找等待队列中符合条件的任务去执行。
if (promoteCalls) {
this.promoteCalls();
}
runningCallsCount = this.runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
private void promoteCalls() {
if (this.runningAsyncCalls.size() < this.maxRequests) {
if (!this.readyAsyncCalls.isEmpty()) {
Iterator i = this.readyAsyncCalls.iterator();
do {
if(!i.hasNext()) {
return;
}
AsyncCall call = (AsyncCall)i.next();
if (this.runningCallsForHost(call) < this.maxRequestsPerHost) {
i.remove();
//找到了等待队列中符合执行的条件的任务,那么就执行它。
this.runningAsyncCalls.add(call);
this.executorService().execute(call);
}
} while(this.runningAsyncCalls.size() < this.maxRequests);
}
}
}
复制代码
这里和同步请求相同,都会走到finished
方法当中,区别在于最后一次参数是true
,而同步请求是false
,也就是说会调用到promoteCalls
方法中,promoteCalls
的做用为:在最开始时,若是不知足执行条件,那么任务将会被加入到等待队列readyAsyncCalls
中,那么当一个任务执行完以后,就须要去等待队列中寻找符合执行条件的任务,并将它加入到任务队列中执行,以后的逻辑和前面的相同。
promoteCalls
函数除了在一个异步请求执行完毕后会调用,当咱们改变最大请求数量和对于同一个host
的最大请求数量时,也会触发该查找过程。
//改变了最大请求数量。
public synchronized void setMaxRequests(int maxRequests) {
if(maxRequests < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequests);
} else {
this.maxRequests = maxRequests;
this.promoteCalls();
}
}
//改变了同一个 Host 的最大请求数量。
public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
if(maxRequestsPerHost < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
} else {
this.maxRequestsPerHost = maxRequestsPerHost;
this.promoteCalls();
}
}
复制代码
以上就是对于异步请求方式的源码分析,由此咱们能够总结出OkHttp
对于异步请求的调度方式:
runningAsyncCalls
中存放的是正在执行任务的列表,readyAsyncCalls
中则是等待被执行的任务。readyAsyncCalls
查找下一个能够被执行的任务。ThreadPoolExecutor
在子线程中来完成的,由它来负责正在执行任务的调度,内部的实现原理如 多线程知识梳理(6) - 线程池四部曲之 ThreadPoolExecutor 所分析。AsyncCall
的execute()
函数中,这里会经过一系列的拦截器进行处理,重试请求、缓存处理和网络请求都是在里面完成的,最终获得返回的Response
,并根据状况回调最开始传入的Callback
的onResponse/onFailure
方法。Callback
的onResponse/onFailed
是在子线程当中执行的,所以若是要在其中执行更新UI
的操做,那么须要通知主线程来更新。