OKHttp源码解析(一)
一、OKHttp的综述
OkHttp是一个高效的Http客户端,有如下的特点:
- 支持HTTP2/SPDY黑科技
- socket自动选择最好路线,并支持自动重连
- 拥有自动维护的socket连接池,减少握手次数
- 拥有队列线程池,轻松写并发
- 拥有Interceptors轻松处理请求与响应(比如透明GZIP压缩,LOGGING)
-
基于Headers的缓存策略
Okhttp的整体流程图
二、网络请求
用OkHttpClient.newCall(request)进行execute/enenqueue时,实际是将请求Call放到了Dispatcher中,okhttp使用Dispatcher进行线程分发,它有两种方法,一个是普通的同步单线程;另一种是使用了队列进行并发任务的分发(Dispatch)与回调。接下来分析第二种并发任务的分发。
1. Dispatcher的结构
maxRequests = 64: 最大并发请求数为64
maxRequestsPerHost = 5: 每个主机最大请求数为5
Dispatcher: 分发者,也就是生产者(默认在主线程)
AsyncCall: 队列中需要处理的Runnable(包装了异步回调接口)
ExecutorService:消费者池(也就是线程池)
Deque:缓存(用数组实现,可自动扩容,无大小限制)
Deque:正在运行的任务,仅仅是用来引用正在运行的任务以判断并发量,注意它并不是消费者缓存
根据生产者消费者模型的模型理论,当入队(enqueue)请求时,如果满足(runningRequests<64 && runningRequestsPerHost<5),那么就直接把AsyncCall直接加到runningCalls的队列中,并在线程池中执行。如果消费者缓存满了,就放入readyAsyncCalls进行缓存等待。
当任务执行完成后,调用finished的promoteCalls()函数,手动移动缓存区(可以看出这里是主动清理的,因此不会发生死锁)
当我们希望使用OkHttp的异步请求时,一般进行如下构造
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder()
.url("http://qq.com").get().build();
client.newCall(request).enqueue(new Callback() {
@Override public void onFailure(Call call, IOException e) {
}
@Override public void onResponse(Call call, Response response) throws IOException {
}
});
根据代码,我们可以发现实际上是Dispatcher进行了入队操作
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//添加正在运行的请求
runningAsyncCalls.add(call);
//线程池执行请求
executorService().execute(call);
} else {
//添加到缓存队列排队等待
readyAsyncCalls.add(call);
}
}
如果满足条件,那么就直接把AsyncCall直接加到runningCalls的队列中,并在线程池中执行(线程池会根据当前负载自动创建,销毁,缓存相应的线程)。反之就放入readyAsyncCalls进行缓存等待。
请求元素RealCall#execute(它实现了Runnable接口),它内部实现的execute方法如下
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
调用 getResponseWithInterceptorChain() 函数获取 HTTP 返回结果,从函数名可以看出,这一步还会进行一系列“拦截”操作。
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
Interceptor 是 OkHttp 最核心的一个东西,不要误以为它只负责拦截请求进行一些额外的处理(例如 cookie),实际上它把实际的网络请求、缓存、透明压缩等功能都统一了起来,每一个功能都只是一个 Interceptor,它们再连接成一个 Interceptor.Chain,环环相扣,最终圆满完成一次网络请求。
三、Interceptor责任链
以下是主要的拦截器的种类:
RetryAndFollowUpInterceptor 负责失败重试以及重定向的
BridgeInterceptor 负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为用户友好的响应的
CacheInterceptor 负责读取缓存直接返回、更新缓存的
ConnectInterceptor 负责和服务器建立连接的
networkInterceptors 配置 OkHttpClient 时设置的
CallServerInterceptor 负责向服务器发送请求数据、从服务器读取响应数据的
1、ConnectInterceptor(建立连接)
核心代码拦截如下:
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
创建了一个 HttpCodec 对象,它是对 HTTP 协议操作的抽象,有两个实现:Http1Codec 和 Http2Codec,顾名思义,它们分别对应 HTTP/1.1 和 HTTP/2 版本的实现。
2、CallServerInterceptor(发送和接收数据)
核心代码拦截如下:
@Override public Response intercept(Chain chain) throws IOException {
HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
Request request = chain.request();
long sentRequestMillis = System.currentTimeMillis();
httpCodec.writeRequestHeaders(request);
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
httpCodec.finishRequest();
Response response = httpCodec.readResponseHeaders()
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
if (!forWebSocket || response.code() != 101) {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
// 省略部分检查代码
return response;
}
向服务器发送 request header;
如果有 request body,就向服务器发送;
读取 response header,先构造一个 Response 对象;
如果有 response body,就在 3 的基础上加上 body 构造一个新的 Response 对象;
四、连接池
HTTP中的keepalive连接在网络性能优化中,对于延迟降低与速度提升的有非常重要的作用。
Okhttp支持5个并发KeepAlive连接,默认链路生命为5分钟(链路空闲后,保持存活的时间)
1、连接池的的关键对象:
Connection: 对jdk的socket物理连接的包装,它内部有List
//Socket清理的Runnable,每当put操作时,就会被主动调用
//注意put操作是在网络线程
//而Socket清理是在`OkHttp ConnectionPool`线程池中调用
while (true) {
//执行清理并返回下场需要清理的时间
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
synchronized (ConnectionPool.this) {
try {
//在timeout内释放锁与时间片
ConnectionPool.this.wait(TimeUnit.NANOSECONDS.toMillis(waitNanos));
} catch (InterruptedException ignored) {
}
}
}
}
这段死循环实际上是一个阻塞的清理任务,首先进行清理(clean),并返回下次需要清理的间隔时间,然后调用wait(timeout)进行等待以释放锁与时间片,当等待时间到了后,再次进行清理,并返回下次要清理的间隔时间
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
//遍历`Deque`中所有的`RealConnection`,标记泄漏的连接
synchronized (this) {
for (RealConnection connection : connections) {
// 查询此连接内部StreamAllocation的引用数量
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
//选择排序法,标记出空闲连接
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
//如果(`空闲socket连接超过5个`
//且`keepalive时间大于5分钟`)
//就将此泄漏连接从`Deque`中移除
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
//返回此连接即将到期的时间,供下次清理
//这里依据是在上文`connectionBecameIdle`中设定的计时
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
//全部都是活跃的连接,5分钟后再次清理
return keepAliveDurationNs;
} else {
//没有任何连接,跳出循环
cleanupRunning = false;
return -1;
}
}
//关闭连接,返回`0`,也就是立刻再次清理
closeQuietly(longestIdleConnection.socket());
return 0;
}
遍历Deque中所有的RealConnection,标记泄漏的连接
如果被标记的连接满足(空闲socket连接超过5个&&keepalive时间大于5分钟),就将此连接从Deque中移除,并关闭连接,返回0,也就是将要执行wait(0),提醒立刻再次扫描
如果(目前还可以塞得下5个连接,但是有可能泄漏的连接(即空闲时间即将达到5分钟)),就返回此连接即将到期的剩余时间,供下次清理
如果(全部都是活跃的连接),就返回默认的keep-alive时间,也就是5分钟后再执行清理
如果(没有任何连接),就返回-1,跳出清理的死循环
这里的“并发”==(“空闲”+“活跃”)==5
标记并找到最不活跃的连接:pruneAndGetAllocationCount()方法中依据弱引用是否为null而判断这个连接是否泄漏
上一篇: OkHttp 源码分析
下一篇: 02.OkHttp重要类说明
推荐阅读
-
仅用500行Python代码实现一个英文解析器的教程
-
压缩卷只能压缩一半现象的原因解析及解决方法介绍
-
基于Spring注解的上下文初始化过程源码解析(一)
-
kubernetes垃圾回收器GarbageCollector源码分析(一)
-
Nginx服务器中location配置的一些基本要点解析
-
Linux系统下怎么用CheckInstall从源码创建一个RPM或DEB包
-
得到影视源码分享(有演示),带一键采集,亲测能用,适合懒人做电影站!
-
axios 源码解析(下) 拦截器的详解
-
C# 实现PPT 每一页转成图片过程解析
-
Spring源码解析之ConfigurableApplicationContext