OkHttp源码分析之ConnectInterceptor和CallServerInterceptor
前一篇分析Okhttp源码我发现拦截器部分的最后两个拦截器ConnectInterceptor和CallServerInterceptor的内容有点多,而且他们两个的联系也是紧密的,这里就抽出来单独分析。
转载请注明出处
https://blog.csdn.net/dreamsever/article/details/80141224
先说ConnectInterceptor,连接拦截器其实就是为后面的CallServerInterceptor请求服务器拦截器做准备的,
如下图:
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
说说各自的作用:
Request: 一个请求,包含请求信息
StreamAllocation :是用来统筹协调其他三个类关系的,这三个类分别是Connections,Streams和Calls。
HttpCodec:用来编码Http请求和解码Http响应的,最终执行的Okio的请求在这里发生
RealConnection:实现Connection接口,为Http流或者socket建立的连接,相当于修建的管道,为以后传输数据
StreamAllocation 是在最开始的拦截器RetryAndFollowUpInterceptor的intercept方法中就新建好的,每次都赋值给新的RealInterceptorChain,一步步传递到这里才使用。
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
先通过streamAllocation的newStream方法得到HttpCodec
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
int connectTimeout = client.connectTimeoutMillis();
int readTimeout = client.readTimeoutMillis();
int writeTimeout = client.writeTimeoutMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
//得到一个健康可用的RealConnection
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
//循环调用findConnection,直到得到一个健康的RealConnection
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,connectionRetryEnabled);
// 如果是一个全新的连接直接拿去使用不需要检查
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// 检查这个连接是否是健康可用的,如果不是把它移除连接池
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
下面去看findConnection方法,这里又出现了一个connectionPool,是一个连接池,用来维护管理着所有的连接,包括HTTP/1.1和 HTTP/2的连接,使用已经存在的连接可以减少等待时间。这个连接池的配置是最大连接数为5,最大空闲存活时间为5分钟。每次添加一个连接到连接池都会判断Runnable cleanupRunnable,清理线程是否在工作中,不在工作中,把它启动起来放入线程池执行。关于复用连接池这块感兴趣的可以去看看这一篇博客–OkHttp3源码分析复用连接池
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// 优先返回一个已经分配的连接,假如这个连接不为空,并且这个连接上面还可以建立新的stream
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// 上面没成功,去连接池中获得一个connection
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
//以上都没成功给selectedRoute赋值为当前路由
selectedRoute = route;
}
// If we need a route, make one. This is a blocking operation.
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
}
RealConnection result;
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
// 再次尝试去连接池获取一个RealConnection,也许由于连接合并可以得到
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null) return connection;
// 最后也没得到可用的RealConnection,就去创建一个RealConnection
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result);
}
//执行TCP + TLS握手
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
// 将新建的连接放入连接池
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
//如果有另一个具有相同IP地址的连接被同时创建,那么把这个连接释放,去用另外一个
//这种情况只有Http2的时候才会去执行
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
return result;
}
上面就是先去查找已经存在的可以用的RealConnection ,假如没有找到就去新建一个RealConnection ,新建连接的时候就分两种情况,HTTP/2与HTTP 1.1,下面先了解下他们的区别
HTTP/2与HTTP 1.1相比,主要区别包括
HTTP/2采用二进制格式而非文本格式
HTTP/2是完全多路复用的,而非有序并阻塞的——只需一个连接即可实现并行
使用报头压缩,HTTP/2降低了开销
HTTP/2让服务器可以将响应主动“推送”到客户端缓存中
所以HTTP/2时一个RealConnection可以用于多个stream,默认最大并发流为MAX_CONCURRENT_STREAMS = 4,而Http 1.1不支持多路复用,在RealConnection里面public int allocationLimit = 1;默认只有一个流可以分配。
Internal.instance其实还是连接池ConnectionPool
@Nullable
Socket deduplicate(Address address, StreamAllocation streamAllocation) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, null)
&& connection.isMultiplexed()
&& connection != streamAllocation.connection()) {
return streamAllocation.releaseAndAcquire(connection);
}
}
return null;
}
循环查找到那个同步被创建的RealConnection, 然后传入执行releaseAndAcquire方法。
StreamAllocation.java
public Socket releaseAndAcquire(RealConnection newConnection) {
assert (Thread.holdsLock(connectionPool));
if (codec != null || connection.allocations.size() != 1) throw new IllegalStateException();
// Release the old connection.
Reference<StreamAllocation> onlyAllocation = connection.allocations.get(0);
Socket socket = deallocate(true, false, false);
// Acquire the new connection.
this.connection = newConnection;
newConnection.allocations.add(onlyAllocation);
return socket;
}
这里面,先把当前的StreamAllocation的this.connection,也就是刚刚新建的那个执行deallocate方法丢弃,然后把查找的newConnection赋值给 this.connection。最后把刚刚新建的connection中的socket关闭。
翻过千山万水,我们总算走完了RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);这一句代码,找到了一个健康可用的RealConnection。比如我们需要找管道传送水到某处,我们首先做的是找管道,现在我们的管道建立好了并且处于可用状态。然后我们再看一看HttpCodec resultCodec = resultConnection.newCodec(client, this);resultCodec 相当于是一个水长,他们不生产水,它只是水的净化检测机构。它把河水转化为可以通过管道输送到用户的自来水。只不过这个水厂有点特别还可以接收用户的请求。
public HttpCodec newCodec(
OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(client.readTimeoutMillis());
source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
得到的HttpCodec 有两种情况,Http/1.1框架时是返回Http1Codec,Http/2时是返回Http2Codec,Http1Codec的时候传入的是(client, streamAllocation, source, sink),BufferedSource 和BufferedSink是Okio里面的用于数据流操作,这里不再深入
final BufferedSource source;
final BufferedSink sink;
int state = STATE_IDLE;
public Http1Codec(OkHttpClient client, StreamAllocation streamAllocation, BufferedSource source,
BufferedSink sink) {
this.client = client;
this.streamAllocation = streamAllocation;
this.source = source;
this.sink = sink;
}
Http2Codec的时候传入的是(client, streamAllocation, http2Connection);前两个和Http1Codec一样,不一样的是第三个参数,第三个参数是在建立连接的时候判断是不是Http/2协议,如果是的话去创建一个http2Connection,在创建的时候其实已经传入了source和sink,这个后期都是要用到的。
if (protocol == Protocol.HTTP_2) {
socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
http2Connection = new Http2Connection.Builder(true)
.socket(socket, route.address().url().host(), source, sink)
.listener(this)
.build();
http2Connection.start();
}
现在Request ,StreamAllocation ,HttpCodec, RealConnection都准备好了,可以去请求了。
CallServerInterceptor的intercept方法真正的去执行这个请求并拿到response
这里先了解一个小的知识点
100-continue 是用于客户端在发送 post 数据给服务器时,征询服务器情况,看服务器是否处理 post 的数据,如果不处理,客户端则不上传 post 数据,正常情况下服务器收到请求后,返回 100 或错误码。
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
httpCodec.writeRequestHeaders(request);
Response.Builder responseBuilder = null;
/**
这里先去判断这个请求是不是包含请求体的请求,基本上我了解的除了GET请求都是,需要可以传递请求体的
判断是否可以包含请求体的请求,并且请求体不为空
*/
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return what
// we did get (such as a 4xx response) without ever transmitting the request body.
//如果请求体中包含100-continue,先去执行一次请求征询服务器情况,看服务器是否处理 post 的数据
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
responseBuilder = httpCodec.readResponseHeaders(true);
}
//responseBuilder == null表示支持,把请求体写入
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
} else if (!connection.isMultiplexed()) {
//responseBuilder 不为空表示征询服务器情况没有触发,已经拿到返回
//并且这个connection不是Http2,不支持流复用一个连接,将这个连接状态改为不可以新建流,
//释放这个connection,最后关掉socket连接
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
// being reused. Otherwise we're still obligated to transmit the request body to leave the
// connection in a consistent state.
streamAllocation.noNewStreams();
}
}
//执行一些完成操作,如关闭sink等都是OKio的操作
httpCodec.finishRequest();
if (responseBuilder == null) {
//为空也就是刚才Expect: 100-continue校验成功的情况,这时候已经写入了post请求体,再次去请求读取响应
responseBuilder = httpCodec.readResponseHeaders(false);
}
//给最终的响应设置信息,请求时间,接受时间请求和握手
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
//再次构造一次response,将数据放入body
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
//返回响应
return response;
}
}
参考链接:
https://www.jianshu.com/p/92a61357164b
https://m.aliyun.com/yunqi/articles/78101
https://blog.piasy.com/2016/07/11/Understand-OkHttp/
上一篇: okhttp的使用详情
下一篇: 使用OkHttp拦截器,添加统一参数
推荐阅读
-
Swoft源码之Swoole和Swoft的分析
-
MapReduce之Job提交流程源码和切片源码分析
-
Android源码解析之应用程序框架层和系统运行库层日志系统分析
-
jQuery源码分析之构造jQuery对象-源码结构和核心函数
-
OkHttp源码分析之ConnectInterceptor和CallServerInterceptor
-
XML解析 验证之XSD和DTD验证以及 SpringXML验证源码分析
-
MapReduce之Job提交流程源码和切片源码分析
-
java定时任务_java中的任务调度之Timer定时器(案例和源码分析)
-
Swoft源码之Swoole和Swoft的分析
-
Shiro源码分析之Subject和SecurityManager