在平常开发中网络请求是很常见的功能。OkHttp做为Android开发中最经常使用的网络请求框架,在Android开发中咱们常常结合retrofit一块儿使用,俗话说得好:“知其然知其因此然”,因此这篇文章咱们经过源码来深刻理解OKHttp3(基于3.12版本)java
//框架引入项目
implementation("com.squareup.okhttp3:okhttp:3.12.0")
//引用官方Demo的例子
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//主线程不能进行耗时操做
new Thread(){
@Override
public void run() {
super.run();
/**
* 同步请求
*/
GetExample getexample = new GetExample();
String syncresponse = null;
try {
syncresponse = getexample.run("https://raw.github.com/square/okhttp/master/README.md");
Log.i("maoqitian","异步请求返回参数"+syncresponse);
} catch (IOException e) {
e.printStackTrace();
}
}
}.start();
/**
* 异步请求
*/
PostExample postexample = new PostExample();
String json = postexample.bowlingJson("Jesse", "Jake");
try {
postexample.post("http://www.roundsapp.com/post", json);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 异步请求
*/
class PostExample {
final MediaType JSON = MediaType.get("application/json; charset=utf-8");
//获取 OkHttpClient 对象
OkHttpClient client = new OkHttpClient();
void post(String url, String json) throws IOException {
RequestBody body = RequestBody.create(JSON, json);
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.i("maoqitian","异步请求返回参数"+e.toString());
}
@Override
public void onResponse(Call call, Response response) throws IOException {
String asynresponse= response.body().string();
Log.i("maoqitian","异步请求返回参数"+asynresponse);
}
});
}
String bowlingJson(String player1, String player2) {
return "{'winCondition':'HIGH_SCORE',"
+ "'name':'Bowling',"
+ "'round':4,"
+ "'lastSaved':1367702411696,"
+ "'dateStarted':1367702378785,"
+ "'players':["
+ "{'name':'" + player1 + "','history':[10,8,6,7,8],'color':-13388315,'total':39},"
+ "{'name':'" + player2 + "','history':[6,10,5,10,10],'color':-48060,'total':41}"
+ "]}";
}
}
/**
* 同步请求
*/
class GetExample {
OkHttpClient client = new OkHttpClient();
String run(String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();
try (Response response = client.newCall(request).execute()) {
return response.body().string();
}
}
}
复制代码
首先看一个流程图,对于接下来的源码分析有个大致印象android
经过上面的例子能够看到,不论是同步请求仍是异步请求,首先调用的OkHttpClient的newCall(request)方法,先来看看这个方法git
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
复制代码
经过newCall方法的源码能够看到该方法返回值是Call,Call是一个接口,他的实现类是RealCall,因此咱们执行的同步execute()方法或者异步enqueue()方法都是RealCall的方法。newCall方法接收了的网络请求参数,接下来咱们看看execute()和enqueue()方法github
/**
* 同步请求
*/
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
timeout.enter();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
e = timeoutExit(e);
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
/**
* 异步请求
*/
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
复制代码
这里先看异步的enqueue方法,很直观能够看到真正执行网络请求的是最后一句代码,而它是怎么作的呢,咱们还得先弄明白dispatcher,Dispatcher的本质是异步请求的调度器,它内部持有一个线程池,结合线程池调配并发请求。官方文档描述也说了这一点。web
/**最大并发请求数*/
private int maxRequests = 64;
/**每一个主机最大请求数*/
private int maxRequestsPerHost = 5;
/** Ready async calls in the order they'll be run. 准备要执行的异步请求队列*/ private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet.
正在执行的异步请求队列*/
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. 正在执行的同步请求队列*/ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); /** Dispatcher 构造方法 */ public Dispatcher(ExecutorService executorService) { this.executorService = executorService; } public Dispatcher() { } public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } 复制代码
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
}
promoteAndExecute();
}
/**
* Promotes eligible calls from {@link #readyAsyncCalls} to {@link #runningAsyncCalls} and runs
* them on the executor service. Must not be called with synchronization because executing calls
* can call into user code.
*
* @return true if the dispatcher is currently running calls.
*/
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
复制代码
/**
* Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
eventListener.callFailed(RealCall.this, ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
复制代码
/**
* Runnable implementation which always sets its thread name.
*/
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
复制代码
@Override protected void execute() {
boolean signalledCallback = false;
timeout.enter();
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
e = timeoutExit(e);
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
/**Dispatcher的finished方法*/
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call);
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call);
}
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
boolean isRunning = promoteAndExecute();
if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}
复制代码
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
复制代码
由getResponseWithInterceptorChain()方法咱们看到添加了不少Interceptor(拦截器),首先要了解每一个Interceptor的做用,也能大体了解OKHttp完成网络请求的过程。json
在阅读接下来源码以前,咱们先要了解责任链模式。通俗化的讲在责任链模式中有不少对象,这些对象能够理解为上面列出的拦截器,而每一个对象之间都经过一条链子链接,网络请求在这条链子上传递,直到某一个对象处理了这个网络请求,也就是完成了网络请求。使用这个模式的好处就是无论你用多少拦截器处理什么操做,最终都不会影响咱们的发出请求的目的,就是完成网络请求,拦截过程你能够任意添加分配责任。设计模式
接着继续看Interceptor.Chain,他是Interceptor的内部接口,前面添加的每个拦截器都实现了Interceptor接口,而RealInterceptorChain是Interceptor.Chain接口的实现类。先看RealInterceptorChain的proceed方法源码缓存
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
......
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
.....
return response;
}
复制代码
经过源码能够注意到interceptor.intercept(next),RetryAndFollowUpInterceptor做为默认拦截器的第一个拦截器,也就是执行了它的intercept方法bash
前面说过RetryAndFollowUpInterceptor拦截器执行OKHttp网络重试,先看它的intercept方法服务器
/**RetryAndFollowUpInterceptor的intercept方法 **/
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
//将请求经过链子chain传递到下一个拦截器
Response response;
boolean releaseConnection = true;
try {
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// 线路异常,链接失败,检查是否能够从新链接
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getFirstConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
// IO异常,链接失败,检查是否能够从新链接
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources. 释放资源 if (releaseConnection) { streamAllocation.streamFailed(null); streamAllocation.release(); } } // Attach the prior response if it exists. Such responses never have a body. if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build(); } Request followUp; try { //效验状态码、身份验证头信息、跟踪重定向或处理客户端请求超时 followUp = followUpRequest(response, streamAllocation.route()); } catch (IOException e) { streamAllocation.release(); throw e; } if (followUp == null) { streamAllocation.release(); // 不须要重定向,正常返回结果 return response; } closeQuietly(response.body()); //超太重试次数 if (++followUpCount > MAX_FOLLOW_UPS) { streamAllocation.release(); throw new ProtocolException("Too many follow-up requests: " + followUpCount); } if (followUp.body() instanceof UnrepeatableRequestBody) { streamAllocation.release(); throw new HttpRetryException("Cannot retry streamed HTTP body", response.code()); } if (!sameConnection(response, followUp.url())) { streamAllocation.release(); streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(followUp.url()), call, eventListener, callStackTrace); this.streamAllocation = streamAllocation; } else if (streamAllocation.codec() != null) { throw new IllegalStateException("Closing the body of " + response + " didn't close its backing stream. Bad interceptor?"); } request = followUp; priorResponse = response; } } 复制代码
1.首先建立StreamAllocation对象(稍后分析),在一个死循环中将经过链子chain传递到下一个拦截器,若是捕获异常,则判断异常是否恢复链接,不能链接则抛出异常,跳出循环并是否建立的链接池资源
2.第一步没有异常,还要返回值效验状态码、头部信息、是否须要重定向、链接超时等信息,捕获异常则抛出并退出循环
3.若是若是重定向,循环超出RetryAndFollowUpInterceptor拦截器的最大重试次数,也抛出异常,退出循环
/**
* Bridges from application code to network code. First it builds a network request from a user
* request. Then it proceeds to call the network. Finally it builds a user response from the network
* response.
* BridgeInterceptor的intercept方法
*/
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
......
Response networkResponse = chain.proceed(requestBuilder.build());
......
}
复制代码
/**
* 拦截器CacheInterceptor的intercept方法
*/
@Override public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
//获取策略,假设当前可使用网络
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it. } // If we're forbidden from using the network and the cache is insufficient, fail. 若是网络被禁止使用而且没有缓存,则请求失败
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// If we don't need the network, we're done.若是有缓存,则返回响应缓存,请求完成
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
//没有缓存,则进行网络请求,执行下一个拦截器
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// If we have a cache response too, then we're doing a conditional get. if (cacheResponse != null) { //状态码 304 if (networkResponse.code() == HTTP_NOT_MODIFIED) { Response response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers(), networkResponse.headers())) .sentRequestAtMillis(networkResponse.sentRequestAtMillis()) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis()) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close(); // Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache.trackConditionalCacheHit(); cache.update(cacheResponse, response); return response; } else { closeQuietly(cacheResponse.body()); } } Response response = networkResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); if (cache != null) { if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) { // Offer this request to the cache. //保存缓存 CacheRequest cacheRequest = cache.put(response); return cacheWritingResponse(cacheRequest, response); } if (HttpMethod.invalidatesCache(networkRequest.method())) { try { cache.remove(networkRequest); } catch (IOException ignored) { // The cache cannot be written. } } } return response; } 复制代码
先看看intercept方法的大体逻辑
缓存的场景也符合设计模式中的策略模式,须要CacheStrategy提供策略在不一样场景下读缓存仍是请求网络。
了解了缓存逻辑,继续深刻了解OKHttp的缓存是如何作的。首先咱们应该回到最初的缓存拦截器设置代码
/**RealCall 设置缓存拦截器*/
interceptors.add(new CacheInterceptor(client.internalCache()));
/**OkHttpClient 设置缓存*/
Cache cache;
@Override public void setCache(OkHttpClient.Builder builder, InternalCache internalCache) {
builder.setInternalCache(internalCache);
}
void setInternalCache(@Nullable InternalCache internalCache) {
this.internalCache = internalCache;
this.cache = null;
}
InternalCache internalCache() {
return cache != null ? cache.internalCache : internalCache;
}
/**Cache类中 内部持有 InternalCache */
final DiskLruCache cache;
final InternalCache internalCache = new InternalCache() {
@Override public Response get(Request request) throws IOException {
return Cache.this.get(request);
}
@Override public CacheRequest put(Response response) throws IOException {
return Cache.this.put(response);
}
@Override public void remove(Request request) throws IOException {
Cache.this.remove(request);
}
@Override public void update(Response cached, Response network) {
Cache.this.update(cached, network);
}
@Override public void trackConditionalCacheHit() {
Cache.this.trackConditionalCacheHit();
}
@Override public void trackResponse(CacheStrategy cacheStrategy) {
Cache.this.trackResponse(cacheStrategy);
}
};
复制代码
上面咱们分别截取了 RealCall类、OkHttpClient类和Cache类的源码,能够了解到拦截器使用的缓存类是DiskLruCache,设置缓存缓存只能经过OkHttpClient的builder来设置,缓存操做实现是在Cache类中,可是Cache没有实现InternalCache接口,而是持有InternalCache接口的内部类对象来实现缓存的操做方法,这样就使得缓存的操做实现只在Cache内部,外部用户是没法实现缓存操做的,方便框架内部使用,接口扩展也不影响外部。
根据前面的分析,缓存拦截器中也会调用chain.proceed方法,因此这时候执行到了第四个默认拦截器ConnectInterceptor,接着看它的intercept方法
/**
* 拦截器ConnectInterceptor的intercept方法
*/
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//打开链接
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
//交由下一个拦截器处理
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
复制代码
public Builder() {
.......
connectionPool = new ConnectionPool();
.......
}
复制代码
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
........
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
.......
}
} catch (IOException e) {
throw new RouteException(e);
}
}
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
........
return candidate;
}
}
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
............
//
if (result == null) {
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
//链接复用
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
..........
if (!foundPooledConnection) {
........
result = new RealConnection(connectionPool, selectedRoute);
//记录每一个链接的引用,每一个调用必须与同一链接上的调用配对。
acquire(result, false);
}
}
.........
synchronized (connectionPool) {
.......
// Pool the connection. 将链接放入链接池
Internal.instance.put(connectionPool, result);
......
}
}
.......
return result;
}
复制代码
根据上面的源码,咱们能够知道findHealthyConnection在循环找健康的链接,直到找到链接,说明findConnection方法是寻找链接的核心方法,该方法中存在能够复用的链接则复用,不然建立新的链接,而且记录链接引用,咱们能够明白StreamAllocation主要是为拦截器提供一个链接, 若是链接池中有复用的链接则复用链接, 若是没有则建立新的链接。
明白StreamAllocation是如何建立和复用链接池,咱们还要明白链接池(ConnectionPool)的是如何实现的。
理解ConnectionPool以前,咱们须要明白TCP链接的知识,Tcp创建链接三次握手和断开链接四次握手过程是须要消耗时间的,在http/1.0每一次请求只能打开一次链接,而在http/1.1是支持持续链接(persistent connection),使得一次链接打开以后会保持一段时间,若是仍是同一个请求而且使同一个服务器则在这段时间内继续请求链接是能够复用的。而ConnectionPool也实现了这个机制,在它内部持有一个线程池和一个缓存链接的双向列表,链接中最多只能存在5个空闲链接,空闲链接最多只能存活5分钟,空闲链接到期以后定时清理。
public final class ConnectionPool {
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
*/
//线程池
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */
private final int maxIdleConnections;
private final long keepAliveDurationNs;
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
// 后台按期清理链接的线程
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
//缓存链接的双向队列
private final Deque<RealConnection> connections = new ArrayDeque<>();
............
/**
* Create a new connection pool with tuning parameters appropriate for a single-user application.
* The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
* this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
*/
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
............
}
复制代码
/**
RealConnection类newCodec方法
*/
public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, chain, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(chain.readTimeoutMillis());
source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
复制代码
/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket.
RealConnection类connectSocket方法
*/
private void connectSocket(int connectTimeout, int readTimeout, Call call,
EventListener eventListener) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
eventListener.connectStart(call, route.socketAddress(), proxy);
rawSocket.setSoTimeout(readTimeout);
try {
//打开 socket 链接
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
// The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
// More details:
// https://github.com/square/okhttp/issues/3245
// https://android-review.googlesource.com/#/c/271775/
try {
//使用OKio来对数据读写
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
复制代码
/**CallServerInterceptor的intercept方法*/
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
//按照HTTP协议,依次写入请求体
httpCodec.writeRequestHeaders(request);
.................
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
//
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
...............
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
//响应数据OKio写入
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
}
return response;
}
/**Http1Codec方法**/
//OKio 读写对象
final BufferedSource source;
final BufferedSink sink;
@Override public void writeRequestHeaders(Request request) throws IOException {
//构造好请求头
String requestLine = RequestLine.get(
request, streamAllocation.connection().route().proxy().type());
writeRequest(request.headers(), requestLine);
}
/** Returns bytes of a request header for sending on an HTTP transport.
将请求信息写入sink
*/
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}
复制代码
/**Http1Codec方法**/
/**
读取响应头信息
*/
@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
throw new IllegalStateException("state: " + state);
}
try {
StatusLine statusLine = StatusLine.parse(readHeaderLine());
Response.Builder responseBuilder = new Response.Builder()
.protocol(statusLine.protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(readHeaders());
if (expectContinue && statusLine.code == HTTP_CONTINUE) {
return null;
} else if (statusLine.code == HTTP_CONTINUE) {
state = STATE_READ_RESPONSE_HEADERS;
return responseBuilder;
}
state = STATE_OPEN_RESPONSE_BODY;
return responseBuilder;
} catch (EOFException e) {
// Provide more context if the server ends the stream before sending a response.
IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
exception.initCause(e);
throw exception;
}
}
/**
写入响应输入到ResponseBody
*/
@Override public ResponseBody openResponseBody(Response response) throws IOException {
streamAllocation.eventListener.responseBodyStart(streamAllocation.call);
String contentType = response.header("Content-Type");
if (!HttpHeaders.hasBody(response)) {
Source source = newFixedLengthSource(0);
return new RealResponseBody(contentType, 0, Okio.buffer(source));
}
if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
Source source = newChunkedSource(response.request().url());
return new RealResponseBody(contentType, -1L, Okio.buffer(source));
}
long contentLength = HttpHeaders.contentLength(response);
if (contentLength != -1) {
Source source = newFixedLengthSource(contentLength);
return new RealResponseBody(contentType, contentLength, Okio.buffer(source));
}
return new RealResponseBody(contentType, -1L, Okio.buffer(newUnknownLengthSource()));
}
复制代码
最后,经过OKHttp这个框架源码阅读,也是对本身的一个提高,不只了解了框架原理,设计模式在适宜场景的运用,同时也是对本身耐心的一次考验,源码的阅读是枯燥的,可是只要静下心来,也能发现阅读源码的乐趣。因为本人水平有限,文章中若是有错误,请你们给我提出来,你们一块儿学习进步,若是以为个人文章给予你帮助,也请给我一个喜欢和关注。
参考连接:
参考书籍: