欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rxjava 八:背压

程序员文章站 2024-02-29 08:59:16
...

问题

Observable线程 发送事件快
Observer线程 接收事件慢
Observable发送了无限个事件
Observer 接收了几个
那么Observable发送的事件将会缓存,缓存越来越多造成内存溢出 OOM

举例:同步不会出现这种问题

    Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                int i=0;
                while (true){
                    i++;
                    emitter.onNext(i);
                    Log.i("zqq","subscribe>>"+i);
                }
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Thread.sleep(2000);
                        Log.i("zqq","integer>>"+integer);
                    }
                });

rxjava 八:背压

Flowable使用

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onNext(5);
                emitter.onNext(6);
                emitter.onNext(7);
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.i("zqq","onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.i("zqq","onNext>>"+integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.i("zqq","onError>>"+t.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.i("zqq","onComplete");
                    }
                });

结果:
rxjava 八:背压
出错了 MissingBackpressureException

修改代码:

   Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onNext(5);
                emitter.onNext(6);
                emitter.onNext(7);
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.i("zqq","onSubscribe");
                        s.request(Integer.MAX_VALUE);  //增加代码
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.i("zqq","onNext>>"+integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.i("zqq","onError>>"+t.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.i("zqq","onComplete");
                    }
                });

结果
rxjava 八:背压

我们只增加的一句代码 s.request(Integer.MAX_VALUE); //增加代码
这句代码的意思是,接收者告知发送者,我可以接收这么多的事件,给我发吧
如果不添加这句话,发送者默认任务,接受者没有能力接收事件,就会抛出MissingBackpressureException异常

使用异步

  Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onNext(5);
                emitter.onNext(6);
                emitter.onNext(7);
                Log.i("zqq","发送事件》》");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.i("zqq","onSubscribe");
//                        s.request(Integer.MAX_VALUE);  //增加代码
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.i("zqq","onNext>>"+integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.i("zqq","onError>>"+t.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.i("zqq","onComplete");
                    }
                });

结果:
rxjava 八:背压
同样接收者未接收事件

解决策略
1、减少事件,只取所需
2、延时发送,慢慢发

观察 Flowable.create的第二个参数
MISSING
ERROR
BUFFER : 增加发送者,发送事件缓存上限,默认为128,当使用BUFFER 的时候,相当于Oberver可以无线缓存,但可能会造成OOM
DROP : 把不存在的事件丢掉 即,request多少给多少,剩余的事件全部丢弃
LATEST :获取某时刻最终的128个事件,其余的丢弃 ,比如0-1000 发送者发送完毕,这是接受者接收,会获取 (1000-128 )到 1000的事件

若不是我们自己创建的Floable可以使用如下方法来进行背压
onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()
效果同上

FlowableEmitter 有一个方法

 /**
     * The current outstanding request amount.
     * <p>This method is thread-safe.
     * @return the current outstanding request amount
     */
    long requested();

返回值,是接受者请求的可以承受的事件数量
并且次数量是动态的
即,如果发送者发出了一个事件,此返回值-1;
当为0的时候 发送者停止发送事件
不为0的时候,才开始发送。

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {

                int i=0;
                while (true){
                    Log.i("zqq","emitter.requested()"+emitter.requested());
                    if(emitter.requested() == 0 && !emitter.isCancelled()){
                        break;
                    }
                    i++;
                    Log.i("zqq","发送事件》》"+i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.i("zqq","onSubscribe");
                        s.request(100);  //增加代码
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.i("zqq","onNext>>"+integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.i("zqq","onError>>"+t.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.i("zqq","onComplete");
                    }
                });

为0的时候,不再发送
rxjava 八:背压
接收者只取了100
rxjava 八:背压

相关标签: android