RxJava基本使用
更多文章请点击链接:
转载请标明出处:https://www.cnblogs.com/tangzh/p/12088300.html,
rxjava究竟是啥,从根本上来讲,它就是一个实现异步操作的库,并且能够使代码非常简洁。它的异步是使用观察者模式来实现的。
关于观察者模式的介绍,可以看我这一篇文章:
https://www.cnblogs.com/tangzh/p/11175120.html
这里我主要讲rxjava的一些基本用法,基本案例,原理的话暂时不深究:
一、自己构造事件
observable.create(new observableonsubscribe<integer>() { @override public void subscribe(observableemitter emitter) { int i = getnumber(); if (i < 0) { emitter.oncomplete(); return; } else { log.d(tag, thread.currentthread().getname()); emitter.onnext(i); emitter.oncomplete(); } } }) .subscribeon(schedulers.io()) .observeon(androidschedulers.mainthread()) .subscribe(new consumer<integer>() { @override public void accept(integer integer) throws exception { log.d(tag, thread.currentthread().getname()); log.d(tag, integer + ""); } }, new consumer<throwable>() { @override public void accept(throwable throwable) throws exception { } });
rxjava 有四个基本概念:observable
(可观察者,即被观察者)、 observer
(观察者)、 subscribe
(订阅)、事件。observable
和 observer
通过 subscribe()
方法实现订阅关系,从而 observable
可以在需要的时候发出事件来通知 observer
。
onnext():方法用来发送事件。
下面看看其他两个方法:
oncompleted()
: 事件队列完结。rxjava 不仅把每个事件单独处理,还会把它们看做一个队列。rxjava 规定,当不会再有新的onnext()
发出时,需要触发oncompleted()
方法作为标志。onerror()
: 事件队列异常。在事件处理过程中出异常时,onerror()
会被触发,同时队列自动终止,不允许再有事件发出。- 在一个正确运行的事件序列中,
oncompleted()
和onerror()
有且只有一个,并且是事件序列中的最后一个。需要注意的是,oncompleted()
和onerror()
二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
讲一下我们上面的例子,上面这个例子是采用简洁的链式调用来写的:
首先使用 create()
方法来创建一个 observable ,并为它定义事件触发规则,然后通过emitter.onnext(i)传递出来,.subscribeon(schedulers.io())便是指定该事件产生的所在的线程为子线程,.observeon(androidschedulers.mainthread())指定观察者执行的线程为主线程。这时候为止返回的对象为observable对象。
然后该observable对象subscribe绑定观察者(也就是观察者进行订阅),里面有接收被观察者发出来的事件,有一个成功的方法,和一个失败的方法,这样就实现了由被观察者向观察传递事件。
二、对集合里的数据进行变换
list<integer> list = new arraylist<integer>() { { add(0); add(1); add(2); } }; observable.fromiterable(list).map(new function() { @override public object apply(object o) throws exception { int i = (int) o + 1; return string.valueof(i); } }) .tolist() .toobservable().subscribeon(schedulers.io()) .subscribeon(androidschedulers.mainthread()) .subscribe(new consumer() { @override public void accept(object o) throws exception { log.d(tag, o.tostring()); } });
且看,我们需要对某个集合里面的数据一一进行变换,然后发送出来执行其他操作。
上面便是对集合里面的每一项进行加一操作,然后再转换为string类型,然后tolist(),组合成集合发送出来,最后在观察者方法中打印出每一项。
三、合并执行
定义两个被观察者,各自产生事件,然后合并在一起,发送给一个观察者。
首先定义我们上面第一个例子的被观察者,用于发送一个数字:
observable observable1 = observable.create(new observableonsubscribe<integer>() { @override public void subscribe(observableemitter emitter) { int i = getnumber(); if (i < 0) { emitter.oncomplete(); return; } else { log.d(tag, thread.currentthread().getname()); emitter.onnext(i); emitter.oncomplete(); } } }) .subscribeon(schedulers.io());
其次再定义我们上面第二个例子的被观察者:
list<integer> list = new arraylist<integer>() { { add(0); add(1); add(2); } }; observable observable2 = observable.fromiterable(list).map(new function() { @override public object apply(object o) { int i = (int) o + 1; return string.valueof(i); } }) .tolist() .toobservable().subscribeon(schedulers.io());
最后将这两个被观察者的事件合并起来发送给一个观察者:
disposable disposable = observable.zip(observable1, observable2, new bifunction() { @override public object apply(object o, object o2) throws exception { int i = (int) o; string k = (string) ((list) o2).get(0); return k + i; } }) .subscribe(new consumer() { @override public void accept(object o) { log.d(tag, (string) o); } }, new consumer<throwable>() { @override public void accept(throwable throwable) { log.d(tag, throwable.getmessage()); } });
zip方法,顾名思义,有点类似与于打包的意思。
o为被观察者1返回的结果,o2为被观察2返回的结果,将这两个结果一起处理后发送给观察者。打印出来。
现在先介绍这几个,找个时间再整理一些其他的用法以及原理实现。