RXJAVA2
程序员文章站
2022-06-09 23:13:51
...
/**
* Observable --- 被观察者
* create ---操作符
* ObservableEmitter --- 发射器向观察者发送事件
*/
Observable<String> objectObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Observable");
emitter.onComplete();
}
});
// 观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe====" + d.toString());
}
@Override
public void onNext(String s) {
System.out.println("onNext====" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("onComplete====");
}
};
//订阅
objectObservable.subscribe(observer);
这样就将被观察者和观察者关联起来了。 被观察者执行onnext等操作时,观察者的回调就执行了。
// Flowable被观察者(背压)的创建
Flowable<Object> objectFlowable = Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
}
}, BackpressureStrategy.BUFFER);
//Single 被观察者
Single.create(new SingleOnSubscribe<Object>() {
@Override
public void subscribe(SingleEmitter<Object> emitter) throws Exception {
}
}).subscribe(new SingleObserver<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Object o) {
}
@Override
public void onError(Throwable e) {
}
});
//Completable 被观察者
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
}
});
//Maybe 被观察者
Maybe.create(new MaybeOnSubscribe<Object>() {
@Override
public void subscribe(MaybeEmitter<Object> emitter) throws Exception {
}
});
当被观察者连续执行onnext但是观察者中的onnext执行是要耗时操作,如果不做处理会处理不过来,导致oom。Flowable背压会将这些操作先存起来,注意背压的几种模式。
当上下游在不同的线程中,通过Observable发射,处理,响应数据流时,如果上游发射数据的速度快于下游接收处理数据的速度,这样对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压(backpressure)问题
Observable.just(1, 2, 3).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object integer) {
System.out.println("just===" + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
最多只能放10个
Observable.concat(Observable.just(1, 2),
Observable.just(5, 6),
Observable.just(3, 4),
Observable.just(7, 8))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Observable.just(1, 2, 3)
.delay(2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe()");
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Observable.just(1, 2, 3)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 3;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Observable.just(1,2,3,4)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 4;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
System.out.println("accept()===" + aBoolean);
}
});
上一篇: dataGrip显示clickhouse时间字段不正确的问题
下一篇: 生成树的算法