OkHttp之因此可以高效处理任务的一个很重要缘由在于其内部维护了三个任务队列(readyAsyncCalls、runningAsyncCalls、runningSyncCalls)和一个线程池(ThreadPoolExecutor)。这四个东西由内部的任务分发器dispathcer来进行调控处理,从而达到高效处理多任务的效果。java
线程池的做用不言而喻,他的主要做用在于能够避免咱们在使用线程进行耗时任务的时候每次都开启线程,用完以后又销毁线程所带来的效率与性能问题。他能够对线程进行屡次的操做并复用空闲线程,从而达到不须要每次都开启以及销毁线程的目的。关于线程的知识,若是有不了解的能够去参考我写的这篇文章 Java中的线程详解,里面对线程池的各类类型还有内部操做有详尽的介绍。缓存
okHttp中的任务队列由两部分组成:并发
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private Runnable idleCallback;
/** Executes calls. Created lazily. */
private ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
...
}
复制代码
能够看到,dispatcher里面实例化了三个任务队列readyAsyncCalls、runningAsyncCalls与runningSyncCalls还有一个线程池ThreadPoolExecutor。异步
readyAsyncCalls:等待执行异步任务队列。当有任务须要dispatcher将其添加进入线程池时,会先判断线程池是否还有能够执行的线程,若是发现没有执行的线程,此时先将任务放入到这个任务队列中等待,等到线程池有空闲线程能够执行任务的时候再从这个任务队列中取出任务交给线程池去处理。async
runningAsyncCalls:运行中异步任务队列。存储dispatcher将任务交给线程池去处理的任务。ide
runningSyncCalls:运行中的同步队列。同步队列和异步队列不一样,他是一个串行的,而不是并行的,因此这个表明在同步操做的状况下运行的队列。函数
咱们看到在executeService()方法中建立了一个线程池ThreadPoolExecutor,里面第一个参数核心线程数设置为了0,表明在空闲一段时间后线程将会被所有销毁。高并发
能够看出,在Okhttp中,构建了一个阀值为[0, Integer.MAX_VALUE]的线程池,它不保留任何最小线程数,随时建立更多的线程数,当线程空闲时只能活60秒,它使用了一个不存储元素的阻塞工做队列,一个叫作"OkHttp Dispatcher"的线程工厂。性能
也就是说,在实际运行中,当收到10个并发请求时,线程池会建立十个线程,当工做完成后,线程池会在60s后相继关闭全部线程。学习
dispatcher分发器相似于Ngnix中的反向代理,经过Dispatcher将任务分发到合适的空闲线程,实现非阻塞,高可用,高并发链接。
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
复制代码
能够看到同步请求总共作了四件事
判断任务是否正在执行executed,若是正在执行则抛出异常。这表明同一个任务一次只能被执行一次,而并不能被执行屡次。
将任务交给任务调用器,dispatcher调用executed去执行这个任务。
经过getResponseWithInterceptorChain()链调用拦截器,以后将任务执行的结果返回Response。
4.以后在任务执行完成调用dispatcher将其finish掉。
至此一个同步请求任务就算完成了。这里关于getResponseWithInterceptorChain()中执行的一些拦截器的操做,之后我会专门写一篇文章来说解OkHttp的拦截器的原理。
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//添加正在运行的请求
runningAsyncCalls.add(call);
//线程池执行请求
executorService().execute(call);
} else {
//添加到缓存队列排队等待
readyAsyncCalls.add(call);
}
}
复制代码
在异步操做中,会先去判断runningAsyncCalls队列中的任务数量是否会大于最大请求数量(maxRequest),这个的最大请求数量为64,而后在判断是否runningCallsForHost是否小于maxRequestsPerHost(单一host请求)。若是两个当中有一个不知足,则表明线程池中可执行的线程数不够,不能将任务添加到线程中去执行。此时则将任务直接添加到缓存队列排队等待(readyAsyncCalls),等到有可执行的线程的时候再将任务添加到正在运行的队列中,再调用线程池去执行call任务。
接下来看看execute里面的源码
@Override protected void execute() {
boolean signalledCallback = false;
try {
//执行耗时IO任务
Response response = getResponseWithInterceptorChain(forWebSocket);
if (canceled) {
signalledCallback = true;
//回调,注意这里回调是在线程池中,而不是想固然的主线程回调
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
//回调,同上
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
logger.log(Level.INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//最关键的代码
client.dispatcher().finished(this);
}
}
复制代码
能够看到里面有调用了拦截器链getResponseWithInterceptorChain(),并将任务的结果又一次返回Response。里面会根据任务是否被Cancled而去回调不一样的方法。被Canceled就去调用onFailure(0方法,在里面处理失败的逻辑,成功就去调用成功的方法Response(),并将返回值交给他去处理。最后不管成功仍是失败都会去调用dispatcher的finish方法来结束掉这个任务。
咱们在来深刻看下finish的方法里面作了哪些操做:
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
复制代码
接下来看看promoteCalls:
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
复制代码
promoteCalls的逻辑也很简单:扫描待执行任务队列,将任务放入正在执行任务队列,并执行该任务。
以上就是整个任务队列的实现细节,总结起来有如下几个特色:
有兴趣能够关注个人小专栏,学习更多知识:小专栏