教你轻松理解Rxjava之线程切换流程(observeOn与subscribeOn)
程序员文章站
2022-04-02 18:08:36
...
几句屁话
Rxjava我开始就觉得不就是能把线程切换吗?我的Handler和AsyncTask就能轻松解决,我还学什么这玩意......现在回想我就是一个井底之蛙
理解RxJava
- 订阅,向上走的一个过程
- 数据流,向下走的一个过程
借用别人的图给讲解下
请看下面的示意图(向上的箭头表示订阅操作的方向,向下的箭头表示数据流向,箭头的颜色表示所在的线程,曲折的箭头表示发生了线程切换)订阅的操作符:create丶doOnSubscribe
数据流的操作符:map丶flatMap丶过滤等主要是对数据处理的操作符
走之前记住一句话:subscribeOn切换订阅线程,observeOn处理数据流方向
1:当我们调用了subscribe的发起订阅
2:向上走,我只需要关心subscribeOn和订阅的操作符
3:向下走,我只需要关心observeOn和数据流的操作符
看不懂?来一段代码!
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d("wyz", "create:" + Thread.currentThread().getName());
e.onNext(1);
}
})
.subscribeOn(Schedulers.io())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
Log.d("wyz", "map1:" + Thread.currentThread().getName());
return integer;
}
})
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d("wyz", "doOnSubscribe1:" + Thread.currentThread().getName());
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d("wyz", "doOnSubscribe2:" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer integer) throws Exception {
Log.d("wyz","flatMap:" + Thread.currentThread().getName());
return Observable.fromArray(integer);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d("wyz", "执行完毕:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
一步一步来
当前线程状态(main线程)
- 订阅,向上走
- 经过第一个操作符observeOn,是控制数据流线程的,跳过
- 经过flatMap处理数据流的操作符,跳过
- 经过observeOn跳过
- 经过subscribeOn,将线程切换到IO线程,现在订阅处于IO线程
- 经过doOnSubscribe,执行里面代码,在IO线程中执行 输出:doOnSubscribe2:RxCachedThreadScheduler-1
- 经过subscribeOn,将线程切换到main线程,现在订阅处于main线程
- 经过doOnSubscribe,执行里面代码,在main线程中执行 输出:doOnSubscribe1:main
- 经过map方法,处理数据流的方法,跳过
- 经过subscribeOn,将线程切换到IO线程,现在订阅处于IO线程
- 经过create方法,在IO线程中执行 输出:create:RxCachedThreadScheduler-1
订阅完成,开始发射数据 注意:从上面结束我们知道,当前处于IO线程哦
当前线程状态(IO线程)
- 发射数据,向下走
- 经过subscribeOn跳过
- 经过map,在IO线程中执行 输出:map1:RxCachedThreadScheduler-1
- 经过doOnSubscribe跳过
- 经过subscribeOn跳过
- 经过doOnSubscribe跳过
- 经过subscribeOn跳过
- 经过observeOn,将线程切换到IO线程
- 经过flatMap执行,在IO线程执行 输出:flatMap:RxCachedThreadScheduler-2
- 经过observeOn,将线程切换到main线程
- 结束执行onNext方法,在main线程 输出:执行完毕:main
好了,就是一个向上子向下走的过程,这下我理解清楚了....我一开始弄的一锅粥
上一篇: RxJava操作符(七)——过滤操作符
下一篇: vuex基本熟悉与使用