上文讲到了OkHttp的使用方法,那么在同步异步中具体代码是怎样构建出来的呢,这边文章就是问说明这个问题而来。java
首先回顾一下是哪三步web
OkHttpClient
和Request
对象Request
封装成Call
对象Call
的execute()
或者enqueue()
方法OkHttpClient
和Request
对象在使用OkHttp以前,会建立OkHttpClient
和Request
对象,这是使用OkHttp的基础,那么这个对象是怎么构建出来的呢?
首先咱们来看OkHttpClient
,查看源代码cookie
public OkHttpClient() { this(new Builder()); }
能够得知,OkHttp能够直接经过new的方式建立对象,默认建立使用的是默认参数,还有就是咱们所熟悉的Builder模式建立:网络
public Builder() { dispatcher = new Dispatcher(); protocols = DEFAULT_PROTOCOLS; connectionSpecs = DEFAULT_CONNECTION_SPECS; eventListenerFactory = EventListener.factory(EventListener.NONE); proxySelector = ProxySelector.getDefault(); if (proxySelector == null) { proxySelector = new NullProxySelector(); } cookieJar = CookieJar.NO_COOKIES; socketFactory = SocketFactory.getDefault(); hostnameVerifier = OkHostnameVerifier.INSTANCE; certificatePinner = CertificatePinner.DEFAULT; proxyAuthenticator = Authenticator.NONE; authenticator = Authenticator.NONE; connectionPool = new ConnectionPool(); dns = Dns.SYSTEM; followSslRedirects = true; followRedirects = true; retryOnConnectionFailure = true; callTimeout = 0; connectTimeout = 10_000; readTimeout = 10_000; writeTimeout = 10_000; pingInterval = 0; }
在Builder模式中,能够设置咱们想要设置的参数,例如以前例子中的设置超时时间app
OkHttpClient mClient = new OkHttpClient.Builder() .callTimeout(5, TimeUnit.SECONDS) //设置超时时间 .build(); //构建OkHttpClient对象
在Builder中,有两个比较重要的参数异步
dispatcher = new Dispatcher(); ··· connectionPool = new ConnectionPool();
前一个参数dispatcher,是OkHttp的核心之一,管理着OkHttp的调度,会根据请求是同步仍是异步,选择相应的任务调度,关于这点,后面会深刻分析,后一个参数是链接池,看一看其实现socket
/** * Create a new connection pool with tuning parameters appropriate for a single-user application. * The tuning parameters in this pool are subject to change in future OkHttp releases. Currently * this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity. */ public ConnectionPool() { this(5, 5, TimeUnit.MINUTES); } public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) { this.delegate = new RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit); }
可见,设置了5个链接,其空闲销毁时间为5分钟,connectionPool简单看到这,后面也会详细说明。
接下来看下Request
的构建过程,一样的,Request
经过Builder构建参数,而且只能经过Builder模式来构建。参数会涉及到哪些呢?async
Request(Builder builder) { this.url = builder.url; this.method = builder.method; this.headers = builder.headers.build(); this.body = builder.body; this.tags = Util.immutableMap(builder.tags); }
那么默认的GET
请求方式是哪里来的呢?ide
public Builder() { this.method = "GET"; this.headers = new Headers.Builder(); }
能够看到,在Builder初始化的时候,就已经设置了GET请求方式和请求头信息。
还能够看到,get和post的区别在代码中的体现svg
public Builder get() { return method("GET", null); } ··· public Builder post(RequestBody body) { return method("POST", body); }
到此为止,第一步便已完成。接下来看第二步。
Request
封装成Call
对象什么是Call对象?Call对象表明了实际的Http请求。能够简单地理解为Request
与Response
联系的桥梁。
查看Call源代码,发现其只是一个接口,其具体实现是由RealCall
完成的
/** * Prepares the {@code request} to be executed at some point in the future. */ @Override public Call newCall(Request request) { return RealCall.newRealCall(this, request, false /* for web socket */); }
而RealCall中使用了静态方法去建立RealCall,RealCall也私有化,只能经过newRealCall()
方法建立。
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { this.client = client; this.originalRequest = originalRequest; this.forWebSocket = forWebSocket; } static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { // Safely publish the Call instance to the EventListener. RealCall call = new RealCall(client, originalRequest, forWebSocket); call.transmitter = new Transmitter(client, call); return call; }
到这里,真正的Call对象已经建立完毕,那么接下来即是同步和异步请求的发送了。
Call
的execute()
或者enqueue()
方法同步和异步,最直接的体现就出如今这里,同步调用的是execute()
,获得Response
,而异步调用enqueue()
,传入callback。
同步发送请求便会进入阻塞状态,直到获得响应。而异步会在工做线程中执行。
execute()
首先查看同步请求的执行,一样是在RealCall中具体实现
@Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } transmitter.timeoutEnter(); transmitter.callStart(); try { client.dispatcher().executed(this); return getResponseWithInterceptorChain(); } finally { client.dispatcher().finished(this); } }
首先看同步代码块,这里有一个标志位executed
,表明了每一个Call只能执行一次,执行过了则会抛出异常,为执行过,标志位设为true。而后是对Transmitter
的设置,Transmitter类是OkHttp的应用层和网络层的一个桥梁类。这里的主要关注点是try里面的代码块,首先是返回了分发器,这个dispatcher正是在OkHttpClient初始化的时候赋值的:
client.dispatcher().executed(this);
public Dispatcher dispatcher() { return dispatcher; }
而后到了最重要的代码
/** Used by {@code Call#execute} to signal it is in-flight. */ synchronized void executed(RealCall call) { runningSyncCalls.add(call); }
这里将Call对象添加到了队列当中,那么runningSyncCalls
是在那里定义的呢?
/** Ready async calls in the order they'll be run. */ private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
这里能够看到三个队列,那么这三个队列分别是什么做用呢,其实注释已经写得很明白了。第一个是异步就绪队列,还没有执行的Call,第二个是异步的执行队列,第三个是同步的执行队列,注意,后两个队列里面包含了取消但没完成的Call。
继续回到源代码return getResponseWithInterceptorChain();
这里直接返回了Response
对象,这实际上是一个拦截器链,会依次调用拦截器操做,是OkHttp的一个核心,包含多种拦截器,后面会逐个详细说明。
最后在finally中的代码client.dispatcher().finished(this);
/** Used by {@code Call#execute} to signal completion. */ void finished(RealCall call) { finished(runningSyncCalls, call); } private <T> void finished(Deque<T> calls, T call) { Runnable idleCallback; synchronized (this) { if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); idleCallback = this.idleCallback; } boolean isRunning = promoteAndExecute(); if (!isRunning && idleCallback != null) { idleCallback.run(); } }
会逐个回收同步请求,没法移除的会抛出异常。
enqueue()
接下来分析异步请求的执行流程,一样是在RealCall中具体实现
@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } transmitter.callStart(); client.dispatcher().enqueue(new AsyncCall(responseCallback)); }
前面和同步同样,只执行一次的判断和transmitter的start,不一样在于此处调用的是dispatcher的enqueue()
方法。传入了AsyncCall
对象,那么这又是啥玩意儿?
final class AsyncCall extends NamedRunnable { ··· }
而NamedRunnable就是一个Runnable。
那么接下来看一看dispatcher的enqueue()
方法
void enqueue(AsyncCall call) { synchronized (this) { readyAsyncCalls.add(call); // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.get().forWebSocket) { AsyncCall existingCall = findExistingCallWithHost(call.host()); if (existingCall != null) call.reuseCallsPerHostFrom(existingCall); } } promoteAndExecute(); }
这里在同步代码块中,将call添加到了readyAsyncCalls
队列,判断是否为forWebSocket,这个参数在以前设置RealCall的时候,传入了固定参数false。因此这里会执行findExistingCallWithHost(call.host())
@Nullable private AsyncCall findExistingCallWithHost(String host) { for (AsyncCall existingCall : runningAsyncCalls) { if (existingCall.host().equals(host)) return existingCall; } for (AsyncCall existingCall : readyAsyncCalls) { if (existingCall.host().equals(host)) return existingCall; } return null; }
依次从runningAsyncCalls
和readyAsyncCalls
队列中查找匹配的队列,查找到则返回,若是existingCall
不存在,则共享同一host。
最后执行promoteAndExecute();
/** * Promotes eligible calls from {@link #readyAsyncCalls} to {@link #runningAsyncCalls} and runs * them on the executor service. Must not be called with synchronization because executing calls * can call into user code. * * @return true if the dispatcher is currently running calls. */ private boolean promoteAndExecute() { assert (!Thread.holdsLock(this)); List<AsyncCall> executableCalls = new ArrayList<>(); boolean isRunning; synchronized (this) { for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall asyncCall = i.next(); if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity. if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity. i.remove(); asyncCall.callsPerHost().incrementAndGet(); executableCalls.add(asyncCall); runningAsyncCalls.add(asyncCall); } isRunning = runningCallsCount() > 0; } for (int i = 0, size = executableCalls.size(); i < size; i++) { AsyncCall asyncCall = executableCalls.get(i); asyncCall.executeOn(executorService()); } return isRunning; }
这里主要作的工做经过for循环,依次从就绪队列中取出AsyncCall实例,这个实例是在上一次的方法中添加的,接下来有两个大小判断
private int maxRequests = 64; private int maxRequestsPerHost = 5;
正在执行的线程数不能大于64,Host不能大于5。同时知足条件,那么将这个实例移除,而且添加到executableCalls
和runningAsyncCalls
当中,将正在运行的同步异步Call数量大于零的结果赋值给isRunning
,这样就能获得时候还有Call正在运行
public synchronized int runningCallsCount() { return runningAsyncCalls.size() + runningSyncCalls.size(); }
接下来循环从executableCalls
取出Call实例执行,executorService
作了非空判断,主要是保证线程池为一个单例。
public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; }
能够看到,这里实例化了一个线程池服务,而后执行executeOn()
/** * Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up * if the executor has been shut down by reporting the call as failed. */ void executeOn(ExecutorService executorService) { assert (!Thread.holdsLock(client.dispatcher())); boolean success = false; try { executorService.execute(this); success = true; } catch (RejectedExecutionException e) { InterruptedIOException ioException = new InterruptedIOException("executor rejected"); ioException.initCause(e); transmitter.noMoreExchanges(ioException); responseCallback.onFailure(RealCall.this, ioException); } finally { if (!success) { client.dispatcher().finished(this); // This call is no longer running! } } }
这里能够看到,执行了executorService.execute(this);
,而且若是没有抛出异常,success赋值true,再看finally中,当success为false时,回收Call实例。若是抛出异常,调用responseCallback.onFailure(RealCall.this, ioException);
回调。线程池调用execute()
会调用每一个线程的run()
方法,也就是NamedRunnable
的run()
方法
/** * Runnable implementation which always sets its thread name. */ public abstract class NamedRunnable implements Runnable { protected final String name; public NamedRunnable(String format, Object... args) { this.name = Util.format(format, args); } @Override public final void run() { String oldName = Thread.currentThread().getName(); Thread.currentThread().setName(name); try { execute(); } finally { Thread.currentThread().setName(oldName); } } protected abstract void execute(); }
而在NamedRunnable
中的run()
调用了execute()
方法,接下来看调用的execute()
方法。
@Override protected void execute() { boolean signalledCallback = false; transmitter.timeoutEnter(); try { Response response = getResponseWithInterceptorChain(); signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } }
这里同步同样,调用了getResponseWithInterceptorChain()
,获得了Response
对象,而且在未抛出异常的状况下,回调了responseCallback.onResponse(RealCall.this, response);
,最后在finally中调用了client.dispatcher().finished(this);
。到这里,再来看一看这个finish()
方法,会调用promoteAndExecute();
,也就是直到所有线程执行完毕。这也使得等待队列可以所有到达运行队列的一个主要缘由。
至此,异步的执行流程便已分析完毕。关于拦截链,会在后面进行详细分析。
经过源代码分析,能够看到同步请求会阻塞线程,所以须要放到子线程中运行
异步请求回调方法位于子线程
经过同步和异步的分析,咱们知道了dispatcher
其实就是维护了一个线程池,关于dispatcher
更多内容会在后面详细解读。