rxjava 八:背压
问题
Observable线程 发送事件快
Observer线程 接收事件慢
Observable发送了无限个事件
Observer 接收了几个
那么Observable发送的事件将会缓存,缓存越来越多造成内存溢出 OOM
举例:同步不会出现这种问题
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
int i=0;
while (true){
i++;
emitter.onNext(i);
Log.i("zqq","subscribe>>"+i);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.i("zqq","integer>>"+integer);
}
});
Flowable使用
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onNext(5);
emitter.onNext(6);
emitter.onNext(7);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.i("zqq","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.i("zqq","onNext>>"+integer);
}
@Override
public void onError(Throwable t) {
Log.i("zqq","onError>>"+t.toString());
}
@Override
public void onComplete() {
Log.i("zqq","onComplete");
}
});
结果:
出错了 MissingBackpressureException
修改代码:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onNext(5);
emitter.onNext(6);
emitter.onNext(7);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.i("zqq","onSubscribe");
s.request(Integer.MAX_VALUE); //增加代码
}
@Override
public void onNext(Integer integer) {
Log.i("zqq","onNext>>"+integer);
}
@Override
public void onError(Throwable t) {
Log.i("zqq","onError>>"+t.toString());
}
@Override
public void onComplete() {
Log.i("zqq","onComplete");
}
});
结果
我们只增加的一句代码 s.request(Integer.MAX_VALUE); //增加代码
这句代码的意思是,接收者告知发送者,我可以接收这么多的事件,给我发吧
如果不添加这句话,发送者默认任务,接受者没有能力接收事件,就会抛出MissingBackpressureException异常
使用异步
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onNext(5);
emitter.onNext(6);
emitter.onNext(7);
Log.i("zqq","发送事件》》");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.i("zqq","onSubscribe");
// s.request(Integer.MAX_VALUE); //增加代码
}
@Override
public void onNext(Integer integer) {
Log.i("zqq","onNext>>"+integer);
}
@Override
public void onError(Throwable t) {
Log.i("zqq","onError>>"+t.toString());
}
@Override
public void onComplete() {
Log.i("zqq","onComplete");
}
});
结果:
同样接收者未接收事件
解决策略
1、减少事件,只取所需
2、延时发送,慢慢发
观察 Flowable.create的第二个参数
MISSING
ERROR
BUFFER : 增加发送者,发送事件缓存上限,默认为128,当使用BUFFER 的时候,相当于Oberver可以无线缓存,但可能会造成OOM
DROP : 把不存在的事件丢掉 即,request多少给多少,剩余的事件全部丢弃
LATEST :获取某时刻最终的128个事件,其余的丢弃 ,比如0-1000 发送者发送完毕,这是接受者接收,会获取 (1000-128 )到 1000的事件
若不是我们自己创建的Floable可以使用如下方法来进行背压
onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()
效果同上
FlowableEmitter 有一个方法
/**
* The current outstanding request amount.
* <p>This method is thread-safe.
* @return the current outstanding request amount
*/
long requested();
返回值,是接受者请求的可以承受的事件数量
并且次数量是动态的
即,如果发送者发出了一个事件,此返回值-1;
当为0的时候 发送者停止发送事件
不为0的时候,才开始发送。
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
int i=0;
while (true){
Log.i("zqq","emitter.requested()"+emitter.requested());
if(emitter.requested() == 0 && !emitter.isCancelled()){
break;
}
i++;
Log.i("zqq","发送事件》》"+i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.i("zqq","onSubscribe");
s.request(100); //增加代码
}
@Override
public void onNext(Integer integer) {
Log.i("zqq","onNext>>"+integer);
}
@Override
public void onError(Throwable t) {
Log.i("zqq","onError>>"+t.toString());
}
@Override
public void onComplete() {
Log.i("zqq","onComplete");
}
});
为0的时候,不再发送
接收者只取了100