欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava subscribeOn和observeOn源码介绍

程序员文章站 2022-04-02 17:45:54
...

转载请以链接形式标明出处:
本文出自:103style的博客

Base on RxJava 2.X

简介

首先我们来看subscribeOnobserveOn这两个方法的实现:

  • subscribeOn
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    
  • observeOn
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
        return observeOn(scheduler, delayError, bufferSize());
    }
    
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    

我们可以看到分别返回了ObservableSubscribeOnObservableObserveOn对象,下面对这两个类分别介绍。


ObservableSubscribeOn 源码解析

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        observer.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    ....
}
  • 通过之前的 Rxjava之create操作符源码解析 的介绍,我们知道subscribe(observer)实际上是调用前一步返回对象的subscribeActual(observer);方法。
  • 这里首先构造了一个 SubscribeOnObserver对象,然后执行 观察者onSubscribe 方法。
  • 然后将在传入的Scheduler中执行任务完成返回的结果传入 SubscribeOnObserversetDisposable方法。
  • scheduler.scheduleDirect(new SubscribeTask(parent)),这里通过之前 RxJava之Schedulers源码介绍 我们知道,实际时候执行了 SubscribeTask(parent)run方法。通过下面的源代码source.subscribe(parent),我们知道 实际上 run 方法 就是 调用了subscribeOn前一步操作符返回对象的 subscribeActual(observer);方法。
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
    
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
    
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
    

SubscribeOnObserver源码:

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    private static final long serialVersionUID = 8094547886072529208L;
    final Observer<? super T> downstream;
    final AtomicReference<Disposable> upstream;

    SubscribeOnObserver(Observer<? super T> downstream) {
        this.downstream = downstream;
        this.upstream = new AtomicReference<Disposable>();
    }

    @Override
    public void onSubscribe(Disposable d) {
        DisposableHelper.setOnce(this.upstream, d);
    }

    @Override
    public void onNext(T t) {
        downstream.onNext(t);
    }

    @Override
    public void onError(Throwable t) {
        downstream.onError(t);
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
    }
    ...
    void setDisposable(Disposable d) {
        DisposableHelper.setOnce(this, d);
    }
}
  • 我们可以看到 onNextonErroronComplete 实际上还是调用了 观察者的 对应方法。
  • DisposableHelper.setOnce(this, d); 即为设置SubscribeOnObservervalue值为线程池执行的任务结果。
    public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
        ObjectHelper.requireNonNull(d, "d is null");
        if (!field.compareAndSet(null, d)) {
            d.dispose();
            if (field.get() != DISPOSED) {
                reportDisposableSet();
            }
            return false;
        }
        return true;
    }
    

我们来个示例介绍下:

Observable
        .create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                System.out.println("subscribe = " + Thread.currentThread().getName());
                for (int i = 0; i < 3; i++) {
                    emitter.onNext(String.valueOf(i));
                }
                emitter.onComplete();
            }
        })
        .subscribeOn(Schedulers.single())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext s = " + s + " thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete thread name = " + Thread.currentThread().getName());
            }
        });

输出结果:

onSubscribe thread name = main
subscribe = RxSingleScheduler-1
onNext s = 0 thread name = RxSingleScheduler-1
onNext s = 1 thread name = RxSingleScheduler-1
onNext s = 2 thread name = RxSingleScheduler-1
onComplete thread name = RxSingleScheduler-1

通过输出结果我们可以看到 任务处理都是在 Schedulers.single()构建的线程池中执行的。
现在来一步一步介绍,顺便复习一下:
流程图大致如下:
RxJava subscribeOn和observeOn源码介绍

  • (1.0) create 操作符 返回的是 ObservableCreate对象。
  • (2.0) 然后 ObservableCreate.subscribeOn(Schedulers.single())返回 sourceObservableCreateschedulerSingleSchedulerObservableSubscribeOn对象。
  • (3.0) 然后 ObservableSubscribeOn.subscribe(new Observer<T>(){}),即调用 ObservableSubscribeOnsubscribeActual(observer)
  • (4.0) 然后执行 observer.onSubscribe(parent);,即执行观察者的 onSubscribe(...)方法。
  • (5.0) 接着在SingleScheduler构建的线程池中执行 SubscribeTaskrun方法(source.subscribe(parent))。
    即执行 ObservableCreate.subscribe(new SubscribeOnObserver<T>(observer))
    即为 ObservableCreate.subscribeActual(new SubscribeOnObserver<T>(observer))
  • (6.0) 然后执行 SubscribeOnObserveronSubscribe(...)
  • (7.0) 然后执行create操作符传进来的ObservableOnSubscribesubscribe(ObservableEmitter<String> emitter)方法。
  • (8.0) 接着我们在subscribe(...)中依次执行了 三次onNext和 一次onComplete
    即调用 new SubscribeOnObserver<T>(observer)的三次onNext和 一次onComplete
    即为subscribe传入的observer的三次onNext和 一次onComplete

ObservableObserveOn 源码解析

  • observeOn 方法:

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    
  • ObservableObserveOn主要的方法:

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
        final boolean delayError;
        final int bufferSize;
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
        ...
    }
    
    • 赋值source为链式调用上一步返回的对象。
    • 保存传进来的 SchedulerdelayErrorbufferSize的值。
    • 然后在 subscribe 的时候 调用 subscribeActual, 先判断 scheduler是否是 TrampolineScheduler的子类:
      • 是的话直接把 observer 传给 链式调用上一步返回的对象的 subscribeActual方法。
      • 不是的话 就把observer 包装成一个ObserveOnObserver 对象传给 链式调用上一步返回的对象的 subscribeActual方法。
    • 通过上面 subscribeOn 的介绍, 我们知道接下来就是调用 观察者的 onSubscribe 方法,以及后续的调用逻辑 onNextonComplete以及onError,即ObserveOnObserver 对象对应的方法。
  • 接下来我们看看 ObserveOnObserver 的源码:

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        final Observer<? super T> downstream;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;
        ...
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                ...
                downstream.onSubscribe(this);
            }
        }
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
    
        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            schedule();
        }
    
        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }
        ...
    }
    
    • 重写的onSubscribe 即调用观察者的 onSubscribe
    • onNextonErroronComplete都是调用 schedule()
    • 我们来看看schedule()的实现:即在传进来的 Scheduler 对象构建的线程池里执行当前类的 run()
      void schedule() {
          if (getAndIncrement() == 0) {
              worker.schedule(this);
          }
      }
      
    • run()的代码实现:
      public void run() {
          if (outputFused) {
              drainFused();
          } else {
              drainNormal();
          }
      }
      
    • outputFused 默认是 false,我们看看 drainNormal()的代码实现:
      outputFusedtrue是,则下面调用的onNext 改成 onComplete
      void drainNormal() {
          int missed = 1;
          final SimpleQueue<T> q = queue; //1.0
          final Observer<? super T> a = downstream;
          for (;;) {
              if (checkTerminated(done, q.isEmpty(), a)) {
                  return;
              }
              for (;;) {
                  boolean d = done;
                  T v;
                  try {
                      v = q.poll();//2.0
                  } catch (Throwable ex) {
                      ...
                      a.onError(ex);
                      worker.dispose();
                      return;
                  }
                  boolean empty = v == null;
                  if (checkTerminated(d, empty, a)) {//2.1
                      return;
                  }
                  if (empty) {//2.2
                      break;
                  }
                  a.onNext(v);//2.3
              }
              missed = addAndGet(-missed);//3.0
              if (missed == 0) {//3.1
                  break;
              }
          }
      }
      
      • (1.0): 我们在上面的 onNext() 中看到,每次调用都会把传入的对象存入queue中。
      • (2.0): 在循环中依次获取存入的对象,(2.1)如果 已经是done状态 或者 disposed则直接结束。(2.2)如果 队列中没有对象了,即终止循环。(2.3)否则调用 观察者onNext 方法。
      • (3.0): addAndGet(-missed);即通过原子操作把·missed·的值置为0(3.1)然后结束onNext

来我们继续举个例子:给subscribeOn例子加上observeOn 方法:

Observable
        .create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                System.out.println("subscribe = " + Thread.currentThread().getName());
                for (int i = 0; i < 5; i++) {
                    emitter.onNext(String.valueOf(i));
                }
                emitter.onComplete();
            }
        })
        .subscribeOn(Schedulers.single())
        .observeOn(Schedulers.io())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("d.classname = " + d.getClass().getSimpleName());
                System.out.println("onSubscribe thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext s = " + s + " thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete thread name = " + Thread.currentThread().getName());
            }
        });

输出结果:

System.out: d.classname = ObserveOnObserver
System.out: onSubscribe thread name = main
System.out: subscribe = RxSingleScheduler-1
System.out: onNext s = 0 thread name = RxCachedThreadScheduler-1
System.out: onNext s = 1 thread name = RxCachedThreadScheduler-1
System.out: onNext s = 2 thread name = RxCachedThreadScheduler-1
System.out: onComplete thread name = RxCachedThreadScheduler-1

通过输出结果我们可以看到 :

  • create操作符 传入的ObservableOnSubscribesubscribe方法是在Schedulers.single()构建的线程池中执行的。
  • onNextonComplete 则是在Schedulers.io()构建的线程池中执行的 。

继续来看下subscribeOn流程图:
RxJava subscribeOn和observeOn源码介绍

  • 上述示例相对于 subscribeOn来说只是 把 subscribe(observer) 里得参数改成了 ObserveOnObserver对象。
  • (4.0:) 执行ObserveOnObserveronSubscribe方法。即observer.onSubscribe(ObserveOnObserver) 即下面方法的 Disposable对象为ObserveOnObserver对象。
    new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
        }
         ...
    });
    
  • (5.0:)SingleScheduler构建的线程池中执行source.subscribe(parent);,即运行如下代码:
    ObservableCreate.subscribeActual(
            new ObserveOnObserver<T>(
                    observer,
                    new EventLoopWorker(new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory)),
                    delayError,
                    bufferSize)
    );
    
  • 我们再来回顾下ObservableCreate.subscribeActual(observer)
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
        ...
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException(...));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
        ...
    }
    
  • (8.0:) 所以调用 onNext(T t)onComplete()即调用 ObserveOnObserver对象的 onNext(T t)onComplete()。 即切换到Schedulers.io()构建的线程池执行onNext(T t)onComplete()

小结

subscribeOn返回得即ObservableSubscribeOn对象。
ObservableSubscribeOnsubscribeActual即为在 传入的 XXXScheduler中 执行 上一步返回对象的 subscribeActual方法。

observeOn返回得即ObservableObserveOn对象。
ObservableObserveOnsubscribeActual即为把 传入的 XXXSchedulerobserver包装成一个 Observer 传给上一步返回对象的 subscribeActual方法,让 onNextonCompleteonNext都在传入的 XXXScheduler 构建的线程池中执行。

所以,你知道RxJava是如何完成线程切换的了吗?

以上

相关标签: RxJava