欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RxJava2.0入门篇

程序员文章站 2024-03-30 22:43:21
传统用法:开启子线程去做耗时任务,业务逻辑越复杂,代码嵌套越严重,Rx系列出来很久了,想自己做一个总结,希望能帮到一部分人 观察者模式先提一嘴 这个老模式简直不想说太多,就说一下流程 1创建被观察者 2创建观察者 3被观察者与观察者进行绑定 4当被观察者状态改变,观察者收到后做响应处理 第一步,Rx ......

传统用法:开启子线程去做耗时任务,业务逻辑越复杂,代码嵌套越严重,rx系列出来很久了,想自己做一个总结,希望能帮到一部分人

观察者模式先提一嘴

这个老模式简直不想说太多,就说一下流程

1创建被观察者

2创建观察者

3被观察者与观察者进行绑定

4当被观察者状态改变,观察者收到后做响应处理

第一步,rxjava创建被观察者

第一种方法:通过observable.create(observableonsubscribe)

这里边的emitter来发射数据和信息

二:通过observable.just(参数);

三:通过observable.from();

第二部,创建观察者

 1 observer<object> observer = new observer<object>() {
 2             @override
 3             public void onsubscribe(disposable d) {
 4                 //被订阅时调用
 5             }
 6 
 7             @override
 8             public void onnext(object o) {
 9           //当被观察者改变的时候调用的方法
10             }
11 
12             @override
13             public void onerror(throwable e) {
14           //处理异常的方法
15             }
16 
17             @override
18             public void oncomplete() {
19           //不再有新的事件的时候调用
20             }
21         };

订阅

observable.subscribe(observer);

订阅之后,代码将依次调用observer的onsubscribe(),observable的subscribe(),observer的onnext与oncomplete

一个简单的模式就形成了

操作符

map -->把一个事件转化成另一个事件

举个栗子:integer转string操作

observable.create(new observableonsubscribe<integer>() {
            @override
            public void subscribe(observableemitter<integer> e) throws exception {
                log.d(tag, "subscribe: ");
                e.onnext(1);
                e.onnext(2);
                e.onnext(3);
            }
        }).map(new function<integer, string>() {
            @override
            public string apply(integer integer) throws exception {
                string mapstr = string.valueof(integer + 1);
                return mapstr;
            }
        }).subscribe(new consumer<string>() {
            @override
            public void accept(string s) throws exception {
                log.d(tag, "accept: " + s);
            }
        });

flatmap -->flatmap是一个非常强大的操作符,flatmap将一个发送事件的上游observable变换为多个发送事件的observables,然后将它们发射的事件合并后放进一个单独的observable里,但是flatmap不能保证事件的顺序

observable.create(new observableonsubscribe<integer>() {
            @override
            public void subscribe(observableemitter<integer> e) throws exception {
                e.onnext(1);
                e.onnext(2);
                e.onnext(3);
            }
        }).flatmap(new function<integer, observable<string>>() {
            @override
            public observable<string> apply(integer integer) throws exception {
                arraylist<string> arraylist = new arraylist<>();
                for (int i = 0; i < 5; i++) {
                    string istr = "flatmap value" + integer;
                    arraylist.add(istr);
                }
                return observable.fromiterable(arraylist).delay(10, timeunit.microseconds);
            }
        }).subscribe(new consumer<string>() {
            @override
            public void accept(string s) throws exception {
                log.d(tag, "accept: " + s);
            }
        });

 

concatmap -->作用和flatmap一样,但是保证了顺序

observable.create(new observableonsubscribe<integer>() {
            @override
            public void subscribe(observableemitter<integer> e) throws exception {
                e.onnext(1);
                e.onnext(11);
                e.onnext(111);
            }
        }).concatmap(new function<integer, observablesource<string>>() {
            @override
            public observablesource<string> apply(integer integer) throws exception {
                arraylist<string> arraylist = new arraylist<>();
                for (int i = 0; i < 3; i++) {
                    arraylist.add("concatmap value" + i + "integer" + integer);
                }
                return observable.fromiterable(arraylist).delay(5, timeunit.milliseconds);
            }
        }).subscribe(new consumer<string>() {
            @override
            public void accept(string s) throws exception {
                log.d(tag, "accept: " + s);
            }
        });

buffer -->

buffer操作符会定期收集observable的数据放进一个数据包裹,然后发射这些包裹,并不是一次发射一个值
buffer操作符将一个observable变换为另一个,原来的observable正常发射数据,变换产生的observable发射这些数据的缓存集合。如果原来的observable发射了一个onerror通知,buffer会立即传递这个通知,而不是首先发射缓存的数据。

scan -->
scan连续地对数据序列的每一项应用一个函数,然后连续发射结果
scan操作符对原始observable发射的第一项数据应用一个函数,然后将这个函数的结果作为自己的第一项数据发射。将函数的结果同第二项数据一起填充给这个函数来产生自己的第二项数据。持续进行这个过程来产生剩余的数据序列。
observable.just(1,2,3,4,5).scan(new bifunction<integer, integer, integer>() {
            @override
            public integer apply(integer integer, integer integer2) throws exception {
                return integer + integer2;
            }
        }).subscribe(new consumer<integer>() {
            @override
            public void accept(integer integer) throws exception {
                log.d(tag, "accept: " + integer);
            }
        });

window -->

window定期将来自原始observable的数据分解为一个observable窗口,发射这些窗口而不是每次发射一项数据

window和buffer类似,但不是发射来自原始observable的数据包,发射的是observables,这些observables中的每一个都发射原始observable数据的一个子集,最后发射一个oncomplete通知。

zip -->

zip通过一个函数将多个observable发送的事件结合到一起,然后发送这些组合到一起的事件。按照严格的顺序应用这个函数,只发射与发射项最少的那个observable一样多的数据,zip在android中的使用,可以适用于如下场景,一个界面需要展示用户的一些信息,这些信息分别要从两个服务器接口中获取,只有当两个数据都获取后才能进行展示。这类同时的信息请求比较适用zip

        //第一个事件
        observable<integer> observable1 = observable.range(1, 5);
        //第二个事件
        observable<integer> observable2 = observable.range(6, 10);
        //合并事件
        observable.zip(observable1, observable2, new bifunction<integer, integer, string>() {
            @override
            public string apply(integer integer, integer integer2) throws exception {
                return string.valueof(integer + integer2);
            }
        }).subscribe(new consumer<string>() {
            @override
            public void accept(string s) throws exception {
                log.d(tag, "accept: " + s);
            }
        });