欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava2基础总结 (一)

程序员文章站 2022-04-02 18:08:42
...

发射器

Observable
被观察者,内部存放观察者,用于发射数据。

创建默认发射器

为什么称默认发射器,因为它是由Observable静态方法生成的一个Observable实例。后续的操作都会在这个实例上进行。 那么经过处理后的原始发射器笔者称为二次发射器。

Observable.create

创建一个含有执行过程(得到发射数据)的发射器,且整个过程处于不可观察状态,其实就是要发射的数据Observable预先不知道。

 Observable.create(new ObservableOnSubscribe<Object>() {

            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                    emitter.onNext("Observable");
            }   
        });

ObservableOnSubscribe是连接发射器与接受器的中枢,得益于它内部的方法subscribe,有几个接收器,Observable 就会调用几次subscribe, subscribe需要一个ObservableEmitter的参数,该参数里存放了一个接收器,我们通过操作ObservableEmitter来实现数据的交互。

发送完数据一定要调用 emitter.onComplete();否则某些操作符会失效
RxJava2基础总结 (一)

Observable.just()
直接发射现有的数据

RxJava2基础总结 (一)
可以同时放入多组不同类型数据

 Observable.just(new Integer(200), "Rxjava").subscribe(new Consumer<Object>() {

            /**
             * Consume the given value.
             *
             * @param o the value
             * @throws Exception on error
             */
            @Override
            public void accept(Object o) throws Exception {
                Logger.e(o.toString());
            }
        });

最后的处理结果

├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E:200
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:63)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ Rxjava
E: └──────────────────

Observable.fromArray()

同理只是参数发生变化而已

 List<String> list = new ArrayList<>();

        for (int i = 0; i < 5; i++) {
            list.add("Rxjava" + i);
        }

        Observable.fromArray(list.toArray(new String[]{})).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Logger.e(s);
            }
        });

Observable.fromIterable()
直接使用List< T >类型即可

Observable.time()

定时发送一个Long 值为0的数据,默认回调观察者为子线程。

  Observable.timer(1000, TimeUnit.MILLISECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Logger.e("当前运行线程"+Thread.currentThread());
                        Logger.e(aLong + "");
                    }
                });
E: │ 当前运行线程Thread[RxComputationThreadPool-1,5,main]
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept  (MainActivity.java:53)
E: │    MainActivity$1.accept  (MainActivity.java:57)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 0
E: 

Observable.interval()

interval 操作符用于间隔时间执行某个操作,其接受三个参数,分别是第一次发送延迟,间隔时间,时间单位。

 Observable.interval(1000,2000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {

                Logger.e("当前运行线程为:"+Thread.currentThread());
                Logger.e(aLong+"");

            }
        });

第一次间隔1秒,以后每次间隔2秒发送数据。 与timer一样也是0吗?

0
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept  (MainActivity.java:52)
E: │    MainActivity$1.accept  (MainActivity.java:56)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 当前运行线程为:Thread[RxComputationThreadPool-1,5,main]
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept  (MainActivity.java:52)
E: │    MainActivity$1.accept  (MainActivity.java:57)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E:1
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept  (MainActivity.java:52)
E: │    MainActivity$1.accept  (MainActivity.java:56)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 当前运行线程为:Thread[RxComputationThreadPool-1,5,main]
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept  (MainActivity.java:52)
E: │    MainActivity$1.accept  (MainActivity.java:57)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E:2
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept  (MainActivity.java:52)
E: │    MainActivity$1.accept  (MainActivity.java:56)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 当前运行线程为:Thread[RxComputationThreadPool-1,5,main]
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept  (MainActivity.java:52)
E: │    MainActivity$1.accept  (MainActivity.java:57)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E:3
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept  (MainActivity.java:52)
E: │    MainActivity$1.accept  (MainActivity.java:56)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 当前运行线程为:Thread[RxComputationThreadPool-1,5,main]
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept  (MainActivity.java:52)
E: │    MainActivity$1.accept  (MainActivity.java:57)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E:4
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌───────────────────────

第一次为0,以后每次递增1,默认运行线程也是子线程。

接收器

观察者,被被观察者调用

Observer
一个完整的数据接收器

public interface Observer<T> {

    void onSubscribe(@NonNull Disposable d);  //订阅被观察者时执行,用来判断执行当前观察者是否继续接收数据


    void onNext(@NonNull T t);   //通过ObservableEmitter调用,将数据传入其中。


    void onError(@NonNull Throwable e);      //同上,表示数据错误


    void onComplete();            //同上,表示发送数据完毕

}

Consumer
只有一个借口,固定模式的接收数据。

public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(T t) throws Exception;
    }

数据接收

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {


                emitter.onNext("RxJava1");
                emitter.onNext("RxJava2");
                emitter.onNext("RxJava3");
                emitter.onNext("RxJava4");


            }

        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Logger.e(s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

这样便完成了一个简易数据发送接收过程。
最后的输出结果

E: │ RxJava1
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.onNext  (MainActivity.java:66)
E: │    MainActivity$1.onNext  (MainActivity.java:74)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ RxJava2
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.onNext  (MainActivity.java:66)
E: │    MainActivity$1.onNext  (MainActivity.java:74)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ RxJava3
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌─────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.onNext  (MainActivity.java:66)
E: │    MainActivity$1.onNext  (MainActivity.java:74)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ RxJava4
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────

onNext(): RxJava的事件回调方法,针对普通事件。
onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做 一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

在一个正确运行的事件序列中,onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

onSubscribe():订阅的时候回调观察者里的此方法,并传入一个Disposable的参数控制后续数据是否继续接收。

实例

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {


                emitter.onNext("RxJava1");
                emitter.onNext("RxJava2");
                emitter.onNext("RxJava3");
                emitter.onNext("RxJava4");


            }

        }).subscribe(new Observer<String>() {

            Disposable d;

            @Override
            public void onSubscribe(Disposable d) {
                this.d = d;
            }

            @Override
            public void onNext(String s) {
                d.dispose();
                Logger.e(s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

当接收器拿到第一个数据后调用d.dispose();将数据中断,则后续数据不会再接受。结果:

┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.onNext  (MainActivity.java:66)
E: │    MainActivity$1.onNext  (MainActivity.java:78)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ RxJava1
E: └───────────────

d.dispose()与emitter.onComplete()区别
一个在发射器端中断数据,一个在接收器端中断数据,且发射器端也可判断接收器端是否中断了数据,通过调用emitter.isDisposed()方法

Custom使用方法同理。

相关标签: Rxjava