读源码之Rxjava2.x
关键词
观察者模式, 基于事件的响应式, 数据流, 异步, 函数式
PS:简洁只是对于会正确使用工具的人来说是正确的
-
基于事件的响应式
对人类现实社会事件响应处理的一种映射。电话、火灾救援等等,都是基于事件的响应,并且是异步的。我们将电话号码(唯一标识)注册到三大运营商,三大运营商作为可观察者或者中间代理者,进行电话呼叫事件的通知,而个人要做的就是对事件的响应,而不是一直等候。 -
数据流
图中的事件流类比川流,大坝类比Rxjava操作符。大坝对上游事件进行拦截后进行处理,之后开闸流向下游,同时也能改变流向(线程切换)。所以说原观察者拿到的不是最真实的数据,拿到的是上游观察者想要你看到的数据。有一点,个人觉得需要明确的是,可观察者和事件生产者代码逻辑基本是写在一起的,但是我们需要去区分两者。可观察者只是对事件进行发射,具体事件的生产是第三方具体的业务,如网络请求、IO处理等,这个就好比三大运营商不生产具体的事件,具体的事件是有沟通需求的客户产生的。
-
函数式
对于函数式,推荐一本书《Java 8 实战》。
Java函数式是使用单一接口的Interface 和 Lambda 表达式 联合实现形如C++的函数式编程,个人觉得Lambda表达式返回的是类对象而非函数对象,所以并非真正意义的函数式编程,但是也可以说是Java形式的函数式编程,见仁见智吧。Rxjava对函数式有充分的实现。
Java 8 函数式更重要的一点我觉得是引入了流数据处理,和Rxjava有异曲同工之妙。Rxjava Reactive Streams 介绍中有这么一句话:
The RxJava team was part of the effort from the beginning and supports the use of Reactive Streams APIs and eventually the Java 9 Flow APIs which are resulting from the success of the Reactive Stream effort.
也就是说Java Flow9 Flow APIs 实现了响应式编程思想。
观察者模式和异步,自不必多说。
用法以及读源码
五种可观察者和两种全能者
Flowable(支持背压[Backpressure]), Observable, Maybe, Single, Completable
Subject,Processor(支持背压):既是可观察者也是观察者
链式操作符
上图是对需要Function对象参数的操作符解释,subscribeOn/observeOn 线程切换操作符同理,新Observable/Observer将原Observable/Observer计算操作切换到指定线程中进行。
理解操作符,个人认为关键是要理解链式操作,再去理解其他问题就会比较容易。
-
问题一:subscribeOn 对在其前面的代码起作用,observeOn对在其后面的代码起作用
-
问题二:subscribeOn 作用于该操作符之前的 Observable 的创建操符以及 doOnSubscribe 操作符
回答这两个问题,优先需要明确的是哪些操作符是在可观察者中执行,哪些操作符是在观察者中执行。 大部分的操作符都是在观察者中执行的,目前常用且个人知道的有Observable创建操作符和doOnSubscribe是在可观察者中执行的。
回答问题一,不管是对其前面的代码起作用还是对其后面代码起作用,重点是代码的执行顺序是怎样的。结合上图可知,Observable链式调用顺序是从下到上,而Observer链式调用顺序是从上到下,所以问题一答案自明。
回答问题二,每次调用subscribeOn操作符都会对可观察者的执行线程切换起作用,即对其前面在可观察者中执行的代码切换执行线程,只是第一次调用之后再次调用会有一个对比判断操作,重复设置会抛出onError,在onError中可以监听重复设置动作。Observer有一个特殊方法onSubscribe是在Observable中调用,doOnSubscribe是在Observer的onSubscribe中调用,最终doOnSubscribe是在Observable中执行。具体流程可以阅读下文关键代码:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
public void subscribeActual(final Observer<? super T> s) {
// 观察者的onSubscribe方法在可观察者的中被调用
s.onSubscribe(parent);
// set 并且 切换线程执行上一个Observable subscribe方法
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
ObjectHelper.requireNonNull(d, "d is null");
// 对比操作,如果重复设置会上报RxJavaPlugins.onError(new IllegalStateException("Disposable already set!"));
if (!field.compareAndSet(null, d)) {
d.dispose();
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}
}
Blog引用,不重复造*
扩展阅读
Help
-
ReactiveX
响应式标准,最好的响应式学习文档。Rxjava2.x基于此标准重写Rxjava1.X。 - API
- Github Home
- 响应式编程思想与实践 视频课程
资源
如需文章中书籍与视频资源,@邮箱aaa@qq.com
上一篇: RxJava2系列之背压策略(一)