欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

OkHttp3源码工作流程浅析

程序员文章站 2024-03-26 09:01:29
...

细节先放在一边,咱来把工作流程先理一下。

常规调用

  • 异步
        Request request = new Request.Builder()
                .url(url)
                .post(requestBody.build())
                .build();
        okHttpClient.newCall(request).enqueue(callback);
  • 同步
        Request request = new Request.Builder()
                .url(url)
                .post(requestBody.build())
                .build();
        okHttpClient.newCall(request).execute();

工作流程

先看下OkHttpClient的参数:

//OkHttpClient.java
  public static final class Builder {
  	//重点,分发器,主管分发任务,所以内部肯定有队列之类的存在
    Dispatcher dispatcher;
    @Nullable Proxy proxy;
    List<Protocol> protocols;
    List<ConnectionSpec> connectionSpecs;
    //重点,OkHttp核心工作流程就是顺序执行各种拦截器的
    final List<Interceptor> interceptors = new ArrayList<>();
    final List<Interceptor> networkInterceptors = new ArrayList<>();
    EventListener.Factory eventListenerFactory;
    ProxySelector proxySelector;
    CookieJar cookieJar;
    @Nullable Cache cache;
    @Nullable InternalCache internalCache;
    SocketFactory socketFactory;
    @Nullable SSLSocketFactory sslSocketFactory;
    @Nullable CertificateChainCleaner certificateChainCleaner;
    HostnameVerifier hostnameVerifier;
    CertificatePinner certificatePinner;
    Authenticator proxyAuthenticator;
    Authenticator authenticator;
    ConnectionPool connectionPool;
    Dns dns;
    boolean followSslRedirects;
    boolean followRedirects;
    boolean retryOnConnectionFailure;
    int connectTimeout;
    int readTimeout;
    int writeTimeout;
    int pingInterval;

    public Builder() {
      dispatcher = new Dispatcher();
      protocols = DEFAULT_PROTOCOLS;
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
    }
 }

若只想了解工作流程的话,只需要关注DispatcherInterceptor就可以了。
再看newCall干了什么:

//OkHttpClient.java
  @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }
//RealCall.java
  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
 	//看来这就是请求主体了
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }

请求被封装成一个RealCall,然后调用了enqueue了扔进了队列。
再看enqueue:

//RealCall.java
  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    //重点,就是这里了,出现了dispather
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

Dispatcher接手,并且此处封装了一个带回调的AsyncCall,这个AsyncCall就是任务的实体。
转战Dispatcher:

//Dispatcher.java
  //双向队列,已经准备就绪的异步任务
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  //双向队列,正在执行的异步任务
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  //双向队列,正在执行的同步任务
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
  
  //将任务添进队列,线程池开始执行异步任务
  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      //开始执行
      executorService().execute(call);
    } else {
      //超出最大数量,则放入就绪队列,后面finish时就是从此队列取出下一个任务
      readyAsyncCalls.add(call);
    }
  }

  /**
   * 线程池由此而来,是用SynchronousQueue,一个阻塞队列,
   * 必须等队列中的添加元素被消费后才能继续添加新的元素
   */
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

再来看看这个AsynCall怎样封装的:

//RealCall.java
  final class AsyncCall extends NamedRunnable {
  	//应答的回调,enqueue传入的
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
	  //顾名思义,父类就是一个可以命名的线程
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

	//重点,真正的执行流程从此开始
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
      	//重点,应答在此已经得到
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        //重点,结束当前请求后的后续操作
        client.dispatcher().finished(this);
      }
    }
  }

AsyncCall就是一个Runnable,它自己的execute就是任务执行的具体操作了,而response应该就在getResponseWithInterceptorChain中拿到了:

//RealCall.java
  Response getResponseWithInterceptorChain() throws IOException {
    List<Interceptor> interceptors = new ArrayList<>();
    //所有自定义的拦截器
    interceptors.addAll(client.interceptors());
    //重定向拦,主要负责失败重连工作
    interceptors.add(retryAndFollowUpInterceptor);
    //桥接,主要负责设置内容长度、编码方式、设置gzip压缩、添加请求头、cookie等相关功能
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //缓存,处理相关缓存策略
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //连接,创建真实的connection,打开socket链接
    interceptors.add(new ConnectInterceptor(client));
    //forWebSocket默认为false,传入一般也为false
    if (!forWebSocket) {
      //所有自定义的网络拦截器
      interceptors.addAll(client.networkInterceptors());
    }
    //向服务器发送请求,返回Response
    interceptors.add(new CallServerInterceptor(forWebSocket));
	//组成了一个链
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,originalRequest, this, eventListener, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis());
	//重点,最终在这里得到了response
    return chain.proceed(originalRequest);
  }

各种Interceptor就是网络请求所作的具体通讯操作,到这里,已经接近通讯层了,继续看RealInterceptorChain:

//RealInterceptorChain.java
  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

没错了,就在这里返回了最终的response,至于Interceptor究竟做了什么最终获取到了Response,这里就不讨论了。
应答找到了,看看应答后会发生什么事:

//Dispatcher.java
  /** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //这里传入了true
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }
	
	//当前没有任务运行且“空闲任务”不为空时,开始执行这个“空闲任务”
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

  //当一个网络请求任务完成后,默认是要到此方法来的
  private void promoteCalls() {
  	//正在执行的任务超限,不再往下走了
    if (runningAsyncCalls.size() >= maxRequests) return; 
    //就绪的任务队列空了,没有任务可执行了,也不再往下走了,看来这个方法就是为了接着执行下一个方法而写的了
    if (readyAsyncCalls.isEmpty()) return; 

	//把就绪队列的任务取出来,可以是多个,取决于runningAsyncCalls的上限,缺多少补多少个,然后这N个任务中某个或多个任务执行完后,又来到此处取出M个就绪的任务,一直到将readyAsyncCalls队列中所有的任务取完
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        //走正常的异步执行流程
        executorService().execute(call);
      }
	  //当前正在执行的任务数量到上限了,只有等待下一次添加任务了
      if (runningAsyncCalls.size() >= maxRequests) return; 
    }
  }

可以看到,当一个AsynCall执行完毕后,会根据相关限制从readyAsyncCalls取出更多的AsynCall进行执行,直到所有AsynCall执行完毕。

回调就不必多说了,在AsynCall.responseCallback中传回了response。
这就是异步的工作流程。

同步有一点区别,在RealCall中调用的方法不一样:

//RealCall.java
 @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    try {
      //只做了添加正在执行的队列中
      client.dispatcher().executed(this);
      //直接执行了,进入了同样的方法
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);
    }
  }

看得出来,同步是直接进行了获取response的操作,异步相比之下就多了线程池的调度操作,也是应有之意。

简而言之,OkHttp3的网络请求就是通过一个InterceptorChain获取response的过程,想深入了解okhttp3的原理,就要研究各个Interceptor的作用了。

以上。