RxJava线程变换subscribeOn和observeOn源码分析
先看用法:
在一个子线程中创建一个Observable发射,Observer接受数据的全过程,执行看看Log信息:
可以看到onSuscribe,subscribe,onNext执行分别在不同的线程
源码分析
首先这样的链式写法非常简洁,但是对代码分析增加困难,所在以上代码可以转换成这样:
可以看到转换成比较直观的代码调用;
分别查看subscribeOn和observeOn:
所以subThreadObservable即为一个ObservableSubscribeOn对象,mainThreadObservable是一个ObservableObserveOn对象
这些方法都是生成了一个对象,真正是从mainThreadObservable.subscribe方法开始,即ObservableObserveOn.subscribe方法开始,因为ObservableObserveOn.subscribe类没有这个方法,因此只能从父类Observable开始:
进入RxJavaPlugins.onSubscribe方法后会发现将observer对象原路返回,所以直接看subscribeActual,即ObservableObserveOn类的subscribeActual方法:
创建主线程的Worker
scheduler即为传入的AndroidSchedulers.mainThread():
MAIN_THREAD是通过执行RxAndroidPlugins.initMainThreadScheduler得来:
通过调用call方法得来,为啥吧不直接new出来就完事了?它通过这种写法完成了一些装饰操作,还判断了非空,个人觉得这种写法可以学习一下。
所以最终返回了HandlerScheduler对象,并传入了主线程的Handler,这个就是线程切换的重点之一。
可以看到scheduler即为HandlerScheduler对象,它并不是TrampolineScheduler的子类,因此执行下面的代码,调用了createWorker方法:
记录下这个worker,后面线程切换时有用
创建主线程的Worker结束
了解了怎么创建后回到一开始的调用:
执行
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
在ObservableObserveOn中并没有找到source,所以只能在父类AbstractObservableWithUpstream中:
在父类找到了source,并且它的通过构造方法传入值,即子类调用super(xx)传入值:
而子类的创建:
传入的是this,谁调用了observeOn方法?是subThreadObservable,它是个ObservableSubscribeOn对象,因此这个this即为subThreadObservable。调用它的subscribe方法,它同样也没有subscribe方法而是在父类,父类又会调用它的subscribeActual方法,所以直接看ObservableSubscribeOn的subscribeActual方法:
这里的s,即ObserveOnObserver类对象,作为参数传入SubscribeOnObserver构造方法并生成一个SubscribeOnObserver对象,然后调用:
s.onSubscribe(parent);
即:ObserveOnObserver.onSubscribe方法:
传入的SubscribeOnObserver不实现QueueDisposable接口,所以if语句跳过
直接执行:
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
第一个很简单,new出一个SpscLinkedArrayQueue对象,这是一种链表+数组的数据结构。然后调用actual.onSubscribe(this)
这里的actual即为自定义的Observer接口对象,层层传递过来,调用它的onSubscribe方法:
这里并没有切换任何线程,所以,onSubscribe的执行是在创建它的当前线程,在子线程创建,那么它就在子线程执行,在主线程创建,则在主线程执行
回到ObservableSubscribeOn的subscribeActual方法,接着调用了:
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
可以看到将一个Runnable对象作为参数传给
scheduleDirect方法,run中执行source.subscribe(parent),这里盲猜这个runnable会在子线程执行。
直接看scheduler.scheduleDirect方法:
又创建了Worker,不过这次创建的Worker猜都知道应该是子线程的相关的Worker,因为刚才创建了个主线程的Worker。这个createWorker方法是个抽象方法,应该是在调用者scheduler中的实现,scheduler又是外部传入的Schedulers.io(),看看它的实现:
看看IO:
与之前创建HandlerScheduler很相似
这里创建的是一个IoScheduler,看看它的createWorker方法:
回到方法:
这里的
Worker w = new EventLoopWorker(pool.get());
可以确认
调用它的schedule方法,即调用EventLoopWorker的schedule方法,并传入runnable对象(decoratedRun其实和run是一样的,因为onSchedule里什么也没干):
调用:
threadWorker.scheduleActual(action, delayTime, unit, tasks);
这里的threadWorker这样来:
这里采用也采用了缓存的思想,一开始当然是没有的,所以创建一个ThreadWorker对象并返回
调用它的scheduleActual方法,可是它没有scheduleActual方法,找父类NewThreadWorker:
这里将Runnable对象传入ScheduledRunnable,其实就是做了一层封装,它本身也实现了Runnable接口,最终还是会调用的run方法,然后调用:
executor.submit((Callable<Object>)sr);
因为delayTime==0
executor是什么?找一下它的源头:
这里就很熟悉了,这是通过
Executors.newScheduledThreadPool(1, factory);
创建的一个线程池,然后将Runnable添加进去执行,所以runnable的执行会发生在子线程!
回到最初run方法执行内容:
source.subscribe(parent);
source与之前分析那个source传入方式相同,存在父类,通过构造方法赋值,值又是从子类传递给父类,子类又是外部传入的自定义接口实现对象:
调用它的subscribe方法,打印线程,此时这条代码执行会发生在线程池中。
调用e.onNext方法,e即为传入的SubscribeOnObserver对象,所以调用它的onNext方法:
这里的actual是外部传入的ObserveOnObserver对象,调用它的onNext方法:
将t值存入queue中,调用schedule:
这里的work即为外部传入的HandlerWorker,调用schedule方法,并传入this(ObserveOnObserver对象):
这里的逻辑就明朗了:同样的将Runnable对象(ObserveOnObserver实现了Runnable接口)封装成ScheduledRunnable,然后包装成Message通过handler(这里handler是主线程的handler)发送给主线程,所以ObserveOnObserver的run方法会在主线程被执行:
outputFused默认为false,执行下面的drainNormal:
主要看红箭头的代码,这是个死循环,从queue中取出数据,判断是否为null,为null退出循环,不为空就执行a.onNext,这个a=actual,而actual就是外部层层传入的自定义接口实现对象Observer:
所以代码的执行线程实现了从创建线程->线程池->主线程的过程,通过queue来存取数据。可以看到queue的创建,添加数据,取出数据分别在不同的线程中完成,因为queue是ObserveOnObserver的一个属性,而ObserveOnObserver是一个静态类,所有的线程都传递同一个ObserveOnObserver对象,所以可以实现共享。因此数据的共享就是通过一个线程将数据存在queue中,另一个线程再来取来实现。
执行链:
(在创建所在线程执行)
Observable.subscribe—>ObservableObserveOn.subscribeActual—>
Observable.subscribe—>ObservableSubscribeOn.subscribeActual—>
(在线程池中执行)
ObservableOnSubscribe.subscribe—>SubscribeOnObserver.onNext—>
ObserveOnObserver.onNext—>ObserveOnObserver.schedule–>
HandlerWorker–>schedule—>
(在主线程执行)
ObserveOnObserver.run—>ObserveOnObserver.run.drainNormal—>
Observer.onNext