欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava系列1:RxJava介绍

程序员文章站 2024-02-28 09:20:58
...

一.什么是响应式编程?

  1. 响应式编程就是异步数据流编程。
  2. 流是响应式的中心, 流廉价且无处不在,任何事物都可以当作一个流:变量、用户输入、属性、缓存、数据结构等等。
  3. 有一堆的函数能够创建(create)任何流,也能将任何流进行组合(combine)和过滤(filter)。 这正是“函数式”的魔力所在。一个流能作为另一个流的输入(input),甚至多个流也可以作为其它流的输入。你能合并(merge)两个流。你还能通过过滤(filter)一个流得到那些你感兴趣的事件。你能将一个流中的数据映射(map)到一个新的流中。

异步和数据流的对象是事件,这里重点理解事件、异步、数据流这三个词。

事件
何为事件?例如,你知道某个名人总是在发送有趣微博,每次他推发一条微博我们可以称之为一个“事件”。如果你看看这位名人微博系列,你会发现其实是一个随着时间的推移(一系列的事件)发生的一序列的“事件”。

在编程世界里,它是一个十分宽泛的概念,它可以是一个变量,一个对象,一段代码,一段业务逻辑…..但实际上我们往往把事件理解成一段业务逻辑。

数据流
事件形成流
响应编程能够简化编程,它依赖于事件,代码运行的顺序不是代码行的顺序,而是和一个以上的事件有关,这些事件发生是以随着时间的推移的序列。我们把这一系列事件称为“流”。
数据流是事件之间沟通的桥梁。每一个业务完成后,都会有一条数据(一个事件)流向下游,下游的业务收到这条数据(这个事件),才会开始自己的工作。
而RxJava就提供了对这个序列流的处理和控制,并且还能够指定在不同的线程中进行操作。上下游。即从上游发出,下游接受。

但是,只有数据流是不能完全正确的构建出事件之间的关系的。我们依然需要异步编程。

异步
因为有些事件是需要在别的线程中运行的。

二.什么是RxJava?

1.ReactiveX的历史

  1. ReactiveX是Reactive Extensions的缩写,一般简写为Rx。最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源。
  2. Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、js和c++,Rx近几年越来越流行了,几乎支持全部的流行编程语言了。Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava、RxJs、Rx.NET。
  3. Rx是响应式编程的一种框架。

2.什么是ReactiveX

微软给的定义是,Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流,用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx=Observables+LINQ+Schedulers。
ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。

3.RxJava是什么

RxJava是Reactive Extensions的java vm实现,是一个使用可观察的序列来组成异步的、基于事件的程序的库。
它扩展了观察者模式来支持数据/事件序列,添加了很多可以让你把多个序列以声明的方式组合起来的操作符,同时去掉了我们对低优先级线程、同步、线程安全和同步数据结构等问题的顾虑。

三.观察者模式

定义对象之间一种的一对多依赖,这样一来,当这个对象
改变状态时,它的所有依赖者都会收到通知并自动更新。
RxJava系列1:RxJava介绍
RxJava中Observable充当被观察者,Observer/Subscriber充当观察者,两个角色之间通过subscribe建立关联。
与传统观察者模式不同,rx扩展了一下特点:
- 在没有subscribe之前,Observable不会产生事件
- 除了onNext,还定义了onComplete和onError

四.RxJava核心概念

基本组成元素

Observable:数据源
Observer/Subscriber:消费者
subscribe:一个订阅过程
operator:数据变化操作符
scheduler:线程切换

rxjava信息流

observable->operator1->operator2->operator3->subscriber

Action0

RxJava中的一个接口,它只有一个无参call()方法,且无返回值,同样还有Action1,Action2…Action9等,Action1封装了含有1 个参的call()方法,即call(T t),Action2封装了含有 2个参数的call方法,即call(T1 t1,T2 t2),以此类推;

Func0:

与Action0非常相似,也有call()方法,但是它是有返回值的,同样也有Func0、Func1…Func9;

在RxJava中它们常被作为一个参数传入subscribe() 以实现不完整定义的回调。

五.一个简单的RxJava实例

RxJava系列1:RxJava介绍

六.操作符

RxJava系列1:RxJava介绍

这里简单介绍几个操作符及项目中的应用场景。

创建操作符

create
作用:
完整创建1个被观察者对象(Observable)。

public static void saveImage(final Context context, final Bitmap bitmap) {
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                String absPath = ImageUtil.saveBitmapResult(context, bitmap, String.valueOf(SntpClock.currentTimeMillis()) + ".jpg");
                subscriber.onNext(absPath);
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(String s) {
                        if (!TextUtils.isEmpty(s)) {
                            ToastUtils.showToast(context.getApplicationContext(), context.getString(R.string.gallery_save_success) + s);
                        } else {
                            ToastUtils.showToast(context.getApplicationContext(), context.getString(R.string.gallery_save_fail));
                        }
                    }
                });
    }

inteval
作用:
快速创建1个被观察者对象(Observable)
发送事件的特点:每隔指定时间 就发送 事件
发送的事件序列 = 从0开始、无限递增1的的整数序列

    /**
     * 保存底部icon下载下来的bitmap数据
     * 每500毫秒执行一次,知道所有icon全部下载并保存到内存
     */
    public void saveBottomIconBitmap(final int id, final ImageLoaderService imageLoader, final String url, final String type, final String title) {
        //创建操作符:每隔500毫秒就发送一次事件
        Observable.interval(500, TimeUnit.MILLISECONDS)
                //条件操作符:执行到某个条件时,停止发送事件,getShouldStop(id, type)返回TRUE就停止发送
                .takeUntil(new Func1<Long, Boolean>() {
                    @Override
                    public Boolean call(Long aLong) {
                        //id对应的图片是否下载完成
                        return getShouldStop(id, type);
                    }
                })
                //过滤操作符:指定观察者最多能接收到的事件数量5
                .take(5)
                .subscribeOn(Schedulers.io())
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Bitmap bitmap = imageLoader.getBitmap(url);
                        if (bitmap == null) {
                            return;
                        }
                        File bitmapFile = createIconFile(id, url);
                        if (bitmapFile != null) {
                            FileOutputStream outputStream = null;
                            try {
                                outputStream = new FileOutputStream(bitmapFile);
                                bitmap.compress(Bitmap.CompressFormat.PNG, 50, outputStream);
                                resetLevelStatus(id, type, !TextUtils.isEmpty(title));
                                outputStream.flush();
                            } catch (Exception e) {
                            } finally {
                                try {
                                    if (outputStream != null) {
                                        outputStream.close();
                                    }
                                } catch (Exception e) {
                                }
                            }
                        }
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {

                    }
                });
    }

变化操作符

map
作用:
对被观察者发送的每1个事件都通过指定的函数处理,从而变换成另外一种事件.

private void createThumbBitmapAndSend(String img, Activity activity) {
        Observable.just(img)
                .map(new Func1<String, Bitmap>() {
                    @Override
                    public Bitmap call(String s) {
                        try {
                            ImageLoader imageLoader = RoboGuice.getInjector(MovieApplication.getContext()).getInstance(ImageLoader.class);
                            return imageLoader.loadBitmap(s);
                        } catch (Exception e) {
                            e.printStackTrace();
                            return null;
                        }
                    }
                })
                .compose(MovieSchedulersTransformer.applySchedulers())
                .subscribe(new Subscriber<Bitmap>() {
                    @Override
                    public void onCompleted() {
                        sendMessageToWX();
                    }

                    @Override
                    public void onError(Throwable e) {
                        sendMessageToWX();
                    }

                    @Override
                    public void onNext(Bitmap thumb) {
                        bitmap = thumb;
                        sendMessageToWX();
                    }
                });
    }

flatmap

作用:
将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送。常用于嵌套网络请求。

原理
1.为事件序列中每个事件都创建一个 Observable 对象;
2.将对每个 原始事件 转换后的 新事件 都放入到对应 Observable对象;
3.将新建的每个Observable 都合并到一个 新建的、总的Observable 对象;
4.新建的、总的Observable 对象 将 新合并的事件序列 发送给观察者(Observer)

flatmap涉及的泛型一个是输入,一个是输出。

举个栗子:
假如有这样一个需求:有一个菜品的接口(川菜、湘菜、粤菜),这个接口只会返回categoryId列表,再用categoryId去请求每一个品类的菜,然后将所有的菜展示在列表上,就可以这样写:

apiService.getCategories().flatMap(new Func1<Category, Observable<List<Food>>>() { @Override
public Observable<List<Food>> call(Category category) { return apiService.getFoodList(category.getCategoryId());
} }).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<List<Food>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(List<Food> foodList) {
数据绑定列表
} });

和map区别:
1.map返回的是结果集,flatmap返回的是包含结果集的Observable(返回结果不同)
2.map被订阅时每传递一个事件执行一次onNext方法,flatmap多用于多对多,一对多,再被转化为多个时,一般利用from/just进行一一分发,被订阅时将所有数据传递完毕汇总到一个Observable然后一一执行onNext方法(执行顺序不同)>>>>(如单纯用于一对一转换则和map相同)
3.map只能单一转换,单一只的是只能一对一进行转换,指一个对象可以转化为另一个对象但是不能转换成对象数组(map返回结果集不能直接使用from/just再次进行事件分发,一旦转换成对象数组的话,再处理集合/数组的结果时需要利用for一一遍历取出,而使用RxJava就是为了剔除这样的嵌套结构,使得整体的逻辑性更强。)
flatmap既可以单一转换也可以一对多/多对多转换,flatmap要求返回Observable,因此可以再内部进行from/just的再次事件分发,一一取出单一对象(转换对象的能力不同)

https://www.jianshu.com/p/c820afafd94b

过滤操作符

filter
作用:过滤特定条件的事件

Observable.just(params)
                    .filter(param -> param != null)
                    .flatMap(param -> dealService.getDealPayStatusInfo(params.orderId))
                    .compose(MovieSchedulerTransformer.ofObserver())
                    .subscribe(data -> view.loadSuccess(data), Actions.empty()));

distinct
作用:
过滤事件序列中重复的事件

// 使用1:过滤事件序列中重复的事件
        Observable.just(1, 2, 3, 1 , 2 )
                .distinct()
                .subscribe(new Consumer<Integer>() {
                      @Override
                      public void accept( Integer integer) throws Exception {
                          Log.d(TAG,"不重复的整型事件元素是: "+ integer);
                      }
        });

参考资料

操作符相关的资料参考
谁来讲讲Rxjava、rxandroid中的操作符的作用?
https://www.zhihu.com/question/32209660

RxJava(1)—基本概念
http://www.jianshu.com/p/8e856b3f7242

在rxjava中,如果把整个事件流看做是工厂的流水线,observable就是原料,observer就是我们的产品经理,这个产品是怎么交到我们产品经理手上的呢?中间很重要的就是工人,也就是operator(操作符)。他负责在observable发出的事件和observer的响应之间做一些处理。
http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2016/0429/4196.html

http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/

通俗解释什么是响应式编程?
http://www.jdon.com/48275

深度 | 重新理解响应式编程
http://www.sohu.com/a/154648311_611601