欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava2.0变换操作符(二)

程序员文章站 2024-02-29 08:50:34
...

1、转换操作符:

①、Map(作用:数据类型色转换)

//实现的功能:将Integer类型转换成String类型
        Observable.create(

                //创建发射器
                new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        //发射事件
                        e.onNext(0);
                        e.onNext(1);
                        e.onNext(2);
                    }
                })
                //转换
                .map(
                        //Integer类型转换成String类型
                        new Function<Integer, String>() {
                            @Override
                            public String apply(@NonNull Integer integer) throws Exception {
                                return "Integer类型转换成String类型:"+integer;
                            }
                        })
                //订阅
                .subscribe(
                        //创建简单的观察者
                        new Consumer<String>() {
                            @Override
                            public void accept(@NonNull String s) throws Exception {
                                System.out.println("接收到的事件:"+s);
                            }
                        }
                );
运行结果:

RxJava2.0变换操作符(二)

②FlatMap(作用:将被观察者发送的事件序列进行 拆分 & 单独转换,在合并成一个新的事件序列,最后进行发送)

原理:

1、为事件序列中每个事件都创建一个 Observable(被观察者)

2、将对每一个 原始事件 转换后的 新的事件 都放入到对应的 Observable

3、将新建的每一个 Observable 都合并到一个 新建的、总的 Observable

4、新建的、总的 Observable 将合并的事件序列 发送给观察者 Observer

示意图:

RxJava2.0变换操作符(二)

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(0);
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        })

        .flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {

                List<String> lists=new ArrayList<String>();

                //把一个事件拆分成n个子事件
                for (int i = 0; i < 5; i++) {
                    lists.add("我是事件:"+integer+",被拆分后的子事件:"+i);
                }

                //发射数组遍历后的数据
                return Observable.fromIterable(lists);
            }
        })
        //订阅
        .subscribe(
                //创建简单的观察者
                new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        System.out.println("接收到的事件:"+s);
                    }
                }
        );
运行结果:

RxJava2.0变换操作符(二)
注意:新合并生成的事件序列顺序是无序的,与旧序列发送的顺序无关。(上面的运行结果为有序纯属巧合


③ConcatMap(作用:类似FlatMap,区别,它是有序的)

RxJava2.0变换操作符(二)

Observable.create(
                new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        e.onNext(0);
                        e.onNext(1);
                        e.onNext(2);
                    }
                }
        )
        //有序的转换符
        .concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                List<String> lists=new ArrayList<String>();

                //把事件拆分成N份
                for (int i = 0; i < 5; i++) {
                    lists.add("我是事件:"+integer+",被拆分成:"+i);
                }

                return Observable.fromIterable(lists);
            }
        })
        //订阅
        .subscribe(
                //创建观测者
                new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        System.out.println("接收到的事件:"+s);
                    }
                }
        );
运行结果:

RxJava2.0变换操作符(二)

总结:ConcatMap,新合并生成的事件序列顺序是有序的,即严格按照旧序列发送事件的顺序。而 FlatMap,新合并的事件序列是无序的。


④Buffer

作用:定期从 被观测者需要发送的事件中 取出一定数量的事件 并且 放到缓存区中,最终发送

RxJava2.0变换操作符(二)

 Observable.create(
                new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        e.onNext(0);
                        e.onNext(1);
                        e.onNext(2);
                        e.onNext(3);
                        e.onNext(4);
                        e.onNext(5);
                        

                    }
                }
        )
                //参数:设置缓存区的大小和步长
                //缓存区的大小 = 每次从被观察者中获取的事件数量
                //步长 = 每次获取新事件的数量
                .buffer(3,1)
                .subscribe(
                        new Consumer<List<Integer>>() {
                            @Override
                            public void accept(@NonNull List<Integer> integers) throws Exception {
                                System.out.println("缓存区里存放放的事件总数:"+integers.size());
                                for (Integer integer : integers) {
                                    System.out.println("从缓存区获取到的事件:"+integer);
                                }
                            }
                        }
                );

运行结果:

RxJava2.0变换操作符(二)

RxJava2.0变换操作符(二)


2、开发实际应用:使用FlatMap嵌套网络请求(比如我们实际开发中的,先注册,注册完成以后去做登录操作。这样就是两步的操作,可以直接用FlatMap来实现)

//举例,为了模拟注册登录,我只是模拟两次不同的网络请求,而实际的注册登录也是同一个道理:

final Observable<RegisterEntity> observableForRegister =   RetrofitFactory.getRetrofit().create(NetApi.class).doRegister();
        final Observable<LoginEntity> observableForLogin = RetrofitFactory.getRetrofit().create(NetApi.class).doLogin();

        observableForRegister
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                //先发起注册请求
                .doOnNext(new Consumer<RegisterEntity>() {
                    @Override
                    public void accept(@NonNull RegisterEntity registerEntity) throws Exception {
                        System.out.println("第一次网络请求:" + "注册成功");
                        System.out.println("注册返回的结果:" + registerEntity.toString());
                    }
                })

                //切换到io线程进行登录请求
                .subscribeOn(Schedulers.io())
                //切换到主线程处理登录请求返回的结果
                .observeOn(AndroidSchedulers.mainThread())

                .flatMap(new Function<RegisterEntity, ObservableSource<LoginEntity>>() {
                    @Override
                    public ObservableSource<LoginEntity> apply(@NonNull RegisterEntity registerEntity) throws Exception {

                        //将注册请求转换成登录请求,即发起第二次网络请求(即登录请求)
                        return observableForLogin;
                    }
                })


                .subscribe(
                        //创建观察者
                        new Consumer<LoginEntity>() {
                            @Override
                            public void accept(@NonNull LoginEntity loginEntity) throws Exception {
                                System.out.println("第二次网络请求:" + "登录成功");
                                System.out.println("登录成功还回的结果:" + loginEntity.toString());
                            }
                        }

                        , new Consumer<Throwable>() {
                            @Override
                            public void accept(@NonNull Throwable throwable) throws Exception {
                                System.out.println("登录失败" + throwable);
                            }
                        }

                );