Android中通过RxJava进行响应式程序设计的入门指南
错误处理
到目前为止,我们都没怎么介绍oncomplete()和onerror()函数。这两个函数用来通知订阅者,被观察的对象将停止发送数据以及为什么停止(成功的完成或者出错了)。
下面的代码展示了怎么使用这两个函数:
observable.just("hello, world!") .map(s -> potentialexception(s)) .map(s -> anotherpotentialexception(s)) .subscribe(new subscriber<string>() { @override public void onnext(string s) { system.out.println(s); } @override public void oncompleted() { system.out.println("completed!"); } @override public void onerror(throwable e) { system.out.println("ouch!"); } });
代码中的potentialexception() 和 anotherpotentialexception()有可能会抛出异常。每一个observerable对象在终结的时候都会调用oncompleted()或者onerror()方法,所以demo中会打印”completed!”或者”ouch!”。
这种模式有以下几个优点:
1.只要有异常发生onerror()一定会被调用
这极大的简化了错误处理。只需要在一个地方处理错误即可以。
2.操作符不需要处理异常
将异常处理交给订阅者来做,observerable的操作符调用链中一旦有一个抛出了异常,就会直接执行onerror()方法。
3.你能够知道什么时候订阅者已经接收了全部的数据。
知道什么时候任务结束能够帮助简化代码的流程。(虽然有可能observable对象永远不会结束)
我觉得这种错误处理方式比传统的错误处理更简单。传统的错误处理中,通常是在每个回调中处理错误。这不仅导致了重复的代码,并且意味着每个回调都必须知道如何处理错误,你的回调代码将和调用者紧耦合在一起。
使用rxjava,observable对象根本不需要知道如何处理错误!操作符也不需要处理错误状态-一旦发生错误,就会跳过当前和后续的操作符。所有的错误处理都交给订阅者来做。
调度器
假设你编写的android app需要从网络请求数据(感觉这是必备的了,还有单机么?)。网络请求需要花费较长的时间,因此你打算在另外一个线程中加载数据。那么问题来了!
编写多线程的android应用程序是很难的,因为你必须确保代码在正确的线程中运行,否则的话可能会导致app崩溃。最常见的就是在非主线程更新ui。
使用rxjava,你可以使用subscribeon()指定观察者代码运行的线程,使用observeron()指定订阅者运行的线程:
myobservableservices.retrieveimage(url) .subscribeon(schedulers.io()) .observeon(androidschedulers.mainthread()) .subscribe(bitmap -> myimageview.setimagebitmap(bitmap));
是不是很简单?任何在我的subscriber前面执行的代码都是在i/o线程中运行。最后,操作view的代码在主线程中运行.
最棒的是我可以把subscribeon()和observeron()添加到任何observable对象上。这两个也是操作符!。我不需要关心observable对象以及它上面有哪些操作符。仅仅运用这两个操作符就可以实现在不同的线程中调度。
如果使用asynctask或者其他类似的,我将不得不仔细设计我的代码,找出需要并发执行的部分。使用rxjava,我可以保持代码不变,仅仅在需要并发的时候调用这两个操作符就可以。
订阅(subscriptions)
当调用observable.subscribe(),会返回一个subscription对象。这个对象代表了被观察者和订阅者之间的联系。
ubscription subscription = observable.just("hello, world!") .subscribe(s -> system.out.println(s));
你可以在后面使用这个subscription对象来操作被观察者和订阅者之间的联系.
subscription.unsubscribe(); system.out.println("unsubscribed=" + subscription.isunsubscribed()); // outputs "unsubscribed=true"
rxjava的另外一个好处就是它处理unsubscribing的时候,会停止整个调用链。如果你使用了一串很复杂的操作符,调用unsubscribe将会在他当前执行的地方终止。不需要做任何额外的工作!
rxandroid
rxandroid是rxjava的一个针对android平台的扩展。它包含了一些能够简化android开发的工具。
首先,androidschedulers提供了针对android的线程系统的调度器。需要在ui线程中运行某些代码?很简单,只需要使用androidschedulers.mainthread():
retrofitservice.getimage(url) .subscribeon(schedulers.io()) .observeon(androidschedulers.mainthread()) .subscribe(bitmap -> myimageview.setimagebitmap(bitmap));
如果你已经创建了自己的handler,你可以使用handlerthreadscheduler1将一个调度器链接到你的handler上。
接着要介绍的就是androidobservable,它提供了跟多的功能来配合android的生命周期。bindactivity()和bindfragment()方法默认使用androidschedulers.mainthread()来执行观察者代码,这两个方法会在activity或者fragment结束的时候通知被观察者停止发出新的消息。
androidobservable.bindactivity(this, retrofitservice.getimage(url)) .subscribeon(schedulers.io()) .subscribe(bitmap -> myimageview.setimagebitmap(bitmap);
我自己也很喜欢androidobservable.frombroadcast()方法,它允许你创建一个类似broadcastreceiver的observable对象。下面的例子展示了如何在网络变化的时候被通知到:
intentfilter filter = new intentfilter(connectivitymanager.connectivity_action); androidobservable.frombroadcast(context, filter) .subscribe(intent -> handleconnectivitychange(intent));
最后要介绍的是viewobservable,使用它可以给view添加了一些绑定。如果你想在每次点击view的时候都收到一个事件,可以使用viewobservable.clicks(),或者你想监听textview的内容变化,可以使用viewobservable.text()。
viewobservable.clicks(mcardnameedittext, false) .subscribe(view -> handleclick(view));
retrofit
大名鼎鼎的retrofit库内置了对rxjava的支持(官方下载页http://square.github.io/retrofit/#download)。通常调用发可以通过使用一个callback对象来获取异步的结果:
@get("/user/{id}/photo") void getuserphoto(@path("id") int id, callback<photo> cb);
使用rxjava,你可以直接返回一个observable对象。
@get("/user/{id}/photo") observable<photo> getuserphoto(@path("id") int id);
现在你可以随意使用observable对象了。你不仅可以获取数据,还可以进行变换。
retrofit对observable的支持使得它可以很简单的将多个rest请求结合起来。比如我们有一个请求是获取照片的,还有一个请求是获取元数据的,我们就可以将这两个请求并发的发出,并且等待两个结果都返回之后再做处理:
observable.zip( service.getuserphoto(id), service.getphotometadata(id), (photo, metadata) -> createphotowithdata(photo, metadata)) .subscribe(photowithdata -> showphoto(photowithdata));
在第二篇里我展示过一个类似的例子(使用flatmap())。这里我只是想展示以下使用rxjava+retrofit可以多么简单地组合多个rest请求。
遗留代码,运行极慢的代码
retrofit可以返回observable对象,但是如果你使用的别的库并不支持这样怎么办?或者说一个内部的内码,你想把他们转换成observable的?有什么简单的办法没?
绝大多数时候observable.just() 和 observable.from() 能够帮助你从遗留代码中创建 observable 对象:
private object oldmethod() { ... } public observable<object> newmethod() { return observable.just(oldmethod()); }
上面的例子中如果oldmethod()足够快是没有什么问题的,但是如果很慢呢?调用oldmethod()将会阻塞住他所在的线程。
为了解决这个问题,可以参考我一直使用的方法–使用defer()来包装缓慢的代码:
private object slowblockingmethod() { ... } public observable<object> newmethod() { return observable.defer(() -> observable.just(slowblockingmethod())); }
现在,newmethod()的调用不会阻塞了,除非你订阅返回的observable对象。
生命周期
我把最难的不分留在了最后。如何处理activity的生命周期?主要就是两个问题:
1.在configuration改变(比如转屏)之后继续之前的subscription。
比如你使用retrofit发出了一个rest请求,接着想在listview中展示结果。如果在网络请求的时候用户旋转了屏幕怎么办?你当然想继续刚才的请求,但是怎么搞?
2.observable持有context导致的内存泄露
这个问题是因为创建subscription的时候,以某种方式持有了context的引用,尤其是当你和view交互的时候,这太容易发生!如果observable没有及时结束,内存占用就会越来越大。
不幸的是,没有银弹来解决这两个问题,但是这里有一些指导方案你可以参考。
第一个问题的解决方案就是使用rxjava内置的缓存机制,这样你就可以对同一个observable对象执行unsubscribe/resubscribe,却不用重复运行得到observable的代码。cache() (或者 replay())会继续执行网络请求(甚至你调用了unsubscribe也不会停止)。这就是说你可以在activity重新创建的时候从cache()的返回值中创建一个新的observable对象。
observable<photo> request = service.getuserphoto(id).cache(); subscription sub = request.subscribe(photo -> handleuserphoto(photo)); // ...when the activity is being recreated... sub.unsubscribe(); // ...once the activity is recreated... request.subscribe(photo -> handleuserphoto(photo));
注意,两次sub是使用的同一个缓存的请求。当然在哪里去存储请求的结果还是要你自己来做,和所有其他的生命周期相关的解决方案一延虎,必须在生命周期外的某个地方存储。(retained fragment或者单例等等)。
第二个问题的解决方案就是在生命周期的某个时刻取消订阅。一个很常见的模式就是使用compositesubscription来持有所有的subscriptions,然后在ondestroy()或者ondestroyview()里取消所有的订阅。
private compositesubscription mcompositesubscription = new compositesubscription(); private void dosomething() { mcompositesubscription.add( androidobservable.bindactivity(this, observable.just("hello, world!")) .subscribe(s -> system.out.println(s))); } @override protected void ondestroy() { super.ondestroy(); mcompositesubscription.unsubscribe(); }
你可以在activity/fragment的基类里创建一个compositesubscription对象,在子类中使用它。
注意! 一旦你调用了 compositesubscription.unsubscribe(),这个compositesubscription对象就不可用了, 如果你还想使用compositesubscription,就必须在创建一个新的对象了。
上一篇: File跟Io流的学习