欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Android之Rxjava2.X 4————Rxjava 组合操作符

程序员文章站 2022-04-02 18:17:38
...

Android之Rxjava2.X 4————Rxjava 创建操作符

一.目录

二.概述

1.作用

创建 被观察者( Observable) 对象 & 发送事件。

2. 类型

Android之Rxjava2.X 4————Rxjava 组合操作符

三.组合多个被观察者

1.concat()/concatArray()

  • 作用:组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
  • 两者区别:组合被观察者的数量,即concat()组合被观察者数量≤4个,而concatArray()则可>4个

原理图:
Android之Rxjava2.X 4————Rxjava 组合操作符

具体使用


        Observable.concat(Observable.just(1, 2, 3),
                Observable.just("z", "x", "c")
        ).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Object value) {

                Log.d(TAG, "onNext: "+value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
    }

Android之Rxjava2.X 4————Rxjava 组合操作符

2.merge()/mergeArray()

  • 作用:组合多个被观察者一起发送数据,合并后 按时间线并行执行
  • merge()/mergeArray()的区别:组合被观察者的数量,即merge()组合被观察者数量≤4个,而mergeArray()则可>4个
  • 和concat()操作符的区别:同样是组合多个被观察者一起发送数据,但concat()操作符合并后是按发送顺序串行执行

原理图:
Android之Rxjava2.X 4————Rxjava 组合操作符
具体使用:


        Observable.concat( Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
                Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS) // 从2开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
        ).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Long value) {

                Log.d(TAG, "onNext: "+value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });

Android之Rxjava2.X 4————Rxjava 组合操作符

3.concatDelayError() / mergeDelayError()

作用:在mergeArray()和concatArray()两个方法中,如果其中一个Observable发送了一个Error事件,那么就会停止发送事件,如果想onError()事件延迟到所有Observable都发送完事件后再执行,就可以使用mergeArrayDelayError()和concatArrayDelayError()

具体使用:无使用concatDelayError()的情况

 Observable.concat(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onError(new NullPointerException()); // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件 emitter.onComplete();
                    }
                }),

                Observable.just(4, 5, 6))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: ");
                    }

                    @Override
                    public void onNext(Integer value) {

                        Log.d(TAG, "onNext: " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: ");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });

Android之Rxjava2.X 4————Rxjava 组合操作符

使用concatDelayError()

Observable.concatArrayDelayError(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onError(new NullPointerException()); // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件 emitter.onComplete();
                    }
                }),

                Observable.just(4, 5, 6))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: ");
                    }

                    @Override
                    public void onNext(Integer value) {

                        Log.d(TAG, "onNext: " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: ");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });

Android之Rxjava2.X 4————Rxjava 组合操作符

四.合并多个事件

1.zip()

  • 作用:用来合并两个Observable发射的事件,根据BiFunction函数生成一个新的值发射出去。当其中一个Observable发送数据结束或者出现异常后,另一个Observable也将停止发送数据。也就是说正常的情况下数据长度会与两个Observable中最少事件的数量一样。
  • 原理图
    Android之Rxjava2.X 4————Rxjava 组合操作符
    Android之Rxjava2.X 4————Rxjava 组合操作符

具体使用:

 Observable.zip(
                Observable.just("a", "b", "c"),
                Observable.just(1, 2, 3),
                new BiFunction<String, Integer, String>() {

                    @Override
                    public String apply(String s, Integer integer) throws Exception {
                        return s + integer;
                    }
                }).subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: ");
                    }

                    @Override
                    public void onNext(String value) {

                        Log.d(TAG, "onNext: " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: ");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });

Android之Rxjava2.X 4————Rxjava 组合操作符

2.combineLatest()

  • 作用:当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据
  • 与Zip()的区别:Zip() = 按个数合并,即1对1合并;CombineLatest() = 按时间合并,即在同一个时间点上合并

原理图:
Android之Rxjava2.X 4————Rxjava 组合操作符
Android之Rxjava2.X 4————Rxjava 组合操作符

具体使用

Observable.combineLatest(
                Observable.just("a", "b", "c"),
                Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2个发送数据事件的Observable:从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
                new BiFunction<String, Long, String>() {

                    @Override
                    public String apply(String s, Long l) throws Exception {
                        return s + l;
                    }
                }).subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: ");
                    }

                    @Override
                    public void onNext(String value) {

                        Log.d(TAG, "onNext: " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: ");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });

Android之Rxjava2.X 4————Rxjava 组合操作符

3.combineLatestDelayError()

  • 作用类似于concatDelayError() / mergeDelayError() ,见上文

4.reduce()

  • 作用:把被观察者需要发送的事件聚合成1个事件 & 发送
  • 聚合的逻辑根据需求撰写,但本质都是前2个数据聚合,然后与后1个数据继续进行聚合,依次类推

具体使用:

  Observable.just(1,2,3,4,5).reduce(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
               Log.d(TAG, "accept: "+integer);
            }
        });

Android之Rxjava2.X 4————Rxjava 组合操作符

5.collect()

  • 作用:将被观察者Observable发送的数据事件收集到一个数据结构里

具体使用

 Observable.just("1","2","3","2")
                .collect(new Callable<List<Integer>>() { //创建数据结构
                    @Override
                    public List<Integer> call() throws Exception {
                        return new ArrayList<Integer>();
                    }
                }, new BiConsumer<List<Integer>, String>() {//收集器
                    @Override
                    public void accept(List<Integer> integers, String s) throws Exception {
                        integers.add(Integer.valueOf(s));
                    }
                }).subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Exception {
                Log.e("---",integers+"");
            }
        });

Android之Rxjava2.X 4————Rxjava 组合操作符

五.发送事件前追加发送事件

1.startWith() / startWithArray()

  • 作用:在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者
    Android之Rxjava2.X 4————Rxjava 组合操作符
    Android之Rxjava2.X 4————Rxjava 组合操作符
    具体使用:
Observable.just(1, 2, 3)
                .startWith(0)  // 追加单个数据 = startWith()
                .startWithArray(1, 2, 3) // 追加多个数据 = startWithArray()
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: ");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "onNext: " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });


        Observable.just(1, 2, 3).
                startWith(Observable.just(4, 5, 6)).
                subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: ");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "onNext: " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });

Android之Rxjava2.X 4————Rxjava 组合操作符

六.统计发送事件的数量

1.count()

  • 作用:统计被观察者发送事件的数量

具体使用:

  Observable.just(1, 2, 3)
                .count()
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "发送的事件数量 =  " + aLong);

                    }
                });

![这里写图片描述](https://img-blog.csdn.net/20180814191208607)

七.参考资料

ReactiveX文档中文翻译
Android Rxjava:这是一篇 清晰 & 易懂的Rxjava 入门教程
Rxjava2入门教程一:函数响应式编程及概述
这可能是最好的RxJava 2.x 教程(完结版)
那些年我们错过的响应式编程

相关标签: rxjava