欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  移动技术

Android中通过RxJava进行响应式程序设计的入门指南

程序员文章站 2024-03-04 16:50:11
错误处理 到目前为止,我们都没怎么介绍oncomplete()和onerror()函数。这两个函数用来通知订阅者,被观察的对象将停止发送数据以及为什么停止(成功的完成或者...

错误处理

到目前为止,我们都没怎么介绍oncomplete()和onerror()函数。这两个函数用来通知订阅者,被观察的对象将停止发送数据以及为什么停止(成功的完成或者出错了)。

下面的代码展示了怎么使用这两个函数:

observable.just("hello, world!")
  .map(s -> potentialexception(s))
  .map(s -> anotherpotentialexception(s))
  .subscribe(new subscriber<string>() {
    @override
    public void onnext(string s) { system.out.println(s); }

    @override
    public void oncompleted() { system.out.println("completed!"); }

    @override
    public void onerror(throwable e) { system.out.println("ouch!"); }
  });

代码中的potentialexception() 和 anotherpotentialexception()有可能会抛出异常。每一个observerable对象在终结的时候都会调用oncompleted()或者onerror()方法,所以demo中会打印”completed!”或者”ouch!”。

这种模式有以下几个优点:

1.只要有异常发生onerror()一定会被调用

这极大的简化了错误处理。只需要在一个地方处理错误即可以。

2.操作符不需要处理异常

将异常处理交给订阅者来做,observerable的操作符调用链中一旦有一个抛出了异常,就会直接执行onerror()方法。

3.你能够知道什么时候订阅者已经接收了全部的数据。

知道什么时候任务结束能够帮助简化代码的流程。(虽然有可能observable对象永远不会结束)

我觉得这种错误处理方式比传统的错误处理更简单。传统的错误处理中,通常是在每个回调中处理错误。这不仅导致了重复的代码,并且意味着每个回调都必须知道如何处理错误,你的回调代码将和调用者紧耦合在一起。

使用rxjava,observable对象根本不需要知道如何处理错误!操作符也不需要处理错误状态-一旦发生错误,就会跳过当前和后续的操作符。所有的错误处理都交给订阅者来做。

调度器

假设你编写的android app需要从网络请求数据(感觉这是必备的了,还有单机么?)。网络请求需要花费较长的时间,因此你打算在另外一个线程中加载数据。那么问题来了!

编写多线程的android应用程序是很难的,因为你必须确保代码在正确的线程中运行,否则的话可能会导致app崩溃。最常见的就是在非主线程更新ui。

使用rxjava,你可以使用subscribeon()指定观察者代码运行的线程,使用observeron()指定订阅者运行的线程:

myobservableservices.retrieveimage(url)
  .subscribeon(schedulers.io())
  .observeon(androidschedulers.mainthread())
  .subscribe(bitmap -> myimageview.setimagebitmap(bitmap));

是不是很简单?任何在我的subscriber前面执行的代码都是在i/o线程中运行。最后,操作view的代码在主线程中运行.

最棒的是我可以把subscribeon()和observeron()添加到任何observable对象上。这两个也是操作符!。我不需要关心observable对象以及它上面有哪些操作符。仅仅运用这两个操作符就可以实现在不同的线程中调度。

如果使用asynctask或者其他类似的,我将不得不仔细设计我的代码,找出需要并发执行的部分。使用rxjava,我可以保持代码不变,仅仅在需要并发的时候调用这两个操作符就可以。

订阅(subscriptions)

当调用observable.subscribe(),会返回一个subscription对象。这个对象代表了被观察者和订阅者之间的联系。

ubscription subscription = observable.just("hello, world!")
  .subscribe(s -> system.out.println(s));

你可以在后面使用这个subscription对象来操作被观察者和订阅者之间的联系.

subscription.unsubscribe();
system.out.println("unsubscribed=" + subscription.isunsubscribed());
// outputs "unsubscribed=true"

rxjava的另外一个好处就是它处理unsubscribing的时候,会停止整个调用链。如果你使用了一串很复杂的操作符,调用unsubscribe将会在他当前执行的地方终止。不需要做任何额外的工作!

rxandroid

rxandroid是rxjava的一个针对android平台的扩展。它包含了一些能够简化android开发的工具。

首先,androidschedulers提供了针对android的线程系统的调度器。需要在ui线程中运行某些代码?很简单,只需要使用androidschedulers.mainthread():

retrofitservice.getimage(url)
  .subscribeon(schedulers.io())
  .observeon(androidschedulers.mainthread())
  .subscribe(bitmap -> myimageview.setimagebitmap(bitmap));

如果你已经创建了自己的handler,你可以使用handlerthreadscheduler1将一个调度器链接到你的handler上。

接着要介绍的就是androidobservable,它提供了跟多的功能来配合android的生命周期。bindactivity()和bindfragment()方法默认使用androidschedulers.mainthread()来执行观察者代码,这两个方法会在activity或者fragment结束的时候通知被观察者停止发出新的消息。

androidobservable.bindactivity(this, retrofitservice.getimage(url))
  .subscribeon(schedulers.io())
  .subscribe(bitmap -> myimageview.setimagebitmap(bitmap);

我自己也很喜欢androidobservable.frombroadcast()方法,它允许你创建一个类似broadcastreceiver的observable对象。下面的例子展示了如何在网络变化的时候被通知到:

intentfilter filter = new intentfilter(connectivitymanager.connectivity_action);
androidobservable.frombroadcast(context, filter)
  .subscribe(intent -> handleconnectivitychange(intent));

最后要介绍的是viewobservable,使用它可以给view添加了一些绑定。如果你想在每次点击view的时候都收到一个事件,可以使用viewobservable.clicks(),或者你想监听textview的内容变化,可以使用viewobservable.text()。

viewobservable.clicks(mcardnameedittext, false)
  .subscribe(view -> handleclick(view));

retrofit

大名鼎鼎的retrofit库内置了对rxjava的支持(官方下载页http://square.github.io/retrofit/#download)。通常调用发可以通过使用一个callback对象来获取异步的结果:

@get("/user/{id}/photo")
void getuserphoto(@path("id") int id, callback<photo> cb);

使用rxjava,你可以直接返回一个observable对象。

@get("/user/{id}/photo")
observable<photo> getuserphoto(@path("id") int id);

现在你可以随意使用observable对象了。你不仅可以获取数据,还可以进行变换。
retrofit对observable的支持使得它可以很简单的将多个rest请求结合起来。比如我们有一个请求是获取照片的,还有一个请求是获取元数据的,我们就可以将这两个请求并发的发出,并且等待两个结果都返回之后再做处理:

observable.zip(
  service.getuserphoto(id),
  service.getphotometadata(id),
  (photo, metadata) -> createphotowithdata(photo, metadata))
  .subscribe(photowithdata -> showphoto(photowithdata));

在第二篇里我展示过一个类似的例子(使用flatmap())。这里我只是想展示以下使用rxjava+retrofit可以多么简单地组合多个rest请求。

遗留代码,运行极慢的代码

retrofit可以返回observable对象,但是如果你使用的别的库并不支持这样怎么办?或者说一个内部的内码,你想把他们转换成observable的?有什么简单的办法没?

绝大多数时候observable.just() 和 observable.from() 能够帮助你从遗留代码中创建 observable 对象:

private object oldmethod() { ... }

public observable<object> newmethod() {
  return observable.just(oldmethod());
}

上面的例子中如果oldmethod()足够快是没有什么问题的,但是如果很慢呢?调用oldmethod()将会阻塞住他所在的线程。
为了解决这个问题,可以参考我一直使用的方法–使用defer()来包装缓慢的代码:

private object slowblockingmethod() { ... }

public observable<object> newmethod() {
  return observable.defer(() -> observable.just(slowblockingmethod()));
}

现在,newmethod()的调用不会阻塞了,除非你订阅返回的observable对象。

生命周期

我把最难的不分留在了最后。如何处理activity的生命周期?主要就是两个问题:
1.在configuration改变(比如转屏)之后继续之前的subscription。

比如你使用retrofit发出了一个rest请求,接着想在listview中展示结果。如果在网络请求的时候用户旋转了屏幕怎么办?你当然想继续刚才的请求,但是怎么搞?

2.observable持有context导致的内存泄露

这个问题是因为创建subscription的时候,以某种方式持有了context的引用,尤其是当你和view交互的时候,这太容易发生!如果observable没有及时结束,内存占用就会越来越大。
不幸的是,没有银弹来解决这两个问题,但是这里有一些指导方案你可以参考。

第一个问题的解决方案就是使用rxjava内置的缓存机制,这样你就可以对同一个observable对象执行unsubscribe/resubscribe,却不用重复运行得到observable的代码。cache() (或者 replay())会继续执行网络请求(甚至你调用了unsubscribe也不会停止)。这就是说你可以在activity重新创建的时候从cache()的返回值中创建一个新的observable对象。

observable<photo> request = service.getuserphoto(id).cache();
subscription sub = request.subscribe(photo -> handleuserphoto(photo));

// ...when the activity is being recreated...
sub.unsubscribe();

// ...once the activity is recreated...
request.subscribe(photo -> handleuserphoto(photo));

注意,两次sub是使用的同一个缓存的请求。当然在哪里去存储请求的结果还是要你自己来做,和所有其他的生命周期相关的解决方案一延虎,必须在生命周期外的某个地方存储。(retained fragment或者单例等等)。

第二个问题的解决方案就是在生命周期的某个时刻取消订阅。一个很常见的模式就是使用compositesubscription来持有所有的subscriptions,然后在ondestroy()或者ondestroyview()里取消所有的订阅。

private compositesubscription mcompositesubscription
  = new compositesubscription();

private void dosomething() {
  mcompositesubscription.add(
    androidobservable.bindactivity(this, observable.just("hello, world!"))
    .subscribe(s -> system.out.println(s)));
}

@override
protected void ondestroy() {
  super.ondestroy();

  mcompositesubscription.unsubscribe();
}

你可以在activity/fragment的基类里创建一个compositesubscription对象,在子类中使用它。

注意! 一旦你调用了 compositesubscription.unsubscribe(),这个compositesubscription对象就不可用了, 如果你还想使用compositesubscription,就必须在创建一个新的对象了。