欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

OKHttp源码解析(一)

程序员文章站 2022-07-13 10:59:00
...

一、OKHttp的综述

OkHttp是一个高效的Http客户端,有如下的特点:

  1. 支持HTTP2/SPDY黑科技
  2. socket自动选择最好路线,并支持自动重连
  3. 拥有自动维护的socket连接池,减少握手次数
  4. 拥有队列线程池,轻松写并发
  5. 拥有Interceptors轻松处理请求与响应(比如透明GZIP压缩,LOGGING)
  6. 基于Headers的缓存策略

    Okhttp的整体流程图
    OKHttp源码解析(一)

二、网络请求

用OkHttpClient.newCall(request)进行execute/enenqueue时,实际是将请求Call放到了Dispatcher中,okhttp使用Dispatcher进行线程分发,它有两种方法,一个是普通的同步单线程;另一种是使用了队列进行并发任务的分发(Dispatch)与回调。接下来分析第二种并发任务的分发。
1. Dispatcher的结构
maxRequests = 64: 最大并发请求数为64
maxRequestsPerHost = 5: 每个主机最大请求数为5
Dispatcher: 分发者,也就是生产者(默认在主线程)
AsyncCall: 队列中需要处理的Runnable(包装了异步回调接口)
ExecutorService:消费者池(也就是线程池)
Deque:缓存(用数组实现,可自动扩容,无大小限制)
Deque:正在运行的任务,仅仅是用来引用正在运行的任务以判断并发量,注意它并不是消费者缓存

根据生产者消费者模型的模型理论,当入队(enqueue)请求时,如果满足(runningRequests<64 && runningRequestsPerHost<5),那么就直接把AsyncCall直接加到runningCalls的队列中,并在线程池中执行。如果消费者缓存满了,就放入readyAsyncCalls进行缓存等待。
当任务执行完成后,调用finished的promoteCalls()函数,手动移动缓存区(可以看出这里是主动清理的,因此不会发生死锁)
当我们希望使用OkHttp的异步请求时,一般进行如下构造

OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder()
    .url("http://qq.com").get().build();
client.newCall(request).enqueue(new Callback() {
  @Override public void onFailure(Call call, IOException e) {

  }

  @Override public void onResponse(Call call, Response response) throws IOException {

  }
});

根据代码,我们可以发现实际上是Dispatcher进行了入队操作

synchronized void enqueue(AsyncCall call) {
  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    //添加正在运行的请求
    runningAsyncCalls.add(call);
    //线程池执行请求
    executorService().execute(call);
  } else {
    //添加到缓存队列排队等待
    readyAsyncCalls.add(call);
  }
}

如果满足条件,那么就直接把AsyncCall直接加到runningCalls的队列中,并在线程池中执行(线程池会根据当前负载自动创建,销毁,缓存相应的线程)。反之就放入readyAsyncCalls进行缓存等待。
请求元素RealCall#execute(它实现了Runnable接口),它内部实现的execute方法如下

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

调用 getResponseWithInterceptorChain() 函数获取 HTTP 返回结果,从函数名可以看出,这一步还会进行一系列“拦截”操作。

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

Interceptor 是 OkHttp 最核心的一个东西,不要误以为它只负责拦截请求进行一些额外的处理(例如 cookie),实际上它把实际的网络请求、缓存、透明压缩等功能都统一了起来,每一个功能都只是一个 Interceptor,它们再连接成一个 Interceptor.Chain,环环相扣,最终圆满完成一次网络请求。

三、Interceptor责任链

OKHttp源码解析(一)
以下是主要的拦截器的种类:
RetryAndFollowUpInterceptor 负责失败重试以及重定向的
BridgeInterceptor 负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为用户友好的响应的
CacheInterceptor 负责读取缓存直接返回、更新缓存的
ConnectInterceptor 负责和服务器建立连接的
networkInterceptors 配置 OkHttpClient 时设置的
CallServerInterceptor 负责向服务器发送请求数据、从服务器读取响应数据的

1、ConnectInterceptor(建立连接)
核心代码拦截如下:

@Override public Response intercept(Chain chain) throws IOException {
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Request request = realChain.request();
  StreamAllocation streamAllocation = realChain.streamAllocation();

  // We need the network to satisfy this request. Possibly for validating a conditional GET.
  boolean doExtensiveHealthChecks = !request.method().equals("GET");
  HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
  RealConnection connection = streamAllocation.connection();

  return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

创建了一个 HttpCodec 对象,它是对 HTTP 协议操作的抽象,有两个实现:Http1Codec 和 Http2Codec,顾名思义,它们分别对应 HTTP/1.1 和 HTTP/2 版本的实现。
2、CallServerInterceptor(发送和接收数据)
核心代码拦截如下:

@Override public Response intercept(Chain chain) throws IOException {
  HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
  StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
  Request request = chain.request();

  long sentRequestMillis = System.currentTimeMillis();
  httpCodec.writeRequestHeaders(request);

  if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
    Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
    BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
    request.body().writeTo(bufferedRequestBody);
    bufferedRequestBody.close();
  }

  httpCodec.finishRequest();

  Response response = httpCodec.readResponseHeaders()
      .request(request)
      .handshake(streamAllocation.connection().handshake())
      .sentRequestAtMillis(sentRequestMillis)
      .receivedResponseAtMillis(System.currentTimeMillis())
      .build();

  if (!forWebSocket || response.code() != 101) {
    response = response.newBuilder()
        .body(httpCodec.openResponseBody(response))
        .build();
  }

  if ("close".equalsIgnoreCase(response.request().header("Connection"))
      || "close".equalsIgnoreCase(response.header("Connection"))) {
    streamAllocation.noNewStreams();
  }

  // 省略部分检查代码

  return response;
}

向服务器发送 request header;
如果有 request body,就向服务器发送;
读取 response header,先构造一个 Response 对象;
如果有 response body,就在 3 的基础上加上 body 构造一个新的 Response 对象;

四、连接池

HTTP中的keepalive连接在网络性能优化中,对于延迟降低与速度提升的有非常重要的作用。
Okhttp支持5个并发KeepAlive连接,默认链路生命为5分钟(链路空闲后,保持存活的时间)

1、连接池的的关键对象:

Connection: 对jdk的socket物理连接的包装,它内部有List

//Socket清理的Runnable,每当put操作时,就会被主动调用
//注意put操作是在网络线程
//而Socket清理是在`OkHttp ConnectionPool`线程池中调用
while (true) {
  //执行清理并返回下场需要清理的时间
  long waitNanos = cleanup(System.nanoTime());
  if (waitNanos == -1) return;
  if (waitNanos > 0) {
    synchronized (ConnectionPool.this) {
      try {
        //在timeout内释放锁与时间片
        ConnectionPool.this.wait(TimeUnit.NANOSECONDS.toMillis(waitNanos));
      } catch (InterruptedException ignored) {
      }
    }
  }
}

这段死循环实际上是一个阻塞的清理任务,首先进行清理(clean),并返回下次需要清理的间隔时间,然后调用wait(timeout)进行等待以释放锁与时间片,当等待时间到了后,再次进行清理,并返回下次要清理的间隔时间

long cleanup(long now) {
  int inUseConnectionCount = 0;
  int idleConnectionCount = 0;
  RealConnection longestIdleConnection = null;
  long longestIdleDurationNs = Long.MIN_VALUE;

  //遍历`Deque`中所有的`RealConnection`,标记泄漏的连接
  synchronized (this) {
    for (RealConnection connection : connections) {
      // 查询此连接内部StreamAllocation的引用数量
      if (pruneAndGetAllocationCount(connection, now) > 0) {
        inUseConnectionCount++;
        continue;
      }

      idleConnectionCount++;

      //选择排序法,标记出空闲连接
      long idleDurationNs = now - connection.idleAtNanos;
      if (idleDurationNs > longestIdleDurationNs) {
        longestIdleDurationNs = idleDurationNs;
        longestIdleConnection = connection;
      }
    }

    if (longestIdleDurationNs >= this.keepAliveDurationNs
        || idleConnectionCount > this.maxIdleConnections) {
      //如果(`空闲socket连接超过5个`
      //且`keepalive时间大于5分钟`)
      //就将此泄漏连接从`Deque`中移除
      connections.remove(longestIdleConnection);
    } else if (idleConnectionCount > 0) {
      //返回此连接即将到期的时间,供下次清理
      //这里依据是在上文`connectionBecameIdle`中设定的计时
      return keepAliveDurationNs - longestIdleDurationNs;
    } else if (inUseConnectionCount > 0) {
      //全部都是活跃的连接,5分钟后再次清理
      return keepAliveDurationNs;
    } else {
      //没有任何连接,跳出循环
      cleanupRunning = false;
      return -1;
    }
  }

  //关闭连接,返回`0`,也就是立刻再次清理
  closeQuietly(longestIdleConnection.socket());
  return 0;
}

遍历Deque中所有的RealConnection,标记泄漏的连接
如果被标记的连接满足(空闲socket连接超过5个&&keepalive时间大于5分钟),就将此连接从Deque中移除,并关闭连接,返回0,也就是将要执行wait(0),提醒立刻再次扫描
如果(目前还可以塞得下5个连接,但是有可能泄漏的连接(即空闲时间即将达到5分钟)),就返回此连接即将到期的剩余时间,供下次清理
如果(全部都是活跃的连接),就返回默认的keep-alive时间,也就是5分钟后再执行清理
如果(没有任何连接),就返回-1,跳出清理的死循环
这里的“并发”==(“空闲”+“活跃”)==5
标记并找到最不活跃的连接:pruneAndGetAllocationCount()方法中依据弱引用是否为null而判断这个连接是否泄漏

参考链接:https://blog.piasy.com/2016/07/11/Understand-OkHttp/

相关标签: OKhttp