欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RXJAVA2

程序员文章站 2022-06-09 23:13:51
...

RXJAVA2

RXJAVA2

 /**
         * Observable --- 被观察者
         * create  ---操作符
         * ObservableEmitter  ---  发射器向观察者发送事件
         */
        Observable<String> objectObservable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("Observable");
                emitter.onComplete();

            }
        });


        // 观察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe====" + d.toString());
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext====" + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                System.out.println("onComplete====");
            }
        };
        //订阅
        objectObservable.subscribe(observer);


这样就将被观察者和观察者关联起来了。    被观察者执行onnext等操作时,观察者的回调就执行了。
 // Flowable被观察者(背压)的创建
        Flowable<Object> objectFlowable = Flowable.create(new FlowableOnSubscribe<Object>() {

            @Override
            public void subscribe(FlowableEmitter<Object> emitter) throws Exception {

            }
        }, BackpressureStrategy.BUFFER);

        //Single 被观察者
        Single.create(new SingleOnSubscribe<Object>() {

            @Override
            public void subscribe(SingleEmitter<Object> emitter) throws Exception {
            }
        }).subscribe(new SingleObserver<Object>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Object o) {

            }

            @Override
            public void onError(Throwable e) {

            }
        });

        //Completable 被观察者
        Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(CompletableEmitter emitter) throws Exception {

            }
        });

        //Maybe 被观察者
        Maybe.create(new MaybeOnSubscribe<Object>() {

            @Override
            public void subscribe(MaybeEmitter<Object> emitter) throws Exception {
            }
        });



当被观察者连续执行onnext但是观察者中的onnext执行是要耗时操作,如果不做处理会处理不过来,导致oom。Flowable背压会将这些操作先存起来,注意背压的几种模式。

当上下游在不同的线程中,通过Observable发射,处理,响应数据流时,如果上游发射数据的速度快于下游接收处理数据的速度,这样对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压(backpressure)问题

 

RXJAVA2

    Observable.just(1, 2, 3).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object integer) {
                System.out.println("just===" + integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

最多只能放10个

RXJAVA2

RXJAVA2

        Observable.concat(Observable.just(1, 2),
                Observable.just(5, 6),
                Observable.just(3, 4),
                Observable.just(7, 8))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

 

RXJAVA2

Observable.just(1, 2, 3)
                .delay(2, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe()");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

RXJAVA2


        Observable.just(1, 2, 3)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer < 3;
                    }
                }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

RXJAVA2

Observable.just(1,2,3,4)
                .all(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer < 4;
                    }
                }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                System.out.println("accept()===" + aBoolean);
            }
        });

 

 

 

RXJAVA2

 

RXJAVA2