RxJava系列1:RxJava介绍
一.什么是响应式编程?
- 响应式编程就是异步数据流编程。
- 流是响应式的中心, 流廉价且无处不在,任何事物都可以当作一个流:变量、用户输入、属性、缓存、数据结构等等。
- 有一堆的函数能够创建(create)任何流,也能将任何流进行组合(combine)和过滤(filter)。 这正是“函数式”的魔力所在。一个流能作为另一个流的输入(input),甚至多个流也可以作为其它流的输入。你能合并(merge)两个流。你还能通过过滤(filter)一个流得到那些你感兴趣的事件。你能将一个流中的数据映射(map)到一个新的流中。
异步和数据流的对象是事件,这里重点理解事件、异步、数据流这三个词。
事件
何为事件?例如,你知道某个名人总是在发送有趣微博,每次他推发一条微博我们可以称之为一个“事件”。如果你看看这位名人微博系列,你会发现其实是一个随着时间的推移(一系列的事件)发生的一序列的“事件”。
在编程世界里,它是一个十分宽泛的概念,它可以是一个变量,一个对象,一段代码,一段业务逻辑…..但实际上我们往往把事件理解成一段业务逻辑。
数据流
事件形成流
响应编程能够简化编程,它依赖于事件,代码运行的顺序不是代码行的顺序,而是和一个以上的事件有关,这些事件发生是以随着时间的推移的序列。我们把这一系列事件称为“流”。
数据流是事件之间沟通的桥梁。每一个业务完成后,都会有一条数据(一个事件)流向下游,下游的业务收到这条数据(这个事件),才会开始自己的工作。
而RxJava就提供了对这个序列流的处理和控制,并且还能够指定在不同的线程中进行操作。上下游。即从上游发出,下游接受。
但是,只有数据流是不能完全正确的构建出事件之间的关系的。我们依然需要异步编程。
异步
因为有些事件是需要在别的线程中运行的。
二.什么是RxJava?
1.ReactiveX的历史
- ReactiveX是Reactive Extensions的缩写,一般简写为Rx。最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源。
- Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、js和c++,Rx近几年越来越流行了,几乎支持全部的流行编程语言了。Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava、RxJs、Rx.NET。
- Rx是响应式编程的一种框架。
2.什么是ReactiveX
微软给的定义是,Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流,用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx=Observables+LINQ+Schedulers。
ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。
3.RxJava是什么
RxJava是Reactive Extensions的java vm实现,是一个使用可观察的序列来组成异步的、基于事件的程序的库。
它扩展了观察者模式来支持数据/事件序列,添加了很多可以让你把多个序列以声明的方式组合起来的操作符,同时去掉了我们对低优先级线程、同步、线程安全和同步数据结构等问题的顾虑。
三.观察者模式
定义对象之间一种的一对多依赖,这样一来,当这个对象
改变状态时,它的所有依赖者都会收到通知并自动更新。
RxJava中Observable充当被观察者,Observer/Subscriber充当观察者,两个角色之间通过subscribe建立关联。
与传统观察者模式不同,rx扩展了一下特点:
- 在没有subscribe之前,Observable不会产生事件
- 除了onNext,还定义了onComplete和onError
四.RxJava核心概念
基本组成元素
Observable:数据源
Observer/Subscriber:消费者
subscribe:一个订阅过程
operator:数据变化操作符
scheduler:线程切换
rxjava信息流
observable->operator1->operator2->operator3->subscriber
Action0
RxJava中的一个接口,它只有一个无参call()方法,且无返回值,同样还有Action1,Action2…Action9等,Action1封装了含有1 个参的call()方法,即call(T t),Action2封装了含有 2个参数的call方法,即call(T1 t1,T2 t2),以此类推;
Func0:
与Action0非常相似,也有call()方法,但是它是有返回值的,同样也有Func0、Func1…Func9;
在RxJava中它们常被作为一个参数传入subscribe() 以实现不完整定义的回调。
五.一个简单的RxJava实例
六.操作符
这里简单介绍几个操作符及项目中的应用场景。
创建操作符
create
作用:
完整创建1个被观察者对象(Observable)。
public static void saveImage(final Context context, final Bitmap bitmap) {
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String absPath = ImageUtil.saveBitmapResult(context, bitmap, String.valueOf(SntpClock.currentTimeMillis()) + ".jpg");
subscriber.onNext(absPath);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
if (!TextUtils.isEmpty(s)) {
ToastUtils.showToast(context.getApplicationContext(), context.getString(R.string.gallery_save_success) + s);
} else {
ToastUtils.showToast(context.getApplicationContext(), context.getString(R.string.gallery_save_fail));
}
}
});
}
inteval
作用:
快速创建1个被观察者对象(Observable)
发送事件的特点:每隔指定时间 就发送 事件
发送的事件序列 = 从0开始、无限递增1的的整数序列
/**
* 保存底部icon下载下来的bitmap数据
* 每500毫秒执行一次,知道所有icon全部下载并保存到内存
*/
public void saveBottomIconBitmap(final int id, final ImageLoaderService imageLoader, final String url, final String type, final String title) {
//创建操作符:每隔500毫秒就发送一次事件
Observable.interval(500, TimeUnit.MILLISECONDS)
//条件操作符:执行到某个条件时,停止发送事件,getShouldStop(id, type)返回TRUE就停止发送
.takeUntil(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
//id对应的图片是否下载完成
return getShouldStop(id, type);
}
})
//过滤操作符:指定观察者最多能接收到的事件数量5
.take(5)
.subscribeOn(Schedulers.io())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Bitmap bitmap = imageLoader.getBitmap(url);
if (bitmap == null) {
return;
}
File bitmapFile = createIconFile(id, url);
if (bitmapFile != null) {
FileOutputStream outputStream = null;
try {
outputStream = new FileOutputStream(bitmapFile);
bitmap.compress(Bitmap.CompressFormat.PNG, 50, outputStream);
resetLevelStatus(id, type, !TextUtils.isEmpty(title));
outputStream.flush();
} catch (Exception e) {
} finally {
try {
if (outputStream != null) {
outputStream.close();
}
} catch (Exception e) {
}
}
}
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
});
}
变化操作符
map
作用:
对被观察者发送的每1个事件都通过指定的函数处理,从而变换成另外一种事件.
private void createThumbBitmapAndSend(String img, Activity activity) {
Observable.just(img)
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
try {
ImageLoader imageLoader = RoboGuice.getInjector(MovieApplication.getContext()).getInstance(ImageLoader.class);
return imageLoader.loadBitmap(s);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
})
.compose(MovieSchedulersTransformer.applySchedulers())
.subscribe(new Subscriber<Bitmap>() {
@Override
public void onCompleted() {
sendMessageToWX();
}
@Override
public void onError(Throwable e) {
sendMessageToWX();
}
@Override
public void onNext(Bitmap thumb) {
bitmap = thumb;
sendMessageToWX();
}
});
}
flatmap
作用:
将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送。常用于嵌套网络请求。
原理
1.为事件序列中每个事件都创建一个 Observable 对象;
2.将对每个 原始事件 转换后的 新事件 都放入到对应 Observable对象;
3.将新建的每个Observable 都合并到一个 新建的、总的Observable 对象;
4.新建的、总的Observable 对象 将 新合并的事件序列 发送给观察者(Observer)
flatmap涉及的泛型一个是输入,一个是输出。
举个栗子:
假如有这样一个需求:有一个菜品的接口(川菜、湘菜、粤菜),这个接口只会返回categoryId列表,再用categoryId去请求每一个品类的菜,然后将所有的菜展示在列表上,就可以这样写:
apiService.getCategories().flatMap(new Func1<Category, Observable<List<Food>>>() { @Override
public Observable<List<Food>> call(Category category) { return apiService.getFoodList(category.getCategoryId());
} }).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<List<Food>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(List<Food> foodList) {
数据绑定列表
} });
和map区别:
1.map返回的是结果集,flatmap返回的是包含结果集的Observable(返回结果不同)
2.map被订阅时每传递一个事件执行一次onNext方法,flatmap多用于多对多,一对多,再被转化为多个时,一般利用from/just进行一一分发,被订阅时将所有数据传递完毕汇总到一个Observable然后一一执行onNext方法(执行顺序不同)>>>>(如单纯用于一对一转换则和map相同)
3.map只能单一转换,单一只的是只能一对一进行转换,指一个对象可以转化为另一个对象但是不能转换成对象数组(map返回结果集不能直接使用from/just再次进行事件分发,一旦转换成对象数组的话,再处理集合/数组的结果时需要利用for一一遍历取出,而使用RxJava就是为了剔除这样的嵌套结构,使得整体的逻辑性更强。)
flatmap既可以单一转换也可以一对多/多对多转换,flatmap要求返回Observable,因此可以再内部进行from/just的再次事件分发,一一取出单一对象(转换对象的能力不同)
https://www.jianshu.com/p/c820afafd94b
过滤操作符
filter
作用:过滤特定条件的事件
Observable.just(params)
.filter(param -> param != null)
.flatMap(param -> dealService.getDealPayStatusInfo(params.orderId))
.compose(MovieSchedulerTransformer.ofObserver())
.subscribe(data -> view.loadSuccess(data), Actions.empty()));
distinct
作用:
过滤事件序列中重复的事件
// 使用1:过滤事件序列中重复的事件
Observable.just(1, 2, 3, 1 , 2 )
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept( Integer integer) throws Exception {
Log.d(TAG,"不重复的整型事件元素是: "+ integer);
}
});
参考资料
操作符相关的资料参考
谁来讲讲Rxjava、rxandroid中的操作符的作用?
https://www.zhihu.com/question/32209660
RxJava(1)—基本概念
http://www.jianshu.com/p/8e856b3f7242
在rxjava中,如果把整个事件流看做是工厂的流水线,observable就是原料,observer就是我们的产品经理,这个产品是怎么交到我们产品经理手上的呢?中间很重要的就是工人,也就是operator(操作符)。他负责在observable发出的事件和observer的响应之间做一些处理。
http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2016/0429/4196.html
http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/
通俗解释什么是响应式编程?
http://www.jdon.com/48275
深度 | 重新理解响应式编程
http://www.sohu.com/a/154648311_611601
上一篇: asp.net获取select值的方法
下一篇: React 组件状态(State)