欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RXJava之线程控制Scheduler(四)

程序员文章站 2024-03-22 16:14:10
...

阿里P7移动互联网架构师进阶视频(每日更新中)免费学习请点击:https://space.bilibili.com/474380680

线程控制:Scheduler

除了灵活的变换,RxJava 另一个牛逼的地方,就是线程的*控制。

1) Scheduler 的 API

前面讲到了,可以利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了 map() flatMap() 等变换方法后,有些好事的(其实就是当初刚接触 RxJava 时的我)就问了:能不能多切换几次线程?

答案是:能。因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。上代码:

Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新线程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 线程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

如上,通过 observeOn() 的多次调用,程序实现了线程的多次切换。

不过,不同于 observeOn()subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

又有好事的(其实还是当初的我)问了:如果我非要调用多次 subscribeOn() 呢?会有什么效果?

这个问题先放着,我们还是从 RxJava 线程控制的原理说起吧。

2) Scheduler 的原理

其实, subscribeOn()observeOn() 的内部实现,也是用的 lift()。具体看图(不同颜色的箭头表示不同的线程):

subscribeOn() 原理图:

RXJava之线程控制Scheduler(四)
subscribeOn() 原理

observeOn() 原理图:

RXJava之线程控制Scheduler(四)
observeOn() 原理

从图中可以看出,subscribeOn()observeOn() 都做了线程切换的工作(图中的 "schedule..." 部位)。不同的是, subscribeOn()的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。

最后,我用一张图来解释当多个 subscribeOn()observeOn() 混合使用时,线程调度是怎么发生的(由于图中对象较多,相对于上面的图对结构做了一些简化调整):

RXJava之线程控制Scheduler(四)
线程控制综合调用

图*有 5 处含有对事件的操作。由图中可以看出,①和②两处受第一个 subscribeOn() 影响,运行在红色线程;③和④处受第一个 observeOn() 的影响,运行在绿色线程;⑤处受第二个 onserveOn() 影响,运行在紫色线程;而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。

3) 延伸:doOnSubscribe()

然而,虽然超过一个的 subscribeOn() 对事件处理的流程没有影响,但在流程之前却是可以利用的。

在前面讲 Subscriber 的时候,提到过 SubscriberonStart() 可以用作流程开始前的初始化。然而 onStart() 由于在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程。这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe() 将会在什么线程执行。

而与 Subscriber.onStart() 相对应的,有一个方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

示例代码:

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

如上,在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了。
原文链接https://gank.io/post/560e15be2dca930e00da1083#toc_17
阿里P7移动互联网架构师进阶视频(每日更新中)免费学习请点击:https://space.bilibili.com/474380680