欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava系列9:理解rxjava的lift

程序员文章站 2022-04-03 07:58:28
...

理解rxjava的lift

一.为什么lift很重要

lift操作是很多操作符的基础操作,了解了lift操作,对大多数的操作符方法都能够理解了。

从源码可以看到,lift是把一个observable转换成另一个observable。Rx响应式编程中,observable发射某一种数据类型的数据流,怎么把这个数据流的类型转化为新的数据类型,只能把源数据流发射出来,进行转换。这就是lift的功用了。

二.lift源码分析

这里为方便理解:
源observable标记为observable<T>.转换后的observable标记为observable<R>.
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }


    public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

    static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }
}
几个要点如下:
1.lift返回的是一个新的Observable<R>。既有源observable<T>和新的observable<R>.对应了OnSubscribe1和OnSubscribe2对象。
2.Observable<T>.just.lift.subscribe(new Subscriber<R>).调用observable<R>的subscribe方法,传入的是Subscriber<R>,此时触发的是observable<R>.OnSubscribe.call方法,也就是如下的方法:
@Override
       public void call(Subscriber<? super R> o) {
            try {
                Subscriber<? super T> st = hook.onLift(operator).call(o);
                try {
                    // new Subscriber created and being subscribed with so 'onStart' it
                    st.onStart();
                   onSubscribe.call(st);
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    st.onError(e);
                }
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                o.onError(e);
            }
        }

这里的o对应的就是Subscriber.
hook.onLift(operator).call(o)会创建一个MapSubscriber

public final class OperatorMap<T, R> implements Operator<R, T> {
    //func
    final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        return parent;
    }

    }
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
static final class MapSubscriber<T, R> extends Subscriber<T> {

        final Subscriber<? super R> actual;
        //func
        final Func1<? super T, ? extends R> mapper;

        boolean done;

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R result;

            try {
                    //func的call方法会把输入的T转化为想要的结果R
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaPluginUtils.handleException(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }

三.以map为例

Observable.just(1).map(new Func1<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) {
                return integer + 1;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {

            }
        });

Func1处理器

四.lift流程梳理

1.从流的角度来理解。
2.Observable<T>转化为Observable<R>。
Operator生成了Subscriber<T,R>。
Observable<T>向Subscriber<T,R>发送事件,发送的数据类型是T,通过转换器转化为R,然后发送给Subscriber<R>。
3.流的路线是:
Observable<T> -> Subscriber<T,R> -> Observable<R> -> Subscriber<R>.
4.即,在observable执行了lift操作后,会返回一个新的observable,这个新的observable像代理一样,负责接收原始的observable发出的数据,处理后发送给Subscriber。

借用抛物线的图:

RxJava系列9:理解rxjava的lift

参考资料

RxJava基本流程和lift源码分析
http://blog.csdn.net/lzyzsd/article/details/50110355

RxJava学习经验谈——lift操作
http://blog.csdn.net/guiyu_1985/article/details/54647027