欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rxjava2.x

程序员文章站 2024-02-29 09:12:10
...

1、添加依赖

compile 'io.reactivex.rxjava2:rxjava:2.1.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

2、
在编译的时候可能会出现如下错误
Error:Execution failed for task ':rxjava2demo:transformResourcesWithMergeJavaResForDebug'.
> com.android.build.api.transform.TransformException: com.android.builder.packaging.DuplicateFileException: Duplicate files copied in APK META-INF/rxjava.properties
	File1: C:\Users\huangyi\.gradle\caches\modules-2\files-2.1\io.reactivex.rxjava2\rxjava\2.1.1\99895d4dc6d79efbb74360f13c556d95533ad8f8\rxjava-2.1.1.jar
	File2: C:\Users\huangyi\.gradle\caches\modules-2\files-2.1\io.reactivex\rxjava\1.1.5\ece7b5d0870e66d8226dab6dcf47a2b12afff061\rxjava-1.1.5.jar
需要在build.gradlede的android标签中加入:

packagingOptions {
    exclude 'META-INF/rxjava.properties'
}
然后再同步一下编译。

3、观察者模式多了一种现在有两种

Observable--Observer Flowable--Subscriber 有这两种观察者被观察者模式。为什么添加了一种模式,

看图1.x版本上次说到过,这两种订阅都可以,因为本身Subscrible就是实现了Observer的接口

rxjava2.x


rxjava2.x


2.0中写法就改成上面这样了。 Observeable用于订阅Observer,是不支持背压的;而Flowable用于订阅Subscriber,是支持背压(Backpressure)的。


4、

1)背压demo1

  private void flowable01() {
        //先建立一个Flowable对象
        Flowable<String> mFlowable = Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<String> e) throws Exception {
                e.onNext("Hello RxJava2.0");
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER);


        Subscriber<String> mSubscriber = new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                //request的方法的含义是控制上游也就是被观察者发送通知事件的速率
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext s===>" + s);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        };

        mFlowable.subscribe(mSubscriber);
    }


或者可写为:

 Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<String> e) throws Exception {
                e.onNext("Hello RxJava2.0");
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                //request的方法的含义是控制上游也就是被观察者发送通知事件的速率
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext s===>" + s);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        });

打印结果:

07-04 16:07:23.781 11112-11112/com.example.rxjava2demo D/MainActivity: onSubscribe
07-04 16:07:23.782 11112-11112/com.example.rxjava2demo D/MainActivity: onNext s===>Hello RxJava2.0


Flowable所持有的对象是FlowableSubscriber,而和观察者交互的对象是FlowableEmitter。


接着来分析观察者和1.x的不同点:

1:多了onsubscriber方法,写的时候会自动生成。调用了request方法。request的方法的含义是控制上游也就是被观察者发送通知事件的速率。如果不加控制就是MAX最大值。如果去掉request这句,会面的OnNext以及OnComplete就不会被执行

2)请求仅仅获取一个上游事件。但是OnNext发送好几次

  Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<String> e) throws Exception {
                Log.d(TAG, "onNext发送第1次");
                e.onNext("onNext 1");

                Log.d(TAG, "onNext发送第2次");
                e.onNext("onNext 2");

                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                //request的方法的含义是控制上游也就是被观察者发送通知事件的速率
                s.request(1);
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext s===>" + s);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        });


打印结果:

07-04 16:22:57.046 28030-28030/com.example.rxjava2demo D/MainActivity: onSubscribe
07-04 16:22:57.047 28030-28030/com.example.rxjava2demo D/MainActivity: onNext发送第1次
07-04 16:22:57.047 28030-28030/com.example.rxjava2demo D/MainActivity: onNext s===>onNext 1
07-04 16:22:57.047 28030-28030/com.example.rxjava2demo D/MainActivity: onNext发送第2次
07-04 16:22:57.047 28030-28030/com.example.rxjava2demo D/MainActivity: onNext发送第3次


再改动

 Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<String> e) throws Exception {
                Log.d(TAG, "onNext发送第1次");
                e.onNext("onNext 1");

                Log.d(TAG, "onNext发送第2次");
                e.onNext("onNext 2");

                Log.d(TAG, "onNext发送第3次");
                e.onNext("onNext 3");

                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                //request的方法的含义是控制上游也就是被观察者发送通知事件的速率
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext s===>" + s);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        });


打印结果:

07-04 16:21:38.137 26478-26478/com.example.rxjava2demo D/MainActivity: onSubscribe
07-04 16:21:38.138 26478-26478/com.example.rxjava2demo D/MainActivity: onNext发送第1次
07-04 16:21:38.138 26478-26478/com.example.rxjava2demo D/MainActivity: onNext s===>onNext 1
07-04 16:21:38.138 26478-26478/com.example.rxjava2demo D/MainActivity: onNext发送第2次
07-04 16:21:38.138 26478-26478/com.example.rxjava2demo D/MainActivity: onNext s===>onNext 2
07-04 16:21:38.138 26478-26478/com.example.rxjava2demo D/MainActivity: onNext发送第3次
07-04 16:21:38.138 26478-26478/com.example.rxjava2demo D/MainActivity: onNext s===>onNext 3

对比两次结果发现当s.request(1),上游设为1的时候e.onNext只执行1次;当s.request(Long.MAX_VALUE)设为最大的时候,

可以多次执行。


3)完整版本的形式

(1)2.x背压模式:

Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<String> e) throws Exception {
                e.onNext("onNext");
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                //request的方法的含义是控制上游也就是被观察者发送通知事件的速率
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext s===>" + s);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        });

(2)2.x Observable模式(一般模式)

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("Hello RxJava2.0");
                e.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull String s) {
                Log.d(TAG, "onNext s===>" + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

(3)1.x

Observable.create(new Observable.OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello World!");
                subscriber.onCompleted();
            }
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext s====>" + s);
            }
        });

说明:被观察者在1.x中方法名是subscribe,在2.x中的方法是call;

背压模式观察者中添加了方法onSubscribe;

1.x Observable------Subscriber

2.x  Observable------Observer


3)简单的形式
 Flowable.just("hello 2017").subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.d(TAG, "s====>" + s);
            }
        });
2.x普通模式:

 Observable.just("hello 2017").subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.d(TAG, "accept s===>" + s);
            }
        });
rxjava1.x版本是这样写的:
Observable.just("hello 2017").subscribe(new Action1<String>() {

            @Override
            public void call(String s) {
                Log.d(TAG, "s====>" + s);
            }
        });
a.观察者中的对象不一样,1.x是Action1,2.x是Consumer;方法也不一样1.x是call,2.x是accept。
4)map
2.x


  Observable.just("c").map(new Function<String, Integer>() {

            @Override
            public Integer apply(@NonNull String s) throws Exception {
                return s.hashCode();
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.d(TAG, "integer===>" + integer);
            }
        });

1.x

 Observable.just("hello").map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                return s + " 2017";
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d(TAG, "s====>" + s);
            }
        });

代码下载:http://download.csdn.net/detail/yihuangol/9889023


相关标签: android rxjava