异步编程RxJava-介绍
前言
前段时间写了一篇对协程的一些理解,里面提到了不管是协程还是callback,本质上其实提供的是一种异步无阻塞的编程模式;并且介绍了java中对异步无阻赛这种编程模式的支持,主要提到了future和completablefuture;之后有同学在下面留言提到了rxjava,刚好最近在看这本书,里面提到了响应式扩展(reactive extensions,rx),而rxjava是rx在jvm上的实现,所有打算对rxjava进一步了解。
rxjava简介
rxjava的官网地址:https://github.com/reactivex/rxjava,
其中对rxjava进行了一句话描述:rxjava – reactive extensions for the jvm – a library for composing asynchronous and event-based programs using observable sequences for the java vm.
大意就是:一个在java vm上使用可观测的序列来组成异步的、基于事件的程序的库。
更详细的说明在netflix技术博客的一篇文章中描述了rxjava的主要特点:
1.易于并发从而更好的利用服务器的能力。
2.易于有条件的异步执行。
3.一种更好的方式来避免回调地狱。
4.一种响应式方法。
与completablefuture对比
之前提到completablefuture真正的实现了异步的编程模式,一个比较常见的使用场景:
completablefuture<integer> future = completablefuture.supplyasync(耗时函数); future<integer> f = future.whencomplete((v, e) -> { system.out.println(v); system.out.println(e); }); system.out.println("other...");
下面用一个简单的例子来看一下rxjava是如何实现异步的编程模式:
observable<long> observable = observable.just(1, 2) .subscribeon(schedulers.io()).map(new func1<integer, long>() { @override public long call(integer t) { try { thread.sleep(1000); //耗时的操作 } catch (interruptedexception e) { e.printstacktrace(); } return (long) (t * 2); } }); observable.subscribe(new subscriber<long>() { @override public void oncompleted() { system.out.println("oncompleted"); } @override public void onerror(throwable e) { system.out.println("error" + e); } @override public void onnext(long result) { system.out.println("result = " + result); } }); system.out.println("other...");
func1中以异步的方式执行了一个耗时的操作,subscriber(观察者)被订阅到observable(被观察者)中,当耗时操作执行完会回调subscriber中的onnext方法。
其中的异步方式是在subscribeon(schedulers.io())中指定的,schedulers.io()可以理解为每次执行耗时操作都启动一个新的线程。
结构上其实和completablefuture很像,都是异步的执行一个耗时的操作,然后在有结果的时候主动告诉我结果。那我们还需要rxjava干嘛,不知道你有没有注意,上面的例子中其实提供2条数据流[1,2],并且处理完任何一个都会主动告诉我,当然这只是它其中的一项功能,rxjava还有很多好用的功能,在下面的内容会进行介绍。
异步观察者模式
上面这段代码有没有发现特别像设计模式中的:观察者模式;首先提供一个被观察者observable,然后把观察者subscriber添加到了被观察者列表中;
rxjava中一共提供了四种角色:observable、observer、subscriber、subjects
observables和subjects是两个被观察者,observers和subscribers是观察者;
当然我们也可以查看一下源码,看一下jdk中的observer和rxjava的observer
jdk中的observer:
public interface observer { void update(observable o, object arg); }
rxjava的observer:
public interface observer<t> { void oncompleted(); void onerror(throwable e); void onnext(t t); }
同时可以发现subscriber是implements observer的:
public abstract class subscriber<t> implements observer<t>, subscription
可以发现rxjava中在observer中引入了2个新的方法:oncompleted()和onerror()
oncompleted():即通知观察者observable没有更多的数据,事件队列完结
onerror():在事件处理过程中出异常时,onerror()会被触发,同时队列自动终止,不允许再有事件发出。
正是因为rxjava提供了同步和异步两种方式进行事件的处理,个人觉得异步的方式更能体现rxjava的价值,所以这里给他命名为异步观察者模式。
好了,下面正式介绍rxjava的那些灵活的操作符,这里仅仅是简单的介绍和简单的实例,具体用在什么场景下,会在以后的文章中介绍
maven引入
<dependency> <groupid>io.reactivex</groupid> <artifactid>rxjava</artifactid> <version>1.2.4</version> </dependency>
创建observable
1.create()创建一个observable,并为它定义事件触发规则
observable<integer> observable = observable .create(new observable.onsubscribe<integer>() { @override public void call(subscriber<? super integer> observer) { for (int i = 0; i < 5; i++) { observer.onnext(i); } observer.oncompleted(); } }); observable.subscribe(new observer<integer>() {...});
2.from()可以从一个列表中创建一个observable,observable将发射出列表中的每一个元素
list<integer> items = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items.add(i); } observable<integer> observable = observable.from(items); observable.subscribe(new observer<integer>() {...});
3.just()将传入的参数依次发送出来
observable<integer> observable = observable.just(1, 2, 3); observable.subscribe(new observer<integer>() {...});
过滤observable
1.filter()来过滤我们观测序列中不想要的值
list<integer> items = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items.add(i); } observable<integer> observable = observable.from(items).filter( new func1<integer, boolean>() { @override public boolean call(integer t) { return t == 1; } }); observable.subscribe(new observer<integer>() {...});
2.take()和tasklast()分别取前几个元素和后几个元素
list<integer> items = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items.add(i); } observable<integer> observable = observable.from(items).take(3); observable.subscribe(new observer<integer>() {...});
observable<integer> observable = observable.from(items).takelast(2);
3.distinct()和distinctuntilchanged()
distinct()过滤掉重复的值
list<integer> items = new arraylist<integer>(); items.add(1); items.add(10); items.add(10); observable<integer> observable = observable.from(items).distinct(); observable.subscribe(new observer<integer>() {...});
distinctuntilchanged()列发射一个不同于之前的一个新值时让我们得到通知
list<integer> items = new arraylist<integer>(); items.add(1); items.add(100); items.add(100); items.add(200); observable<integer> observable = observable.from(items).distinctuntilchanged(); observable.subscribe(new observer<integer>() {...});
4.first()和last()分别取第一个元素和最后一个元素
list<integer> items = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items.add(i); } // observable<integer> observable = observable.from(items).first(); observable<integer> observable = observable.from(items).last(); observable.subscribe(new observer<integer>() {...});
5.skip()和skiplast()分别从前或者后跳过几个元素
list<integer> items = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items.add(i); } // observable<integer> observable = observable.from(items).skip(2); observable<integer> observable = observable.from(items).skiplast(2); observable.subscribe(new observer<integer>() {...});
6.elementat()取第几个元素进行发射
list<integer> items = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items.add(i); } observable<integer> observable = observable.from(items).elementat(2); observable.subscribe(new observer<integer>() {...});
7.sample()指定发射间隔进行发射
list<integer> items = new arraylist<integer>(); for (int i = 0; i < 50000; i++) { items.add(i); } observable<integer> observable = observable.from(items).sample(1,timeunit.microseconds); observable.subscribe(new observer<integer>() {...});
8.timeout()设定的时间间隔内如果没有得到一个值则发射一个错误
list<integer> items = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items.add(i); } observable<integer> observable = observable.from(items).timeout(1,timeunit.microseconds); observable.subscribe(new observer<integer>() {...onerror()...});
9.debounce()在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个
list<integer> items = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items.add(i); } observable<integer> observable = observable.from(items).debounce(1,timeunit.microseconds); observable.subscribe(new observer<integer>() {...});
转换observable
1.map()接收一个指定的func对象然后将它应用到每一个由observable发射的值上
list<integer> items = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items.add(i); } observable<integer> observable = observable.from(items).map( new func1<integer, integer>() { @override public integer call(integer t) { return t * 2; } }); observable.subscribe(new observer<integer>() {...});
2.flatmap()函数提供一种铺平序列的方式,然后合并这些observables发射的数据
final scheduler scheduler = schedulers.from(executors.newfixedthreadpool(3)); list<integer> items = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items.add(i); } observable<integer> observable = observable.from(items).flatmap( new func1<integer, observable<? extends integer>>() { @override public observable<? extends integer> call(integer t) { list<integer> items = new arraylist<integer>(); items.add(t); items.add(99999); return observable.from(items).subscribeon(scheduler); } }); observable.subscribe(new observer<integer>() {...});
重要的一点提示是关于合并部分:它允许交叉。这意味着flatmap()不能够保证在最终生成的observable中源observables确切的发射
顺序。
3.concatmap()函数解决了flatmap()的交叉问题,提供了一种能够把发射的值连续在一起的铺平函数,而不是合并它们。
示例代码同上,将flatmap替换为concatmap,输出的结果来看是有序的
4.switchmap()和flatmap()很像,除了一点:每当源observable发射一个新的数据项(observable)时,它将取消订阅并停止监视之前那个数据项产生的observable,并开始监视当前发射的这一个。
示例代码同上,将flatmap替换为switchmap,输出的结果只剩最后一个值
5.scan()是一个累积函数,对原始observable发射的每一项数据都应用一个函数,计算出函数的结果值,并将该值填充回可观测序列,等待和下一次发射的数据一起使用。
list<integer> items = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items.add(i); } observable<integer> observable = observable.from(items).scan( new func2<integer, integer, integer>() { @override public integer call(integer t1, integer t2) { system.out.println(t1 + "+" + t2); return t1 + t2; } }); observable.subscribe(new observer<integer>() {...});
6.groupby()来分组元素
list<integer> items = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items.add(i); } observable<groupedobservable<integer, integer>> observable = observable .from(items).groupby(new func1<integer, integer>() { @override public integer call(integer t) { return t % 3; } }); observable.subscribe(new observer<groupedobservable<integer, integer>>() { @override public void onnext(final groupedobservable<integer, integer> t) { t.subscribe(new action1<integer>() { @override public void call(integer value) { system.out.println("key:" + t.getkey()+ ", value:" + value); } }); });
7.buffer()函数将源observable变换一个新的observable,这个新的observable每次发射一组列表值而不是一个一个发射。
list<integer> items = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items.add(i); } observable<list<integer>> observable = observable.from(items).buffer(2); observable.subscribe(new observer<list<integer>>() {...});
8.window()函数和 buffer()很像,但是它发射的是observable而不是列表
list<integer> items = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items.add(i); } observable<observable<integer>> observable = observable.from(items).window(2); observable.subscribe(new observer<observable<integer>>() { @override public void onnext(observable<integer> t) { t.subscribe(new action1<integer>() { @override public void call(integer t) { system.out.println("this action1 = " + this+ ",result = " + t); } }); //oncompleted和onerror });
9.cast()它将源observable中的每一项数据都转换为新的类型,把它变成了不同的class
list<father> items = new arraylist<father>(); items.add(new son()); items.add(new son()); items.add(new father()); items.add(new father()); observable<son> observable = observable.from(items).cast(son.class); observable.subscribe(new observer<son>() {...}); class father { } class son extends father { }
组合observables
1.merge()方法将帮助你把两个甚至更多的observables合并到他们发射的数据项里
list<integer> items1 = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items1.add(i); } list<integer> items2 = new arraylist<integer>(); for (int i = 5; i < 10; i++) { items2.add(i); } observable<integer> observable1 = observable.from(items1); observable<integer> observable2 = observable.from(items2); observable<integer> observablemerge = observable.merge(observable1,observable2); observable.subscribe(new observer<integer>() {...});
2.zip()合并两个或者多个observables发射出的数据项,根据指定的函数 func* 变换它们,并发射一个新值
list<integer> items1 = new arraylist<integer>(); for (int i = 0; i < 5; i++) { items1.add(i); } list<integer> items2 = new arraylist<integer>(); for (int i = 5; i < 10; i++) { items2.add(i); } observable<integer> observable1 = observable.from(items1); observable<integer> observable2 = observable.from(items2); observable<integer> observablezip = observable.zip(observable1, observable2, new func2<integer, integer, integer>() { @override public integer call(integer t1, integer t2) { return t1 * t2; } }); observable.subscribe(new observer<integer>() {...});
3.combinelatest()把两个observable产生的结果进行合并,这两个observable中任意一个observable产生的结果,都和另一个observable最后产生的结果,按照一定的规则进行合并。
observable<long> observable1 = observable.interval(1000,timeunit.milliseconds); observable<long> observable2 = observable.interval(1000,timeunit.milliseconds); observable.combinelatest(observable1, observable2, new func2<long, long, long>() { @override public long call(long t1, long t2) { system.out.println("t1 = " + t1 + ",t2 = " + t2); return t1 + t2; } }).subscribe(new observer<long>() {...}); thread.sleep(100000);
4.join()类似combinelatest(),但是join操作符可以控制每个observable产生结果的生命周期,在每个结果的生命周期内,可以与另一个observable产生的结果按照一定的规则进行合并
observable<long> observable1 = observable.interval(1000, timeunit.milliseconds); observable<long> observable2 = observable.interval(1000, timeunit.milliseconds); observable1.join(observable2, new func1<long, observable<long>>() { @override public observable<long> call(long t) { system.out.println("left=" + t); return observable.just(t).delay(1000, timeunit.milliseconds); } }, new func1<long, observable<long>>() { @override public observable<long> call(long t) { system.out.println("right=" + t); return observable.just(t).delay(1000, timeunit.milliseconds); } }, new func2<long, long, long>() { @override public long call(long t1, long t2) { return t1 + t2; } }).subscribe(new observer<long>() { @override public void oncompleted() { system.out.println("observable completed"); } @override public void onerror(throwable e) { system.out.println("oh,no! something wrong happened!"); } @override public void onnext(long t) { system.out.println("[result=]" + t); } }); thread.sleep(100000);
5.switchonnext()把一组observable转换成一个observable,对于这组observable中的每一个observable所产生的结果,如果在同一个时间内存在两个或多个observable提交的结果,只取最后一个observable提交的结果给订阅者
observable<observable<long>> observable = observable.interval(2, timeunit.seconds) .map(new func1<long, observable<long>>() { @override public observable<long> call(long along) { return observable.interval(1, timeunit.milliseconds).take(5); } }).take(2); observable.switchonnext(observable).subscribe(new observer<long>() {...}); thread.sleep(1000000);
6.startwith()在observable开始发射他们的数据之前,startwith()通过传递一个参数来先发射一个数据序列
observable.just(1000, 2000).startwith(1, 2).subscribe(new observer<integer>() {...});
总结
本文主要对rxjava进行了简单的介绍,从异步编程这个角度对rxjava进行了分析;并且针对observable的过滤,转换,组合的api进行了简单的介绍,当然我们更关心的是rxjava有哪些应用场景。