RxJava操作符(四)——组合操作符
程序员文章站
2022-04-02 18:08:24
...
1、简介:
之前几篇讲解的操作符多是单个被观察者对象发送事件,本篇来介绍下组合操作符的使用,组合操作符的作用:
组合 多个被观察者(Observable) & 合并需要发送的事件
2、类型:
3、操作符介绍
- concat() / concatArray():
- 作用:合并多个被观察者 ,发送的顺序与产生的顺序相同(串行发送)
- 二者联系:concat 使用时最多只能发送4个被观察者对象,concatArray可发送大于4个
- 使用场景:多个被观察者发送事件
- 使用实例:
Observable.concatArray(
Observable.just("1"),
Observable.just("2", "3"),
Observable.just("4", "5", "6"),
Observable.just("7", "8", "9","10"),
Observable.just("A", "B", "C")
).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("concat=====", s);
}
});
输出的结果
05-03 20:04:52.379 6302-6302/? E/concat=====: 1
05-03 20:04:52.379 6302-6302/? E/concat=====: 2
05-03 20:04:52.379 6302-6302/? E/concat=====: 3
05-03 20:04:52.379 6302-6302/? E/concat=====: 4
05-03 20:04:52.379 6302-6302/? E/concat=====: 5
05-03 20:04:52.379 6302-6302/? E/concat=====: 6
05-03 20:04:52.379 6302-6302/? E/concat=====: 7
05-03 20:04:52.379 6302-6302/? E/concat=====: 8
05-03 20:04:52.379 6302-6302/? E/concat=====: 9
05-03 20:04:52.379 6302-6302/? E/concat=====: 10
05-03 20:04:52.379 6302-6302/? E/concat=====: A
05-03 20:04:52.379 6302-6302/? E/concat=====: B
05-03 20:04:52.379 6302-6302/? E/concat=====: C
- merge() / mergeArray()
- 作用:合并多个被观察者(并行发送)
- 二者联系:merge使用时最多只能发送4个被观察者对象,mergeArray可发送大于4个
- 使用场景:并行发送多个被观察者发送事件
- 使用实例:
Observable.mergeArray(
Observable.intervalRange(2,3,1000,1000, TimeUnit.MILLISECONDS),
Observable.intervalRange(5,3,1000,1000, TimeUnit.MILLISECONDS),
Observable.intervalRange(8,3,1000,1000, TimeUnit.MILLISECONDS)
).subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
Log.e("concat=====", s+"");
}
});
输出结果:05-03 20:14:55.558 6948-6974/? E/concat=====: 2
05-03 20:14:55.560 6948-6976/? E/concat=====: 8
05-03 20:14:55.562 6948-6975/? E/concat=====: 5
05-03 20:14:56.558 6948-6974/? E/concat=====: 3
05-03 20:14:56.559 6948-6975/? E/concat=====: 6
05-03 20:14:56.560 6948-6976/? E/concat=====: 9
05-03 20:14:57.558 6948-6974/? E/concat=====: 4
05-03 20:14:57.559 6948-6975/? E/concat=====: 7
05-03 20:14:57.560 6948-6976/? E/concat=====: 10
上面输出的顺序为 2,5,8 ——3,6,9——4,7,10 -
concatDelayError() / mergeDelayError()
Observable.mergeArray(
Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> e) throws Exception {
e.onNext(1L);
e.onNext(2L);
e.onNext(3L);
e.onError(new Exception("Error"));
e.onComplete();
}
}),
Observable.just(4L,5L,6L)
).subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
Log.e("concat=====", s + "");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("throwable=====", throwable.getMessage());
}
});
输出结果:05-03 20:52:22.003 7661-7661/? E/concat=====: 1
05-03 20:52:22.003 7661-7661/? E/concat=====: 2
05-03 20:52:22.003 7661-7661/? E/concat=====: 3
05-03 20:52:22.005 7661-7661/? E/throwable=====: Error
由此可见当其中一个发送异常时,后面的被观察者将不会再发送数据,那么我们如果想让异常延后,是所有的时间都发送完成后在发送异常的话,这时就要用到concatDelayError() / mergeDelayError()方法,将上述代码中使用的mergeArray变换为对应的mergeDelayError(),运行程序输出结果为:
05-03 20:56:37.450 7954-7954/? E/concat=====: 1
05-03 20:56:37.450 7954-7954/? E/concat=====: 2
05-03 20:56:37.450 7954-7954/? E/concat=====: 3
05-03 20:56:37.451 7954-7954/? E/concat=====: 4
05-03 20:56:37.451 7954-7954/? E/concat=====: 5
05-03 20:56:37.451 7954-7954/? E/concat=====: 6
05-03 20:56:37.451 7954-7954/? E/throwable=====: Error
在1——6输出完成后才输出异常信息。- zip()
- 作用:对多个被观察进行某个操作后生成一个新的被观察者序列,然后发送数据,(一一对应操作)
- 原理:对每个观察者之间的数据按照发送的顺序进行合并,最后生成新的被观察者对象进行发送
- 使用场景——需要对个被观察者队形中的数据进行组合
- 使用实例:
- 创建两个发送数据 的被观察者对象
Observable observable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> e) throws Exception {
e.onNext(1L);
Log.e("observable=====","发送"+1);
e.onNext(2L);
Log.e("observable=====","发送"+2);
e.onNext(3L);
Log.e("observable=====","发送"+3);
e.onComplete();
}
});
Observable observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("=A");
Log.e("observable2=====","发送 =A");
e.onNext("=B");
Log.e("observable2=====","发送 =B");
e.onNext("=C");
Log.e("observable2=====","发送 =C");
e.onComplete();
}
});
- 使用Zip()操作符合并发送数据:
Observable.zip(observable, observable2, new BiFunction<Long, String, String>() {
@Override
public String apply(Long aLong, String s) throws Exception {
return aLong +"="+s;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("zip=====",s);
}
});
输出结果:
05-03 21:09:14.206 8411-8411/? E/observable=====: 发送1
05-03 21:09:14.206 8411-8411/? E/observable=====: 发送2
05-03 21:09:14.206 8411-8411/? E/observable=====: 发送3
05-03 21:09:14.207 8411-8411/? E/zip=====: 1==A
05-03 21:09:14.207 8411-8411/? E/observable2=====: 发送 =A
05-03 21:09:14.207 8411-8411/? E/zip=====: 2==B
05-03 21:09:14.208 8411-8411/? E/observable2=====: 发送 =B
05-03 21:09:14.208 8411-8411/? E/zip=====: 3==C
05-03 21:09:14.208 8411-8411/? E/observable2=====: 发送 =C
第一个Observable发送的数据为1,2,3,第二个发送的为A,B,C ,组合后生成成对的数据 1==A、2==B、3==C
注意:
- 事件组合方式 = 严格按照原先事件序列 进行对位合并
- 最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
- 没有配对的数据仍然会发送,只是没有组合输出
我们将上面的1,2,3换成1,2,3,4,看看发送的数据是否会改变
05-03 21:17:42.746 8860-8860/? E/observable=====: 发送1
05-03 21:17:42.746 8860-8860/? E/observable=====: 发送2
05-03 21:17:42.746 8860-8860/? E/observable=====: 发送3
05-03 21:17:42.746 8860-8860/? E/observable=====: 发送4
05-03 21:17:42.747 8860-8860/? E/zip=====: 1==A
05-03 21:17:42.747 8860-8860/? E/observable2=====: 发送 =A
05-03 21:17:42.748 8860-8860/? E/zip=====: 2==B
05-03 21:17:42.748 8860-8860/? E/observable2=====: 发送 =B
05-03 21:17:42.749 8860-8860/? E/zip=====: 3==C
05-03 21:17:42.749 8860-8860/? E/observable2=====: 发送 =C
上述的4虽然没有与之相应的字母配对,但数据仍是会发送,只是没有被打印
-
combineLatest()
1、作用:当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据
2、原理:第一个Observables 最后发送的数据将被保存下来,与后面Observables 发送的数据进行组合
3、与zip的比较:zip是针对发送事件进行1对1组合,而combineLatest是将最后的数据和之后的事件组合,从发送的顺序和时间上组合
4、使用实例:
Observable observable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> e) throws Exception {
e.onNext(1L);
Log.e("observable=====", "发送" + 1);
e.onNext(2L);
Log.e("observable=====", "发送" + 2);
e.onNext(3L);
Log.e("observable=====", "发送" + 3);
e.onNext(4L);
Log.e("observable=====", "发送" + 4);
e.onComplete();
}
});
Observable.combineLatest(observable,
Observable.just(6, 7, 8)
, new BiFunction<Long, Integer, Long>() {
@Override
public Long apply(Long aLong, Integer integer) throws Exception {
return aLong + integer;
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long o) throws Exception {
Log.e("combineLatest=====", o + "");
}
});
输出结果:
05-03 21:30:32.338 9218-9218/? E/observable=====: 发送1
05-03 21:30:32.338 9218-9218/? E/observable=====: 发送2
05-03 21:30:32.338 9218-9218/? E/observable=====: 发送3
05-03 21:30:32.338 9218-9218/? E/observable=====: 发送4
05-03 21:30:32.338 9218-9218/? E/combineLatest=====: 10
05-03 21:30:32.339 9218-9218/? E/combineLatest=====: 11
05-03 21:30:32.339 9218-9218/? E/combineLatest=====: 12
在第一个Observable发送的最后数字为4,然后4依次和后面发送的6,7,8进行相加,最终输出。
-
combineLatestDelayError()
作用类似于concatDelayError() / mergeDelayError() ,即错误处理,此处不作过多描述
- reduce()
- 作用:将要发送的一组数据按照相应的规则聚合为一个数据,一起发送
- 原理:将前两个数据组合后依次和后面的数据进行组合
- 使用场景——需要对数据进行整合
- 使用实例:数据的累加
Observable.just(1,2,3,4)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("reduce=====", integer+"");
}
});
输出结果:
05-03 21:39:05.823 9565-9565/? E/reduce=====: 10
-
collect
- 作用:将发送的数据装换为一个集合,最后发送
- 使用示例:
Observable.just(1, 2, 3 ,4, 5, 6)
.collect(
// 1. 创建数据结构(容器),用于收集被观察者发送的数据
new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
// 2. 对发送的数据进行收集
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> list, Integer integer)
throws Exception {
// 参数说明:list = 容器,integer = 后者数据
list.add(integer);
// 对发送的数据进行收集
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(@NonNull ArrayList<Integer> s) throws Exception {
Log.e(TAG, "本次发送的数据是: "+s);
}
});
- startWith /starWithArray()
- 作用:在发送之前追加发送事件
- 二者比较:startWith 追加单个数据 或 被观察者对象,starWithArray 追加多个数据
- 使用示例:
Observable.just(4, 5, 6)
.startWith(0) // 追加单个数据 = startWith()
.startWithArray(1, 2, 3) // 追加多个数据 = startWithArray()
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
注意:输出的顺序是按照 后调用先追加 ,按照追加的顺序 倒序输出,最后才输出原始发送的事件,例如上面会先发送
startWithArray(1, 2, 3) 的数据,然后是.startWith(0) 最后才是Observable发送的4,5,6.
要实现追加观察者对象,只需把上面发送的数据换成 被观察者对象,
Observable.just(4, 5, 6)
.startWith(Observable.just(1, 2, 3))
- count()
- 作用:获取发送事件的个数
- 使用场景:需要统计发送事件的数量
- 使用示例:
Observable.just(1, 2, 3, 4)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "发送的事件数量 = "+aLong);
}
});
本篇的介绍就到这里,本篇介绍的操作符比较多,但组合使用可以解决我们实际开发的很多问题,关于本篇操作符的使用将在下一篇单独介绍。上一篇: vuex基本熟悉与使用