欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

读源码之Rxjava2.x

程序员文章站 2024-02-29 08:42:10
...

关键词

观察者模式, 基于事件的响应式, 数据流, 异步, 函数式

PS:简洁只是对于会正确使用工具的人来说是正确的

  • 基于事件的响应式

    对人类现实社会事件响应处理的一种映射。电话、火灾救援等等,都是基于事件的响应,并且是异步的。我们将电话号码(唯一标识)注册到三大运营商,三大运营商作为可观察者或者中间代理者,进行电话呼叫事件的通知,而个人要做的就是对事件的响应,而不是一直等候。

  • 数据流
    读源码之Rxjava2.x
    图中的事件流类比川流,大坝类比Rxjava操作符。大坝对上游事件进行拦截后进行处理,之后开闸流向下游,同时也能改变流向(线程切换)。所以说原观察者拿到的不是最真实的数据,拿到的是上游观察者想要你看到的数据。

    有一点,个人觉得需要明确的是,可观察者和事件生产者代码逻辑基本是写在一起的,但是我们需要去区分两者。可观察者只是对事件进行发射,具体事件的生产是第三方具体的业务,如网络请求、IO处理等,这个就好比三大运营商不生产具体的事件,具体的事件是有沟通需求的客户产生的。

  • 函数式

    对于函数式,推荐一本书《Java 8 实战》。

    Java函数式是使用单一接口的Interface 和 Lambda 表达式 联合实现形如C++的函数式编程,个人觉得Lambda表达式返回的是类对象而非函数对象,所以并非真正意义的函数式编程,但是也可以说是Java形式的函数式编程,见仁见智吧。Rxjava对函数式有充分的实现。

    Java 8 函数式更重要的一点我觉得是引入了流数据处理,和Rxjava有异曲同工之妙。Rxjava Reactive Streams 介绍中有这么一句话:

    The RxJava team was part of the effort from the beginning and supports the use of Reactive Streams APIs and eventually the Java 9 Flow APIs which are resulting from the success of the Reactive Stream effort.

    也就是说Java Flow9 Flow APIs 实现了响应式编程思想。

    观察者模式和异步,自不必多说。

用法以及读源码

五种可观察者和两种全能者

Flowable(支持背压[Backpressure]), Observable, Maybe, Single, Completable

Subject,Processor(支持背压):既是可观察者也是观察者

链式操作符

读源码之Rxjava2.x
上图是对需要Function对象参数的操作符解释,subscribeOn/observeOn 线程切换操作符同理,新Observable/Observer将原Observable/Observer计算操作切换到指定线程中进行。

理解操作符,个人认为关键是要理解链式操作,再去理解其他问题就会比较容易。

  • 问题一:subscribeOn 对在其前面的代码起作用,observeOn对在其后面的代码起作用

  • 问题二:subscribeOn 作用于该操作符之前的 Observable 的创建操符以及 doOnSubscribe 操作符

回答这两个问题,优先需要明确的是哪些操作符是在可观察者中执行,哪些操作符是在观察者中执行。 大部分的操作符都是在观察者中执行的,目前常用且个人知道的有Observable创建操作符和doOnSubscribe是在可观察者中执行的。

回答问题一,不管是对其前面的代码起作用还是对其后面代码起作用,重点是代码的执行顺序是怎样的。结合上图可知,Observable链式调用顺序是从下到上,而Observer链式调用顺序是从上到下,所以问题一答案自明。
读源码之Rxjava2.x
回答问题二,每次调用subscribeOn操作符都会对可观察者的执行线程切换起作用,即对其前面在可观察者中执行的代码切换执行线程,只是第一次调用之后再次调用会有一个对比判断操作,重复设置会抛出onError,在onError中可以监听重复设置动作。Observer有一个特殊方法onSubscribe是在Observable中调用,doOnSubscribe是在Observer的onSubscribe中调用,最终doOnSubscribe是在Observable中执行。具体流程可以阅读下文关键代码:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        
        // 观察者的onSubscribe方法在可观察者的中被调用
        s.onSubscribe(parent);

        // set 并且 切换线程执行上一个Observable subscribe方法
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

    public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
        ObjectHelper.requireNonNull(d, "d is null");
        // 对比操作,如果重复设置会上报RxJavaPlugins.onError(new IllegalStateException("Disposable already set!"));
        if (!field.compareAndSet(null, d)) {
            d.dispose();
            if (field.get() != DISPOSED) {
                reportDisposableSet();
            }
            return false;
        }
        return true;
    }
}

Blog引用,不重复造*

扩展阅读

Help

  • ReactiveX
    响应式标准,最好的响应式学习文档。Rxjava2.x基于此标准重写Rxjava1.X。
  • API
  • Github Home
  • 响应式编程思想与实践 视频课程

资源

如需文章中书籍与视频资源,@邮箱aaa@qq.com