RxJava系列9:理解rxjava的lift
程序员文章站
2022-04-03 07:58:28
...
理解rxjava的lift
一.为什么lift很重要
lift操作是很多操作符的基础操作,了解了lift操作,对大多数的操作符方法都能够理解了。
从源码可以看到,lift是把一个observable转换成另一个observable。Rx响应式编程中,observable发射某一种数据类型的数据流,怎么把这个数据流的类型转化为新的数据类型,只能把源数据流发射出来,进行转换。这就是lift的功用了。
二.lift源码分析
这里为方便理解:
源observable标记为observable<T>.转换后的observable标记为observable<R>.
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
}
几个要点如下:
1.lift返回的是一个新的Observable<R>。既有源observable<T>和新的observable<R>.对应了OnSubscribe1和OnSubscribe2对象。
2.Observable<T>.just.lift.subscribe(new Subscriber<R>).调用observable<R>的subscribe方法,传入的是Subscriber<R>,此时触发的是observable<R>.OnSubscribe.call方法,也就是如下的方法:
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
o.onError(e);
}
}
这里的o对应的就是Subscriber.
hook.onLift(operator).call(o)会创建一个MapSubscriber
public final class OperatorMap<T, R> implements Operator<R, T> {
//func
final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
return parent;
}
}
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
//func
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
//func的call方法会把输入的T转化为想要的结果R
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaPluginUtils.handleException(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
三.以map为例
Observable.just(1).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer + 1;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
});
Func1处理器
四.lift流程梳理
1.从流的角度来理解。
2.Observable<T>转化为Observable<R>。
Operator生成了Subscriber<T,R>。
Observable<T>向Subscriber<T,R>发送事件,发送的数据类型是T,通过转换器转化为R,然后发送给Subscriber<R>。
3.流的路线是:
Observable<T> -> Subscriber<T,R> -> Observable<R> -> Subscriber<R>.
4.即,在observable执行了lift操作后,会返回一个新的observable,这个新的observable像代理一样,负责接收原始的observable发出的数据,处理后发送给Subscriber。
借用抛物线的图:
参考资料
RxJava基本流程和lift源码分析
http://blog.csdn.net/lzyzsd/article/details/50110355
RxJava学习经验谈——lift操作
http://blog.csdn.net/guiyu_1985/article/details/54647027
上一篇: 可爱动漫卡通图片(大图)
下一篇: 线程池的原理解析