欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

OkHttp源码分析之ConnectInterceptor和CallServerInterceptor

程序员文章站 2022-07-13 10:54:54
...

前一篇分析Okhttp源码我发现拦截器部分的最后两个拦截器ConnectInterceptor和CallServerInterceptor的内容有点多,而且他们两个的联系也是紧密的,这里就抽出来单独分析。

转载请注明出处
https://blog.csdn.net/dreamsever/article/details/80141224

先说ConnectInterceptor,连接拦截器其实就是为后面的CallServerInterceptor请求服务器拦截器做准备的,
如下图:
OkHttp源码分析之ConnectInterceptor和CallServerInterceptor

@Override 
public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

说说各自的作用:
Request: 一个请求,包含请求信息
StreamAllocation :是用来统筹协调其他三个类关系的,这三个类分别是Connections,Streams和Calls。
HttpCodec:用来编码Http请求和解码Http响应的,最终执行的Okio的请求在这里发生
RealConnection:实现Connection接口,为Http流或者socket建立的连接,相当于修建的管道,为以后传输数据

StreamAllocation 是在最开始的拦截器RetryAndFollowUpInterceptor的intercept方法中就新建好的,每次都赋值给新的RealInterceptorChain,一步步传递到这里才使用。

streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);

先通过streamAllocation的newStream方法得到HttpCodec

public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
       //得到一个健康可用的RealConnection
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
}


//循环调用findConnection,直到得到一个健康的RealConnection 
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
    int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
    throws IOException {
    while (true) {
        RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,connectionRetryEnabled);
        // 如果是一个全新的连接直接拿去使用不需要检查
        synchronized (connectionPool) {
            if (candidate.successCount == 0) {
                return candidate;
            }
        }

        // 检查这个连接是否是健康可用的,如果不是把它移除连接池
        if (!candidate.isHealthy(doExtensiveHealthChecks)) {
            noNewStreams();
            continue;
        }
        return candidate;
    }
}

下面去看findConnection方法,这里又出现了一个connectionPool,是一个连接池,用来维护管理着所有的连接,包括HTTP/1.1和 HTTP/2的连接,使用已经存在的连接可以减少等待时间。这个连接池的配置是最大连接数为5,最大空闲存活时间为5分钟。每次添加一个连接到连接池都会判断Runnable cleanupRunnable,清理线程是否在工作中,不在工作中,把它启动起来放入线程池执行。关于复用连接池这块感兴趣的可以去看看这一篇博客–OkHttp3源码分析复用连接池

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      // 优先返回一个已经分配的连接,假如这个连接不为空,并且这个连接上面还可以建立新的stream
      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      // 上面没成功,去连接池中获得一个connection
      Internal.instance.get(connectionPool, address, this, null);
      if (connection != null) {
        return connection;
      }

        //以上都没成功给selectedRoute赋值为当前路由
      selectedRoute = route;
    }

    // If we need a route, make one. This is a blocking operation.
    if (selectedRoute == null) {
      selectedRoute = routeSelector.next();
    }

    RealConnection result;
    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      // 再次尝试去连接池获取一个RealConnection,也许由于连接合并可以得到
      Internal.instance.get(connectionPool, address, this, selectedRoute);
      if (connection != null) return connection;

      // 最后也没得到可用的RealConnection,就去创建一个RealConnection
      route = selectedRoute;
      refusedStreamCount = 0;
      result = new RealConnection(connectionPool, selectedRoute);
      acquire(result);
    }
    //执行TCP + TLS握手
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      // 将新建的连接放入连接池
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      //如果有另一个具有相同IP地址的连接被同时创建,那么把这个连接释放,去用另外一个
      //这种情况只有Http2的时候才会去执行
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    return result;
}

上面就是先去查找已经存在的可以用的RealConnection ,假如没有找到就去新建一个RealConnection ,新建连接的时候就分两种情况,HTTP/2与HTTP 1.1,下面先了解下他们的区别

HTTP/2与HTTP 1.1相比,主要区别包括
HTTP/2采用二进制格式而非文本格式
HTTP/2是完全多路复用的,而非有序并阻塞的——只需一个连接即可实现并行
使用报头压缩,HTTP/2降低了开销
HTTP/2让服务器可以将响应主动“推送”到客户端缓存中

所以HTTP/2时一个RealConnection可以用于多个stream,默认最大并发流为MAX_CONCURRENT_STREAMS = 4,而Http 1.1不支持多路复用,在RealConnection里面public int allocationLimit = 1;默认只有一个流可以分配。

Internal.instance其实还是连接池ConnectionPool

@Nullable 
Socket deduplicate(Address address, StreamAllocation streamAllocation) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, null)
          && connection.isMultiplexed()
          && connection != streamAllocation.connection()) {
        return streamAllocation.releaseAndAcquire(connection);
      }
    }
    return null;
}

循环查找到那个同步被创建的RealConnection, 然后传入执行releaseAndAcquire方法。
StreamAllocation.java

public Socket releaseAndAcquire(RealConnection newConnection) {
    assert (Thread.holdsLock(connectionPool));
    if (codec != null || connection.allocations.size() != 1) throw new IllegalStateException();

    // Release the old connection.
    Reference<StreamAllocation> onlyAllocation = connection.allocations.get(0);
    Socket socket = deallocate(true, false, false);

    // Acquire the new connection.
    this.connection = newConnection;
    newConnection.allocations.add(onlyAllocation);

    return socket;
  }

这里面,先把当前的StreamAllocation的this.connection,也就是刚刚新建的那个执行deallocate方法丢弃,然后把查找的newConnection赋值给 this.connection。最后把刚刚新建的connection中的socket关闭。

翻过千山万水,我们总算走完了RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);这一句代码,找到了一个健康可用的RealConnection。比如我们需要找管道传送水到某处,我们首先做的是找管道,现在我们的管道建立好了并且处于可用状态。然后我们再看一看HttpCodec resultCodec = resultConnection.newCodec(client, this);resultCodec 相当于是一个水长,他们不生产水,它只是水的净化检测机构。它把河水转化为可以通过管道输送到用户的自来水。只不过这个水厂有点特别还可以接收用户的请求。


public HttpCodec newCodec(
      OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
    if (http2Connection != null) {
      return new Http2Codec(client, streamAllocation, http2Connection);
    } else {
      socket.setSoTimeout(client.readTimeoutMillis());
      source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
      return new Http1Codec(client, streamAllocation, source, sink);
    }
}

得到的HttpCodec 有两种情况,Http/1.1框架时是返回Http1Codec,Http/2时是返回Http2Codec,Http1Codec的时候传入的是(client, streamAllocation, source, sink),BufferedSource 和BufferedSink是Okio里面的用于数据流操作,这里不再深入

final BufferedSource source;
  final BufferedSink sink;
  int state = STATE_IDLE;

  public Http1Codec(OkHttpClient client, StreamAllocation streamAllocation, BufferedSource source,
      BufferedSink sink) {
    this.client = client;
    this.streamAllocation = streamAllocation;
    this.source = source;
    this.sink = sink;
  }

Http2Codec的时候传入的是(client, streamAllocation, http2Connection);前两个和Http1Codec一样,不一样的是第三个参数,第三个参数是在建立连接的时候判断是不是Http/2协议,如果是的话去创建一个http2Connection,在创建的时候其实已经传入了source和sink,这个后期都是要用到的。

if (protocol == Protocol.HTTP_2) {
      socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
      http2Connection = new Http2Connection.Builder(true)
          .socket(socket, route.address().url().host(), source, sink)
          .listener(this)
          .build();
      http2Connection.start();
    }

现在Request ,StreamAllocation ,HttpCodec, RealConnection都准备好了,可以去请求了。

CallServerInterceptor的intercept方法真正的去执行这个请求并拿到response

这里先了解一个小的知识点

100-continue 是用于客户端在发送 post 数据给服务器时,征询服务器情况,看服务器是否处理 post 的数据,如果不处理,客户端则不上传 post 数据,正常情况下服务器收到请求后,返回 100 或错误码。

@Override 
public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    /**
    这里先去判断这个请求是不是包含请求体的请求,基本上我了解的除了GET请求都是,需要可以传递请求体的
    判断是否可以包含请求体的请求,并且请求体不为空
    */
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return what
      // we did get (such as a 4xx response) without ever transmitting the request body.
      //如果请求体中包含100-continue,先去执行一次请求征询服务器情况,看服务器是否处理 post 的数据
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        responseBuilder = httpCodec.readResponseHeaders(true);
      }
        //responseBuilder == null表示支持,把请求体写入
      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
      } else if (!connection.isMultiplexed()) {
         //responseBuilder 不为空表示征询服务器情况没有触发,已经拿到返回
         //并且这个connection不是Http2,不支持流复用一个连接,将这个连接状态改为不可以新建流,
         //释放这个connection,最后关掉socket连接
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
        // being reused. Otherwise we're still obligated to transmit the request body to leave the
        // connection in a consistent state.
        streamAllocation.noNewStreams();
      }
    }

    //执行一些完成操作,如关闭sink等都是OKio的操作
    httpCodec.finishRequest();

    if (responseBuilder == null) {
      //为空也就是刚才Expect: 100-continue校验成功的情况,这时候已经写入了post请求体,再次去请求读取响应
      responseBuilder = httpCodec.readResponseHeaders(false);
    }
    //给最终的响应设置信息,请求时间,接受时间请求和握手
    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
        //再次构造一次response,将数据放入body
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }
    //返回响应
    return response;
  }
}

参考链接:

https://www.jianshu.com/p/92a61357164b
https://m.aliyun.com/yunqi/articles/78101
https://blog.piasy.com/2016/07/11/Understand-OkHttp/

相关标签: okhttp