RxJava操作符(六)——功能操作符
程序员文章站
2024-02-28 12:59:10
...
一、功能操作符:辅助被观察者(Observable) 在发送事件时实现一些功能性需求
二、功能操作符按照使用功能,大致分类:
- 订阅:subscribe()
- 线程调度:subscribeOn()、observeOn()
- 延迟:delay()
- do操作:do()
- 错误处理 :onErrorReturn() 、onErrorResumeNext() 、onExceptionResumeNext()、retry()、retryUntil() 、retryWhen()
- 重复操作符:repeat() 、repeatWhen()
三、使用详解
- subscribe()
- 作用:订阅观察者和被观察者
- 使用示例
//创建被观察者对象
Observable observable = Observable.just(1, 2, 3);
//创建观察者对象
Consumer<Integer> integerConsumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
};
//订阅
observable.subscribe(integerConsumer);
完成订阅后,当被观察者发送事件时,观察者会执行其实现的方法。
- subscribeOn()/observeOn()
- 作用:切换执行线程
- 介绍:
- 在 RxJava模型中,被观察者 (Observable) / 观察者(Observer)的工作线程 = 创建自身的线程
- 创建被观察者 (Observable) / 观察者(Observer)的线程 = 主线程,所以整个过程都会执行在主线程
- 所以生产事件 / 接收& 响应事件都发生在主线程,而我们有些耗时操作或网络请求是不需要阻塞线程的,所以需要在其他线程执行操作后,此时就用到subscribeOn()
- 当我们在其他线程操作数据后需要更新UI时,此时需要切换到主线程使用 observeOn()
3、线程类型:在 RxJava
中,内置了多种用于调度的线程类型
类型 | 含义 | 应用场景 |
---|---|---|
Schedulers.immediate() | 当前线程 = 不指定线程 | 默认 |
AndroidSchedulers.mainThread() | Android主线程 | 操作UI |
Schedulers.newThread() | 常规新线程 | 耗时等操作 |
Schedulers.io() | io操作线程 | 网络请求、读写文件等io密集型操作 |
Schedulers.computation() | CPU计算操作线程 | 大量计算操作 |
- 注:
RxJava
内部使用 线程池 来维护这些线程,所以线程的调度效率非常高。
4、使用示例:
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
5、使用注意:- subscribeOn()针对的是目标过程是调用此方法之前的过程,observeOn()操作目标为观察者对象或其后的执行过程
- 对于subscribeOn()多次调用只有第一次起作用,observeOn()每次调用都会起作用
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
Log.e("Thread===",Thread.currentThread().getName());
e.onNext(1);
}
}).subscribeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.e("thread===",Thread.currentThread().getName());
}
});
输出结果:
05-06 06:04:00.454 4814-4814/com.example.administrator.googleplay E/Thread===: main
05-06 06:04:00.547 4814-4814/com.example.administrator.googleplay E/thread===: main
上述过程,制定了两次订阅的线程分别为主线程和创建线程,从输出结果上看被观察者是在main线程中发送的事件,后面的newThread并未起作用,下面看看observeOn(): Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext(1);
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.doOnNext(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.e("doOnNext Thread===",Thread.currentThread().getName());
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.e("subscribe Thread===",Thread.currentThread().getName());
}
});
上面使用了两次observeOn切换线程,如果两者都起作用,那第一个应该创建新线程,第二打印的应该是主线程,看看输出结果:05-06 06:10:56.608 5303-5330/com.example.administrator.googleplay E/doOnNext Thread===: RxNewThreadScheduler-1
05-06 06:10:56.624 5303-5303/com.example.administrator.googleplay E/subscribe Thread===: main
输出结果与分析的一致,所以验证上面的结论:对于subscribeOn()多次调用只有第一次起作用,observeOn()每次调用都会起作用
-
delay()
- 作用:延迟操作符可以延迟一段时间发送事件
- delay()的重载
// 1. 指定延迟时间
// 参数1 = 时间;参数2 = 时间单位
delay(long delay,TimeUnit unit)
// 2. 指定延迟时间 & 调度器
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器
delay(long delay,TimeUnit unit,mScheduler scheduler)
// 3. 指定延迟时间 & 错误延迟
// 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常
// 参数1 = 时间;参数2 = 时间单位;参数3 = 错误延迟参数
delay(long delay,TimeUnit unit,boolean delayError)
// 4. 指定延迟时间 & 调度器 & 错误延迟
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器;参数4 = 错误延迟参数
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度
3、使用示例
Observable.just(1,2,3)
.delay(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
- do()操作
- 作用 :在某个事件的生命周期中调用
- 类型 :do()操作符有很多个,具体如下
3、使用示例
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Throwable("发生错误了"));
}
})
// 1. 当Observable每发送1次数据事件就会调用1次
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
Log.d(TAG, "doOnEach: " + integerNotification.getValue());
}
})
// 2. 执行Next事件前调用
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "doOnNext: " + integer);
}
})
// 3. 执行Next事件后调用
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "doAfterNext: " + integer);
}
})
// 4. Observable正常发送事件完毕后调用
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnComplete: ");
}
})
// 5. Observable发送错误事件时调用
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "doOnError: " + throwable.getMessage());
}
})
// 6. 观察者订阅时调用
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe: ");
}
})
// 7. Observable发送事件完毕后调用,无论正常发送完毕 / 异常终止
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doAfterTerminate: ");
}
})
// 8. 最后执行
.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doFinally: ");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
输出结果:
错误处理符
- onErrorReturn()
- 作用: 当发生一个错误时会返回一个数据,并且立即停止操作
- 使用示例:
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Throwable("发送异常!"));
}
}).onErrorReturn(new Function<Throwable,Integer>() {
@Override
public Integer apply(Throwable o) throws Exception {
Log.e("=====",o.getMessage());
return 110;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.e("=====",o + "");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("=====",throwable.getMessage());
}
});
输出结果:
05-06 06:43:17.498 6446-6470/com.example.administrator.googleplay E/=====: 发送异常!
05-06 06:43:17.513 6446-6446/com.example.administrator.googleplay E/=====: 1
05-06 06:43:17.513 6446-6446/com.example.administrator.googleplay E/=====: 2
05-06 06:43:17.513 6446-6446/com.example.administrator.googleplay E/=====: 3
05-06 06:43:17.513 6446-6446/com.example.administrator.googleplay E/=====: 110
在发送异常事件后首先执行了onErrorReturn输出异常信息,然后修改并返回一个数据,此数据作为一个新的事件发送,从而回掉onNext()方法。
-
onErrorResumeNext()
- 作用:当发生一个错误时会返回一个新的被观察着对象
- 使用示例:使用方法与onErrorReturn() 相同,只是将发送的数据换成了一个Onservable对象
.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
@Override
public ObservableSource apply(Throwable throwable) throws Exception {
return Observable.just("A","B");
}
}
-
onExceptionResumeNext()
- 作用:当发生一个异常时返回一个新的被观察者
- 使用示例同上
注意:
- onExceptionResumeNext()拦截的错误 = Exception;若需拦截Throwable请用onErrorResumeNext()
- 若onExceptionResumeNext()拦截的错误 = Throwable,则会将错误传递给观察者的onError方法
-
retry()
- 作用:当发生异常时被观察者会重新发送事件
- 类型:
<-- 1. retry() -->
// 作用:出现错误时,让被观察者重新发送数据
// 注:若一直错误,则一直重新发送
<-- 2. retry(long time) -->
// 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制
// 参数 = 重试次数
<-- 3. retry(Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)
// 参数 = 判断逻辑 //返回false = 不重新重新发送数据 & 调用观察者的onError()结束 //返回true = 重新发送请求(最多重新发送3次)
<-- 4. retry(new BiPredicate<Integer, Throwable>) -->// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试// 参数 = 判断逻辑(传入当前重试次数 & 异常错误信息)//返回false = 不重新重新发送数据 & 调用观察者的onError()结束 //返回true = 重新发送请求(最多重新发送3次)<-- 5. retry(long time,Predicate predicate) -->// 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制// 参数 = 设置重试次数 & 判断逻辑 //返回false = 不重新重新发送数据 & 调用观察者的onError()结束 //返回true = 重新发送请求(最多重新发送3次)使用示例:
// 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制
// 参数 = 设置重试次数 & 判断逻辑
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
e.onNext(3);
}
})
// 拦截错误后,判断是否需要重新发送请求
.retry(3, new Predicate<Throwable>() {
@Override
public boolean test(@NonNull Throwable throwable) throws Exception {
// 捕获异常
Log.e(TAG, "retry错误: "+throwable.toString());
//返回false = 不重新重新发送数据 & 调用观察者的onError()结束
//返回true = 重新发送请求(最多重新发送3次)
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
-
retryUntil()
- 作用:当出现错误时判断是否需要重新发送数据,功能与上述类似
- 使用示例:
.retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
return true;
}
})
注意:当返回true时表示不重复发送,其实从名字中可以看出来retryUntil ,有个Until是当达到某种条件时即可停止
- retryWhen
- 作用:遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者Observable 发送事件
- 使用示例:
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Throwable("发送异常!"));
e.onNext(3);
}
}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
return Observable.just(4);
}
});
}
})
输出结果:
/com.example.administrator.googleplay E/=====: 1
05-06 07:15:02.627 7459-7459/com.example.administrator.googleplay E/=====: 2
05-06 07:15:02.627 7459-7459/com.example.administrator.googleplay E/=====: 1
05-06 07:15:02.627 7459-7459/com.example.administrator.googleplay E/=====: 2
05-06 07:15:02.627 7459-7459/com.example.administrator.googleplay E/=====: 1
从上面可以看出当发送异常时会重新发送,发送数据为1,2 循环,
注:1. 若 新的被观察者 Observable发送的事件 = Error事件,那么 原始Observable则不重新发送事件:
2. 若 新的被观察者 Observable发送的事件 = Next事件 ,那么原始的Observable则重新发送事件:
现在修改上面的Observable.just(4)为 Observable.error(new Throwable("retryWhen终止啦"));,
Observable.error(new Throwable("retryWhen终止啦"));
再次运行程序,回掉onError()方法,输出结果:
05-06 07:19:35.237 8015-8015/com.example.administrator.googleplay E/=====: retryWhen终止啦
重复发送
-
repeat()
- 作用:无条件不停止循环
- 使用类型
Observable.just(1, 2, 3)
.repeat()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.e("=====", o + "");
}
});
输出结果:
05-06 07:24:51.407 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 3
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 3
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 3
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 3
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 3
重载:repeat(3)参数:循环的次数,先修改传入参数2,执行程序
.repeat(2)
输出结果:
05-06 07:28:17.482 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:28:17.490 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:28:17.490 8210-8210/com.example.administrator.googleplay E/=====: 3
05-06 07:28:17.490 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:28:17.490 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:28:17.509 8210-8210/com.example.administrator.googleplay E/=====: 3
- repeatUntil () / repeat When()
- 作用:按照条件决定是否重复发送事件
- 使用示例:二者的使用方法与上述的retryUntil()/retryWhen()相同
以上就是RxJava的功能操作符的介绍,功能强大之处在开发中会深有体会。