欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava操作符(四)——组合操作符

程序员文章站 2022-04-02 18:08:24
...

 1、简介:

 之前几篇讲解的操作符多是单个被观察者对象发送事件,本篇来介绍下组合操作符的使用,组合操作符的作用:

组合 多个被观察者(Observable) & 合并需要发送的事件

 2、类型:

RxJava操作符(四)——组合操作符

3、操作符介绍

  • concat() / concatArray()
  1. 作用:合并多个被观察者 ,发送的顺序与产生的顺序相同(串行发送)
  2. 二者联系:concat 使用时最多只能发送4个被观察者对象,concatArray可发送大于4个
  3. 使用场景:多个被观察者发送事件
  4. 使用实例:
Observable.concatArray(
                Observable.just("1"),
                Observable.just("2", "3"),
                Observable.just("4", "5", "6"),
                Observable.just("7", "8", "9","10"),
                Observable.just("A", "B", "C")

        ).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e("concat=====", s);
            }
        });

输出的结果

05-03 20:04:52.379 6302-6302/? E/concat=====: 1
05-03 20:04:52.379 6302-6302/? E/concat=====: 2
05-03 20:04:52.379 6302-6302/? E/concat=====: 3
05-03 20:04:52.379 6302-6302/? E/concat=====: 4
05-03 20:04:52.379 6302-6302/? E/concat=====: 5
05-03 20:04:52.379 6302-6302/? E/concat=====: 6
05-03 20:04:52.379 6302-6302/? E/concat=====: 7
05-03 20:04:52.379 6302-6302/? E/concat=====: 8
05-03 20:04:52.379 6302-6302/? E/concat=====: 9
05-03 20:04:52.379 6302-6302/? E/concat=====: 10
05-03 20:04:52.379 6302-6302/? E/concat=====: A
05-03 20:04:52.379 6302-6302/? E/concat=====: B
05-03 20:04:52.379 6302-6302/? E/concat=====: C
  • merge() / mergeArray()
  1. 作用:合并多个被观察者(并行发送)
  2. 二者联系:merge使用时最多只能发送4个被观察者对象,mergeArray可发送大于4个
  3. 使用场景:并行发送多个被观察者发送事件
  4. 使用实例:
Observable.mergeArray(
                Observable.intervalRange(2,3,1000,1000, TimeUnit.MILLISECONDS),
                Observable.intervalRange(5,3,1000,1000, TimeUnit.MILLISECONDS),
                Observable.intervalRange(8,3,1000,1000, TimeUnit.MILLISECONDS)
        ).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long s) throws Exception {
                Log.e("concat=====", s+"");
            }
        });
输出结果:
05-03 20:14:55.558 6948-6974/? E/concat=====: 2
05-03 20:14:55.560 6948-6976/? E/concat=====: 8
05-03 20:14:55.562 6948-6975/? E/concat=====: 5
05-03 20:14:56.558 6948-6974/? E/concat=====: 3
05-03 20:14:56.559 6948-6975/? E/concat=====: 6
05-03 20:14:56.560 6948-6976/? E/concat=====: 9
05-03 20:14:57.558 6948-6974/? E/concat=====: 4
05-03 20:14:57.559 6948-6975/? E/concat=====: 7
05-03 20:14:57.560 6948-6976/? E/concat=====: 10
上面输出的顺序为 2,5,8 ——3,6,9——4,7,10 
  • concatDelayError() / mergeDelayError()
 我们来看一种情况,在使用  concat() / concatArray()、   merge() / mergeArray()时,当其中一个 出现Exception时后面的Observale如何处理呢?下面我们尝试发送异常:
 
Observable.mergeArray(
                Observable.create(new ObservableOnSubscribe<Long>() {
                    @Override
                    public void subscribe(ObservableEmitter<Long> e) throws Exception {
                        e.onNext(1L);
                        e.onNext(2L);
                        e.onNext(3L);
                        e.onError(new Exception("Error"));
                        e.onComplete();
                    }
                }),
                Observable.just(4L,5L,6L)
        ).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long s) throws Exception {
                Log.e("concat=====", s + "");
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e("throwable=====", throwable.getMessage());
            }
        });
输出结果:
05-03 20:52:22.003 7661-7661/? E/concat=====: 1
05-03 20:52:22.003 7661-7661/? E/concat=====: 2
05-03 20:52:22.003 7661-7661/? E/concat=====: 3
05-03 20:52:22.005 7661-7661/? E/throwable=====: Error
由此可见当其中一个发送异常时,后面的被观察者将不会再发送数据,那么我们如果想让异常延后,是所有的时间都发送完成后在发送异常的话,这时就要用到concatDelayError() / mergeDelayError()方法,将上述代码中使用的mergeArray变换为对应的
mergeDelayError(),运行程序输出结果为:
05-03 20:56:37.450 7954-7954/? E/concat=====: 1
05-03 20:56:37.450 7954-7954/? E/concat=====: 2
05-03 20:56:37.450 7954-7954/? E/concat=====: 3
05-03 20:56:37.451 7954-7954/? E/concat=====: 4
05-03 20:56:37.451 7954-7954/? E/concat=====: 5
05-03 20:56:37.451 7954-7954/? E/concat=====: 6
05-03 20:56:37.451 7954-7954/? E/throwable=====: Error
在1——6输出完成后才输出异常信息。
  • zip() 

  1. 作用:对多个被观察进行某个操作后生成一个新的被观察者序列,然后发送数据,(一一对应操作)
  2. 原理:对每个观察者之间的数据按照发送的顺序进行合并,最后生成新的被观察者对象进行发送
  3. 使用场景——需要对个被观察者队形中的数据进行组合
  4. 使用实例:
  • 创建两个发送数据 的被观察者对象                                                                   
  Observable observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(ObservableEmitter<Long> e) throws Exception {
                e.onNext(1L);
                Log.e("observable=====","发送"+1);
                e.onNext(2L);
                Log.e("observable=====","发送"+2);
                e.onNext(3L);
                Log.e("observable=====","发送"+3);
                e.onComplete();
            }
        });
        Observable observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("=A");
                Log.e("observable2=====","发送 =A");
                e.onNext("=B");
                Log.e("observable2=====","发送 =B");
                e.onNext("=C");
                Log.e("observable2=====","发送 =C");
                e.onComplete();
            }
        });
  • 使用Zip()操作符合并发送数据:
Observable.zip(observable, observable2, new BiFunction<Long, String, String>() {
            @Override
            public String apply(Long aLong, String s) throws Exception {
                return aLong +"="+s;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e("zip=====",s);
            }
        });

输出结果:

05-03 21:09:14.206 8411-8411/? E/observable=====: 发送1
05-03 21:09:14.206 8411-8411/? E/observable=====: 发送2
05-03 21:09:14.206 8411-8411/? E/observable=====: 发送3
05-03 21:09:14.207 8411-8411/? E/zip=====: 1==A
05-03 21:09:14.207 8411-8411/? E/observable2=====: 发送 =A
05-03 21:09:14.207 8411-8411/? E/zip=====: 2==B
05-03 21:09:14.208 8411-8411/? E/observable2=====: 发送 =B
05-03 21:09:14.208 8411-8411/? E/zip=====: 3==C
05-03 21:09:14.208 8411-8411/? E/observable2=====: 发送 =C

第一个Observable发送的数据为1,2,3,第二个发送的为A,B,C ,组合后生成成对的数据 1==A、2==B、3==C

注意:

  • 事件组合方式 = 严格按照原先事件序列 进行对位合并
  • 最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
  • 没有配对的数据仍然会发送,只是没有组合输出

我们将上面的1,2,3换成1,2,3,4,看看发送的数据是否会改变

05-03 21:17:42.746 8860-8860/? E/observable=====: 发送1
05-03 21:17:42.746 8860-8860/? E/observable=====: 发送2
05-03 21:17:42.746 8860-8860/? E/observable=====: 发送3
05-03 21:17:42.746 8860-8860/? E/observable=====: 发送4
05-03 21:17:42.747 8860-8860/? E/zip=====: 1==A
05-03 21:17:42.747 8860-8860/? E/observable2=====: 发送 =A
05-03 21:17:42.748 8860-8860/? E/zip=====: 2==B
05-03 21:17:42.748 8860-8860/? E/observable2=====: 发送 =B
05-03 21:17:42.749 8860-8860/? E/zip=====: 3==C
05-03 21:17:42.749 8860-8860/? E/observable2=====: 发送 =C

上述的4虽然没有与之相应的字母配对,但数据仍是会发送,只是没有被打印

  • combineLatest()

1、作用:当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据

2、原理:第一个Observables 最后发送的数据将被保存下来,与后面Observables 发送的数据进行组合

3、与zip的比较:zip是针对发送事件进行1对1组合,而combineLatest是将最后的数据和之后的事件组合,从发送的顺序和时间上组合

4、使用实例:

 Observable observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(ObservableEmitter<Long> e) throws Exception {
                e.onNext(1L);
                Log.e("observable=====", "发送" + 1);
                e.onNext(2L);
                Log.e("observable=====", "发送" + 2);
                e.onNext(3L);
                Log.e("observable=====", "发送" + 3);
                e.onNext(4L);
                Log.e("observable=====", "发送" + 4);
                e.onComplete();
            }
        });

        Observable.combineLatest(observable,
                Observable.just(6, 7, 8)
                , new BiFunction<Long, Integer, Long>() {
                    @Override
                    public Long apply(Long aLong, Integer integer) throws Exception {
                        return aLong + integer;
                    }
                }).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long o) throws Exception {
                Log.e("combineLatest=====", o + "");
            }
        });

输出结果:

05-03 21:30:32.338 9218-9218/? E/observable=====: 发送1
05-03 21:30:32.338 9218-9218/? E/observable=====: 发送2
05-03 21:30:32.338 9218-9218/? E/observable=====: 发送3
05-03 21:30:32.338 9218-9218/? E/observable=====: 发送4
05-03 21:30:32.338 9218-9218/? E/combineLatest=====: 10
05-03 21:30:32.339 9218-9218/? E/combineLatest=====: 11
05-03 21:30:32.339 9218-9218/? E/combineLatest=====: 12

在第一个Observable发送的最后数字为4,然后4依次和后面发送的6,7,8进行相加,最终输出。

  • combineLatestDelayError()
    作用类似于concatDelayError() / mergeDelayError() ,即错误处理,此处不作过多描述
  • reduce()
  1. 作用:将要发送的一组数据按照相应的规则聚合为一个数据,一起发送
  2. 原理:将前两个数据组合后依次和后面的数据进行组合
  3. 使用场景——需要对数据进行整合
  4. 使用实例:数据的累加
Observable.just(1,2,3,4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("reduce=====", integer+"");
            }
        });

输出结果:

05-03 21:39:05.823 9565-9565/? E/reduce=====: 10
  • collect

  1. 作用:将发送的数据装换为一个集合,最后发送
  2. 使用示例:
Observable.just(1, 2, 3 ,4, 5, 6)
                .collect(
                        // 1. 创建数据结构(容器),用于收集被观察者发送的数据
                        new Callable<ArrayList<Integer>>() {
                            @Override
                            public ArrayList<Integer> call() throws Exception {
                                return new ArrayList<>();
                            }
                            // 2. 对发送的数据进行收集
                        }, new BiConsumer<ArrayList<Integer>, Integer>() {
                            @Override
                            public void accept(ArrayList<Integer> list, Integer integer)
                                    throws Exception {
                                // 参数说明:list = 容器,integer = 后者数据
                                list.add(integer);
                                // 对发送的数据进行收集
                            }
                        }).subscribe(new Consumer<ArrayList<Integer>>() {
            @Override
            public void accept(@NonNull ArrayList<Integer> s) throws Exception {
                Log.e(TAG, "本次发送的数据是: "+s);

            }
        });
  • startWith /starWithArray()

  1. 作用:在发送之前追加发送事件
  2. 二者比较:startWith 追加单个数据 或 被观察者对象,starWithArray 追加多个数据
  3. 使用示例:
 Observable.just(4, 5, 6)
                  .startWith(0)  // 追加单个数据 = startWith()
                  .startWithArray(1, 2, 3) // 追加多个数据 = startWithArray()
                  .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

注意:输出的顺序是按照 后调用先追加 ,按照追加的顺序 倒序输出,最后才输出原始发送的事件,例如上面会先发送

startWithArray(1, 2, 3) 的数据,然后是.startWith(0) 最后才是Observable发送的4,5,6.

要实现追加观察者对象,只需把上面发送的数据换成 被观察者对象,

 Observable.just(4, 5, 6)
                .startWith(Observable.just(1, 2, 3))
  • count() 

  1. 作用:获取发送事件的个数
  2. 使用场景:需要统计发送事件的数量
  3. 使用示例:
        Observable.just(1, 2, 3, 4)
                  .count()
                  .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "发送的事件数量 =  "+aLong);
                    }
                });
本篇的介绍就到这里,本篇介绍的操作符比较多,但组合使用可以解决我们实际开发的很多问题,关于本篇操作符的使用将在下一篇单独介绍。

相关标签: RxJava