欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  移动技术

RxJava操作符的简单使用

程序员文章站 2022-04-24 10:45:41
一、准备工作在app的build.gradle文件下的dependencies中添加依赖: 二、RxJava的操作符用法: 1、create 2、from 3、just 4、map 5、flatMap 此处用到的getCityWeathData方法返回出来一个泛型为WeathData的Observa ......

一、准备工作在app的build.gradle文件下的dependencies中添加依赖:

    compile 'io.reactivex:rxjava:1.3.0'
    compile 'io.reactivex:rxandroid:1.2.1'

二、rxjava的操作符用法:

1、create

//create  创建observable
        //observable 被观察者
        //subscribers观察者

        observable<string> observable = observable.create(new observable.onsubscribe<string>() {
            @override
            public void call(subscriber<? super string> subscriber) {
                log.i(tag,"call.....");
                if(!subscriber.isunsubscribed()) {
                    subscriber.onnext("我被执行了1"); //只有完成了下面的订阅方法才能执行onnext等方法
                    subscriber.onnext("我被执行了2");
                    subscriber.onnext("我被执行了3");
                    subscriber.onnext("我被执行了4");
                    subscriber.onnext("我被执行了5");
                    subscriber.onnext("我被执行了6");

                    subscriber.oncompleted();
                }
            }
        });

        //完成订阅
        observable.subscribe(new subscriber<string>() {
            @override
            public void oncompleted() {
                log.i(tag,"oncompleted") ;
            }

            @override
            public void onerror(throwable e) {
                log.i(tag,"onerror:"+e.getmessage());
            }

            @override
            public void onnext(string s) {
                log.i(tag,"onnext:"+s);
            }
        }) ;

2、from

//from 也可以创建observable  讲int类型的数组01234转化为observable,通过订阅的onnext可以一个个的取到数组的值
        integer[] arr = {0,1,2,3,4};
        //完成创建observable
        observable<integer> from = observable.from(arr);
        //完成订阅
        from.subscribe(new subscriber<integer>() {
            @override
            public void oncompleted() {
                log.i(tag,"oncompleted") ;
            }

            @override
            public void onerror(throwable e) {
                log.i(tag,"onerror:"+e.getmessage());
            }

            @override
            public void onnext(integer i) {
                log.i(tag,"onnext:"+i);
            }
        }) ;

3、just

//just  也可以创建observable  参数是可变参数
        observable.just(0, 1, 2, 3, 4, 5)
                .subscribe(new action1<integer>() {
                    @override
                    public void call(integer integer) {
                        log.i(tag,"call:"+integer);
                    }
                }) ;

4、map

//map 变换,在该实例中,将integer转换成string
//在实际开发中,需要通过string类型的url获取到图片

        string[] urls = {};
        observable.from(urls)
                .map(new func1<string, bitmap>() { //第一个参数为传入的类型,第二个参数为返回的类型
                    @override
                    public bitmap call(string s) {
                        //s 代表图片url,网络请求通过url 获取到图片
                        return null;
                    }
                }).subscribe(new action1<bitmap>() { // 获取的的bitmap类型再进行订阅处理
            @override
            public void call(bitmap bitmap) {
                //iv.setbackage()
            }
        });

        /*observable.just(0,1,2,3)
                .map(new func1<integer, string>() {
                    @override
                    public string call(integer integer) {
                        return integer+"转换了";
                    }
                }).subscribe(new action1<string>() {
            @override
            public void call(string s) {
                log.i(tag,"call:"+s);//0转换了  1转换了。。。
            }
        });*/

5、flatmap

//flatmap 转换  将多个observable转换成一个observalbe然后发送
        //获取城市的天气

        /**
         *
         * map 是将一种类型转换成另一种类型(可以是任意类型)
         *
         * flatmap 是将一种类转换成observable(泛型可以是任意的)
         *
         */

        string[] citys = {"北京","上海","杭州"};
        observable.from(citys)
                .flatmap(new func1<string, observable<weathdata>>() {
                    @override
                    public observable<weathdata> call(string s) {
                        return getcityweathdata(s);
                    }
                }).subscribe(new action1<weathdata>() {
            @override
            public void call(weathdata weathdata) {
                log.i(tag,weathdata.city+weathdata.state);
            }
        });

此处用到的getcityweathdata方法返回出来一个泛型为weathdata的observable:

/**
     * 获取一个城市的天气数据
     * @param city
     * @return
     */
    private observable<weathdata> getcityweathdata(final string city){
        return observable.just(city)
                .map(new func1<string, weathdata>() {
                    @override
                    public weathdata call(string s) {
                        //通过网络请求获取城市的天气数据
                        weathdata weathdata = new weathdata();
                        weathdata.city = city ;
                        weathdata.state = "晴天" ;
                        return weathdata ;
                    }
                });
    }

6、zip

//zip  将两个observable按照规则严格的合成一个observable
        observable<integer> observable1 = observable.just(10, 20, 30,40);
        observable<integer> observable2 = observable.just(1, 2, 3,4);

        observable.zip(observable1, observable2, new func2<integer, integer, string>() {
            @override
            public string call(integer integer, integer integer2) {
                //定义合并规则
                return integer + integer2 + "abc";
            }
        }).subscribe(new action1<string>() {
            @override
            public void call(string string) {
                log.i(tag,"call:"+string) ;
            }
        });

7、zipwith

//zipwith 将本身与其他的observable按照规则严格的合并成一个observable
        observable.just(10,20,30,40)
                .zipwith(observable.just("a", "b", "c"), new func2<integer, string, string>() {
                    @override
                    public string call(integer integer, string s) {
                        //合并规则
                        return integer + s ;
                    }
                }).subscribe(new action1<string>() {
            @override
            public void call(string s) {
                log.i(tag,"call:"+s) ;
            }
        });

8、retry

//retry 在出错的时候重试(异常的时候重新执行)
        //用处:网络连接异常的时候
        observable.create(new observable.onsubscribe<integer>() {
            @override
            public void call(subscriber<? super integer> subscriber) {
                try {
                    for (int i = 0; i < 5; i++) {
                        if (i == 3) {
                            throw new exception("出错了");
                        }
                        subscriber.onnext(i);
                    }
                    subscriber.oncompleted();
                }catch (exception e){
                    subscriber.onerror(e);
                }
            }
        }).retry(2).subscribe(new subscriber<integer>() {
            @override
            public void oncompleted() {
                log.i(tag,"oncompleted");
            }

            @override
            public void onerror(throwable e) {
                log.i(tag,"onerror:"+e.getmessage()) ;
            }

            @override
            public void onnext(integer integer) {
                log.i(tag,"onnext:"+integer) ;
            }
        });

9、retrywhen

//retrywhen 异常的时候执行
        //网络请求框架中,一般使用retrywhen  要执行操作是连接网络,连接出异常的时候,
        // 1、我们可以直接重复执行连接网络,retry
        // 2、同时我们也可以判断连接异常的类型,再做决定是否重连 retyrwhen
        observable.create(new observable.onsubscribe<integer>() {
            @override
            public void call(subscriber<? super integer> subscriber) {
                log.i(tag,"总出错");
                subscriber.onerror(new throwable("出错了"));
            }
        }).retrywhen(new func1<observable<? extends throwable>, observable<?>>() {
            @override
            public observable<?> call(observable<? extends throwable> observable) {
                return observable.zipwith(observable.range(1, 3), new func2<throwable, integer, integer>() {
                    @override
                    public integer call(throwable throwable, integer integer) {
                        return integer;
                    }
                }).flatmap(new func1<integer, observable<?>>() {
                    @override
                    public observable<?> call(integer integer) {
                        //timer 延迟执行的操作符
                        log.i(tag,"延迟"+integer+"秒");
                        return observable.timer(integer, timeunit.seconds);
                    }
                });
            }
        }).subscribe(new subscriber<integer>() {
            @override
            public void oncompleted() {
                log.i(tag,"oncompleted");
            }

            @override
            public void onerror(throwable e) {
                log.i(tag,"onerror:"+e.getmessage()) ;
            }

            @override
            public void onnext(integer integer) {
                log.i(tag,"onnext:"+integer) ;
            }
        }) ;

10、filter

//filter 按照规则过滤
        observable.just(0,1,2,3,4,5)
                .filter(new func1<integer, boolean>() {
                    @override
                    public boolean call(integer integer) {
                        return integer < 2;
                    }
                }).subscribe(new action1<integer>() {
            @override
            public void call(integer integer) {
                log.i(tag,"call:"+integer) ;
            }
        });

。。。