rxjava2.0 线程切换的原理及SubscribeOn,ObserveOn的用法
程序员文章站
2024-02-28 10:00:04
...
android开发过程中经常会用到线程切换,比如数据加载、文件存储、数据库操作等都是在io线程处理,而处理结果的展示刷新ui需要在ui线程进行。
如果不用rxjava,我们可能会asycTask,或者retrofit自己默认的callback来在ui线程刷新ui。但是当我们的操作变复杂时,比如一个接口的数据依赖于另一个接口的返回,或者一次上传多张图片功能,就会造成接口多层嵌套进而增加维护成本
注:由于observable实现了observablesource接口所以下文所说的observable与source等价
线程切换方式
先上一段代码及运行结果,呈现出一种直观的认识
Observable.just("Some String")
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
Log.d("cong", "1 threadName:"+Thread.currentThread().getName());
return s.length();
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
Log.d("cong", "2 threadName:"+Thread.currentThread().getName());
return 2 * integer;
}
})
.subscribeOn(Schedulers.newThread())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
Log.d("cong", "3 threadName:"+Thread.currentThread().getName());
return 2 * integer;
}
})
.observeOn(Schedulers.computation()) // change thread
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
Log.d("cong", "4 threadName:"+Thread.currentThread().getName());
return 2 * integer;
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
Log.d("cong", "5 threadName:"+Thread.currentThread().getName());
return 2 * integer;
}
})
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("cong", "6 threadName:"+Thread.currentThread().getName());
}
});
可以发现5处线程切换其实只生效了3处,第一个subscribeOn,两个ObserveOn。
要理解原因必须要了解rxjava操作符的设计模式,rxjava操作符的设计其实是大量利用包装者模式来包装observer,observable。
以map操作符为例:
public interface Function<T, R> {
//传递一个类型的值并将值转换为另一个类型
R apply(@NonNull T t) throws Exception;
}
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
// 新生成一个observable,并且将本类,及mapper 作为参数传递到新的observable 中
return new ObservableMap<T, R>(this, mapper));
}
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
/**1: 每个source其实就是调用操作符的时候的observable主体即被包装的observable
* 2: MapperObserver是对当前source接收的observer类型的封装,当upstreamSource observable类型的数据传递过来的时候会在mapperobserser中做转换,最后调用到真实的被包装的observer的onnext
*
* **/
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//真实的被包装的observer
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
线程切换操作符的原理类似,首先我们还是先给出第一个示例的流程图。
由于source包装的都是调用操作符的source,及所谓的upstreamsource
什么是upstream downstream
upstream downstream
source <------------- operator ------------------> consumer/further operators
操作符的左边是upstream 操作符的右边是dowmstream
操作符中理解这张图的关键就是subscribeOn既会影响upstream也会影响downstream,而observeOn只会影响downStream
图中向上的箭头是subscribe过程,向下的箭头是onnext通知过程