欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava 2.x入门教程

程序员文章站 2024-02-29 10:12:22
...

前言

首先来说一下rxjava1和rxjava2的区别吧,附带一些RxJava 1升级到RxJava 2过程中踩过的一些“坑”,RxJava 对大家而言肯定不陌生,其受欢迎程度不言而喻。而在去年的早些时候,官方便宣布,将在一段时间后不再对 RxJava 1.x 进行维护,而在仓库中另辟蹊径,开始对 RxJava 2.x 进行推广起来,我原本是不想写这么一套教程的,因为 RxJava 受欢迎度这么高,而且这 2.x 也出来了这么久,我坚信网上一定有很多超级大牛早已为大家避雷。然而很难过的是,我搜索了些时间,能搜出来的基本都是对 RxJava 1.x 的讲解。

与RxJava 1.x的差异

  • 这是一个很大的变化,熟悉 RxJava 1.x 的童鞋一定都知道,1.x 是允许我们在发射事件的时候传入 null 值的,但现在我们的 2.x 不支持了,不信你试试? 大大的 NullPointerException 教你做人。这意味着 Observable 不再发射任何值,而是正常结束或者抛出空指针。

  • 在 RxJava 1.x 中关于介绍 backpressure 部分有一个小小的遗憾,那就是没有用一个单独的类,而是使用 Observable 。而在 2.x 中 Observable 不支持背压了,将用一个全新的 Flowable 来支持背压。
    或许对于背压,有些小伙伴们还不是特别理解,这里简单说一下。大概就是指在异步场景中,被观察者发送事件的速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。感兴趣的小伙伴可以模拟这种情况,在差距太大的时候,我们的内存会猛增,直到OOM。而我们的 Flowable 一定意义上可以解决这样的问题,但其实并不能完全解决,这个后面可能会提到。

  • 如 Func1…N 的变化,现在同样用 Consumer 和 BiConsumer 对 Action1 和 Action2
    进行了替换。后面的 Action 都被替换了,只保留了 ActionN。

附录

下面从官方截图展示 2.x 相对 1.x 的改动细节,仅供参考。
RxJava 2.x入门教程
RxJava 2.x入门教程
RxJava 2.x入门教程
RxJava 2.x入门教程

直接代码介绍操作符

package com.csl.text;

import android.os.Bundle;
import android.support.annotation.NonNull;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.widget.TextView;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;

/**
 * RxJava 2.x 入门教程-代码
 */
public class MainActivity extends AppCompatActivity {
    private TextView textView;
    private static String date;
    private Disposable mDisposable;
    protected final String TAG = this.getClass().getSimpleName();


    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        textView = (TextView) findViewById(R.id.tv_text);
        textdata();
        textwindow();
    }

    public void textdata() {
        SimpleDateFormat sDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        date = sDateFormat.format(new java.util.Date());
    }

    /**
     * Map用法Integer-转Float
     * Map 基本算是 RxJava 中一个最简单的操作符了,
     * 熟悉 RxJava 1.x 的知道,它的作用是对发射时间发送的每一个事件应用一个函数,
     * 是的每一个事件都按照指定的函数去变化,而在 2.x 中它的作用几乎一致。
     */
    public void textMap() {

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).map(new Function<Integer, Float>() {
            @Override
            public Float apply(Integer integer) throws Exception {
                return Float.valueOf(integer);
            }
        }).subscribe(new Consumer<Float>() {
            @Override
            public void accept(Float aFloat) throws Exception {
                textView.append("accept : " + aFloat + "\n");
            }
        });
    }

    /**
     * Zip用法
     * zip 专用于合并事件,该合并不是连接(连接操作符后面会说),
     * 而是两两配对,也就意味着,最终配对出的 Observable 发射事件数目只和少的那个相同。
     */
    public void textZip() {
        Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() {
            @Override
            public String apply(String s, Integer integer) throws Exception {
                return s + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                textView.append("zip : accept : " + s + "\n");
                Log.e(TAG, "zip : accept : " + s + "\n");
            }
        });

    }


    /**
     * 用于Zip合并事件测试
     *
     * @return
     */
    private Observable<String> getStringObservable() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext("A");
                    textView.append("String emit : A \n");
                    e.onNext("B");
                    textView.append("String emit : B \n");
                    e.onNext("C");
                    textView.append("String emit : C \n");
                }
            }
        });
    }

    /**
     * 用于Zip合并事件测试
     *
     * @return
     */
    private Observable<Integer> getIntegerObservable() {
        return Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext(1);
                    textView.append("Integer emit : 1 \n");
                    e.onNext(2);
                    textView.append("Integer emit : 2 \n");
     2               e.onNext(3);
                    textView.append("Integer emit : 3 \n");
                    e.onNext(4);
                    textView.append("Integer emit : 4 \n");
                    e.onNext(5);
                    textView.append("Integer emit : 5 \n");
                }
            }
        });
    }

    /**
     * Concat用法
     * 对于单一的把两个发射器连接成一个发射器,虽然 zip 不能完成,
     * 但我们还是可以自力更生,官方提供的 concat 让我们的问题得到了完美解决。
     */
    public void textConcat() {
        Observable.concat(Observable.just("陈力", 1, 2, 3), Observable.just("阿力", 4, 5, 6))
                .subscribe(new Consumer<Serializable>() {
                    @Override
                    public void accept(Serializable serializable) throws Exception {
                        textView.append("concat : " + serializable + "\n");
                        Log.e(TAG, "concat : " + serializable + "\n");
                    }
                });
    }

    /**
     * FlatMap用法
     * FlatMap 是一个很有趣的东西,我坚信你在实际开发中会经常用到。
     * 它可以把一个发射器 Observable 通过某种方法转换为多个 Observables,
     * 然后再把这些分散的 Observables装进一个单一的发射器 Observable。但有个需要注意的是,
     * flatMap 并不能保证事件的顺序,如果需要保证,需要用到我们下面要讲的 ConcatMap。
     */
    public void textFlatMap() {

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                int delayTime = (int) (1 + Math.random() * 10);
                return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
            }
        }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "flatMap : accept : " + s + "\n");
                        textView.append("flatMap : accept : " + s + "\n");
                    }
                });
    }

    /**
     * 上面其实就说了,concatMap 与 FlatMap 的唯一区别就是 concatMap 保证了顺序,
     * 所以,我们就直接把 flatMap 替换为 concatMap 验证吧。
     */
    public void textconcatMap() {
        Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                int delayTime = (int) (1 + Math.random() * 10);
                return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
            }
        }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
      2                  Log.e(TAG, "flatMap : accept : " + s + "\n");
                        textView.append("flatMap : accept : " + s + "\n");
                    }
                });
    }

    /**
     * 这个操作符非常的简单、通俗、易懂,就是简单的去重嘛,
     * 我甚至都不想贴代码,但人嘛,总得持之以恒。
     */
    public void textdistinct() {
        Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        textView.append("distinct : " + integer + "\n");
                        Log.e(TAG, "distinct : " + integer + "\n");
                    }
                });
    }

    /**
     * Filter用法
     * 信我,Filter 你会很常用的,它的作用也很简单,过滤器嘛。
     * 可以接受一个参数,让其过滤掉不符合我们条件的值
     */
    public void textFilter() {
        Observable.just(1, 20, 65, -5, 7, 19)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer >= 50;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                textView.append("distinct : " + integer + "\n");
                Log.e(TAG, "distinct : " + integer + "\n");
            }
        });
    }

    /**
     * buffery用法
     * buffer 操作符接受两个参数,buffer(count,skip),
     * 作用是将 Observable 中的数据按 skip (步长) 分成最大不超过 count 的 buffer ,然后生成一个  Observable 。
     * 也许你还不太理解,我们可以通过我们的示例图和示例代码来进一步深化它。
     */
    public void textbuffer() {
        Observable.just(1, 2, 3, 4, 5)
                .buffer(3, 2)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        textView.append("buffer size : " + integers.size() + "\n");
                        Log.e(TAG, "buffer size : " + integers.size() + "\n");
                        textView.append("buffer value : ");
                        Log.e(TAG, "buffer value : ");
                        for (Integer i : integers) {
                            textView.append(i + "");
                            Log.e(TAG, i + "");
                        }
                        textView.append("\n");
                        Log.e(TAG, "\n");
                    }
                });
    }

    /**
     * timer用法
     * timer 很有意思,相当于一个定时任务。在 1.x 中它还可以执行间隔逻辑,
     * 但在 2.x 中此功能被交给了 interval,下一个会介绍。但需要注意的是,
     * timer 和 interval 均默认在新线程。
     */
    public void texttimer() {
        textView.append("timer start : " + date + "\n");
        Log.e(TAG, "timer start : " + date + "\n");
        Observable.timer(5, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        textdata();
                        textView.append("timer :" + aLong + " at " + date + "\n");
                        Log.e(TAG, "timer :" + aLong + " at " + date + "\n");
                    }
                });
    }

    /**
     * interval用法
     * 如同我们上面可说,interval 操作符用于间隔时间执行某个操作,
     * 其接受三个参数,分别是第一次发送延迟,间隔时间,时间单位。
     */
    public void textinterval() {
        textView.append("timer start : " + date + "\n");
        Log.e(TAG, "timer start : " + date + "\n");
        mDisposable = Observable.interval(3, 2, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        textdata();
                        textView.append("interval :" + aLong + " at " + date + "\n");
                        Log.e(TAG, "interval :" + aLong + " at " + date + "\n");
                    }
                });
    }


    /**
     * doOnNext用法
     * 其实觉得 doOnNext 应该不算一个操作符,但考虑到其常用性,
     * 我们还是咬咬牙将它放在了这里。它的作用是让订阅者在接收到数据之前干点有意思的事情。
     * 假如我们在获取到数据之前想先保存一下它,无疑我们可以这样实现。
     */
    public void textdoOnNext() {
        Observable.just(1, 2, 3, 4)
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        textView.append("doOnNext 保存 " + integer + "成功" + "\n");
                        Log.e(TAG, "doOnNext 保存 " + integer + "成功" + "\n");
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                textView.append("doOnNext :" + integer + "\n");
                Log.e(TAG, "doOnNext :" + integer + "\n");
            }
        });
    }

    /**
     * skip用法
     * skip 很有意思,其实作用就和字面意思一样,接受一个 long 型参数 count ,
     * 代表跳过 count 个数目开始接收。
     */
    public void textskip() {
        Observable.just(1, 2, 3, 4, 5)
                .skip(3)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        textView.append("skip : " + integer + "\n");
                        Log.e(TAG, "skip : " + integer + "\n");
                    }
                });

    }

    /**
     * Single用法
     * 顾名思义,Single 只会接收一个参数,而 SingleObserver 只会调用 onError() 或者 onSuccess()。
     */
    public void textSingle() {
        Single.just(new Random().nextInt())
                .subscribe(new SingleObserver<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onSuccess(Integer value) {
                        textView.append("single : onSuccess : " + value + "\n");
                        Log.e(TAG, "single : onSuccess : " + value + "\n");
                    }

                    @Override
                    public void onError(Throwable e) {
                        textView.append("single : onError : " + e.getMessage() + "\n");
                        Log.e(TAG, "single : onError : " + e.getMessage() + "\n");
                    }
                });
    }

    /**
     * debounce用法
     * 去除发送频率过快的项,看起来好像没啥用处,但你信我,后面绝对有地方很有用武之地。
     */
    public void textdebounce() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // send events with simulated time wait
                emitter.onNext(1); // skip
                Thread.sleep(400);
                emitter.onNext(2); // deliver
                Thread.sleep(505);
                emitter.onNext(3); // skip
                Thread.sleep(100);
                emitter.onNext(4); // deliver
                Thread.sleep(605);
                emitter.onNext(5); // deliver
                Thread.sleep(510);
                emitter.onComplete();
            }
        }).debounce(500, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        textView.append("debounce :" + integer + "\n");
                        Log.e(TAG, "debounce :" + integer + "\n");
                    }
                });
    }

    /**
     * defer用法
     * 简单地时候就是每次订阅都会创建一个新的 Observable,
     * 并且如果没有被订阅,就不会产生新的 Observable。
     */
    public void textdefer() {
        Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {

            @Override
            public ObservableSource<Integer> call() throws Exception {
                return Observable.just(1, 2, 3);
            }
        });
        observable.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {
                textView.append("defer : " + value + "\n");
                Log.e(TAG, "defer : " + value + "\n");
            }

            @Override
            public void onError(Throwable e) {
                textView.append("defer : onError : " + e.getMessage() + "\n");
                Log.e(TAG, "defer : onError : " + e.getMessage() + "\n");
            }

            @Override
            public void onComplete() {
                textView.append("defer : onComplete\n");
                Log.e(TAG, "defer : onComplete\n");
            }
        });
    }

    /**
     * last用法
     * last 操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。
     */
    public void textlast() {
        Observable.just(1, 2, 3, 4)
                .last(0)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        textView.append("last : " + integer + "\n");
                        Log.e(TAG, "last : " + integer + "\n");
                    }
                });
    }

    /**
     * merge用法
     * merge 顾名思义,熟悉版本控制工具的你一定不会不知道 merge 命令,
     * 而在 Rx 操作符中,merge 的作用是把多个 Observable 结合起来,接受可变参数,
     * 也支持迭代器集合。注意它和 concat 的区别在于,
     * 不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。
     */
    public void textmerge() {
        Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        textView.append("merge :" + integer + "\n");
                        Log.e(TAG, "accept: merge :" + integer + "\n");
                    }
                });
    }

    /**
     * reduce用法
     * reduce 操作符每次用一个方法处理一个值,可以有一个 seed 作为初始值。
     */
    public void textreduce() {
        Observable.just(1, 2, 3, 4, 5)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                textView.append("reduce : " + integer + "\n");
                Log.e(TAG, "accept: reduce : " + integer + "\n");
            }
        });
    }

    /**
     * scan用法
     * scan 操作符作用和上面的 reduce 一致,
     * 唯一区别是 reduce 是个只追求结果的坏人,而 scan 会始终如一地把每一个步骤都输出。
     */
    public void textscan() {
        Observable.just(1, 2, 3, 4)
                .scan(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                textView.append("scan " + integer + "\n");
                Log.e(TAG, "accept: scan " + integer + "\n");
            }
        });
    }

    /**
     * window用法
     * 按照实际划分窗口,将数据发送给不同的 Observable
     */
    public void textwindow() {
        textView.append("window\n");
        Log.e(TAG, "window\n");
        Observable.interval(1, TimeUnit.SECONDS)
                .take(15)  //最多接收15个
                .window(3, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Observable<Long>>() {
                    @Override
                    public void accept(Observable<Long> longObservable) throws Exception {
                        textView.append("Sub Divide begin...\n");
                        Log.e(TAG, "Sub Divide begin...\n");
                        longObservable.subscribeOn(Schedulers.io())
                                .observeOn(AndroidSchedulers.mainThread())
                                .subscribe(new Consumer<Long>() {
                                    @Override
                                    public void accept(@NonNull Long aLong) throws Exception {
                                        textView.append("Next:" + aLong + "\n");
                                        Log.e(TAG, "Next:" + aLong + "\n");
                                    }
                                });
                    }
                });
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null && !mDisposable.isDisposed()) {
            mDisposable.dispose();
        }
    }
}

rxjava1升级rxjava2填坑

  • RxJava1 跟 RxJava2 不能共存如果,在同一个module中同时使用RxJava1和RxJava2,类似如下:
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'
compile 'io.reactivex:rxandroid:1.2.0'
compile 'io.reactivex:rxjava:1.1.5'
  • 新增Flowable,RxJava1 中 Observable 不能很好地支持 backpressure,会抛出MissingBackpressureException。所以在 RxJava2 中 Oberservable 不再支持backpressure ,而使用新增的 Flowable 来支持 backpressure,Flowable的用法跟原先的Observable是一样的。

附加在7.0以上会出现解析包出现问题的错误导致更新失败

RxJava 2.x入门教程