RxJava2|Flowable以及背压
rxjava2 flowable以及背压
前述
java-
1.8
maven-
3
rxjava-
2.2.3
背压
背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
----https://www.jianshu.com/p/0cd258eecf60
flowable
的官方介绍:
io.reactivex.flowable
: 0..n flows, supporting reactive-streams and backpressure
0...n flows, 支持响应式流和背压(backpressure)
只有在需要处理背压问题时,才需要使用flowable。
由于只有在上下游运行在不同的线程中,且上游发射数据的速度大于下游接收处理数据的速度时,才会产生背压问题;
所以,如果能够确定:
上下游运行在同一个线程中,
上下游工作在不同的线程中,但是下游处理数据的速度不慢于上游发射数据的速度,
-
上下游工作在不同的线程中,但是数据流中只有一条数据
则不会产生背压问题,就没有必要使用flowable,以免影响性能。由于基于flowable发射的数据流,以及对数据加工处理的各操作符都添加了背压支持,附加了额外的逻辑,其运行效率要比observable慢得多。
此段出处: https://www.jianshu.com/p/ff8167c1d191
示例(flowable
简单使用)
flowable
逻辑类 - helloflowable.java
package yag; import io.reactivex.*; import org.reactivestreams.subscriber; import org.reactivestreams.subscription; public class helloflowable { public void helloflowable(){ // 基本上和observable一样. flowable .create((flowableonsubscribe<integer>) flowableemitter -> { integer i = 0; while ( i < 7) { i++; flowableemitter.onnext(i); } }, backpressurestrategy.error/* 背压 */) .subscribe(new subscriber<integer>() { private subscription subscription; @override public void onsubscribe(subscription subscription) { subscription.request(long.max_value); this.subscription = subscription; } @override public void onnext(integer i) { if (i == 5){ // 退出接收 subscription.cancel(); }else { system.out.println("现在接收到的信号是: 第" + i + "信号"); } } @override public void onerror(throwable throwable) { } @override public void oncomplete() { } }); } }
执行者 - runner.java
package yag; public class runner { public static void main(string[] args){ helloflowable helloflowable = new helloflowable(); helloflowable.helloflowable(); } }
执行结果
现在接收到的信号是: 第1信号 现在接收到的信号是: 第2信号 现在接收到的信号是: 第3信号 现在接收到的信号是: 第4信号 process finished with exit code 0
小结
request()
subscription.request(long.max_value);
这个方法就是用来向生产者申请可以消费的事件数量。这样我们便可以根据本身的消费能力进行消费事件。
当调用了request()方法后,生产者便发送对应数量的事件供消费者消费。
backpressurestrategy.error
参考: https://www.jianshu.com/p/1f4867ce3c01
这是一个背压操作策略. (backpressurestrategy
- 背压策略)
在error
策略下,如果缓存池溢出,就会立刻抛出missingbackpressureexception
异常。即保证在异步操作中,事件累积不能超过128,超过即出现异常。消费者不能再接收事件了,但生产者并不会停止。
其他
-
buffer
- 所谓buffer就是把rxjava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。消费者通过request()
即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。比较消耗内存, 除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生oom。(
outofmemoryerror
) drop
- 当消费者处理不了事件,就丢弃。latest
- 消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
唯一的区别就是latest
总能使消费者能够接收到生产者产生的最后一个事件。
个人补充:
missing
- 写入过程中没有任何缓冲或丢弃, 即不操作.
上一篇: Input 标签 安卓 与 IOS 出现圆角 显示
下一篇: 小白一键重装系统工具v3.0使用教程