欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava使用操作符(二)——变换操作符

程序员文章站 2022-06-07 12:51:41
...

1、简介

RxJava 的作用之强大相信每个用过的同学都深有体会,在介绍基本的创建操作符之后,我们继续来看变换操作符,正是因为变换操作符的存在,RxJava的才可以满足不同场景的功能。

变换操作符的作用:对事件序列中的事件 / 整个事件序列 进行加工处理(即变换),使得其转变成不同的事件 / 整个事件序列

2、操作符类型

  • map()
  • flatMap() 
  • ConcatMap ()
  • buffer ()

3、使用介绍

  • map

(1)被观察着发送的每个数据经过固定的函数处理,返回某个数据类型

(2)使用场景 ——数据类型转换

(3)具体实例,将数字转换为字符串:

Observable.just(1, 2, 3)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return String.valueOf(integer);
                    }
                }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                
            }
        });
从上面可以看出,map() 将参数中的 Integer 类型对象转换成一个 String类型 对象后返回,事件的参数类型也转换为String


  • flatMap

(1)将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送

(2)原理:

  • 为每个分发序列中的事件都创建一个Observable对象
  • 对创建的每个Observable对象都执行相应的方法进行装换并储存
  • 对变换后的Observable对象组合成一个对象并发送给观察者

(3)使用场景——对发送的事件进行拆分装换成新的事件

(4)具体实例——将每个数字转换为集合并添加一个0

 Observable.just(1, 2, 3)
                .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(Integer integer) throws Exception {
                        ArrayList<Integer> list = new ArrayList<>();
                        list.add(0);
                        list.add(integer);
                        return Observable.fromIterable(list);
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

            }
        });

输出结果:


注:此时输出的顺序是无序的

  • ConcatMap 

(1)作用和flatMap()相同,不过 拆分 & 重新合并生成的事件序列 的顺序 = 被观察者旧序列生产的顺序

(2)原理与flatMap相同

(3)使用场景——对发送的事件进行拆分装换成新的事件

(4)具体实例——将每个数字转换为集合并添加一个0

 Observable.just(1, 2, 3)
                .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(Integer integer) throws Exception {
                        ArrayList<Integer> list = new ArrayList<>();
                        list.add(0);
                        list.add(integer);
                        return Observable.fromIterable(list);
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

            }
        });

输出结果:

05-02 20:16:09.046 15718-15718/com.example.administrator.glide E/concatMap========: 0
05-02 20:16:09.046 15718-15718/com.example.administrator.glide E/concatMap========: 1
05-02 20:16:09.046 15718-15718/com.example.administrator.glide E/concatMap========: 2
05-02 20:16:09.046 15718-15718/com.example.administrator.glide E/concatMap========: 0
05-02 20:16:09.046 15718-15718/com.example.administrator.glide E/concatMap========: 1
05-02 20:16:09.047 15718-15718/com.example.administrator.glide E/concatMap========: 2
05-02 20:16:09.047 15718-15718/com.example.administrator.glide E/concatMap========: 0
05-02 20:16:09.047 15718-15718/com.example.administrator.glide E/concatMap========: 1
05-02 20:16:09.047 15718-15718/com.example.administrator.glide E/concatMap========: 2

注:此时输出的顺序是按照事件分发顺序的

  • buffer 

(1)定期从被观察者中取出固定的事件放到缓存区中,在一起发送

(2)原理:

  • 按照设置的缓存大小和步长拿去事件
  • 发送缓存区的事件

(3)使用场景——缓存发送的事件

(4)使用实例——发送数据

Observable.just(1,2,3,4,5)
                .buffer(3,1)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        
                    }
                });

输出结果:

05-02 20:16:09.050 15718-15718/com.example.administrator.glide E/buffer=========: 1
05-02 20:16:09.050 15718-15718/com.example.administrator.glide E/buffer=========: 2
05-02 20:16:09.050 15718-15718/com.example.administrator.glide E/buffer=========: 3
05-02 20:16:09.050 15718-15718/com.example.administrator.glide E/buffer=========: 2
05-02 20:16:09.050 15718-15718/com.example.administrator.glide E/buffer=========: 3
05-02 20:16:09.051 15718-15718/com.example.administrator.glide E/buffer=========: 4
05-02 20:16:09.051 15718-15718/com.example.administrator.glide E/buffer=========: 3
05-02 20:16:09.052 15718-15718/com.example.administrator.glide E/buffer=========: 4
05-02 20:16:09.052 15718-15718/com.example.administrator.glide E/buffer=========: 5
05-02 20:16:09.053 15718-15718/com.example.administrator.glide E/buffer=========: 4
05-02 20:16:09.054 15718-15718/com.example.administrator.glide E/buffer=========: 5
05-02 20:16:09.054 15718-15718/com.example.administrator.glide E/buffer=========: 5

4、开发实例

实现注册后自动登陆的请求,业务逻辑

  • 验证注册信息
  • 注册成功后发起登陆请求
  • 验证登陆信息
  • 登陆成功

模拟注册请求,注册一个用户账号:

 final Observable<User> observableLogin = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext(new User("AAAAA","123"));
            }
        });

模拟登陆请求:

Observable<User> observableRegister = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext(new User("AAAAA","123"));
            }
        });

实现业务逻辑:

observableRegister.filter(new Predicate<User>() {
            @Override
            public boolean test(User user) throws Exception {
                return user.getName().equals("AAAAA");
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<User>() {
                    @Override
                    public void accept(User user) throws Exception {
                        Log.e("========",user.getName()+"注册成功");
                    }
                }).observeOn(Schedulers.io())
                .flatMap(new Function<User, ObservableSource<User>>() {
                    @Override
                    public ObservableSource<User> apply(User user) throws Exception {
                        return observableLogin;
                    }
                }).filter(new Predicate<User>() {
            @Override
            public boolean test(User user) throws Exception {
                return user.getName().equals("AAAAA") && user.getPswd().equals("123");
            }
        }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<User>() {
                    @Override
                    public void accept(User user) throws Exception {
                        Log.e("========",user.getName() + "登陆成功");
                    }
                });

上面过程使用observableRegister在注册时先验证注册信息是否为“AAAAA”,验证成功后完成注册,注册成功后切换为登陆的被观察者对象,完成登陆功能。

输出结果:

05-03 01:36:40.142 2310-2310/? E/========: AAAAA注册成功
05-03 01:36:40.316 2310-2310/? E/========: AAAAA登陆成功

这种使用在平时的业务逻辑中会起很大作用,可以在实现更能的同时使代码逻辑更清晰。

相关标签: RxJava flatMap