OkHttp源码分析
本文基于OkHttp 4.9.0分析
OkHttp是什么?
众所周知,OkHttp是一个客户端用来发送HTTP消息并对服务器的响应做出处理的应用层框架。而且现在流行的Retrofit的底层同样也是基于Okhttp的。那么OkHttp有什么优点呢?我们来看下:
- 无缝的支持GZIP减少数据流量
- 缓存响应数据减少重复的网络请求
- 请求失败自动重试主机的其他ip,自动重定向。
- 如果 HTTP/2 不可用, 使用连接池复用减少请求延迟。
- …
使用方式
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url(url)
.build();
//同步
Response response = client.newCall(request).execute();
//异步
Response response = client.newCall(request).enqueue();
我们可以看到同步和异步是调用Call的execute和enqueue方法,我们来看下具体的实现:
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
可以看出,不管是同步还是异步,都会使用dispatcher,也就是任务分发器。
Dispatcher分发器
前面我们说到了不管同步还是异步,都会使用dispatcher,dispatcher内部会有一个线程池,也就是使用异步请求的时候我们会用到的。我们先来看下Dispatcher内部基本的成员变量:
//异步请求的最大数量
var maxRequests = 64
//每个主机同时请求的最大数量
var maxRequestsPerHost = 5
//闲置任务
var idleCallback: Runnable? = null
//异步请求线程池
private var executorServiceOrNull: ExecutorService? = null
val executorService: ExecutorService
//异步请求等待队列
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
//异步请求执行队列
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
//同步请求执行队列
private val runningSyncCalls = ArrayDeque<RealCall>()
同步请求
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
因为是同步请求,所以无需做任何操作,只需要把执行的callback放入同步队列中即可。
异步请求
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
可以看到执行异步请求的时候,我们先把请求放入等待队列中,然后调用promoteAndExecute。我们看下这段代码是用来干什么的?
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
当正在执行的请求没有超过最大请求数64个时,并且同一个host的请求没有超过5个时,将它加入到执行队列。开始执行。
当请求执行完成后,还会调用分发器当finish方法,我们看下finish方法:
/** Used by [AsyncCall.run] to signal completion. */
internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
/** Used by [Call.execute] to signal completion. */
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
不管同步还是异步,执行完成后,都需要从队列中移除,然后判断是否有正在执行的任务,如果没有的话,就执行闲置任务。
请求流程
前面梳理了分发器的同步和异步操作,但是真正的请求流程还是在RealCall中。我们来看下它的同步和异步方法:
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
可以看到同步请求直接return了getResponseWithInterceptorChain()
方法。我们在看看异步的请求:
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
可以看到,在异步请求的时候丢给了AsyncCall去处理。而AsyncCall是一个Runnable。我们直接看看它的run方法。
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
可以看到,这里也是getResponseWithInterceptorChain()
去拿到它的response。而OkHttp中最核心的也就是这个方法,处理了各种拦截器的逻辑。
拦截器
无论同异步请求都会调用到getResponseWithInterceptorChain()
,这个方法主要使用责任链模式将整个请求分为几个拦截器调用,简化了各自的责任和逻辑,而且还可以扩展一些自定义的拦截器。如果不清楚责任链模式,请先查看设计模式之责任链模式。
如何进行拦截
前面说到了责任链模式,那我们看下RealCall中是如何进行责任链形式的调用,其实主要方法是在RealInterceptorChain中。我们看下这个方法:
@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if (exchange != null) {
check(exchange.finder.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
//注释1
// Call the next interceptor in the chain.
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}
可以看到注释1处,拿到下一级的拦截器,不断执行它的intercept方法,最后return response 然后一层一层向上反馈数据。
拦截器分析
现在我们来看下拦截器具体的逻辑,直接查看getResponseWithInterceptorChain()
这个方法:
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
//自定义的拦截器
interceptors += client.interceptors
//处理重定向的后续请求和失败重试
interceptors += RetryAndFollowUpInterceptor(client)
//补全请求,处理网络桥接的
interceptors += BridgeInterceptor(client.cookieJar)
//处理缓存的
interceptors += CacheInterceptor(client.cache)
//处理tcp链接的
interceptors += ConnectInterceptor
//处理网络的
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
//处理服务器通信,并封装请求数据与解析响应数据
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
RetryAndFollowUpInterceptor
这个拦截器主要处理重试以及重定向的,一般情况下,第一次请求不会涉及到,我们先来看下重试的方法。
try {
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
// 尝试通过路由链接失败,请求不会发送
if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
continue
} catch (e: IOException) {
// 与服务器通信的时候发生了异常,请求可能已经发送了
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
continue
}
可以看到两个异常都是根据recover方法判断是否能够进行重试,如果返回true,则表示允许重试。那么我们来看下recover方法:
private fun recover( e: IOException, call: RealCall, userRequest: Request,
requestSendStarted: Boolean
): Boolean {
// 本身设置了不允许重试
if (!client.retryOnConnectionFailure) return false
// 如果没法再次发送请求
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
// 如果不是重试的异常
if (!isRecoverable(e, requestSendStarted)) return false
// 没有其他路径进行链接
if (!call.retryAfterFailure()) return false
// For failure recovery, use the same route selector with a new connection.
return true
}
如果请求结束没有出现异常,不代表当前的响应就是最终交互的,因为我们还需要判断是否需要重定向,而重定向的方法是followUpRequest。我们来看一下:
@Throws(IOException::class)
private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {
val route = exchange?.connection?.route()
val responseCode = userResponse.code
val method = userResponse.request.method
when (responseCode) {
//407:使用了代理服务器,让代理服务器授权
HTTP_PROXY_AUTH -> {
val selectedProxy = route!!.proxy
if (selectedProxy.type() != Proxy.Type.HTTP) {
throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")
}
return client.proxyAuthenticator.authenticate(route, userResponse)
}
//401:未经授权,进行授权
HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)
//300,301,302,303,307,308: 需要重定向,进行重定向操作。
HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {
return buildRedirectRequest(userResponse, method)
}
// 408 :请求超时
HTTP_CLIENT_TIMEOUT -> {
//如果客户端不允许,直接返回null
if (!client.retryOnConnectionFailure) {
return null
}
//如果尝试了,还是失败,就不管了,返回null
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}
val priorResponse = userResponse.priorResponse
if (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) {
return null
}
//如果服务器告诉我们重试时间,我们也不管了,返回null
if (retryAfter(userResponse, 0) > 0) {
return null
}
return userResponse.request
}
//503:服务不可用,但是只在服务器告诉你 Retry-After:0(意思就是立即重试) 才重请求
HTTP_UNAVAILABLE -> {
val priorResponse = userResponse.priorResponse
if (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) {
return null
}
if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
return userResponse.request
}
return null
}
//421:即使域名不同,Okhttp还是可以合并Http2链接,当返回421时,可以用其他链接进行重试。
HTTP_MISDIRECTED_REQUEST -> {
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}
if (exchange == null || !exchange.isCoalescedConnection) {
return null
}
exchange.connection.noCoalescedConnections()
return userResponse.request
}
else -> return null
}
}
讲了这么多,其实就是如果此方法返回空,那就表示不需要再重定向了,直接返回响应;但是如果返回非空,那就要重新请求返回的Request。
BridgeInterceptor
桥接拦截器,其实就是http的请求头,每个网络请求都会有相关请求头才会最终到达服务器。这里不过多介绍。当前拦截器用到的请求头说明如下:
请求头 | 说明 |
---|---|
Content-Type | 请求体类型 |
Content-Length/Transfer-Encoding | 请求体解析方式 |
Host | 请求的主机站点 |
Connection: Keep-Alive | 保持长连接 |
Accept-Encoding: gzip | 接受响应支持gzip压缩 |
Cookie | cookie身份辨别 |
User-Agent | 请求的用户信息 |
CacheInterceptor
缓存拦截器,主要是做缓存相关的处理,如果本地存在缓存,并且发送时命中了缓存逻辑,就可以直接使用缓存的response。那么我们看下它的intercept方法:
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
//如果缓存的request不为空,缓存的response为空,则就不适用。直接关闭
if (cacheCandidate != null && cacheResponse == null) {
cacheCandidate.body?.closeQuietly()
}
//如果网络request为空,缓存response为空,则算是禁用了网络,直接返回504,超时
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}
// 如果网络request为空,缓存response不为空,直接使用缓存
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
//如果网络request不为空,缓存也不为空,通知命中缓存,如果缓存为空,并且Cache不为空,通知缓存丢失。
if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
listener.cacheMiss(call)
}
var networkResponse: Response? = null
try {
networkResponse = chain.proceed(networkRequest)
} finally {
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// 如果网络请求的response拿到的code是304,则证明未修改,更新至缓存中
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
cacheResponse.body?.closeQuietly()
}
}
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
if (cache != null) {
//判断是否具有主体 并且 是否可以缓存供后续使用
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
//加入缓存
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
listener.cacheMiss(call)
}
}
}
//如果请求方法无效 就从缓存中remove掉
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
缓存拦截器的操作相对来说比较简单,只是稍微有点绕。真正是否可以缓存还是请求服务端都是通过CacheStrategy去判断的。也就是缓存拦截器中的这行代码:
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
我们直接进去看看:
private fun computeCandidate(): CacheStrategy {
// 不存在缓存,直接返回
if (cacheResponse == null) {
return CacheStrategy(request, null)
}
// 如果缺少必要的握手,直接返回
if (request.isHttps && cacheResponse.handshake == null) {
return CacheStrategy(request, null)
}
// 根据响应头判断是否需要返回
if (!isCacheable(cacheResponse, request)) {
return CacheStrategy(request, null)
}
val requestCaching = request.cacheControl
//如果没有缓存,或者用户没有指定缓存,就直接返回
if (requestCaching.noCache || hasConditions(request)) {
return CacheStrategy(request, null)
}
val responseCaching = cacheResponse.cacheControl
//获得缓存的响应从创建到现在的时间
val ageMillis = cacheResponseAge()
//获取这个响应有效缓存的时长
var freshMillis = computeFreshnessLifetime()
//如果请求中指定了 max-age 表示指定了能拿的缓存有效时长,就需要综合响应有效缓存时长与请求能拿缓存的时长,获得最小的能够使用响应缓存的时长
if (requestCaching.maxAgeSeconds != -1) {
freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
}
var minFreshMillis: Long = 0
//请求认为的缓存有效时间
if (requestCaching.minFreshSeconds != -1) {
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
}
//Cache-Control:must-revalidate 可缓存但必须再向源服务器进行确认
//Cache-Control:max-stale 缓存过期后还能使用指定的时长,如果未指定多少秒,则表示无论过期多长时间都可以;如果指定了,则只要是指定时间内就能使用缓存
// 前者会忽略后者,所以判断了不必须向服务器确认,再获得请求头中的max-stale
var maxStaleMillis: Long = 0
if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
}
//不需要与服务器验证有效性 && 响应存在的时间+请求认为的缓存有效时间 < 缓存有效时长+过期后还可以使用的时间
//可以使用缓存
if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
val builder = cacheResponse.newBuilder()
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
}
val oneDayMillis = 24 * 60 * 60 * 1000L
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
}
return CacheStrategy(null, builder.build())
}
val conditionName: String
val conditionValue: String?
when {
etag != null -> {
conditionName = "If-None-Match"
conditionValue = etag
}
lastModified != null -> {
conditionName = "If-Modified-Since"
conditionValue = lastModifiedString
}
servedDate != null -> {
conditionName = "If-Modified-Since"
conditionValue = servedDateString
}
else -> return CacheStrategy(request, null)
}
val conditionalRequestHeaders = request.headers.newBuilder()
conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)
val conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build()
return CacheStrategy(conditionalRequest, cacheResponse)
}
ConnectInterceptor
此拦截器负责建⽴连接。包含了⽹络请求所需要的TCP连接(HTTP),或者TCP之前的TLS连接(HTTPS),并且会创建出对应的HttpCodec对象(⽤于编码解码HTTP请求)。我们来看下代码:
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
嗯?怎么就四行代码,我们继续往下看RealCall的initExchange方法:
internal fun initExchange(chain: RealInterceptorChain): Exchange {
synchronized(this) {
check(expectMoreExchanges) { "released" }
check(!responseBodyOpen)
check(!requestBodyOpen)
}
val exchangeFinder = this.exchangeFinder!!
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
synchronized(this) {
this.requestBodyOpen = true
this.responseBodyOpen = true
}
if (canceled) throw IOException("Canceled")
return result
}
这个方法主要是用来查找新的或者合并后的链接以进行即将到来的请求和响应。我们来看一下查找逻辑:
fun find(client: OkHttpClient,chain: RealInterceptorChain): ExchangeCodec {
try {
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}
其内部调用了findHealthyConnection,也就是查找一个可用的链接,我们接着往下看:
@Throws(IOException::class)
private fun findHealthyConnection(connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean ): RealConnection {
while (true) {
//查找链接
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// 确定找到的链接可用并返回
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}
candidate.noNewExchanges()
if (nextRouteToTry != null) continue
val routesLeft = routeSelection?.hasNext() ?: true
if (routesLeft) continue
val routesSelectionLeft = routeSelector?.hasNext() ?: true
if (routesSelectionLeft) continue
throw IOException("exhausted all routes")
}
}
我们看下查找链接的方法:
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
if (call.isCanceled()) throw IOException("Canceled")
// 尝试重用call的链接
val callConnection = call.connection
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}
//如果不为null,就返回
if (call.connection != null) {
check(toClose == null)
return callConnection
}
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// 尝试从连接池中找一个连接,如果找到就返回连接
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
// 如果连接池没有,找到下一步需要尝试的连接
val routes: List<Route>?
val route: Route
if (nextRouteToTry != null) {
routes = null
route = nextRouteToTry!!
nextRouteToTry = null
} else if (routeSelection != null && routeSelection!!.hasNext()) {
routes = null
route = routeSelection!!.next()
} else {
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
val localRouteSelection = localRouteSelector.next()
routeSelection = localRouteSelection
routes = localRouteSelection.routes
if (call.isCanceled()) throw IOException("Canceled")
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
route = localRouteSelection.next()
}
// 进行连接
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
//确定是不是多路复用,如果是就进行连接合并。
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}
//丢到缓存池里面去
synchronized(newConnection) {
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}
eventListener.connectionAcquired(call, newConnection)
return newConnection
}
我们继续往下看看它到底是怎么建立连接的:
fun connect( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, call: Call, eventListener: EventListener) {
check(protocol == null) { "already connected" }
var routeException: RouteException? = null
val connectionSpecs = route.address.connectionSpecs
val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs)
if (route.address.sslSocketFactory == null) {
if (ConnectionSpec.CLEARTEXT !in connectionSpecs) {
throw RouteException(UnknownServiceException(
"CLEARTEXT communication not enabled for client"))
}
val host = route.address.url.host
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw RouteException(UnknownServiceException(
"CLEARTEXT communication to $host not permitted by network security policy"))
}
} else {
if (Protocol.H2_PRIOR_KNOWLEDGE in route.address.protocols) {
throw RouteException(UnknownServiceException(
"H2_PRIOR_KNOWLEDGE cannot be used with HTTPS"))
}
}
while (true) {
try {
//如果需要建立通讯隧道,就建立隧道。也就是通过http代理访问https
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our resources.
break
}
} else {
//建立socket连接。
connectSocket(connectTimeout, readTimeout, call, eventListener)
}
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
break
} catch (e: IOException) {
socket?.closeQuietly()
rawSocket?.closeQuietly()
socket = null
rawSocket = null
source = null
sink = null
handshake = null
protocol = null
http2Connection = null
allocationLimit = 1
eventListener.connectFailed(call, route.socketAddress, route.proxy, null, e)
if (routeException == null) {
routeException = RouteException(e)
} else {
routeException.addConnectException(e)
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException
}
}
}
if (route.requiresTunnel() && rawSocket == null) {
throw RouteException(ProtocolException(
"Too many tunnel connections attempted: $MAX_TUNNEL_ATTEMPTS"))
}
idleAtNs = System.nanoTime()
}
到这里,整个连接流程结束。
CallServerInterceptor
此拦截器主要负责具体的请求与响应的I/O操作,即往Socket⾥写⼊请求数据,和从Socket⾥读取响应数据。也就是发送请求到服务器然后直到数据解析生成response。话不多说,直接上代码:
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
exchange.writeRequestHeaders(request)
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
//判断请求头是不是POST
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// 如果请求头包含"100-continue"的响应,就等待它完成响应,在执行正文,如果没有得到,返回正常的结果。
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
//如果响应是100,这代表了是请求Expect: 100-continue成功的响应,需要再读取一份响应头
if (code == 100) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
exchange.responseHeadersEnd(response)
response = if (forWebSocket && code == 101) {
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
}
return response
}
到此,已经完成了HTTP协议报文的封装与解析。
总结
整个OkHttp功能的实现就在这五个默认的拦截器中。具体的小结如下:
-
addInterceptor(Interceptor),这是由开发者设置的,会按照开发者的要求,在所有的拦截器处理之前进行最早的拦截处理,比如一些公共参数,Header都可以在这里添加。
-
RetryAndFollowUpInterceptor,这里会对连接做一些初始化工作,以及请求失败的充实工作,重定向的后续请求工作。跟他的名字一样,就是做重试工作还有一些连接跟踪工作。
-
BridgeInterceptor,这里会为用户构建一个能够进行网络访问的请求,同时后续工作将网络请求回来的响应Response转化为用户可用的Response,比如添加文件类型,content-length计算添加,gzip解包。
-
CacheInterceptor,这里主要是处理cache相关处理,会根据OkHttpClient的配置以及缓存策略对请求值进行缓存,而且如果本地有了可⽤的Cache,就可以在没有网络交互的情况下就返回缓存结果。
-
ConnectInterceptor,这里主要就是负责建立连接了,会建立TCP连接或者TLS连接,以及负责编码解码的HttpCodec。
-
networkInterceptors,这里也是开发者自己设置的,所以本质上和第一个拦截器差不多,但是由于位置不同,所以用处也不同。这个位置添加的拦截器可以看到请求和响应的数据了,所以可以做一些网络调试。
-
CallServerInterceptor,这里就是进行网络数据的请求和响应了,也就是实际的网络I/O操作,通过socket读写数据。
推荐阅读
-
OkHttp源码分析
-
Android进阶:四、RxJava2 源码解析 1
-
Win10系统从源码构建3D Slicer
-
学习操作系统:自己动手写操作系统 ->chapter5_e Loader分析
-
volatile关键字实例分析 博客分类: java并发编程 volatile并发线程javasynchronized
-
解决通过this.class.getResource()得到的URL中乱码的问题及源码解析:
-
split 陷阱分析 博客分类: Java javajdk java 字符串split有很多坑,使用时请小心!! Java代码
-
(一)UI绘制流程-源码分析
-
从源码角度分析NestedScrolling
-
从源码角度看 Spark 任务提交流程(上)