RxJava 2.0版本使用精炼详解
一,前期基础知识储备
笔者之前写过两篇关于RxJava1.0的文章
《RxJava常见操作符讲解》,感兴趣的读者可以看一看。里面有有关RxJava异步操作库的详细介绍,本文不再赘述。
RxJava1.0版本添加依赖:
implementation 'io.reactivex:rxandroid:1.2.1'
implementation 'io.reactivex:rxjava:1.2.1'
RxJava2.0版本添加依赖:
// RxJava2.0
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.1.3'
注意二者的依赖环境已经发生改变,所以类和接口所处的包都已经发生改变。
目前RxJava已经更新到2.0版本,2.0版本和1.0版本还是有比较大的区别,这里会在下面的代码中有所体现。
二,上代码,具体实现
1. RxJava2的基础使用
从打印“Hello world”开始,给出从简单到复杂的不同实现方式。
1)简单方式
private static void helloworldSimple() {
// 创建消费者,消费者接收一个String类型的事件
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: 0," + s);
}
};
Observable.just("Hello WOrld").subscribe(consumer);
}
打印结果:
accept: 0,Hello WOrld
在简单版本中,我们创建了一个消费者consumer,其实也可以称之为订阅者或者观察者,消费者实现accept()方法,接收一个字符串类型的数据或者事件。
被观察者Observable通过just()方法发出一个“Hello World”的呼唤,然后我们使用subscribe方法指定呼唤的接收者或者消费者,即consumer。那么consumer就能接收到被观察者的呼唤,打印出log。
2)复杂方式
/**
* Observer相比Consumer,会对消息的处理更加细化
*/
private static void helloworldComplex() {
// 创建一个观察者
Observer<String> observer = new Observer<String>() {
// 当Observable 调用subscribe方法时会回调该方法
// onSubscribe表示已经开始观察被观察者了
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: 1");
}
// 调用onSubscribe方法后
// 表示收到被观察者的消息
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: 2," + s);
}
// 出错时调用
@Override
public void onError(Throwable e) {
}
// onNext之后调用
// 表示接收消息结束
@Override
public void onComplete() {
Log.d(TAG, "onComplete: 3");
}
};
Observable.just("Hello World").subscribe(observer);
}
打印结果为:
onSubscribe: 1
onNext: 2,Hello World
onComplete: 3
这里我们创建了一个Observer而不是一个Consumer,Observer在接口方法上要多很多,有onSubscribe(), onNext(), onError(), onComplete()。被观察者还是Observable,也发出一声“Hello World”,然后通过subscribe指定观察者observer。
从结果中,可以看到observer中方法的调用顺序,onSubscribe()表示已经开始观察被观察者了,onNext()表示收到被观察者的消息,onComplete()表示接收消息结束。所以Observer相比Consumer会对消息的处理更加细化。
3)受控制的复杂方式
/**
* 在onSubscribe方法中会接收到一个Disposable对象,
* 该对象相当于一个开关,如果开关关闭,则观察者不会接收到任何事件或数据
*/
private static void helloworldComplexDisposable() {
Observer<String> observer = new Observer<String>() {
// 声明一个Disposable对象
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d; // 保存disposable对象
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: 2," + s);
if (s.equals("No")) {
disposable.dispose();
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: 3");
}
};
Observable.just("Hello World","No","No").subscribe(observer);
}
打印结果如下:
onNext: 2,Hello World
onNext: 2,No
在onSubscribe()方法中会接收一个Disposable对象,该对象相当于一个开关,如果开关关闭,则观察者不会收到任何事件和数据。
使用一个变量保存Disposable对象,在onNext方法中如果传过来的字符串为“No”,则调用dispose()方法关闭事件的接收。被观察者会发出3个字符串,执行结果可以看到最后一个字符串没有打印,甚至onComplete()方法都不会执行。
4)十分复杂的方式
/**
* 使用Create方法创建被观察者,并实现subscribe方法,
* 接收一个ObservableEmitter对象,即被观察者的发射器,发射器能够发出数据和事件
* 使用链式调用,让代码看起来更加整洁,
* 上面发出数据和事件,下面接收数据和事件
*/
private static void helloworldPlus() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "subscribe: send hello world");
emitter.onNext("hello world");
Log.d(TAG, "subscribe: send No");
emitter.onNext("No");
Log.d(TAG, "subscribe: send No");
emitter.onNext("No");
Log.d(TAG, "subscribe: send complete");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
观察者还是原来的观察者,被观察者则使用create()的方法创建出来,并实现了subscribe()方法,接收一个ObservableEmitter对象,即被观察者的发射器,发射器能够发出数据和事件。
打印结果为:
onSubscribe:
subscribe: send hello world
onNext: hello world
subscribe: send No
onNext: No
subscribe: send No
onNext: No
subscribe: send complete
onComplete:
从打印结果可知,发射器每发出一个数据或者事件,观察者就会收到。上述代码中使用了链式调用,让代码看起来更加整洁。上面发出数据和事件,下面接收数据和事件。
2.RxJava操作符
RxJava提供了大量的操作符来完成对数据的处理,这些操作符也可以理解为函数。如果把RxJava比作一条数据流水线,那么操作符就是一道工序,数据通过这些工序的加工变换,组装,最后生产出我们需要的数据。
记住:操作符都是对被观察发出数据的操作。
1)过滤操作符filter
2019男篮世界杯,中国最终负于委内瑞拉,其中关键一局中国对战波兰,周琦发球失误,罚球失误,将比赛拖入加时赛,最终输掉比赛,由此由来成语“摇头叹琦”。这里演示过滤“摇头叹琦”。
/**
* filter 过滤操作符
*/
private static void filter() {
Observable.just("姚明","阿联","摇头叹琦","大侄子")
.filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
Log.d(TAG, "test: " + s);
return s.equals("摇头叹琦"); // 只检查出摇头叹琦
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
}
打印结果为:
test: 姚明
test: 阿联
test: 摇头叹琦
accept: 摇头叹琦
test: 大侄子
2)map变换操作符
private static void map () {
Student student = new Student("Jack");
// map操作符,从student类型转换成Developer
Observable.just(student).map(new Function<Student, Developer>() {
@Override
public Developer apply(Student student) throws Exception {
Log.d(TAG, "apply: " + student.toString());
Developer developer = new Developer();
developer.setName(student.getName());
developer.setSkill("Android");
return developer;
}
}).subscribe(new Observer<Developer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Developer developer) {
Log.d(TAG, "onNext: " + developer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
map操作符能够完成数据类型的转换。被观察者发送出一个student,而观察者想要接收一个developer(两个都是简单的实体类),那么在student发送给观察者之前,需要对student进行一些培训,让它转换成一个developer。
打印结果为:
onSubscribe:
apply: Student{name='Jack'}
onNext: Developer{name='Jack', skill='Android'}
onComplete:
结果:被观察者发送出一个Student,在观察者的onNext()方法中收到了一个Developer。
篇幅有限,文中只介绍了两种操作符的用法,更多内容请参考:
3.异步
作为一个异步操作库,RxJava提供了非常方便的API来完成线程的调度,内置的线程调度器有:
- Schedule.single(),单线程调度器,线程可复用;
- Schedule.newThread(),为每个任务创建新的线程;
- Schedule.io(),处理I/O任务,内部线程池实现,可根据需求增长;
- Schedule.computation(),处理计算任务,如事件循环和回调任务
- AndroidSchedulers.mainThread(),Android主线程调度器,属于RxAndroid。
线程调度器实际上指定事件或者数据在什么样的线程中处理。这里主要涉及两个API:subscribeOn()和observeOn()
1)subscribeOn
默认情况下,被观察者和观察者在同一线程中执行。而subscribeOn方法实际上是指定被观察者的代码在那种线程中执行。
需要注意的是,subscribeOn方法调用的位置没有特殊指定,它可以放置在其他操作符的前面,中间或者后面,subscribeOn也可以被多次调用,但以第一次调用为准。
2)observerOn
observeOn指定的是后续的操作符以及观察者的代码在什么样的线程中执行。而且observeOn可以被多次调用,每次都生效。
注意observerOn既可以指定被观察者操作符的线程;又可以指定观察者所在的线程。
______________________________________________________________________________
4. RxJava2.0与Retrofit2.0集成
笔者之前写过三篇Retrofit的文章
感兴趣的读者可以去看一看,里面详细阐述了Retrofit2.0的用法,并且结合RxJava1.0实现了简单的网络请求。
1)添加依赖并开启网络权限
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.3'
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:converter-gson:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
<uses-permission android:name="android.permission.INTERNET"/>
2)定义网络接口
通过豆瓣API去请求豆瓣电影Top250的网址:
http://douban.uieee.com/v2/movie/top250?start=0&count=10
所以定义的接口如下:
public interface Api {
@GET("top250")
Observable<MovieBean> listTop250(@Query("start") int start,
@Query("count") int count);
}
注意:豆瓣API接口网址可能会有变化,以豆瓣API的官方文档为准。
参数start表示查询数据的起始位置,count表示查询数据的个数。返回Observeable类型而不是Call类型。MovieBean为网络结果解析成的JavaBean。
3) 使用单例模式获取Retrofit实例
public class MovieRetrofit {
private static MovieRetrofit sMovieRetrofit;
private final Api mApi;
// 使用单例模式
public static MovieRetrofit getInstance() {
if (sMovieRetrofit == null) {
synchronized (MovieRetrofit.class) {
if (sMovieRetrofit == null) {
sMovieRetrofit = new MovieRetrofit();
}
}
}
return sMovieRetrofit;
}
private MovieRetrofit() {
Retrofit retrofit = new Retrofit.Builder().baseUrl("http://douban.uieee.com/v2/movie/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
// 创建网络接口代理
mApi = retrofit.create(Api.class);
}
// 返回API接口对象的实现
public Api getApi() {
return mApi;
}
}
使用单例模式获取Retrofit实例对象, 实现Api的接口,获取接口实例对象.
4)发送网络请求刷新列表
接下来就可以使用接口获取一个Observable对象,通过subscribeOn指定网络请求以及网络响应解析的线程,通过ObserverOn指定刷新UI的线程,实现如下:
/**
* 简化代码,Activity继承自ListActivity
*/
public class MovieListActivity extends ListActivity {
private static final String TAG = "MovieListActivity";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
// 获取Observable对象,查询排名前25的电影
Observable<MovieBean> movieBeanObservable = MovieRetrofit.getInstance().getApi().listTop250(0, 25);
movieBeanObservable.subscribeOn(Schedulers.io())
// 将网络结果转为电影名的集合
.map(new Function<MovieBean, List<String>>() {
@Override
public List<String> apply(MovieBean movieBean) throws Exception {
List<String> array = new ArrayList<String>();
for (int i = 0; i < movieBean.getSubjects().size(); i++) {
String title = movieBean.getSubjects().get(i).getTitle();
Log.d(TAG, "apply: " + title);
array.add(title);
}
return array;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<String> value) {
ArrayAdapter<String> arrayAdapter = new ArrayAdapter<String>(MovieListActivity.this,
android.R.layout.simple_list_item_1, value);
setListAdapter(arrayAdapter); // 设置Adapter刷新列表
}
@Override
public void onError(Throwable e) {
Toast.makeText(MovieListActivity.this, "onError",
Toast.LENGTH_SHORT).show();
Log.d(TAG, "onError: " + e.getLocalizedMessage());
}
@Override
public void onComplete() {
Toast.makeText(MovieListActivity.this, "onComplete",
Toast.LENGTH_SHORT).show();
}
});
}
}
Retrofit和RxJava结合后,就可以使用RxJava对结果进行一系列的操作,当需要对网络结果做一些复杂的处理时,RxJava的优势非常大.
更多RxJava在Android平台的使用,可以参考开源项目:
最后给出用到的实体类MovieBean(实体类比较复杂,建议使用Json网站去自动根据Json数据生成Java Bean.)
public class MovieBean {
private int count;
private int start;
private int total;
private String title;
private List<SubjectsBean> subjects;
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public int getStart() {
return start;
}
public void setStart(int start) {
this.start = start;
}
public int getTotal() {
return total;
}
public void setTotal(int total) {
this.total = total;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public List<SubjectsBean> getSubjects() {
return subjects;
}
public void setSubjects(List<SubjectsBean> subjects) {
this.subjects = subjects;
}
public static class SubjectsBean {
private RatingBean rating;
private String title;
private int collect_count;
private String original_title;
private String subtype;
private String year;
private ImagesBean images;
private String alt;
private String id;
private List<String> genres;
private List<CastsBean> casts;
private List<DirectorsBean> directors;
public RatingBean getRating() {
return rating;
}
public void setRating(RatingBean rating) {
this.rating = rating;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public int getCollect_count() {
return collect_count;
}
public void setCollect_count(int collect_count) {
this.collect_count = collect_count;
}
public String getOriginal_title() {
return original_title;
}
public void setOriginal_title(String original_title) {
this.original_title = original_title;
}
public String getSubtype() {
return subtype;
}
public void setSubtype(String subtype) {
this.subtype = subtype;
}
public String getYear() {
return year;
}
public void setYear(String year) {
this.year = year;
}
public ImagesBean getImages() {
return images;
}
public void setImages(ImagesBean images) {
this.images = images;
}
public String getAlt() {
return alt;
}
public void setAlt(String alt) {
this.alt = alt;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public List<String> getGenres() {
return genres;
}
public void setGenres(List<String> genres) {
this.genres = genres;
}
public List<CastsBean> getCasts() {
return casts;
}
public void setCasts(List<CastsBean> casts) {
this.casts = casts;
}
public List<DirectorsBean> getDirectors() {
return directors;
}
public void setDirectors(List<DirectorsBean> directors) {
this.directors = directors;
}
public static class RatingBean {
private int max;
private double average;
private String stars;
private int min;
public int getMax() {
return max;
}
public void setMax(int max) {
this.max = max;
}
public double getAverage() {
return average;
}
public void setAverage(double average) {
this.average = average;
}
public String getStars() {
return stars;
}
public void setStars(String stars) {
this.stars = stars;
}
public int getMin() {
return min;
}
public void setMin(int min) {
this.min = min;
}
}
public static class ImagesBean {
private String small;
private String large;
private String medium;
public String getSmall() {
return small;
}
public void setSmall(String small) {
this.small = small;
}
public String getLarge() {
return large;
}
public void setLarge(String large) {
this.large = large;
}
public String getMedium() {
return medium;
}
public void setMedium(String medium) {
this.medium = medium;
}
}
public static class CastsBean {
private String alt;
private AvatarsBean avatars;
private String name;
private String id;
public String getAlt() {
return alt;
}
public void setAlt(String alt) {
this.alt = alt;
}
public AvatarsBean getAvatars() {
return avatars;
}
public void setAvatars(AvatarsBean avatars) {
this.avatars = avatars;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public static class AvatarsBean {
private String small;
private String large;
private String medium;
public String getSmall() {
return small;
}
public void setSmall(String small) {
this.small = small;
}
public String getLarge() {
return large;
}
public void setLarge(String large) {
this.large = large;
}
public String getMedium() {
return medium;
}
public void setMedium(String medium) {
this.medium = medium;
}
}
}
public static class DirectorsBean {
private String alt;
private AvatarsBean avatars;
private String name;
private String id;
public String getAlt() {
return alt;
}
public void setAlt(String alt) {
this.alt = alt;
}
public AvatarsBean getAvatars() {
return avatars;
}
public void setAvatars(AvatarsBean avatars) {
this.avatars = avatars;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public static class AvatarsBean {
private String small;
private String large;
private String medium;
public String getSmall() {
return small;
}
public void setSmall(String small) {
this.small = small;
}
public String getLarge() {
return large;
}
public void setLarge(String large) {
this.large = large;
}
public String getMedium() {
return medium;
}
public void setMedium(String medium) {
this.medium = medium;
}
}
}
}
}