欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava 2.x详解

程序员文章站 2024-02-29 10:12:04
...
implementation "io.reactivex.rxjava2:rxjava:2.1.1"
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'

先来个入门案例:

  Observable.create(new ObservableOnSubscribe<String>() { //创建观察者
            //ObservableEmitter:发射器
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onNext("4");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() { //订阅

            //创建观察者

            /**
             * RxJava 2.x 新增的方法,Disposable可以做到切断的操作,让Observer观察者不再接收上游事件
             * @param d
             */
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String s) {
                Log.e("xyh", "onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

不难看出,RxJava 2.x 与 1.x 还是存在着一些区别的。首先,创建 Observable 时,回调的是 ObservableEmitter ,字面意思即发射器,并且直接 throws Exception。其次,在创建的 Observer 中,也多了一个回调方法:onSubscribe,传递参数为Disposable,Disposable 相当于 RxJava 1.x 中的 Subscription, 用于解除订阅。

当然,我们的 RxJava 2.x 也为我们保留了简化订阅方法,我们可以根据需求,进行相应的简化订阅,只不过传入对象改为了 Consumer。

Consumer 即消费者,用于接收单个值,BiConsumer 则是接收两个值,Function 用于变换对象,Predicate 用于判断。这些接口命名大多参照了 Java 8 ,熟悉 Java 8 新特性的应该都知道意思,这里也不再赘述。

  Observable.create(new ObservableOnSubscribe<String>() { 
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onComplete();
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e("xyh", "accept: " + s);
            }
        });

线程调度

关于线程切换这点,RxJava 1.x 和 RxJava 2.x 的实现思路是一样的。

subscribeOn() 指定的就是发射事件的线程
observerOn 指定的就是订阅者接收事件的线程

多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略。
但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次。

  Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("xyh", "accept: " + s);
                    }
                });

RxJava 中,已经内置了很多线程选项供我们选择,例如有:

1.Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作;
2.Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作;
3.Schedulers.newThread() 代表一个常规的新线程;
4.AndroidSchedulers.mainThread() 代表Android的主线程

观察者模式

大家可能都知道, RxJava 以观察者模式为骨架,在 2.0 中依旧如此。

不过此次更新中,出现了两种观察者模式:

**Observable ( 被观察者 ) / Observer ( 观察者 )
Flowable (被观察者)/ Subscriber (观察者)**

在 RxJava 2.x 中,Observable 用于订阅 Observer,不再支持背压(1.x 中可以使用背压策略),而 Flowable 用于订阅 Subscriber , 是支持背压(Backpressure)的。


        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

            }
        }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {

                    }

                    @Override
                    public void onNext(String s) {

                    }

                    @Override
                    public void onError(Throwable t) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

RxJava2.x相比RxJava1.x的变化:

RxJava 2.x 是按照 Reactive-Streams specification 规范完全的重写的,完全独立于 RxJava 1.x 而存在,它改变了以往 RxJava 的用法。

1.Nulls

这是一个很大的变化,熟悉 RxJava 1.x 的童鞋一定都知道,1.x 是允许我们在发射事件的时候传入 null 值的,但现在我们的 2.x 不支持了,不信你试试? 大大的 NullPointerException 教你做人。这意味着 Observable 不再发射任何值,而是正常结束或者抛出空指针。

2、Flowable

RxJava 2.x 最大的改动就是对于 backpressure 的处理,为此将原来的 Observable 拆分成了新的 Observable 和 Flowable

什么是背压?
大概就是指在异步场景中,被观察者发送事件的速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。在差距太大的时候,我们的内存会猛增,直到OOM。而我们的 Flowable 一定意义上可以解决这样的问题,但其实并不能完全解决。
RxJava 2.x详解
RxJava 2.x详解

3.Single/Completable/Maybe

其实这三者都差不多,Single 顾名思义,只能发送一个事件,和 Observable接受可变参数完全不同。而 Completable 侧重于观察结果,而 Maybe 是上面两种的结合体。也就是说,当你只想要某个事件的结果(true or false)的时候,你可以使用这种观察者模式。

4.线程调度相关

这一块基本没什么改动,但细心的小伙伴一定会发现,RxJava 2.x 中已经没有了 Schedulers.immediate() 这个线程环境,还有 Schedulers.test()。

5.Function相关

熟悉 1.x 的小伙伴一定都知道,我们在1.x 中是有 Func1,Func2…..FuncN的,但 2.x 中将它们移除,而采用 Function 替换了 Func1,采用 BiFunction 替换了 Func 2..N。并且,它们都增加了 throws Exception,也就是说,妈妈再也不用担心我们做某些操作还需要 try-catch 了。

6.其他操作符相关

如 Func1…N 的变化,现在同样用 Consumer 和 BiConsumer 对 Action1 和 Action2 进行了替换。后面的 Action 都被替换了,只保留了 ActionN。

操作符

1.Create:
create 操作符应该是最常见的操作符了,主要用于产生一个 Obserable 被观察者对象,


        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onNext("4");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {

            private int i;
            private Disposable mDisposable;

            @Override
            public void onSubscribe(Disposable d) {
                mDisposable = d;
            }

            @Override
            public void onNext(String s) {
                i++;
                if (i == 2) {
                    // 在RxJava 2.x 中,新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件
                    mDisposable.dispose();
                }
                Log.e("xyh", "onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

2.x 中有一个 Disposable 概念,这个东西可以直接调用切断,可以看到,当它的 isDisposed() 返回为 false 的时候,接收器能正常接收事件,但当其为 true 的时候,接收器停止了接收。所以可以通过此参数动态控制接收事件了。

2.Map
Map 基本算是 RxJava 中一个最简单的操作符了,熟悉 RxJava 1.x 的知道,它的作用是对发射时间发送的每一个事件应用一个函数,是的每一个事件都按照指定的函数去变化,而在 2.x 中它的作用几乎一致。

  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).map(new Function<Integer, String>() {

            @Override
            public String apply(Integer integer) throws Exception {
                return "This is" + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e("xyh", "accept: " + s);
            }
        });
05-25 15:47:13.512 10791-10791/com.xiaoyehai.threadpool E/xyh: accept: This is  1
05-25 15:47:13.513 10791-10791/com.xiaoyehai.threadpool E/xyh: accept: This is  2
05-25 15:47:13.513 10791-10791/com.xiaoyehai.threadpool E/xyh: accept: This is  3

map 基本作用就是将一个 Observable 通过某种函数关系,转换为另一种 Observable,上面例子中就是把我们的 Integer 数据变成了 String 类型。

3.FlatMap

FlatMap 是一个很有趣的东西,我坚信你在实际开发中会经常用到。它可以把一个发射器 Observable 通过某种方法转换为多个 Observables,然后再把这些分散的 Observables装进一个单一的发射器 Observable。但有个需要注意的是,flatMap 并不能保证事件的顺序,如果需要保证,需要用到我们下面要讲的 ConcatMap。

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                int delayTime = (int) (1 + Math.random() * 10);
                return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("xyh", "accept: " + s);
                    }
                });
05-25 16:03:01.362 11840-11840/com.xiaoyehai.threadpool E/xyh: accept: I am value 1
05-25 16:03:01.362 11840-11840/com.xiaoyehai.threadpool E/xyh: accept: I am value 1
05-25 16:03:01.362 11840-11840/com.xiaoyehai.threadpool E/xyh: accept: I am value 1
05-25 16:03:01.362 11840-11840/com.xiaoyehai.threadpool E/xyh: accept: I am value 3
05-25 16:03:01.362 11840-11840/com.xiaoyehai.threadpool E/xyh: accept: I am value 3
05-25 16:03:01.362 11840-11840/com.xiaoyehai.threadpool E/xyh: accept: I am value 3
05-25 16:03:01.362 11840-11840/com.xiaoyehai.threadpool E/xyh: accept: I am value 2
05-25 16:03:01.362 11840-11840/com.xiaoyehai.threadpool E/xyh: accept: I am value 2
05-25 16:03:01.362 11840-11840/com.xiaoyehai.threadpool E/xyh: accept: I am value 2

4.concatMap

上面其实就说了,concatMap 与 FlatMap 的唯一区别就是 concatMap 保证了顺序,所以,我们就直接把 flatMap 替换为 concatMap 验证吧。

  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                int delayTime = (int) (1 + Math.random() * 10);
                return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("xyh", "accept: " + s);
                    }
                });
05-25 16:03:51.653 12037-12037/com.xiaoyehai.threadpool E/xyh: accept: I am value 1
05-25 16:03:51.653 12037-12037/com.xiaoyehai.threadpool E/xyh: accept: I am value 1
05-25 16:03:51.653 12037-12037/com.xiaoyehai.threadpool E/xyh: accept: I am value 1
05-25 16:03:51.653 12037-12037/com.xiaoyehai.threadpool E/xyh: accept: I am value 2
05-25 16:03:51.653 12037-12037/com.xiaoyehai.threadpool E/xyh: accept: I am value 2
05-25 16:03:51.653 12037-12037/com.xiaoyehai.threadpool E/xyh: accept: I am value 2
05-25 16:03:51.653 12037-12037/com.xiaoyehai.threadpool E/xyh: accept: I am value 3
05-25 16:03:51.653 12037-12037/com.xiaoyehai.threadpool E/xyh: accept: I am value 3
05-25 16:03:51.653 12037-12037/com.xiaoyehai.threadpool E/xyh: accept: I am value 3

5.distinct

这个操作符非常的简单、通俗、易懂,就是简单的去重嘛。

 Observable.just(1, 1, 2, 2, 3, 4, 4, 5)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("xyh", "accept: " + integer);
                    }
                });
05-25 16:06:12.762 12268-12268/com.xiaoyehai.threadpool E/xyh: accept: 1
05-25 16:06:12.762 12268-12268/com.xiaoyehai.threadpool E/xyh: accept: 2
05-25 16:06:12.762 12268-12268/com.xiaoyehai.threadpool E/xyh: accept: 3
05-25 16:06:12.762 12268-12268/com.xiaoyehai.threadpool E/xyh: accept: 4
05-25 16:06:12.762 12268-12268/com.xiaoyehai.threadpool E/xyh: accept: 5

6.Filter

Filter 你会很常用的,它的作用也很简单,过滤器嘛。可以接受一个参数,让其过滤掉不符合我们条件的值

 //过滤掉大于30的数
        Observable.just(10, 20, 30, 40, 50)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer <= 30;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("xyh", "accept: " + integer);
            }
        });
05-25 16:09:56.025 12535-12535/com.xiaoyehai.threadpool E/xyh: accept: 10
05-25 16:09:56.025 12535-12535/com.xiaoyehai.threadpool E/xyh: accept: 20
05-25 16:09:56.025 12535-12535/com.xiaoyehai.threadpool E/xyh: accept: 30

7.buffer

buffer 操作符接受两个参数,buffer(count,skip),作用是将 Observable 中的数据按 skip (步长) 分成最大不超过 count 的 buffer ,然后生成一个 Observable 。

  Observable.just(1, 2, 3, 4, 5)
                .buffer(3, 2)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        Log.e("xyh", "buffer size : " + integers.size() + "\n");
                        for (Integer integer : integers) {
                            Log.e("xyh", "accept: " + integer);
                        }
                    }
                });
05-25 16:14:33.918 12925-12925/com.xiaoyehai.threadpool E/xyh: buffer size : 3
05-25 16:14:33.918 12925-12925/com.xiaoyehai.threadpool E/xyh: accept: 1
05-25 16:14:33.918 12925-12925/com.xiaoyehai.threadpool E/xyh: accept: 2
05-25 16:14:33.918 12925-12925/com.xiaoyehai.threadpool E/xyh: accept: 3
05-25 16:14:33.918 12925-12925/com.xiaoyehai.threadpool E/xyh: buffer size : 3
05-25 16:14:33.918 12925-12925/com.xiaoyehai.threadpool E/xyh: accept: 3
05-25 16:14:33.918 12925-12925/com.xiaoyehai.threadpool E/xyh: accept: 4
05-25 16:14:33.918 12925-12925/com.xiaoyehai.threadpool E/xyh: accept: 5
05-25 16:14:33.918 12925-12925/com.xiaoyehai.threadpool E/xyh: buffer size : 1
05-25 16:14:33.918 12925-12925/com.xiaoyehai.threadpool E/xyh: accept: 5

我们把 1, 2, 3, 4, 5 依次发射出来,经过 buffer 操作符,其中参数 skip 为 2, count 为 3,而我们的输出 依次是 123,345,5。显而易见,我们 buffer 的第一个参数是 count,代表最大取值,在事件足够的时候,一般都是取 count 个值,然后每次跳过 skip 个事件。其实看 Log 日志,我相信大家都明白了。

8.timer

timer 很有意思,相当于一个定时任务。在 1.x 中它还可以执行间隔逻辑,但在 2.x 中此功能被交给了 interval,下一个会介绍。但需要注意的是,timer 和 interval 均默认在新线程。等同于Android中Handler的postDelay( )方法:

 Observable.timer(2, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread()) // // timer 默认在新线程,所以需要切换回主线程
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        //2秒后执行该方法
                        Log.e("xyh", "accept: " + aLong);
                    }
                });

9.interval

如同我们上面可说,interval 操作符用于间隔时间执行某个操作,其接受三个参数,分别是第一次发送延迟,间隔时间,时间单位。

 Observable.interval(3, 2, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread()) // interval 默认在新线程,所以需要切换回主线程
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        //3秒后执行该方法,然后每隔两秒执行该方法,无限循环
                        Log.e("xyh", "accept: " + aLong);
                    }
                });

当我们的Activity 都销毁的时候,实际上这个操作还依然在进行,所以,我们得花点小心思让我们在不需要它的时候干掉它。

  mDisposable = Observable.interval(3, 2, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread()) // interval 默认在新线程,所以需要切换回主线程
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        //3秒后执行该方法,然后每隔两秒执行该方法,无限循环
                        Log.e("xyh", "accept: " + aLong);
                    }
                });
    }

    @Override
    protected void onDestroy() {
        if (mDisposable != null && !mDisposable.isDisposed()) {
            mDisposable.dispose();
        }
    }

10.doOnNext

其实觉得 doOnNext 应该不算一个操作符,但考虑到其常用性,我们还是咬咬牙将它放在了这里。它的作用是让订阅者在接收到数据之前干点有意思的事情。假如我们在获取到数据之前想先保存一下它,无疑我们可以这样实现。

   Observable.just(1, 2, 3, 4)
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        //让订阅者在接收到数据之前干点有意思的事情
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

            }
        });

11.skip

skip 很有意思,其实作用就和字面意思一样,接受一个 long 型参数 count ,代表跳过 count 个数目开始接收。

   Observable.just(1, 2, 3, 4)
                .skip(2) //代表跳过 count 个数目开始接收。
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("xyh", "accept: " + integer);
                    }
                });
05-25 16:30:06.819 13813-13813/com.xiaoyehai.threadpool E/xyh: accept: 3
05-25 16:30:06.819 13813-13813/com.xiaoyehai.threadpool E/xyh: accept: 4

12.take

take,接受一个 long 型参数 count ,代表至多接收 count 个数据。

 Observable.just(1, 2, 3, 4)
                .take(2) //代表至多接收 2 个数据
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("xyh", "accept: " + integer);
                    }
                });
05-25 16:31:37.110 14079-14079/com.xiaoyehai.threadpool E/xyh: accept: 1
05-25 16:31:37.110 14079-14079/com.xiaoyehai.threadpool E/xyh: accept: 2

13.just

just,没什么好说的,其实在前面各种例子都说明了,就是一个简单的发射器依次调用 onNext() 方法。

Observable.just("1", "2", "3")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {

                       Log.e(TAG,"accept : onNext : " + s + "\n" );
                    }
                });

13.Single

顾名思义,Single 只会接收一个参数,而 SingleObserver 只会调用 onError() 或者 onSuccess()。

  Single.just(1)
                .subscribe(new SingleObserver<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onSuccess(Integer integer) {
                        Log.e("xyh", "onSuccess: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }
                });
05-25 16:37:18.342 14390-14390/com.xiaoyehai.threadpool E/xyh: onSuccess: 1

15.debounce

去除发送频率过快的项,看起来好像没啥用处,但你信我,后面绝对有地方很有用武之地。


        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                Thread.sleep(300);
                emitter.onNext(2);
                Thread.sleep(200);
                emitter.onNext(3);
                Thread.sleep(500);
                emitter.onNext(4);
                Thread.sleep(600);
            }
        }).debounce(500, TimeUnit.MILLISECONDS)
                //去除发送间隔时间小于等于 500 毫秒的发射事件
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("xyh", "accept: " + integer);
                    }
                });
05-25 16:41:27.632 14706-14706/com.xiaoyehai.threadpool E/xyh: accept: 4

16.defer

简单地时候就是每次订阅都会创建一个新的 Observable,并且如果没有被订阅,就不会产生新的 Observable。


        //每次订阅都会创建一个新的 Observable,并且如果没有被订阅,就不会产生新的 Observable。
        Observable.defer(new Callable<ObservableSource<Integer>>() {
            @Override
            public ObservableSource<Integer> call() throws Exception {
                return Observable.just(1, 2, 3);
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("xyh", "accept: " + integer);
            }
        });

17.last

last 操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。

Observable.just(1, 2, 3, 4)
                .last(2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("xyh", "accept: " + integer);
                    }
                });
05-25 16:47:25.135 15322-15322/com.xiaoyehai.threadpool E/xyh: accept: 4

18.merge

merge 顾名思义,熟悉版本控制工具的你一定不会不知道 merge 命令,而在 Rx 操作符中,merge 的作用是把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。

 Observable.merge(Observable.just(1, 2, 3), Observable.just(4, 5))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("xyh", "accept: " + integer);
                    }
                });
05-25 16:49:18.973 15551-15551/com.xiaoyehai.threadpool E/xyh: accept: 1
05-25 16:49:18.973 15551-15551/com.xiaoyehai.threadpool E/xyh: accept: 2
05-25 16:49:18.973 15551-15551/com.xiaoyehai.threadpool E/xyh: accept: 3
05-25 16:49:18.973 15551-15551/com.xiaoyehai.threadpool E/xyh: accept: 4
05-25 16:49:18.973 15551-15551/com.xiaoyehai.threadpool E/xyh: accept: 5

19.reduce

reduce 操作符每次用一个方法处理一个值,可以有一个 seed 作为初始值。

 Observable.just(1, 2, 3)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("xyh", "accept: " + integer);
            }
        });
05-25 16:52:50.621 15827-15827/com.xiaoyehai.threadpool E/xyh: accept: 6

可以看到,代码中,我们中间采用 reduce ,支持一个 function 为两数值相加,所以应该最后的值是:1 + 2 = 3 + 3 = 6 , 而Log 日志完美解决了我们的问题。

20.scan

scan 操作符作用和上面的 reduce 一致,唯一区别是 reduce 是个只追求结果的坏人,而 scan 会始终如一地把每一个步骤都输出。

Observable.just(1, 2, 3)
                .scan(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("xyh", "accept: " + integer);
            }
        });
05-25 16:55:03.932 16206-16206/com.xiaoyehai.threadpool E/xyh: accept: 1
05-25 16:55:03.933 16206-16206/com.xiaoyehai.threadpool E/xyh: accept: 3
05-25 16:55:03.933 16206-16206/com.xiaoyehai.threadpool E/xyh: accept: 6

21.window

按照实际划分窗口,将数据发送给不同的 Observable

至此,大部分 RxJava 2.x 的操作符就告一段落了,当然还有一些没有提到的操作符。

网络请求:

  //网络请求
     Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                //在这里请求网络获取数据
                String result = "";
                emitter.onNext(result);
            }
        }).map(new Function<String, User>() {
            //把string转换为实体类
            @Override
            public User apply(String s) throws Exception {
                //Gson
                return null;
            }
        }).doOnNext(new Consumer<User>() {
            @Override
            public void accept(User user) throws Exception {
                //在这里可以进行数据库存储操作
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<User>() {
                    @Override
                    public void accept(User user) throws Exception {

                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {

                    }
                });
相关标签: android RxJava2