欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava线程变换subscribeOn和observeOn源码分析

程序员文章站 2024-02-28 09:17:04
...

先看用法:

RxJava线程变换subscribeOn和observeOn源码分析

在一个子线程中创建一个Observable发射,Observer接受数据的全过程,执行看看Log信息:

RxJava线程变换subscribeOn和observeOn源码分析

可以看到onSuscribe,subscribe,onNext执行分别在不同的线程

源码分析

首先这样的链式写法非常简洁,但是对代码分析增加困难,所在以上代码可以转换成这样:

RxJava线程变换subscribeOn和observeOn源码分析

可以看到转换成比较直观的代码调用;

分别查看subscribeOn和observeOn:

RxJava线程变换subscribeOn和observeOn源码分析

RxJava线程变换subscribeOn和observeOn源码分析

RxJava线程变换subscribeOn和observeOn源码分析

所以subThreadObservable即为一个ObservableSubscribeOn对象,mainThreadObservable是一个ObservableObserveOn对象

这些方法都是生成了一个对象,真正是从mainThreadObservable.subscribe方法开始,即ObservableObserveOn.subscribe方法开始,因为ObservableObserveOn.subscribe类没有这个方法,因此只能从父类Observable开始:

RxJava线程变换subscribeOn和observeOn源码分析

进入RxJavaPlugins.onSubscribe方法后会发现将observer对象原路返回,所以直接看subscribeActual,即ObservableObserveOn类的subscribeActual方法:

RxJava线程变换subscribeOn和observeOn源码分析

创建主线程的Worker


scheduler即为传入的AndroidSchedulers.mainThread():
RxJava线程变换subscribeOn和observeOn源码分析

MAIN_THREAD是通过执行RxAndroidPlugins.initMainThreadScheduler得来:

RxJava线程变换subscribeOn和observeOn源码分析RxJava线程变换subscribeOn和observeOn源码分析

通过调用call方法得来,为啥吧不直接new出来就完事了?它通过这种写法完成了一些装饰操作,还判断了非空,个人觉得这种写法可以学习一下。

RxJava线程变换subscribeOn和observeOn源码分析

所以最终返回了HandlerScheduler对象,并传入了主线程的Handler,这个就是线程切换的重点之一。

可以看到scheduler即为HandlerScheduler对象,它并不是TrampolineScheduler的子类,因此执行下面的代码,调用了createWorker方法:

RxJava线程变换subscribeOn和observeOn源码分析

记录下这个worker,后面线程切换时有用


创建主线程的Worker结束

了解了怎么创建后回到一开始的调用:

RxJava线程变换subscribeOn和observeOn源码分析

执行

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));

在ObservableObserveOn中并没有找到source,所以只能在父类AbstractObservableWithUpstream中:

RxJava线程变换subscribeOn和observeOn源码分析

在父类找到了source,并且它的通过构造方法传入值,即子类调用super(xx)传入值:

RxJava线程变换subscribeOn和observeOn源码分析

而子类的创建:

RxJava线程变换subscribeOn和observeOn源码分析

传入的是this,谁调用了observeOn方法?是subThreadObservable,它是个ObservableSubscribeOn对象,因此这个this即为subThreadObservable。调用它的subscribe方法,它同样也没有subscribe方法而是在父类,父类又会调用它的subscribeActual方法,所以直接看ObservableSubscribeOn的subscribeActual方法:

RxJava线程变换subscribeOn和observeOn源码分析
这里的s,即ObserveOnObserver类对象,作为参数传入SubscribeOnObserver构造方法并生成一个SubscribeOnObserver对象,然后调用:

s.onSubscribe(parent);

即:ObserveOnObserver.onSubscribe方法:

RxJava线程变换subscribeOn和observeOn源码分析

传入的SubscribeOnObserver不实现QueueDisposable接口,所以if语句跳过

直接执行:

queue = new SpscLinkedArrayQueue<T>(bufferSize);

actual.onSubscribe(this);

第一个很简单,new出一个SpscLinkedArrayQueue对象,这是一种链表+数组的数据结构。然后调用actual.onSubscribe(this)

这里的actual即为自定义的Observer接口对象,层层传递过来,调用它的onSubscribe方法:

RxJava线程变换subscribeOn和observeOn源码分析

这里并没有切换任何线程,所以,onSubscribe的执行是在创建它的当前线程,在子线程创建,那么它就在子线程执行,在主线程创建,则在主线程执行

回到ObservableSubscribeOn的subscribeActual方法,接着调用了:

parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));

可以看到将一个Runnable对象作为参数传给
scheduleDirect方法,run中执行source.subscribe(parent),这里盲猜这个runnable会在子线程执行。

直接看scheduler.scheduleDirect方法:

RxJava线程变换subscribeOn和observeOn源码分析
RxJava线程变换subscribeOn和observeOn源码分析

又创建了Worker,不过这次创建的Worker猜都知道应该是子线程的相关的Worker,因为刚才创建了个主线程的Worker。这个createWorker方法是个抽象方法,应该是在调用者scheduler中的实现,scheduler又是外部传入的Schedulers.io(),看看它的实现:

RxJava线程变换subscribeOn和observeOn源码分析

看看IO:

RxJava线程变换subscribeOn和observeOn源码分析

与之前创建HandlerScheduler很相似

RxJava线程变换subscribeOn和observeOn源码分析

这里创建的是一个IoScheduler,看看它的createWorker方法:

RxJava线程变换subscribeOn和observeOn源码分析

回到方法:

RxJava线程变换subscribeOn和observeOn源码分析

这里的

Worker w = new EventLoopWorker(pool.get());

可以确认

调用它的schedule方法,即调用EventLoopWorker的schedule方法,并传入runnable对象(decoratedRun其实和run是一样的,因为onSchedule里什么也没干):

RxJava线程变换subscribeOn和observeOn源码分析

调用:

threadWorker.scheduleActual(action, delayTime, unit, tasks);

这里的threadWorker这样来:

RxJava线程变换subscribeOn和observeOn源码分析

RxJava线程变换subscribeOn和observeOn源码分析
这里采用也采用了缓存的思想,一开始当然是没有的,所以创建一个ThreadWorker对象并返回

RxJava线程变换subscribeOn和observeOn源码分析

调用它的scheduleActual方法,可是它没有scheduleActual方法,找父类NewThreadWorker:

RxJava线程变换subscribeOn和observeOn源码分析

这里将Runnable对象传入ScheduledRunnable,其实就是做了一层封装,它本身也实现了Runnable接口,最终还是会调用的run方法,然后调用:

executor.submit((Callable<Object>)sr);

因为delayTime==0

executor是什么?找一下它的源头:

RxJava线程变换subscribeOn和observeOn源码分析

RxJava线程变换subscribeOn和observeOn源码分析

这里就很熟悉了,这是通过

Executors.newScheduledThreadPool(1, factory);

创建的一个线程池,然后将Runnable添加进去执行,所以runnable的执行会发生在子线程!

回到最初run方法执行内容:

source.subscribe(parent);

source与之前分析那个source传入方式相同,存在父类,通过构造方法赋值,值又是从子类传递给父类,子类又是外部传入的自定义接口实现对象:

RxJava线程变换subscribeOn和observeOn源码分析

调用它的subscribe方法,打印线程,此时这条代码执行会发生在线程池中。

调用e.onNext方法,e即为传入的SubscribeOnObserver对象,所以调用它的onNext方法:

RxJava线程变换subscribeOn和observeOn源码分析

这里的actual是外部传入的ObserveOnObserver对象,调用它的onNext方法:

RxJava线程变换subscribeOn和observeOn源码分析

RxJava线程变换subscribeOn和observeOn源码分析

将t值存入queue中,调用schedule:

RxJava线程变换subscribeOn和observeOn源码分析

这里的work即为外部传入的HandlerWorker,调用schedule方法,并传入this(ObserveOnObserver对象):

RxJava线程变换subscribeOn和observeOn源码分析

这里的逻辑就明朗了:同样的将Runnable对象(ObserveOnObserver实现了Runnable接口)封装成ScheduledRunnable,然后包装成Message通过handler(这里handler是主线程的handler)发送给主线程,所以ObserveOnObserver的run方法会在主线程被执行:

RxJava线程变换subscribeOn和observeOn源码分析

outputFused默认为false,执行下面的drainNormal:

RxJava线程变换subscribeOn和observeOn源码分析

主要看红箭头的代码,这是个死循环,从queue中取出数据,判断是否为null,为null退出循环,不为空就执行a.onNext,这个a=actual,而actual就是外部层层传入的自定义接口实现对象Observer:

RxJava线程变换subscribeOn和observeOn源码分析

所以代码的执行线程实现了从创建线程->线程池->主线程的过程,通过queue来存取数据。可以看到queue的创建,添加数据,取出数据分别在不同的线程中完成,因为queue是ObserveOnObserver的一个属性,而ObserveOnObserver是一个静态类,所有的线程都传递同一个ObserveOnObserver对象,所以可以实现共享。因此数据的共享就是通过一个线程将数据存在queue中,另一个线程再来取来实现。

执行链:

(在创建所在线程执行)
Observable.subscribe—>ObservableObserveOn.subscribeActual—>
Observable.subscribe—>ObservableSubscribeOn.subscribeActual—>
(在线程池中执行)
ObservableOnSubscribe.subscribe—>SubscribeOnObserver.onNext—>
ObserveOnObserver.onNext—>ObserveOnObserver.schedule–>
HandlerWorker–>schedule—>
(在主线程执行)
ObserveOnObserver.run—>ObserveOnObserver.run.drainNormal—>
Observer.onNext