RXJava之线程控制Scheduler(四)
阿里P7移动互联网架构师进阶视频(每日更新中)免费学习请点击:https://space.bilibili.com/474380680
线程控制:Scheduler
除了灵活的变换,RxJava 另一个牛逼的地方,就是线程的*控制。
1) Scheduler 的 API
前面讲到了,可以利用 subscribeOn()
结合 observeOn()
来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了 map()
flatMap()
等变换方法后,有些好事的(其实就是当初刚接触 RxJava 时的我)就问了:能不能多切换几次线程?
答案是:能。因为 observeOn()
指定的是 Subscriber
的线程,而这个 Subscriber
并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe()
参数中的 Subscriber
,而是 observeOn()
执行时的当前 Observable
所对应的 Subscriber
,即它的直接下级 Subscriber
。换句话说,observeOn()
指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn()
即可。上代码:
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定
如上,通过 observeOn()
的多次调用,程序实现了线程的多次切换。
不过,不同于 observeOn()
, subscribeOn()
的位置放在哪里都可以,但它是只能调用一次的。
又有好事的(其实还是当初的我)问了:如果我非要调用多次 subscribeOn()
呢?会有什么效果?
这个问题先放着,我们还是从 RxJava 线程控制的原理说起吧。
2) Scheduler 的原理
其实, subscribeOn()
和 observeOn()
的内部实现,也是用的 lift()
。具体看图(不同颜色的箭头表示不同的线程):
subscribeOn()
原理图:
observeOn()
原理图:
从图中可以看出,subscribeOn()
和 observeOn()
都做了线程切换的工作(图中的 "schedule..." 部位)。不同的是, subscribeOn()
的线程切换发生在 OnSubscribe
中,即在它通知上一级 OnSubscribe
时,这时事件还没有开始发送,因此 subscribeOn()
的线程控制可以从事件发出的开端就造成影响;而 observeOn()
的线程切换则发生在它内建的 Subscriber
中,即发生在它即将给下一级 Subscriber
发送事件时,因此 observeOn()
控制的是它后面的线程。
最后,我用一张图来解释当多个 subscribeOn()
和 observeOn()
混合使用时,线程调度是怎么发生的(由于图中对象较多,相对于上面的图对结构做了一些简化调整):
图*有 5 处含有对事件的操作。由图中可以看出,①和②两处受第一个 subscribeOn()
影响,运行在红色线程;③和④处受第一个 observeOn()
的影响,运行在绿色线程;⑤处受第二个 onserveOn()
影响,运行在紫色线程;而第二个 subscribeOn()
,由于在通知过程中线程就被第一个 subscribeOn()
截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个 subscribeOn()
的时候,只有第一个 subscribeOn()
起作用。
3) 延伸:doOnSubscribe()
然而,虽然超过一个的 subscribeOn()
对事件处理的流程没有影响,但在流程之前却是可以利用的。
在前面讲 Subscriber
的时候,提到过 Subscriber
的 onStart()
可以用作流程开始前的初始化。然而 onStart()
由于在 subscribe()
发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe()
被调用时的线程。这就导致如果 onStart()
中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe()
将会在什么线程执行。
而与 Subscriber.onStart()
相对应的,有一个方法 Observable.doOnSubscribe()
。它和 Subscriber.onStart()
同样是在 subscribe()
调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe()
执行在 subscribe()
发生的线程;而如果在 doOnSubscribe()
之后有 subscribeOn()
的话,它将执行在离它最近的 subscribeOn()
所指定的线程。
示例代码:
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
如上,在 doOnSubscribe()
的后面跟一个 subscribeOn()
,就能指定准备工作的线程了。
原文链接https://gank.io/post/560e15be2dca930e00da1083#toc_17
阿里P7移动互联网架构师进阶视频(每日更新中)免费学习请点击:https://space.bilibili.com/474380680
上一篇: 图片抓取【scrapy、splash】
下一篇: 回文数判断(Leetcode)