欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rxjava2

程序员文章站 2022-06-09 23:26:30
...

rxjava1:


下面文章的代码下载地址:https://github.com/nanchen2251/RxJava2Examples

001.
与RxJava 1.x的差异

其实,我标题为入门教程,按理说应该从简单入门开始讲的,原谅我突然偏题了,因为我觉得可能大多数人都了解或者使用过RxJava 1.x(因为它真的太棒了)。虽然可能熟悉1.x 的你可以直接扒文档就可以了,但这么大的变化,请原谅我还在这里瞎比比。

  • Nulls
    这是一个很大的变化,熟悉 RxJava 1.x 的童鞋一定都知道,1.x 是允许我们在发射事件的时候传入 null 值的,但现在我们的 2.x 不支持了,不信你试试? 大大的 NullPointerException 教你做人。这意味着 Observable<Void> 不再发射任何值,而是正常结束或者抛出空指针。

  • 2、Flowable
    在 RxJava 1.x 中关于介绍 backpressure 部分有一个小小的遗憾,那就是没有用一个单独的类,而是使用 Observable 。而在 2.x 中 Observable 不支持背压了,将用一个全新的 Flowable 来支持背压。
    或许对于背压,有些小伙伴们还不是特别理解,这里简单说一下。大概就是指在异步场景中,被观察者发送事件的速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。感兴趣的小伙伴可以模拟这种情况,在差距太大的时候,我们的内存会猛增,直到OOM。而我们的 Flowable 一定意义上可以解决这样的问题,但其实并不能完全解决,这个后面可能会提到。

  • Single/Completable/Maybe
    其实这三者都差不多,Single 顾名思义,只能发送一个事件,和 Observable接受可变参数完全不同。而 Completable 侧重于观察结果,而 Maybe 是上面两种的结合体。也就是说,当你只想要某个事件的结果(true or false)的时候,你可以使用这种观察者模式。

  • 线程调度相关
    这一块基本没什么改动,但细心的小伙伴一定会发现,RxJava 2.x 中已经没有了 Schedulers.immediate() 这个线程环境,还有 Schedulers.test()

  • Function相关
    熟悉 1.x 的小伙伴一定都知道,我们在1.x 中是有 Func1Func2.....FuncN的,但 2.x 中将它们移除,而采用 Function 替换了 Func1,采用 BiFunction 替换了 Func 2..N。并且,它们都增加了 throws Exception,也就是说,妈妈再也不用担心我们做某些操作还需要 try-catch 了。

  • 其他操作符相关
    Func1...N 的变化,现在同样用 ConsumerBiConsumerAction1Action2 进行了替换。后面的 Action 都被替换了,只保留了 ActionN

附录

下面从官方截图展示 2.x 相对 1.x 的改动细节,仅供参考。

rxjava2
rxjava2
rxjava2
rxjava2
rxjava2
rxjava2
rxjava2
rxjava2


002.

Create

create 操作符应该是最常见的操作符了,主要用于产生一个 Obserable 被观察者对象,为了方便大家的认知,以后的教程中统一把被观察者 Observable 称为发射器(上游事件),观察者 Observer 称为接收器(下游事件)。

rxjava2

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                mRxOperatorsText.append("Observable emit 1" + "\n");
                Log.e(TAG, "Observable emit 1" + "\n");
                e.onNext(1);
                mRxOperatorsText.append("Observable emit 2" + "\n");
                Log.e(TAG, "Observable emit 2" + "\n");
                e.onNext(2);
                mRxOperatorsText.append("Observable emit 3" + "\n");
                Log.e(TAG, "Observable emit 3" + "\n");
                e.onNext(3);
                e.onComplete();
                mRxOperatorsText.append("Observable emit 4" + "\n");
                Log.e(TAG, "Observable emit 4" + "\n" );
                e.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            private int i;
            private Disposable mDisposable;

            @Override
            public void onSubscribe(@NonNull Disposable d) {
                mRxOperatorsText.append("onSubscribe : " + d.isDisposed() + "\n");
                Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n" );
                mDisposable = d;
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                mRxOperatorsText.append("onNext : value : " + integer + "\n");
                Log.e(TAG, "onNext : value : " + integer + "\n" );
                i++;
                if (i == 2) {
                    // 在RxJava 2.x 中,新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件
                    mDisposable.dispose();
                    mRxOperatorsText.append("onNext : isDisposable : " + mDisposable.isDisposed() + "\n");
                    Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n");
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {
                mRxOperatorsText.append("onError : value : " + e.getMessage() + "\n");
                Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
            }

            @Override
            public void onComplete() {
                mRxOperatorsText.append("onComplete" + "\n");
                Log.e(TAG, "onComplete" + "\n" );
            }
        });

输出:

rxjava2

需要注意的几点是:

  • 在发射事件中,我们在发射了数值 3 之后,直接调用了 e.onComlete(),虽然无法接收事件,但发送事件还是继续的。

  • 另外一个值得注意的点是,在 RxJava 2.x 中,可以看到发射事件方法相比 1.x 多了一个 throws Excetion,意味着我们做一些特定操作再也不用 try-catch 了。

  • 并且 2.x 中有一个 Disposable 概念,这个东西可以直接调用切断,可以看到,当它的 isDisposed() 返回为 false 的时候,接收器能正常接收事件,但当其为 true 的时候,接收器停止了接收。所以可以通过此参数动态控制接收事件了。

Map

Map 基本算是 RxJava 中一个最简单的操作符了,熟悉 RxJava 1.x 的知道,它的作用是对发射时间发送的每一个事件应用一个函数,是的每一个事件都按照指定的函数去变化,而在 2.x 中它的作用几乎一致。

rxjava2

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(@NonNull Integer integer) throws Exception {
                return "This is result " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                mRxOperatorsText.append("accept : " + s +"\n");
                Log.e(TAG, "accept : " + s +"\n" );
            }
        });

输出:

rxjava2

是的,map 基本作用就是将一个 Observable 通过某种函数关系,转换为另一种 Observable,上面例子中就是把我们的 Integer 数据变成了 String 类型。从Log日志显而易见。

Zip

zip 专用于合并事件,该合并不是连接(连接操作符后面会说),而是两两配对,也就意味着,最终配对出的 Observable 发射事件数目只和少的那个相同。

Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() {
            @Override
            public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
                return s + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                mRxOperatorsText.append("zip : accept : " + s + "\n");
                Log.e(TAG, "zip : accept : " + s + "\n");
            }
        });
private Observable<String> getStringObservable() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext("A");
                    mRxOperatorsText.append("String emit : A \n");
                    Log.e(TAG, "String emit : A \n");
                    e.onNext("B");
                    mRxOperatorsText.append("String emit : B \n");
                    Log.e(TAG, "String emit : B \n");
                    e.onNext("C");
                    mRxOperatorsText.append("String emit : C \n");
                    Log.e(TAG, "String emit : C \n");
                }
            }
        });
    }

    private Observable<Integer> getIntegerObservable() {
        return Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext(1);
                    mRxOperatorsText.append("Integer emit : 1 \n");
                    Log.e(TAG, "Integer emit : 1 \n");
                    e.onNext(2);
                    mRxOperatorsText.append("Integer emit : 2 \n");
                    Log.e(TAG, "Integer emit : 2 \n");
                    e.onNext(3);
                    mRxOperatorsText.append("Integer emit : 3 \n");
                    Log.e(TAG, "Integer emit : 3 \n");
                    e.onNext(4);
                    mRxOperatorsText.append("Integer emit : 4 \n");
                    Log.e(TAG, "Integer emit : 4 \n");
                    e.onNext(5);
                    mRxOperatorsText.append("Integer emit : 5 \n");
                    Log.e(TAG, "Integer emit : 5 \n");
                }
            }
        });
    }

输出:

rxjava2

需要注意的是:

  • zip 组合事件的过程就是分别从发射器 A 和发射器 B 各取出一个事件来组合,并且一个事件只能被使用一次,组合的顺序是严格按照事件发送的顺序来进行的,所以上面截图中,可以看到,1 永远是和 A 结合的,2 永远是和 B 结合的。

  • 最终接收器收到的事件数量是和发送器发送事件最少的那个发送器的发送事件数目相同,所以如截图中,5 很孤单,没有人愿意和它交往,孤独终老的单身狗。

Concat

对于单一的把两个发射器连接成一个发射器,虽然 zip 不能完成,但我们还是可以自力更生,官方提供的 concat 让我们的问题得到了完美解决。

rxjava2

Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("concat : "+ integer + "\n");
                        Log.e(TAG, "concat : "+ integer + "\n" );
                    }
                });

输出:

rxjava2

如图,可以看到。发射器 B 把自己的三个孩子送给了发射器 A,让他们组合成了一个新的发射器,非常懂事的孩子,有条不紊的排序接收。

FlatMap

FlatMap 是一个很有趣的东西,我坚信你在实际开发中会经常用到。它可以把一个发射器 Observable 通过某种方法转换为多个 Observables,然后再把这些分散的 Observables装进一个单一的发射器 Observable。但有个需要注意的是,flatMap 并不能保证事件的顺序,如果需要保证,需要用到我们下面要讲的 ConcatMap

rxjava2
Observable.create(new ObservableOnSubscribe<Integer>() {
           @Override
           public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
               e.onNext(1);
               e.onNext(2);
               e.onNext(3);
           }
       }).flatMap(new Function<Integer, ObservableSource<String>>() {
           @Override
           public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
               List<String> list = new ArrayList<>();
               for (int i = 0; i < 3; i++) {
                   list.add("I am value " + integer);
               }
               int delayTime = (int) (1 + Math.random() * 10);
               return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
           }
       }).subscribeOn(Schedulers.newThread())
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Consumer<String>() {
                   @Override
                   public void accept(@NonNull String s) throws Exception {
                       Log.e(TAG, "flatMap : accept : " + s + "\n");
                       mRxOperatorsText.append("flatMap : accept : " + s + "\n");
                   }
               });

输出:

rxjava2

一切都如我们预期中的有意思,为了区分 concatMap(下一个会讲),我在代码中特意动了一点小手脚,我采用一个随机数,生成一个时间,然后通过 delay(后面会讲)操作符,做一个小延时操作,而查看 Log 日志也确认验证了我们上面的说法,它是无序的。

concatMap

上面其实就说了,concatMapFlatMap 的唯一区别就是 concatMap 保证了顺序,所以,我们就直接把 flatMap 替换为 concatMap 验证吧。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                int delayTime = (int) (1 + Math.random() * 10);
                return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
            }
        }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "flatMap : accept : " + s + "\n");
                        mRxOperatorsText.append("flatMap : accept : " + s + "\n");
                    }
                });

输出:

rxjava2

结果的确和我们预想的一样。

003.

distinct

这个操作符非常的简单、通俗、易懂,就是简单的去重嘛,我甚至都不想贴代码,但人嘛,总得持之以恒。

rxjava2
 Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("distinct : " + integer + "\n");
                        Log.e(TAG, "distinct : " + integer + "\n");
                    }
                });

输出:

rxjava2

Log 日志显而易见,我们在经过 dinstinct() 后接收器接收到的事件只有1,2,3,4,5了。

Filter

信我,Filter 你会很常用的,它的作用也很简单,过滤器嘛。可以接受一个参数,让其过滤掉不符合我们条件的值

rxjava2

Observable.just(1, 20, 65, -5, 7, 19)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer integer) throws Exception {
                        return integer >= 10;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                mRxOperatorsText.append("filter : " + integer + "\n");
                Log.e(TAG, "filter : " + integer + "\n");
            }
        });

输出:

rxjava2

可以看到,我们过滤器舍去了小于 10 的值,所以最好的输出只有 20, 65, 19。

buffer

buffer 操作符接受两个参数,buffer(count,skip),作用是将 Observable 中的数据按 skip (步长) 分成最大不超过 count 的 buffer ,然后生成一个 Observable 。也许你还不太理解,我们可以通过我们的示例图和示例代码来进一步深化它。

rxjava2

Observable.just(1, 2, 3, 4, 5)
                .buffer(3, 2)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(@NonNull List<Integer> integers) throws Exception {
                        mRxOperatorsText.append("buffer size : " + integers.size() + "\n");
                        Log.e(TAG, "buffer size : " + integers.size() + "\n");
                        mRxOperatorsText.append("buffer value : ");
                        Log.e(TAG, "buffer value : " );
                        for (Integer i : integers) {
                            mRxOperatorsText.append(i + "");
                            Log.e(TAG, i + "");
                        }
                        mRxOperatorsText.append("\n");
                        Log.e(TAG, "\n");
                    }
                });

输出:

rxjava2

如图,我们把 1, 2, 3, 4, 5 依次发射出来,经过 buffer 操作符,其中参数 skip 为 2, count 为 3,而我们的输出 依次是 123,345,5。显而易见,我们 buffer 的第一个参数是 count,代表最大取值,在事件足够的时候,一般都是取 count 个值,然后每次跳过 skip 个事件。其实看 Log 日志,我相信大家都明白了。

timer

timer 很有意思,相当于一个定时任务。在 1.x 中它还可以执行间隔逻辑,但在 2.x 中此功能被交给了 interval,下一个会介绍。但需要注意的是,timerinterval 均默认在新线程。

rxjava2

mRxOperatorsText.append("timer start : " + TimeUtil.getNowStrTime() + "\n");
        Log.e(TAG, "timer start : " + TimeUtil.getNowStrTime() + "\n");
        Observable.timer(2, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread()) // timer 默认在新线程,所以需要切换回主线程
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        mRxOperatorsText.append("timer :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
                        Log.e(TAG, "timer :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
                    }
                });

输出:

rxjava2

显而易见,当我们两次点击按钮触发这个事件的时候,接收被延迟了 2 秒。

interval

如同我们上面可说,interval 操作符用于间隔时间执行某个操作,其接受三个参数,分别是第一次发送延迟,间隔时间,时间单位。

rxjava2

mRxOperatorsText.append("interval start : " + TimeUtil.getNowStrTime() + "\n");
       Log.e(TAG, "interval start : " + TimeUtil.getNowStrTime() + "\n");
       Observable.interval(3,2, TimeUnit.SECONDS)
               .subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread()) // 由于interval默认在新线程,所以我们应该切回主线程
               .subscribe(new Consumer<Long>() {
                   @Override
                   public void accept(@NonNull Long aLong) throws Exception {
                       mRxOperatorsText.append("interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
                       Log.e(TAG, "interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
                   }
               });

输出:

rxjava2

如同 Log 日志一样,第一次延迟了 3 秒后接收到,后面每次间隔了 2 秒。
然而,心细的小伙伴可能会发现,由于我们这个是间隔执行,所以当我们的Activity 都销毁的时候,实际上这个操作还依然在进行,所以,我们得花点小心思让我们在不需要它的时候干掉它。查看源码发现,我们subscribe(Cousumer<? super T> onNext)返回的是Disposable,我们可以在这上面做文章。

rxjava2
@Override
   protected void doSomething() {
       mRxOperatorsText.append("interval start : " + TimeUtil.getNowStrTime() + "\n");
       Log.e(TAG, "interval start : " + TimeUtil.getNowStrTime() + "\n");
       mDisposable = Observable.interval(3, 2, TimeUnit.SECONDS)
               .subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread()) // 由于interval默认在新线程,所以我们应该切回主线程
               .subscribe(new Consumer<Long>() {
                   @Override
                   public void accept(@NonNull Long aLong) throws Exception {
                       mRxOperatorsText.append("interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
                       Log.e(TAG, "interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
                   }
               });
   }

   @Override
   protected void onDestroy() {
       super.onDestroy();
       if (mDisposable != null && !mDisposable.isDisposed()) {
           mDisposable.dispose();
       }
   }

哈哈,再次验证,解决了我们的疑惑。

doOnNext

其实觉得 doOnNext 应该不算一个操作符,但考虑到其常用性,我们还是咬咬牙将它放在了这里。它的作用是让订阅者在接收到数据之前干点有意思的事情。假如我们在获取到数据之前想先保存一下它,无疑我们可以这样实现。

Observable.just(1, 2, 3, 4)
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("doOnNext 保存 " + integer + "成功" + "\n");
                        Log.e(TAG, "doOnNext 保存 " + integer + "成功" + "\n");
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                mRxOperatorsText.append("doOnNext :" + integer + "\n");
                Log.e(TAG, "doOnNext :" + integer + "\n");
            }
        });

输出:

rxjava2

skip

skip 很有意思,其实作用就和字面意思一样,接受一个 long 型参数 count ,代表跳过 count 个数目开始接收。

rxjava2

Observable.just(1,2,3,4,5)
                .skip(2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("skip : "+integer + "\n");
                        Log.e(TAG, "skip : "+integer + "\n");
                    }
                });

输出:

rxjava2

take

take,接受一个 long 型参数 count ,代表至多接收 count 个数据。

rxjava2

Flowable.fromArray(1,2,3,4,5)
                .take(2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("take : "+integer + "\n");
                        Log.e(TAG, "accept: take : "+integer + "\n" );
                    }
                });

输出:

rxjava2

just

just,没什么好说的,其实在前面各种例子都说明了,就是一个简单的发射器依次调用 onNext() 方法。

rxjava2

Observable.just("1", "2", "3")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        mRxOperatorsText.append("accept : onNext : " + s + "\n");
                        Log.e(TAG,"accept : onNext : " + s + "\n" );
                    }
                });

输出:

rxjava2


004.

Single

顾名思义,Single 只会接收一个参数,而 SingleObserver 只会调用 onError() 或者 onSuccess()

Single.just(new Random().nextInt())
                .subscribe(new SingleObserver<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onSuccess(@NonNull Integer integer) {
                        mRxOperatorsText.append("single : onSuccess : "+integer+"\n");
                        Log.e(TAG, "single : onSuccess : "+integer+"\n" );
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        mRxOperatorsText.append("single : onError : "+e.getMessage()+"\n");
                        Log.e(TAG, "single : onError : "+e.getMessage()+"\n");
                    }
                });

输出:

rxjava2

distinct

去重操作符,简单的作用就是去重。

rxjava2
Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("distinct : " + integer + "\n");
                        Log.e(TAG, "distinct : " + integer + "\n");
                    }
                });

输出:

rxjava2

很明显,发射器发送的事件,在接收的时候被去重了。

debounce

去除发送频率过快的项,看起来好像没啥用处,但你信我,后面绝对有地方很有用武之地。

rxjava2
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                // send events with simulated time wait
                emitter.onNext(1); // skip
                Thread.sleep(400);
                emitter.onNext(2); // deliver
                Thread.sleep(505);
                emitter.onNext(3); // skip
                Thread.sleep(100);
                emitter.onNext(4); // deliver
                Thread.sleep(605);
                emitter.onNext(5); // deliver
                Thread.sleep(510);
                emitter.onComplete();
            }
        }).debounce(500, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("debounce :" + integer + "\n");
                        Log.e(TAG,"debounce :" + integer + "\n");
                    }
                });

输出:

rxjava2

代码很清晰,去除发送间隔时间小于 500 毫秒的发射事件,所以 1 和 3 被去掉了。

defer

简单地时候就是每次订阅都会创建一个新的 Observable,并且如果没有被订阅,就不会产生新的 Observable

rxjava2

Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
            @Override
            public ObservableSource<Integer> call() throws Exception {
                return Observable.just(1, 2, 3);
            }
        });


        observable.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                mRxOperatorsText.append("defer : " + integer + "\n");
                Log.e(TAG, "defer : " + integer + "\n");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                mRxOperatorsText.append("defer : onError : " + e.getMessage() + "\n");
                Log.e(TAG, "defer : onError : " + e.getMessage() + "\n");
            }

            @Override
            public void onComplete() {
                mRxOperatorsText.append("defer : onComplete\n");
                Log.e(TAG, "defer : onComplete\n");
            }
        });

输出:

rxjava2

last

last 操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。
rxjava2

Observable.just(1, 2, 3)
                .last(4)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("last : " + integer + "\n");
                        Log.e(TAG, "last : " + integer + "\n");
                    }
                });

输出:

rxjava2

merge

merge 顾名思义,熟悉版本控制工具的你一定不会不知道 merge 命令,而在 Rx 操作符中,merge 的作用是把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。

rxjava2

Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("merge :" + integer + "\n");
                        Log.e(TAG, "accept: merge :" + integer + "\n" );
                    }
                });

输出:

rxjava2

reduce

reduce 操作符每次用一个方法处理一个值,可以有一个 seed 作为初始值。

rxjava2

Observable.just(1, 2, 3)
               .reduce(new BiFunction<Integer, Integer, Integer>() {
                   @Override
                   public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
                       return integer + integer2;
                   }
               }).subscribe(new Consumer<Integer>() {
           @Override
           public void accept(@NonNull Integer integer) throws Exception {
               mRxOperatorsText.append("reduce : " + integer + "\n");
               Log.e(TAG, "accept: reduce : " + integer + "\n");
           }
       });

输出:

rxjava2

可以看到,代码中,我们中间采用 reduce ,支持一个 function 为两数值相加,所以应该最后的值是:1 + 2 = 3 + 3 = 6 , 而Log 日志完美解决了我们的问题。

scan

scan 操作符作用和上面的 reduce 一致,唯一区别是 reduce 是个只追求结果的坏人,而 scan 会始终如一地把每一个步骤都输出。

rxjava2

Observable.just(1, 2, 3)
                .scan(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                mRxOperatorsText.append("scan " + integer + "\n");
                Log.e(TAG, "accept: scan " + integer + "\n");
            }
        });

输出:

rxjava2

看日志,没毛病。

window

按照实际划分窗口,将数据发送给不同的 Observable

rxjava2

mRxOperatorsText.append("window\n");
       Log.e(TAG, "window\n");
       Observable.interval(1, TimeUnit.SECONDS) // 间隔一秒发一次
               .take(15) // 最多接收15个
               .window(3, TimeUnit.SECONDS)
               .subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Consumer<Observable<Long>>() {
                   @Override
                   public void accept(@NonNull Observable<Long> longObservable) throws Exception {
                       mRxOperatorsText.append("Sub Divide begin...\n");
                       Log.e(TAG, "Sub Divide begin...\n");
                       longObservable.subscribeOn(Schedulers.io())
                               .observeOn(AndroidSchedulers.mainThread())
                               .subscribe(new Consumer<Long>() {
                                   @Override
                                   public void accept(@NonNull Long aLong) throws Exception {
                                       mRxOperatorsText.append("Next:" + aLong + "\n");
                                       Log.e(TAG, "Next:" + aLong + "\n");
                                   }
                               });
                   }
               });

输出:

rxjava2

005.

简单的网络请求

想必大家都知道,很多时候我们在使用 RxJava 的时候总是和 Retrofit 进行结合使用,而为了方便演示,这里我们就暂且采用 OkHttp3 进行演示,配合 mapdoOnNext ,线程切换进行简单的网络请求:

  • 1)通过 Observable.create() 方法,调用 OkHttp 网络请求;
  • 2)通过 map 操作符集合 gson,将 Response 转换为 bean 类;
  • 3)通过 doOnNext() 方法,解析 bean 中的数据,并进行数据库存储等操作;
  • 4)调度线程,在子线程中进行耗时操作任务,在主线程中更新 UI ;
  • 5)通过 subscribe(),根据请求成功或者失败来更新 UI 。
Observable.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
                Builder builder = new Builder()
                        .url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                        .get();
                Request request = builder.build();
                Call call = new OkHttpClient().newCall(request);
                Response response = call.execute();
                e.onNext(response);
            }
        }).map(new Function<Response, MobileAddress>() {
                    @Override
                    public MobileAddress apply(@NonNull Response response) throws Exception {

                        Log.e(TAG, "map 线程:" + Thread.currentThread().getName() + "\n");
                        if (response.isSuccessful()) {
                            ResponseBody body = response.body();
                            if (body != null) {
                                Log.e(TAG, "map:转换前:" + response.body());
                                return new Gson().fromJson(body.string(), MobileAddress.class);
                            }
                        }
                        return null;
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress s) throws Exception {
                        Log.e(TAG, "doOnNext 线程:" + Thread.currentThread().getName() + "\n");
                        mRxOperatorsText.append("\ndoOnNext 线程:" + Thread.currentThread().getName() + "\n");
                        Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
                        mRxOperatorsText.append("doOnNext: 保存成功:" + s.toString() + "\n");

                    }
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress data) throws Exception {
                        Log.e(TAG, "subscribe 线程:" + Thread.currentThread().getName() + "\n");
                        mRxOperatorsText.append("\nsubscribe 线程:" + Thread.currentThread().getName() + "\n");
                        Log.e(TAG, "成功:" + data.toString() + "\n");
                        mRxOperatorsText.append("成功:" + data.toString() + "\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 线程:" + Thread.currentThread().getName() + "\n");
                        mRxOperatorsText.append("\nsubscribe 线程:" + Thread.currentThread().getName() + "\n");

                        Log.e(TAG, "失败:" + throwable.getMessage() + "\n");
                        mRxOperatorsText.append("失败:" + throwable.getMessage() + "\n");
                    }
                });

为了方便,我们后面的讲解大部分采用开源的 Rx2AndroidNetworking 来处理,数据来源于天狗网等多个公共API接口。

mRxOperatorsText.append("RxNetworkActivity\n");
        Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                .build()
                .getObjectObservable(MobileAddress.class)
                .observeOn(AndroidSchedulers.mainThread()) // 为doOnNext() 指定在主线程,否则报错
                .doOnNext(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress data) throws Exception {
                        Log.e(TAG, "doOnNext:"+Thread.currentThread().getName()+"\n" );
                        mRxOperatorsText.append("\ndoOnNext:"+Thread.currentThread().getName()+"\n" );
                        Log.e(TAG,"doOnNext:"+data.toString()+"\n");
                        mRxOperatorsText.append("doOnNext:"+data.toString()+"\n");
                    }
                })
                .map(new Function<MobileAddress, ResultBean>() {
                    @Override
                    public ResultBean apply(@NonNull MobileAddress mobileAddress) throws Exception {
                        Log.e(TAG, "\n" );
                        mRxOperatorsText.append("\n");
                        Log.e(TAG, "map:"+Thread.currentThread().getName()+"\n" );
                        mRxOperatorsText.append("map:"+Thread.currentThread().getName()+"\n" );
                        return mobileAddress.getResult();
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<ResultBean>() {
                    @Override
                    public void accept(@NonNull ResultBean data) throws Exception {
                        Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName()+"\n" );
                        mRxOperatorsText.append("\nsubscribe 成功:"+Thread.currentThread().getName()+"\n" );
                        Log.e(TAG, "成功:" + data.toString() + "\n");
                        mRxOperatorsText.append("成功:" + data.toString() + "\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 失败:"+Thread.currentThread().getName()+"\n" );
                        mRxOperatorsText.append("\nsubscribe 失败:"+Thread.currentThread().getName()+"\n" );
                        Log.e(TAG, "失败:"+ throwable.getMessage()+"\n" );
                        mRxOperatorsText.append("失败:"+ throwable.getMessage()+"\n");
                    }
                });

先读取缓存,如果缓存没数据再通过网络请求获取数据后更新UI

想必在实际应用中,很多时候(对数据操作不敏感时)都需要我们先读取缓存的数据,如果缓存没有数据,再通过网络请求获取,随后在主线程更新我们的 UI。

concat 操作符简直就是为我们这种需求量身定做。

concat 可以做到不交错的发射两个甚至多个 Observable 的发射事件,并且只有前一个 Observable 终止( onComplete() ) 后才会定义下一个 Observable

利用这个特性,我们就可以先读取缓存数据,倘若获取到的缓存数据不是我们想要的,再调用 onComplete() 以执行获取网络数据的 Observable,如果缓存数据能应我们所需,则直接调用 onNext() ,防止过度的网络请求,浪费用户的流量。

Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {
                Log.e(TAG, "create当前线程:"+Thread.currentThread().getName() );
                FoodList data = CacheManager.getInstance().getFoodListData();

                // 在操作符 concat 中,只有调用 onComplete 之后才会执行下一个 Observable
                if (data != null){ // 如果缓存数据不为空,则直接读取缓存数据,而不读取网络数据
                    isFromNet = false;
                    Log.e(TAG, "\nsubscribe: 读取缓存数据:" );
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            mRxOperatorsText.append("\nsubscribe: 读取缓存数据:\n");
                        }
                    });

                    e.onNext(data);
                }else {
                    isFromNet = true;
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            mRxOperatorsText.append("\nsubscribe: 读取网络数据:\n");
                        }
                    });
                    Log.e(TAG, "\nsubscribe: 读取网络数据:" );
                    e.onComplete();
                }


            }
        });

        Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows",10+"")
                .build()
                .getObjectObservable(FoodList.class);


        // 两个 Observable 的泛型应当保持一致

        Observable.concat(cache,network)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList tngouBeen) throws Exception {
                        Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() );
                        if (isFromNet){
                            mRxOperatorsText.append("accept : 网络获取数据设置缓存: \n");
                            Log.e(TAG, "accept : 网络获取数据设置缓存: \n"+tngouBeen.toString() );
                            CacheManager.getInstance().setFoodListData(tngouBeen);
                        }

                        mRxOperatorsText.append("accept: 读取数据成功:" + tngouBeen.toString()+"\n");
                        Log.e(TAG, "accept: 读取数据成功:" + tngouBeen.toString());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 失败:"+Thread.currentThread().getName() );
                        Log.e(TAG, "accept: 读取数据失败:"+throwable.getMessage() );
                        mRxOperatorsText.append("accept: 读取数据失败:"+throwable.getMessage()+"\n");
                    }
                });

有时候我们的缓存可能还会分为 memory 和 disk ,实际上都差不多,无非是多写点 Observable ,然后通过 concat 合并即可。

多个网络请求依次依赖

想必这种情况也在实际情况中比比皆是,例如用户注册成功后需要自动登录,我们只需要先通过注册接口注册用户信息,注册成功后马上调用登录接口进行自动登录即可。

我们的 flatMap 恰好解决了这种应用场景,flatMap 操作符可以将一个发射数据的 Observable 变换为多个 Observables ,然后将它们发射的数据合并后放到一个单独的 Observable,利用这个特性,我们很轻松地达到了我们的需求。

Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows", 1 + "")
                .build()
                .getObjectObservable(FoodList.class) // 发起获取食品列表的请求,并解析到FootList
                .subscribeOn(Schedulers.io())        // 在io线程进行网络请求
                .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理获取食品列表的请求结果
                .doOnNext(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList foodList) throws Exception {
                        // 先根据获取食品列表的响应结果做一些操作
                        Log.e(TAG, "accept: doOnNext :" + foodList.toString());
                        mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");
                    }
                })
                .observeOn(Schedulers.io()) // 回到 io 线程去处理获取食品详情的请求
                .flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
                    @Override
                    public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
                        if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
                            return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
                                    .addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
                                    .build()
                                    .getObjectObservable(FoodDetail.class);
                        }
                        return null;

                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<FoodDetail>() {
                    @Override
                    public void accept(@NonNull FoodDetail foodDetail) throws Exception {
                        Log.e(TAG, "accept: success :" + foodDetail.toString());
                        mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: error :" + throwable.getMessage());
                        mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");
                    }
                });

结合多个接口的数据更新 UI

在实际应用中,我们极有可能会在一个页面显示的数据来源于多个接口,这时候我们的 zip 操作符为我们排忧解难。

zip 操作符可以将多个 Observable 的数据结合为一个数据源再发射出去。

Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                .build()
                .getObjectObservable(MobileAddress.class);

        Observable<CategoryResult> observable2 = Network.getGankApi()
                .getCategoryData("Android",1,1);

        Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {
            @Override
            public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {
                return "合并后的数据为:手机归属地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "accept: 成功:" + s+"\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: 失败:" + throwable+"\n");
                    }
                });

间隔任务实现心跳

想必即时通讯等需要轮训的任务在如今的 APP 中已是很常见,而 RxJava 2.x 的 interval 操作符可谓完美地解决了我们的疑惑。

这里就简单的意思一下轮训。

private Disposable mDisposable;
    @Override
    protected void doSomething() {
        mDisposable = Flowable.interval(1, TimeUnit.SECONDS)
                .doOnNext(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        Log.e(TAG, "accept: doOnNext : "+aLong );
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        Log.e(TAG, "accept: 设置文本 :"+aLong );
                        mRxOperatorsText.append("accept: 设置文本 :"+aLong +"\n");
                    }
                });
    }

    /**
     * 销毁时停止心跳
     */
    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null){
            mDisposable.dispose();
        }
    }


compose( )操作符

RxJava的另一个好处在于,我们可以清楚地看到数据是如何在一系列操作符之间进行转换的。

Observable.from(someSource)
    .map(new Func1<Data, Data>() {
      @Override public Data call(Data data) {
        return manipulate(data);
      }
    }).subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Data>() {
      @Override public void call(Data data) {
        doSomething(data);
      }
    });

如何将一组操作符重用于多个数据流中呢?例如,因为希望在工作线程中处理数据,然后在主线程中处理结果,所以我会频繁使用subscribeOn()observeOn()。如果我能够通过重用的方式,将这种逻辑运用到我所有的数据流中,将是一件多么伟大的事。

糟糕的实现方式

下面这些代码是我在过去几个月里一直都在使用的,正好可以拿来当反面教材。

首先,使用schedulers(调度)创建一个方法:

<T> Observable<T> applySchedulers(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());
}

然后,封装Observable数据链:

applySchedulers(Observable.from(someSource).map(new Func1<Data, Data>() {
      @Override public Data call(Data data) {
        return manipulate(data);
      }
    })
).subscribe(new Action1<Data>() {
  @Override public void call(Data data) {
    doSomething(data);
  }
});

虽然这样做也能达到目的,但是它看起来不仅丑,还容易让人产生困惑,applySchedulers()到底什么鬼?它不再符合操作符链路式结构,所以,看起来很难理解。然而,我找不到任何办法去格式化这段代码,因此,这并不尴尬。

现在,试想一下,如果在一个数据流中反复使用的话,这个反面教材将会变得要多烂有多烂。(译者注:OMG)

Transformers简介

聪明的同学可能已经意识到了这个问题,但是RxJava早已提供了一种解决方案:Transformer(译者注:有转换器意思),一般情况下可以通过使用操作符Observable.compose()来实现。

Transformer实际上就是一个Func1<Observable<T>, Observable<R>>,换言之就是:可以通过它将一种类型的Observable转换成另一种类型的Observable,和调用一系列的内联操作符是一模一样的。

接下来,我们一起创建一个Transformerschedulers相结合的方法:

 <T> Transformer<T, T> applySchedulers() {
 return new Transformer<T, T>() {
   @Override
   public Observable<T> call(Observable<T> observable) {
     return observable.subscribeOn(Schedulers.io())
         .observeOn(AndroidSchedulers.mainThread());
   }
 };
}

使用Lambda表达式,会变得更加优雅:

<T> Transformer<T, T> applySchedulers() {  
return observable -> observable.subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());
}

更新于2015年3月11日:如果使用 JDK 7或者更早版本进行编译的话,就不得不做出一些额外的改变,为compose()添加泛型。显式声明返回类型,就像这样:

 Observable.from(someSource)
    .map(new Func1<Data, Data>() {
       @Override public Data call(Data data) {
         return manipulate(data);
       }
    })
    .compose(this.<YourType>applySchedulers())
    .subscribe(new Action1<Data>() {
      @Override public void call(Data data) {
        doSomething(data);
      }
    });

重用Transformers

上一个实例中,我在所有的方法调用中都new(译者注:新建)了一个Transformer实例。你可以创建一个实例化版本,节省不必要的实例化对象。毕竟,Transformers关乎着代码重用。

如果,总是将一个具体的类型转换成另一个具体的类型,那么可以很容易的创建一个Transformer实例:

Transformer<String, String> myTransformer = new Transformer<String, String>() {  
   // ...Do your work here...
};

那么scheduler Transformer(调度转换器)应该怎样实现呢?因为,它根本不需要关心类型,所以就无法定义一个泛型实例:

// Doesn't compile; where would T come from?(译者注:编译无法通过;因为根本不知道 T 从何而来。)
Transformer<T, T> myTransformer;  

可能有人会这样定义Transformer<Object, Object>,然而这并没有什么卵用,并且造成的后果就是Observable将失去其类型信息。

为了解决这个问题,我从Collections中得到了一些启发,这个包装类有这样一堆方法,能够创建类型安全并且不可变的空集合(比如,Collections.emptyList())。由于在其内部使用了一个无泛型实例,所以需要封装在一个添加泛型约束的方法里。

可以像下面这样定义我们的schedulers Transformer(调度转换器)实例:

 final Observable.Transformer schedulersTransformer = new  Observable.Transformer() {
 @Override public Object call(Object observable) {
   return ((Observable)  observable).subscribeOn(Schedulers.newThread())
       .observeOn(AndroidSchedulers.mainThread());
  }
};

 <T> Observable.Transformer<T, T> applySchedulers() {
 return (Observable.Transformer<T, T>) schedulersTransformer;
}

现在我们终于把他做成一个“单例”了(译者注:此单例非彼单例)。

警告:如果不做类型转换检查,可能陷入麻烦。确保Transformer真正与类型无关。否则,即使代码通过了编译,在运行时仍然存在抛出ClassCastException异常的隐患。在当前场景中,我们知道它是安全的,因为schedulers(译者注:调度)并不会与发送出的事件产生任何的互动操作。

** flatMap()操作符怎么样?**

现在你可能会好奇,compose()操作符和flatMap()操作符有何区别。他们最终都会发送出Observable<R>,这就意味着,两者都能够用于操作符的重用?

不同点在于compose()操作符拥有更高层次的抽象概念:它操作于整个数据流中,不仅仅是某一个被发送的事件。具体如下:

  1. compose()是唯一一个能够从数据流中得到原始Observable<T>的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()observeOn())需要使用compose()来实现。相较而言,如果在flatMap()中使用subscribeOn()或者observeOn()那么它仅仅对在flatMap()中创建的Observable起作用,而不会对剩下的流产生影响(译者注:深坑,会在后面的系列着重讲解,欢迎关注)。

  2. 当创建Observable流的时候,compose()会立即执行,犹如已经提前写好了一个操作符一样,而flatMap()则是在onNext()被调用后执行,onNext()的每一次调用都会触发flatMap(),也就是说,flatMap()转换每一个事件,而compose()转换的是整个数据流。

  3. 因为每一次调用onNext()后,都不得不新建一个Observable,所以flatMap()的效率较低。事实上,compose()操作符只在主干数据流上执行操作。

如果想重用一些操作符,还是使用compose()吧,虽然flatMap()的用处很多,但作为重用代码这一点来讲,并不适用。


Observable<List<String>> listObservable = Observable.just(getColorList());
listObservable.subscribe(new Observer<List<String>>() { 

    @Override 
    public void onCompleted() { } 

    @Override 
    public void onError(Throwable e) { } 

    @Override
    public void onNext(List<String> colors) {
        mSimpleStringAdapter.setStrings(colors);
    }
});

注意:只有当subscribe()订阅了Observable之后才会执行onNext()方法;如果不再有数据可以发送(我们在Observable.just()中只让Observable发送一个数据),onComplete()方法会被调用。

多使用Observable.fromCallable

使用Observable.fromCallable的好处
- 获取发送数据的代码只会在有Observer订阅之后执行
- 获取数据的代码可以在子线程中执行.

每当Observer订阅Observable时就会生成一个Subscription对象。一个Subscription代表了一个Observer与Observable之间的连接。使用它可以在onDestory方法中解除订阅.

使用Single

Single是Observable的一个精简版,它的回调方法是onSuccess/onError,它使用的是SingleSubscriber来订阅.

Single<List<String>> tvShowSingle = Single.fromCallable(new Callable<List<String>>() { 
    @Override
    public List<String> call() throws Exception {
        mRestClient.getFavoriteTvShows(); 
    }
});

然后订阅一下:

mTvShowSubscription = tvShowSingle
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new SingleSubscriber<List<String>>() {

        @Override 
        public void onSuccess(List<String> tvShows) {
            displayTvShows(tvShows); 
        }

        @Override 
        public void onError(Throwable error) {
            displayErrorMessage(); 
        } 
    });

Subjects

Subject这个对象既是Observable又是Observer,我会把Subject想象成一个管道:从一端把数据注入,结果就会从另一端输出。

mCounterEmitter = PublishSubject.create(); 
mCounterEmitter.subscribe(new Observer<Integer>() {

    @Override
    public void onCompleted() { } 

    @Override
    public void onError(Throwable e) { } 

    @Override
    public void onNext(Integer integer) { 
        mCounterDisplay.setText(String.valueOf(integer));
    } 
});

mIncrementButton.setOnClickListener(new View.OnClickListener() {

    @Override 
    public void onClick(View v) { 
        mCounter++;
        mCounterEmitter.onNext(mCounter);
    }
});

如上,它既可以subcribe()从管道输出 又可以 onNext()向管道输入
注意:如果一个Observer订阅了(subscribe)Observable,那么这个Observable就不能再被其他的Observer订阅. 但是PublishSubject就不存在这种情况.

deboundce与throttleFirst的区别

  • debounce(400, TimeUnit.MILLISECONDS) 当没有数据传入达到400ms之后,才去发送数据
  • throttleFirst(400, TimeUnit.MILLISECONDS) 在每一个400ms内,如果有数据传入就发送.且每个400ms内只发送一次或零次数据.

take(1) 与 first

这两个都代表只发送数据列表的第一个数据. 但是当列表是空的时候如: Observable.just(“”).isEmpty(),这时候take(1)不会crash,而first不会.

subscribeOn 与 observeOn

  • observeOn 指定的是订阅者所在的线程
  • subscribeOn 指定的是被订阅者所在的线程, 但是它可以切换多次.
    如下所示:
 Observable.just("abs").subscribeOn(Schedulers.io())
                //执行在io线程
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {

                        return s + "aaaa";
                    }
                }).subscribeOn(Schedulers.newThread())
                //执行在newThread
                .flatMap(new Func1<String, Observable<MovieEntity>>() {
                    @Override
                    public Observable<MovieEntity> call(String o) {
                        MovieEntity movieEntity = new MovieEntity();
                        movieEntity.name = o;
                        return Observable.just(movieEntity);
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                //执行在activity mainThread
                .subscribe(new Observer<MovieEntity>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(MovieEntity movieEntity) {

                    }
                });


006

onErrorReturn

public final Observable<T> onErrorReturn(Func1<java.lang.Throwable,? extends T> resumeFunction)

默认情况下,Observer订阅的Observable如果发生异常或错误,那么Observable就不再释放item而是调用Observer的onError方法之后终止。现在可以使用onErrorReturn来改变这个行为了。可以看到onErrorReturn接受一个Func1实现,当源Observable正常执行时onErrorReturn就直接把源Observable释放的item释放出去,一旦源Observable发生了错误或异常,替代原先源Observable调用Observer的onError方法,onErrorReturn中那个Func1实现被调用并接受这个错误或异常作为参数,这个Func1实现的返回值将作为onErrorReturn返回的Observable的item释放出去然后调用Observer的onCompleted方法终止。

onErrorResumeNext

public final Observable<T> onErrorResumeNext(Func1<java.lang.Throwable,? extends Observable<? extends T>> resumeFunction)
public final Observable<T> onErrorResumeNext(Observable<? extends T> resumeSequence)

和上面差不多,都是源Observable正常的时候由这个操作符新建的Observable把源Observable释放的item直接释放释放出去。一旦源Observable遇到错误,这个onErrorResumeNext会把源Observable用一个新的Observable替掉,然后这个新的Observable如果没遇到什么问题就会释放item给Observer。你可以直接将一个Observable实例传入onErrorResumeNext作为这个新的Observable,也可以传给onErrorResumeNext一个Func1实现,这个Func1实现接受源Observable的错误作为参数,返回新的Observable。

onExceptionResumeNext

public final Observable<T> onExceptionResumeNext(Observable<? extends T> resumeSequence)

和上面的onErrorResumeNext差不多,不过触发onErrorResumeNext的是源Observable发生了错误(Error),而触发onExceptionResumeNext的是源Observable发生了异常(Exception),如果源Observable发生了错误仍然像默认情况一样调用Observer的onError方法。

retry

public final Observable<T> retry()
public final Observable<T> retry(long count)
public final Observable<T> retry(Func2<java.lang.Integer,java.lang.Throwable,java.lang.Boolean> predicate)

默认retry在trampoline Scheduler上执行。正常情况一下retry新建的Observable直接把源Observable释放的item释放出去。当源Observable遇到错误或异常时,对于那个无参retry()方法,ta会重新订阅源Observable,只要源Observable遇到错误或异常ta都会重新订阅源Observable,没次数限制;对于retry(long count)可以限制重新订阅的次数,超过这个次数限制,直接调用Observer的onError终止。还有个retry接受一个Func2实现作为参数,Func2接受两个参数第一个是到现在为止retry新建的Observable已经重新订阅源Observable的次数,第二参数是导致源Observable要调用Observer的onError的错误或异常实例,Fun2的返回值是个Boolean用来指示retry新建的Observable是否再次重新订阅源Observable,如果不再订阅,直接将把源Observable最后的调用onError通知传给Observer。值得注意的是,retry新建的Observable会把源Observable释放的所有item都释放出去,即使中间某次重新订阅发生失败,失败时的源Observable释放的item在下次成功订阅时也会随着释放出去,比如第一次订阅失败了,在此期间源Observable发出[1,2],但是第二次成功了源Observable释放了[1,2,3,4,5],那么这次retry新建的Observable释放的完整序列是[1,2,1,2,3,4,5]。

retryWhen

public final Observable<T> retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)
public final Observable<T> retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler, Scheduler scheduler)

retryWhen默认在trampoline Scheduler上执行,也可以指定ta的Scheduler。源Observable没出状况时直接镜像ta释放的item。一旦源Observable遇到异常,将把这个异常包装成一个Observable传给retryWhen的参数Func1实现,这个Func1实现返回一个Observable,接下来如果这个Observable正常释放一个item,那么retryWhen新建的Observable重新订阅源Obervable,然后镜像释放源Observable释放的item,如果Func1返回的Observable发生错误或异常,发出调用onError通知,那么就直接把这个调用onError通知传递给Observer然后retry新建的Observable终止。