RxJava2基础总结 (一)
发射器
Observable
被观察者,内部存放观察者,用于发射数据。
创建默认发射器
为什么称默认发射器,因为它是由Observable静态方法生成的一个Observable实例。后续的操作都会在这个实例上进行。 那么经过处理后的原始发射器笔者称为二次发射器。
Observable.create
创建一个含有执行过程(得到发射数据)的发射器,且整个过程处于不可观察状态,其实就是要发射的数据Observable预先不知道。
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
emitter.onNext("Observable");
}
});
ObservableOnSubscribe是连接发射器与接受器的中枢,得益于它内部的方法subscribe,有几个接收器,Observable 就会调用几次subscribe, subscribe需要一个ObservableEmitter的参数,该参数里存放了一个接收器,我们通过操作ObservableEmitter来实现数据的交互。
发送完数据一定要调用 emitter.onComplete();否则某些操作符会失效
Observable.just()
直接发射现有的数据
可以同时放入多组不同类型数据
Observable.just(new Integer(200), "Rxjava").subscribe(new Consumer<Object>() {
/**
* Consume the given value.
*
* @param o the value
* @throws Exception on error
*/
@Override
public void accept(Object o) throws Exception {
Logger.e(o.toString());
}
});
最后的处理结果
├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 200
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext (LambdaObserver.java:63)
E: │ MainActivity$1.accept (MainActivity.java:63)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ Rxjava
E: └──────────────────
Observable.fromArray()
同理只是参数发生变化而已
List<String> list = new ArrayList<>();
for (int i = 0; i < 5; i++) {
list.add("Rxjava" + i);
}
Observable.fromArray(list.toArray(new String[]{})).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Logger.e(s);
}
});
Observable.fromIterable()
直接使用List< T >类型即可
Observable.time()
定时发送一个Long 值为0的数据,默认回调观察者为子线程。
Observable.timer(1000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Logger.e("当前运行线程"+Thread.currentThread());
Logger.e(aLong + "");
}
});
E: │ 当前运行线程Thread[RxComputationThreadPool-1,5,main]
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept (MainActivity.java:53)
E: │ MainActivity$1.accept (MainActivity.java:57)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 0
E:
Observable.interval()
interval 操作符用于间隔时间执行某个操作,其接受三个参数,分别是第一次发送延迟,间隔时间,时间单位。
Observable.interval(1000,2000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Logger.e("当前运行线程为:"+Thread.currentThread());
Logger.e(aLong+"");
}
});
第一次间隔1秒,以后每次间隔2秒发送数据。 与timer一样也是0吗?
│ 0
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept (MainActivity.java:52)
E: │ MainActivity$1.accept (MainActivity.java:56)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 当前运行线程为:Thread[RxComputationThreadPool-1,5,main]
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept (MainActivity.java:52)
E: │ MainActivity$1.accept (MainActivity.java:57)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 1
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept (MainActivity.java:52)
E: │ MainActivity$1.accept (MainActivity.java:56)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 当前运行线程为:Thread[RxComputationThreadPool-1,5,main]
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept (MainActivity.java:52)
E: │ MainActivity$1.accept (MainActivity.java:57)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 2
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept (MainActivity.java:52)
E: │ MainActivity$1.accept (MainActivity.java:56)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 当前运行线程为:Thread[RxComputationThreadPool-1,5,main]
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept (MainActivity.java:52)
E: │ MainActivity$1.accept (MainActivity.java:57)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 3
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept (MainActivity.java:52)
E: │ MainActivity$1.accept (MainActivity.java:56)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 当前运行线程为:Thread[RxComputationThreadPool-1,5,main]
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept (MainActivity.java:52)
E: │ MainActivity$1.accept (MainActivity.java:57)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 4
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌───────────────────────
第一次为0,以后每次递增1,默认运行线程也是子线程。
接收器
观察者,被被观察者调用
Observer
一个完整的数据接收器
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d); //订阅被观察者时执行,用来判断执行当前观察者是否继续接收数据
void onNext(@NonNull T t); //通过ObservableEmitter调用,将数据传入其中。
void onError(@NonNull Throwable e); //同上,表示数据错误
void onComplete(); //同上,表示发送数据完毕
}
Consumer
只有一个借口,固定模式的接收数据。
public interface Consumer<T> {
/**
* Consume the given value.
* @param t the value
* @throws Exception on error
*/
void accept(T t) throws Exception;
}
数据接收
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("RxJava1");
emitter.onNext("RxJava2");
emitter.onNext("RxJava3");
emitter.onNext("RxJava4");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Logger.e(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
这样便完成了一个简易数据发送接收过程。
最后的输出结果
E: │ RxJava1
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.onNext (MainActivity.java:66)
E: │ MainActivity$1.onNext (MainActivity.java:74)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ RxJava2
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.onNext (MainActivity.java:66)
E: │ MainActivity$1.onNext (MainActivity.java:74)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ RxJava3
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌─────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.onNext (MainActivity.java:66)
E: │ MainActivity$1.onNext (MainActivity.java:74)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ RxJava4
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
onNext(): RxJava的事件回调方法,针对普通事件。
onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做 一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中,onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
onSubscribe():订阅的时候回调观察者里的此方法,并传入一个Disposable的参数控制后续数据是否继续接收。
实例
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("RxJava1");
emitter.onNext("RxJava2");
emitter.onNext("RxJava3");
emitter.onNext("RxJava4");
}
}).subscribe(new Observer<String>() {
Disposable d;
@Override
public void onSubscribe(Disposable d) {
this.d = d;
}
@Override
public void onNext(String s) {
d.dispose();
Logger.e(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
当接收器拿到第一个数据后调用d.dispose();将数据中断,则后续数据不会再接受。结果:
┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.onNext (MainActivity.java:66)
E: │ MainActivity$1.onNext (MainActivity.java:78)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ RxJava1
E: └───────────────
d.dispose()与emitter.onComplete()区别
一个在发射器端中断数据,一个在接收器端中断数据,且发射器端也可判断接收器端是否中断了数据,通过调用emitter.isDisposed()方法
Custom使用方法同理。