Android之Rxjava2.X 4————Rxjava 组合操作符
程序员文章站
2022-04-02 18:17:38
...
Android之Rxjava2.X 4————Rxjava 创建操作符
一.目录
二.概述
1.作用
创建 被观察者( Observable) 对象 & 发送事件。
2. 类型
三.组合多个被观察者
1.concat()/concatArray()
- 作用:组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
- 两者区别:组合被观察者的数量,即concat()组合被观察者数量≤4个,而concatArray()则可>4个
原理图:
具体使用
Observable.concat(Observable.just(1, 2, 3),
Observable.just("z", "x", "c")
).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Object value) {
Log.d(TAG, "onNext: "+value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
2.merge()/mergeArray()
- 作用:组合多个被观察者一起发送数据,合并后 按时间线并行执行
- merge()/mergeArray()的区别:组合被观察者的数量,即merge()组合被观察者数量≤4个,而mergeArray()则可>4个
- 和concat()操作符的区别:同样是组合多个被观察者一起发送数据,但concat()操作符合并后是按发送顺序串行执行
原理图:
具体使用:
Observable.concat( Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS) // 从2开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Long value) {
Log.d(TAG, "onNext: "+value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
3.concatDelayError() / mergeDelayError()
作用:在mergeArray()和concatArray()两个方法中,如果其中一个Observable发送了一个Error事件,那么就会停止发送事件,如果想onError()事件延迟到所有Observable都发送完事件后再执行,就可以使用mergeArrayDelayError()和concatArrayDelayError()
具体使用:无使用concatDelayError()的情况
Observable.concat(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new NullPointerException()); // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件 emitter.onComplete();
}
}),
Observable.just(4, 5, 6))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
使用concatDelayError()
Observable.concatArrayDelayError(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new NullPointerException()); // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件 emitter.onComplete();
}
}),
Observable.just(4, 5, 6))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
四.合并多个事件
1.zip()
- 作用:用来合并两个Observable发射的事件,根据BiFunction函数生成一个新的值发射出去。当其中一个Observable发送数据结束或者出现异常后,另一个Observable也将停止发送数据。也就是说正常的情况下数据长度会与两个Observable中最少事件的数量一样。
- 原理图
具体使用:
Observable.zip(
Observable.just("a", "b", "c"),
Observable.just(1, 2, 3),
new BiFunction<String, Integer, String>() {
@Override
public String apply(String s, Integer integer) throws Exception {
return s + integer;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
2.combineLatest()
- 作用:当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据
- 与Zip()的区别:Zip() = 按个数合并,即1对1合并;CombineLatest() = 按时间合并,即在同一个时间点上合并
原理图:
具体使用
Observable.combineLatest(
Observable.just("a", "b", "c"),
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2个发送数据事件的Observable:从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
new BiFunction<String, Long, String>() {
@Override
public String apply(String s, Long l) throws Exception {
return s + l;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
3.combineLatestDelayError()
- 作用类似于concatDelayError() / mergeDelayError() ,见上文
4.reduce()
- 作用:把被观察者需要发送的事件聚合成1个事件 & 发送
- 聚合的逻辑根据需求撰写,但本质都是前2个数据聚合,然后与后1个数据继续进行聚合,依次类推
具体使用:
Observable.just(1,2,3,4,5).reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: "+integer);
}
});
5.collect()
- 作用:将被观察者Observable发送的数据事件收集到一个数据结构里
具体使用
Observable.just("1","2","3","2")
.collect(new Callable<List<Integer>>() { //创建数据结构
@Override
public List<Integer> call() throws Exception {
return new ArrayList<Integer>();
}
}, new BiConsumer<List<Integer>, String>() {//收集器
@Override
public void accept(List<Integer> integers, String s) throws Exception {
integers.add(Integer.valueOf(s));
}
}).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.e("---",integers+"");
}
});
五.发送事件前追加发送事件
1.startWith() / startWithArray()
- 作用:在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者
具体使用:
Observable.just(1, 2, 3)
.startWith(0) // 追加单个数据 = startWith()
.startWithArray(1, 2, 3) // 追加多个数据 = startWithArray()
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
Observable.just(1, 2, 3).
startWith(Observable.just(4, 5, 6)).
subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
六.统计发送事件的数量
1.count()
- 作用:统计被观察者发送事件的数量
具体使用:
Observable.just(1, 2, 3)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "发送的事件数量 = " + aLong);
}
});
![这里写图片描述](https://img-blog.csdn.net/20180814191208607)
七.参考资料
ReactiveX文档中文翻译
Android Rxjava:这是一篇 清晰 & 易懂的Rxjava 入门教程
Rxjava2入门教程一:函数响应式编程及概述
这可能是最好的RxJava 2.x 教程(完结版)
那些年我们错过的响应式编程