欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava2.0中flatMap操作符用法和源码分析

程序员文章站 2022-04-02 18:13:44
...

flatMap基本使用

flatMap是变换操作符,使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后flatMap合并这些Observable发射的数据,最后将合并后的结果当作它自己的数据序列发射。

注意:flatMap对这些Observable发射的数据做的是合并(merge)操作,因此它们可能是交错的。RxJava2.0中flatMap操作符用法和源码分析
我们可以用代码示例说明:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
    }
}).flatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {
        String flatMap = "I am value " + integer;
        return Observable.just(flatMap);
    }
}).subscribe(new Consumer<String>() {

    @Override
    public void accept(String s) throws Exception {
        println("flatMap : accept : " + s + "\n");
    }
});

输出结果:
flatMap : accept : I am value 1

flatMap : accept : I am value 2

flatMap : accept : I am value 3

下面我们将从源码的角度来分析下:

这里我们首先使用create操作符创建一个Observable,并且发射指定的数据。关于create如何创建Observable对象,我们这里不做分析,前面文章中已经说明。我们直接分析flatMap的源码:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
    return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
    return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
        boolean delayErrors, int maxConcurrency, int bufferSize) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    if (this instanceof ScalarCallable) {
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

flatMap方法具有多个重载方法,在调用flatMap方法时,首先需要传入一个Function的对象,但对是Function类中的泛型参数有要求,第二个参数必须是继承自ObservableSource类,然后在抽象的apply方法中返回该类型参数。而我们都知道Observable就是继承自该类。所以在Function中,最后必须返回的是一个Observable的对象。

我们知道在执行操作符方法时,最后都会生成一个具体的Observable对象,这个对象作为Observable的具体实现类,里面实现了具体的业务逻辑,然后通过多态的方式进行调用,而flatMap中对应的就是ObservableFlatMap类。

public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final boolean delayErrors;
    final int maxConcurrency;
    final int bufferSize;

    public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
    }

    .......

}

ObservableFlatMap的构造方法中,传入了五个参数。如下:

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
        boolean delayErrors, int maxConcurrency, int bufferSize) {
    ......
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

其中this参数,代表的是当前创建的Observable对象,本例子中使用的是create操作符创建的对象,所以具体的对象就是ObservableCreate实例。
mapper就是Function中的第二个参数,是一个新的Observable对象。其他参数都是系统默认参数。

完成了被观察者的准备工作后,接下来就是订阅观察者了。从之前的分析我们知道在订阅观察者时,实际调用的都是subscribeActual的方法。那么现在我们具体来分析ObservableFlatMap类中的方法:
ObservableFlatMap#subscribeActual

@Override
public void subscribeActual(Observer<? super U> t) {

    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
        return;
    }

    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

里面再次调用了source.subscribe方法,而我们刚才分析了这个source就是this这个参数指代的对象,这里即ObservableCreate对象。

Observable#subscribe

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

参数observer就是上面新建的MergeObserver对象。由于subscribeActual是个抽象方法,这里执行的是ObservableCreate中的方法。我们在看一下该类中的方法:

ObservableCreate#subscribeActual

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

这里面的通过调用ObservableOnSubscribe类中的subscribe方法,开发发射数据。但是参数observer就是刚才的MergeObserver对象。
在通过onNext发射数据时,其实执行的就是MergeObserver中的onNext的对象。

@Override
public void onNext(T t) {
    // safeguard against misbehaving sources
    if (done) {
        return;
    }
    ObservableSource<? extends U> p;
    try {
        p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        s.dispose();
        onError(e);
        return;
    }

    if (maxConcurrency != Integer.MAX_VALUE) {
        synchronized (this) {
            if (wip == maxConcurrency) {
                sources.offer(p);
                return;
            }
            wip++;
        }
    }

    subscribeInner(p);
}

方法中,首先接收了发射的数据,然后在通过mapper.apply(t)将数据转化为具有发射数据的ObservableSource对象,这里就对应了之前所说的Function将发射的数据转换为ObservableSource的对象。然后将新生成的ObservableSource的数据发射,回到之前的代码可以知道这里的p是mapper.apply(t)返回也就是ObservableJust对象

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }

接着代码继续往下看,会执行ObservableJust的subscribe方法

@SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
            for (;;) {
                if (p instanceof Callable) {
                    tryEmitScalar(((Callable<? extends U>)p));

                    if (maxConcurrency != Integer.MAX_VALUE) {
                        synchronized (this) {
                            p = sources.poll();
                            if (p == null) {
                                wip--;
                                break;
                            }
                        }
                    } else {
                        break;
                    }
                } else {
                    InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    if (addInner(inner)) {
                        p.subscribe(inner);
                    }
                    break;
                }
            }
        }

接着看下ObservableJust的源码

@Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }
        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }

这里的s其实就是InnerObserver对象到了这里会执行InnerObserver的next方法,进而执行MergeObserver的tryEmit方法

        @Override
        public void onNext(U t) {
            if (fusionMode == QueueDisposable.NONE) {
                parent.tryEmit(t, this);
            } else {
                parent.drain();
            }
        }
        void tryEmit(U value, InnerObserver<T, U> inner) {
            if (get() == 0 && compareAndSet(0, 1)) {
                actual.onNext(value);
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                SimpleQueue<U> q = inner.queue;
                if (q == null) {
                    q = new SpscLinkedArrayQueue<U>(bufferSize);
                    inner.queue = q;
                }
                q.offer(value);
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }

这里的actual对象其实就是我们代码订阅的观察者其实就是LambdaObserver,最终代码执行的是LambdaObserver的next方法,其实就是Consumer的next方法

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }
    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            try {
                onNext.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                get().dispose();
                onError(e);
            }
        }
    }

好了 到这里整个执行流程分析完毕。(其实还有一些细节的东西没有分析到位,希望大神指教)

相关标签: RxJava