欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  移动技术

RxJava入门之介绍与基本运用

程序员文章站 2024-03-05 20:46:31
前言 因为这个rxjava内容不算少,而且应用场景非常广,所以这个关于rxjava的文章我们会陆续更新,今天就来先来个入门rxjava吧 初识rxjava 什么是...

前言

因为这个rxjava内容不算少,而且应用场景非常广,所以这个关于rxjava的文章我们会陆续更新,今天就来先来个入门rxjava吧

初识rxjava

什么是rx

很多教程在讲解rxjava的时候,上来就介绍了什么是rxjava。这里我先说一下什么是rx,rx就是reactivex,官方定义是:

    rx是一个函数库,让开发者可以利用可观察序列和linq风格查询操作符来编写异步和基于事件的程序

看到这个定义我只能呵呵,稍微通俗点说是这样的:

    rx是微软.net的一个响应式扩展。rx借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。

这个有点清晰了,至少看到我们熟悉的异步与事件驱动,所以简单点且不准确地来说:

     rx就是一种响应式编程,来创建基于事件的异步程序

注意,这个定义是不准确的,但是对于初学者来说,已经可以有个基本的认知了。

另外还有一点就是rx其实是一种编程思想,用很多语言都可以实现,比如rxjava、rxjs、rxphp等等。而现在我们要说的就是rxjava。

rxjava是什么

二话不说,先上定义:

     rxjava就是一种用java语言实现的响应式编程,来创建基于事件的异步程序

有人问你这不是废话么,好吧那我上官方定义:

     一个在 java vm 上使用可观测的序列来组成异步的、基于事件的程序的库

反正我刚看这句话的时候也呵呵了,当然现在有所领悟了。

     除此之外,就是:异步,它就是一个实现异步操作的库。

扩展的观察者模式

对于普通的观察者模式,这里我就不细说了。简单概括就是,观察者(observer)需要在被观察者(observable)变化的一瞬间做出反应。

而两者通过注册(register)或者订阅(subscribe)的方式进行绑定。

就拿扔物线老师给的例子来说,我丰富了一下如图所示:

RxJava入门之介绍与基本运用

其中这个button就是被观察者(observable),onclicklistener就是观察者(observer),两者通过setonclicklistener达成订阅(subscribe)关系,之后当button产生onclick事件的时候,会直接发送给onclicklistener,它做出相应的响应处理。

当然还有其他的例子,比如android四大组件中的contentprovider与contentobserver之间也存在这样的关系。

而rxjava的观察者模式呢,跟这个差不多,但是也有几点差别:

      observer与observable是通过 subscribe() 来达成订阅关系。

      rxjava中事件回调有三种:onnext() oncompleted() onerror()

      如果一个observerble没有任何的observer,那么这个observable是不会发出任何事件的。

其中关于第三点,这里想说明一下,在rx中,其实observable有两种形式:热启动observable和冷启动observable。

      热启动observable任何时候都会发送消息,即使没有任何观察者监听它。

      冷启动observable只有在至少有一个订阅者的时候才会发送消息

这个地方虽然对于初学者来说区别不大,但是要注意一下,所以上面的第三点其实就针对于冷启动来说的。

另外,关于rxjava的回调事件的总结:

      onnext() :基本事件。

      oncompleted() : 事件队列完结。rxjava 不仅把每个事件单独处理,还会把它们看做一个队列。rxjava 规定,当不会再有新的 onnext()  发出时,需要触发 oncompleted() 方法作为标志。

      onerror() : 事件队列异常。在事件处理过程中出异常时,onerror() 会被触发,同时队列自动终止,不允许再有事件发出。

值得注意的是在一个正确运行的事件序列中, oncompleted()onerror() 有且只有一个,并且是事件序列中的最后一个。如果在队列中调用了其中一个,就不应该再调用另一个。

好了,那我们也附一张图对比一下吧:

RxJava入门之介绍与基本运用

如何实现rxjava

关于实现rxjava的步骤,这里我就大体总结概括一下。

创建observer

在java中,一想到要创建一个对象,我们马上就想要new一个。没错,这里我们也是要new一个observer出来,其实就是实现observer的接口,注意string是接收参数的类型:

//创建observer
observer<string> observer = new observer<string>() {
 @override
 public void onnext(string s) {
  log.i("onnext ---> ", "item: " + s);
 }

 @override
 public void oncompleted() {
  log.i("oncompleted ---> ", "完成");
 }

 @override
 public void onerror(throwable e) {
  log.i("onerror ---> ", e.tostring());
 }
};

当然这里也要提一个实现了 observer 接口的抽象类:subscriber ,它跟 observer 接口几乎完全一样,只是多了两个方法,看看总结:

      onstart() :  它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onstart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。

      unsubscribe() : 用于取消订阅。在这个方法被调用后,subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isunsubscribed() 先判断一下状态。 要在不再使用的时候尽快在合适的地方(例如 onpause() onstop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

虽然多了两个方法,但是基本实现方式跟observer是一样的,所以暂时可以不考虑两者的区别。不过值得注意的是:

实质上,在 rxjava 的 subscribe 过程中,observer 也总是会先被转换成一个 subscriber 再使用。

创建observable

与observer不同的是,observable是通过 create() 方法来创建的。注意string是发送参数的类型:

//创建observable
observable observable = observable.create(new observable.onsubscribe<string>() {
 @override
 public void call(subscriber<? super string> subscriber) {
  subscriber.onnext("hello");
  subscriber.onnext("world");
  subscriber.oncompleted();
 }
});

关于这其中的流程,我们暂且不考虑。

订阅(subscribe)

在之前,我们创建了 observable 和 observer ,现在就需要用 subscribe() 方法来将它们连接起来,形成一种订阅关系:

//订阅
observable.subscribe(observer);

这里其实确实有点奇怪,为什么是observable(被观察者)订阅了observer(观察者)呢?其实我们想一想之前button的点击事件:

button.setonclicklistener(new view.onclicklistener())

button是被观察者,onclicklistener是观察者,setonclicklistener是订阅。我们惊讶地发现,也是被观察者订阅了观察者,所以应该是一种流式api的设计吧,也没啥影响。

完整代码如下:

 //创建observer
 observer<string> observer = new observer<string>() {
  @override
  public void onnext(string s) {
   log.i("onnext ---> ", "item: " + s);
  }

  @override
  public void oncompleted() {
   log.i("oncompleted ---> ", "完成");
  }

  @override
  public void onerror(throwable e) {
   log.i("onerror ---> ", e.tostring());
  }
 };

 //创建observable
 observable observable = observable.create(new observable.onsubscribe<string>() {
  @override
  public void call(subscriber<? super string> subscriber) {
   subscriber.onnext("hello");
   subscriber.onnext("world");
   subscriber.oncompleted();
  }
 });

 //订阅
 observable.subscribe(observer);

运行的结果如下,可以看到observable中发送的string已经被observer接收并打印了出来:

RxJava入门之介绍与基本运用

线程控制——scheduler

好了,这里就是rxjava的精髓之一了。

在rxjava中,scheduler相当于线程控制器,可以通过它来指定每一段代码运行的线程。

rxjava已经内置了几个scheduler,下面是总结:

      schedulers.immediate() : 直接在当前线程运行,相当于不指定线程。这是默认的scheduler。

      schedulers.newthread() : 总是启用新线程,并在新线程执行操作。

      schedulers.io() : i/o 操作(读写文件、读写数据库、网络信息交互等)所使用的scheduler。行为模式和newthread()差不多,区别在于io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()比newthread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。

      schedulers.computation() : 计算所使用的scheduler。这个计算指的是 cpu 密集型计算,即不会被 i/o 等操作限制性能的操作,例如图形的计算。这个scheduler使用的固定的线程池,大小为 cpu 核数。不要把 i/o 操作放在computation()中,否则 i/o 操作的等待时间会浪费 cpu。

       androidschedulers.mainthread() ,android专用线程,指定操作在主线程运行。

那我们如何切换线程呢?rxjava中提供了两个方法:subscribeon() observeon() ,两者的不同点在于:

       subscribeon() : 指定subscribe()订阅所发生的线程,即 call() 执行的线程。或者叫做事件产生的线程。

       observeon() : 指定observer所运行在的线程,即onnext()执行的线程。或者叫做事件消费的线程。

具体实现如下:

//改变运行的线程
observable.subscribeon(schedulers.io());
observable.observeon(androidschedulers.mainthread());

这里确实不好理解,没关系,下面我们在具体例子中观察现象。

而这其中的原理,会在之后的源码级分析的文章中详细解释,现在我们暂且搁下。

第一个rxjava案例

好了,当看完之前的所有基础东西,现在我们就完全可以写一个基于rxjava的demo了。

这里我们用一个基于rxjava的异步加载网络图片来演示。

由于重点在于rxjava对于异步的处理,所以关于如何通过网络请求获取图片,这里就不详细说明了。

另外这里采用的是链式调用,并为重要位置打上log日志,观察方法执行的所在线程。

首先需要添加依赖,这没什么好说的:

dependencies {
 compile filetree(include: ['*.jar'], dir: 'libs')
 testcompile 'junit:junit:4.12'
 ...
 compile 'io.reactivex:rxjava:1.1.6'

}

然后按照步骤来,首先通过create创建observable,注意发送参数的类型是bitmap:

//创建被观察者
observable.create(new observable.onsubscribe<bitmap>() {
 /**
 * 复写call方法
 *
 * @param subscriber 观察者对象
 */
 @override
 public void call(subscriber<? super bitmap> subscriber) {
  //通过url得到图片的bitmap对象
  bitmap bitmap = getbitmapforurl.getbitmap(url);
  //回调观察者方法
  subscriber.onnext(bitmap);
  subscriber.oncompleted();
  log.i(" call ---> ", "运行在 " + thread.currentthread().getname() + " 线程");
 }
})

然后我们需要创建observer,并进行订阅,这里是链式调用

.subscribe(new observer<bitmap>() { //订阅观察者(其实是观察者订阅被观察者)

 @override
 public void onnext(bitmap bitmap) {
  mainimageview.setimagebitmap(bitmap);
  log.i(" onnext ---> ", "运行在 " + thread.currentthread().getname() + " 线程");
 }

 @override
 public void oncompleted() {
  mainprogressbar.setvisibility(view.gone);
  log.i(" oncompleted ---> ", "完成");
 }

 @override
 public void onerror(throwable e) {
  log.e(" onerror --->", e.tostring());
 }
 });

当然网络请求是耗时操作,我们需要在其他线程中执行,而更新ui需要在主线程中执行,所以需要设置线程:

.subscribeon(schedulers.io()) // 指定subscribe()发生在io线程
.observeon(androidschedulers.mainthread()) // 指定subscriber的回调发生在ui线程

这样我们就完成了一个rxjava的基本编写,现在整体看一下代码:

//创建被观察者
observable.create(new observable.onsubscribe<bitmap>() {
 /**
 * 复写call方法
 *
 * @param subscriber 观察者对象
 */
 @override
 public void call(subscriber<? super bitmap> subscriber) {
  //通过url得到图片的bitmap对象
  bitmap bitmap = getbitmapforurl.getbitmap(url);
  //回调观察者方法
  subscriber.onnext(bitmap);
  subscriber.oncompleted();
  log.i(" call ---> ", "运行在 " + thread.currentthread().getname() + " 线程");
 }
})
.subscribeon(schedulers.io()) // 指定subscribe()发生在io线程
.observeon(androidschedulers.mainthread()) // 指定subscriber的回调发生在ui线程
.subscribe(new observer<bitmap>() { //订阅观察者(其实是观察者订阅被观察者)

 @override
 public void onnext(bitmap bitmap) {
  mainimageview.setimagebitmap(bitmap);
  log.i(" onnext ---> ", "运行在 " + thread.currentthread().getname() + " 线程");
 }

 @override
 public void oncompleted() {
  mainprogressbar.setvisibility(view.gone);
  log.i(" oncompleted ---> ", "完成");
 }

 @override
 public void onerror(throwable e) {
  log.e(" onerror --->", e.tostring());
 }
 });

好了,下面是运行的动态图:

RxJava入门之介绍与基本运用
rxjava异步加载网络图片

现在来看一下运行的log日志:

RxJava入门之介绍与基本运用
log

可以看到,call方法(事件产生)执行在io线程,而onnext方法(事件消费)执行在main线程。说明之前分析的是对的。

总结

好了,由于本文是一个rxjava的基础,所以篇幅稍微过长了点。即使这样,很多细节性问题都没有交代清楚。但所幸的是,本文已经将rxjava必要的基础入门知识讲解完了。可能由于技术水平有限,文中难免会有错误或者疏忽之处,欢迎大家指正与交流。希望这篇文章对大家的学习或者工作带来一定的帮助,小编还会陆续更新相关的文章,感兴趣的朋友们请继续关注。