OkHttp3源码工作流程浅析
细节先放在一边,咱来把工作流程先理一下。
常规调用
- 异步
Request request = new Request.Builder()
.url(url)
.post(requestBody.build())
.build();
okHttpClient.newCall(request).enqueue(callback);
- 同步
Request request = new Request.Builder()
.url(url)
.post(requestBody.build())
.build();
okHttpClient.newCall(request).execute();
工作流程
先看下OkHttpClient的参数:
//OkHttpClient.java
public static final class Builder {
//重点,分发器,主管分发任务,所以内部肯定有队列之类的存在
Dispatcher dispatcher;
@Nullable Proxy proxy;
List<Protocol> protocols;
List<ConnectionSpec> connectionSpecs;
//重点,OkHttp核心工作流程就是顺序执行各种拦截器的
final List<Interceptor> interceptors = new ArrayList<>();
final List<Interceptor> networkInterceptors = new ArrayList<>();
EventListener.Factory eventListenerFactory;
ProxySelector proxySelector;
CookieJar cookieJar;
@Nullable Cache cache;
@Nullable InternalCache internalCache;
SocketFactory socketFactory;
@Nullable SSLSocketFactory sslSocketFactory;
@Nullable CertificateChainCleaner certificateChainCleaner;
HostnameVerifier hostnameVerifier;
CertificatePinner certificatePinner;
Authenticator proxyAuthenticator;
Authenticator authenticator;
ConnectionPool connectionPool;
Dns dns;
boolean followSslRedirects;
boolean followRedirects;
boolean retryOnConnectionFailure;
int connectTimeout;
int readTimeout;
int writeTimeout;
int pingInterval;
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}
}
若只想了解工作流程的话,只需要关注Dispatcher与Interceptor就可以了。
再看newCall干了什么:
//OkHttpClient.java
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
//RealCall.java
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
//看来这就是请求主体了
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
请求被封装成一个RealCall,然后调用了enqueue了扔进了队列。
再看enqueue:
//RealCall.java
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
//重点,就是这里了,出现了dispather
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
由Dispatcher接手,并且此处封装了一个带回调的AsyncCall,这个AsyncCall就是任务的实体。
转战Dispatcher:
//Dispatcher.java
//双向队列,已经准备就绪的异步任务
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//双向队列,正在执行的异步任务
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//双向队列,正在执行的同步任务
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
//将任务添进队列,线程池开始执行异步任务
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
//开始执行
executorService().execute(call);
} else {
//超出最大数量,则放入就绪队列,后面finish时就是从此队列取出下一个任务
readyAsyncCalls.add(call);
}
}
/**
* 线程池由此而来,是用SynchronousQueue,一个阻塞队列,
* 必须等队列中的添加元素被消费后才能继续添加新的元素
*/
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
再来看看这个AsynCall怎样封装的:
//RealCall.java
final class AsyncCall extends NamedRunnable {
//应答的回调,enqueue传入的
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
//顾名思义,父类就是一个可以命名的线程
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
//重点,真正的执行流程从此开始
@Override protected void execute() {
boolean signalledCallback = false;
try {
//重点,应答在此已经得到
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//重点,结束当前请求后的后续操作
client.dispatcher().finished(this);
}
}
}
AsyncCall就是一个Runnable,它自己的execute就是任务执行的具体操作了,而response应该就在getResponseWithInterceptorChain中拿到了:
//RealCall.java
Response getResponseWithInterceptorChain() throws IOException {
List<Interceptor> interceptors = new ArrayList<>();
//所有自定义的拦截器
interceptors.addAll(client.interceptors());
//重定向拦,主要负责失败重连工作
interceptors.add(retryAndFollowUpInterceptor);
//桥接,主要负责设置内容长度、编码方式、设置gzip压缩、添加请求头、cookie等相关功能
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//缓存,处理相关缓存策略
interceptors.add(new CacheInterceptor(client.internalCache()));
//连接,创建真实的connection,打开socket链接
interceptors.add(new ConnectInterceptor(client));
//forWebSocket默认为false,传入一般也为false
if (!forWebSocket) {
//所有自定义的网络拦截器
interceptors.addAll(client.networkInterceptors());
}
//向服务器发送请求,返回Response
interceptors.add(new CallServerInterceptor(forWebSocket));
//组成了一个链
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,originalRequest, this, eventListener, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis());
//重点,最终在这里得到了response
return chain.proceed(originalRequest);
}
各种Interceptor就是网络请求所作的具体通讯操作,到这里,已经接近通讯层了,继续看RealInterceptorChain:
//RealInterceptorChain.java
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}
return response;
}
没错了,就在这里返回了最终的response,至于Interceptor究竟做了什么最终获取到了Response,这里就不讨论了。
应答找到了,看看应答后会发生什么事:
//Dispatcher.java
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//这里传入了true
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
//当前没有任务运行且“空闲任务”不为空时,开始执行这个“空闲任务”
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
//当一个网络请求任务完成后,默认是要到此方法来的
private void promoteCalls() {
//正在执行的任务超限,不再往下走了
if (runningAsyncCalls.size() >= maxRequests) return;
//就绪的任务队列空了,没有任务可执行了,也不再往下走了,看来这个方法就是为了接着执行下一个方法而写的了
if (readyAsyncCalls.isEmpty()) return;
//把就绪队列的任务取出来,可以是多个,取决于runningAsyncCalls的上限,缺多少补多少个,然后这N个任务中某个或多个任务执行完后,又来到此处取出M个就绪的任务,一直到将readyAsyncCalls队列中所有的任务取完
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
//走正常的异步执行流程
executorService().execute(call);
}
//当前正在执行的任务数量到上限了,只有等待下一次添加任务了
if (runningAsyncCalls.size() >= maxRequests) return;
}
}
可以看到,当一个AsynCall执行完毕后,会根据相关限制从readyAsyncCalls取出更多的AsynCall进行执行,直到所有AsynCall执行完毕。
回调就不必多说了,在AsynCall.responseCallback中传回了response。
这就是异步的工作流程。
同步有一点区别,在RealCall中调用的方法不一样:
//RealCall.java
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
//只做了添加正在执行的队列中
client.dispatcher().executed(this);
//直接执行了,进入了同样的方法
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
看得出来,同步是直接进行了获取response的操作,异步相比之下就多了线程池的调度操作,也是应有之意。
简而言之,OkHttp3的网络请求就是通过一个InterceptorChain获取response的过程,想深入了解okhttp3的原理,就要研究各个Interceptor的作用了。
以上。
下一篇: 一个并行限制Promise的例题