Rxjava2基础
1、简介
RxJava: Reactive Extensions for the JVM Rxjava : JVM的响应式扩展。
RxJava是响应式扩展的Java VM实现:通过使用可观察序列组合异步和基于事件的程序的库。
它扩展了observer模式以支持数据/事件序列,并添加了操作符,允许您以声明的方式将序列组合在一起,同时抽象出对底层线程、同步、线程安全性和并发数据结构等方面的关注。
2、使用
第一步是将RxJava 2包导入项目中,在build.gradle中添加:
compile "io.reactivex.rxjava2:rxjava:2.x.y" // 将x和y替换为最新的版本号 ,此时最新为2.1.16 。
样例 Hello World
public class HelloWorld {
public static void main(String[] args) {
Flowable.just("Hello world").subscribe(System.out::println);
}
}
如果您的平台不支持Java 8 lambdas(到目前为止),您必须手动创建一个Consumer内部类:
import io.reactivex.functions.Consumer;
Flowable.just("Hello world")
.subscribe(new Consumer<String>() {
@Override public void accept(String s) {
System.out.println(s);
}
});
3、Base classes
RxJava 2具有几个基本类,您可以在这些类上发现操作符:
io.reactivex.Flowable: 0..N flows, supporting Reactive-Streams and backpressure
io.reactivex.Observable: 0..N flows, no backpressure,
io.reactivex.Single: a flow of exactly 1 item or an error,
io.reactivex.Completable: a flow without items but only a completion or error signal,
io.reactivex.Maybe: a flow with no items, exactly one item or an error.
一些术语
1、Upstream, downstream
RxJava中的dataflow由一个源、零个或多个中间步骤组成,然后是一个数据使用者或组合器步骤(其中步骤负责以某种方式使用dataflow):
source.operator1().operator2().operator3().subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3());
在这里,如果我们假设我们在operator2上,向左看向源,称为上游。向右看为用户/消费者,称为下游。当每个元素都写在单独的一行上时,这一点通常更为明显 :
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
2、Objects in motion
在RxJava的文档中,emission, emits, item, event, signal, data and message (发射、发出、项、事件、信号、数据和消息)被视为同义词,并表示沿着数据流移动的对象。
3、Backpressure
当数据流通过异步步骤运行时,每个步骤可能以不同的速度执行不同的事情。为了避免过多地使用此类步骤(通常表现为由于临时缓冲或需要跳过/删除数据而导致的内存使用量增加),应用了所谓的背压(backpressure),这是一种流控制形式,在这种形式下,这些步骤(the steps)可以表示它们准备处理多少项。这样就可以限制数据的内存使用,在这种情况下一个步骤(a step)通常都无法知道上游会发送多少条信息。
在RxJava中,Flowable class专门用于支持backpressure,而Observable则专门用于非backpressure操作(短序列、GUI交互等)。其他类型的,如Single, Maybe and Completable 不支持backpressure,也不应该支持 ; 总会有暂时存放一件物品的空间。
4、Assembly time(装配时间)
在所谓的装配时间内,应用不同的中间操作符来准备dataflow:
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v* v)
.filter(v -> v % 3 == 0);
此时,数据还没有流动,也没有出现任何副作用。
5、Subscription time
当一个流(a flow)上调用了subscribe()也就在内部建立处理步骤链时,这是一个临时状态:
flow.subscribe(System.out::println)
这时会触发订阅副作用(参见doOnSubscribe)。有些源会在这种状态下立即阻塞或开始发送项目。
5、Runtime
这是当流正在主动地发出项、错误或完成(emitting items, errors or completion)信号时的状态:
Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);
实际上,这是上述给定实例的主体执行的时候。
Simple background computation
RxJava的一个常见用例是在后台线程上运行一些计算、网络请求,并在UI线程上显示结果(或错误):
import io.reactivex.schedulers.Schedulers;
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate模仿 expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- wait for the flow to finish
这种样式的链接方法称为a fluent(流利的) API,类似于构建器模式。但是,RxJava的响应类型是不可变的;每个方法调用都返回一个带有附加行为的新的Flowable。为了说明这一点,这个例子可以重写如下:
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
通常,您可以通过subscribeOn将计算或阻塞IO的操作移动到其他线程。一旦数据准备好,您可以确保它们在前台或GUI线程上通过observeOn进行处理。
6、Schedulers
RxJava操作符不直接使用线程或ExecutorServices,而是使用所谓的调度器,这些调度器在统一的API后面提取并发源。RxJava 2的特性是可以通过调度器实用类访问的几个标准调度器。
- Schedulers.computation() :在后台对固定数量的专用线程进行计算密集型工作。大多数异步操作符都将其作为默认调度器。
- Schedulers.io():在一组动态变化的线程上运行I/O或阻塞操作
- Schedulers.single():按顺序和FIFO方式在单个线程上运行工作。
- Schedulers.trampoline():在一个参与的线程中以顺序和FIFO方式运行工作,通常用于测试目的。
这些都可以在所有JVM平台上使用,但是一些特定的平台,比如Android,有它们自己定义的典型调度器:AndroidSchedulers.mainThread(), SwingScheduler.instance() or JavaFXSchedulers.gui().
此外,还可以选择将现有的执行程序(及其子类型,如ExecutorService)通过Schedulers.from(Executor)包装到调度器中。例如,可以使用它来拥有一个更大但仍然固定的线程池(不同于 computation() 和io() 独自地 )。
最后的代码thread . sleep(2000);并不是偶然的。在RxJava中,默认的调度器运行在守护进程线程上,这意味着一旦Java主线程退出,它们都会被停止,后台计算可能永远不会发生。在这个示例场景中,休眠一段时间让你可以看到在控制台上的流有时间输出。
7、Concurrency(并发性) within a flow
RxJava中的流本质上是连续的,根据它们可能同时运行 分为处理阶段:
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);
这个示例流将computation Scheduler上的数字从1到10进行平方,并使用“主”线程(更准确地说,是blockingSubscribe的调用者线程)消费结果。但是,对这个流来说v -> v * v不是并行运行的;它在相同的计算线程上一个接一个地接收值1到10。
8、Parallel processing
实际上,RxJava中的并行性意味着独立运行的流并将其结果合并到单个流中。操作符flatMap首先将从1到10的每一个数字映射到它自己的Flowable,运行它们各自的平方运算然后合并结果值。
但是,请注意,flatMap并不保证有任何顺序,内部流的最终结果可能会交叉。可选择的操作符: concatMap
每次映射并运行一个内部流 concatMapEager
它“同时”运行所有内部流,但输出流将按照创建这些内部流的顺序。
另外,还有一个beta运算符flow .parallel()和 ParallelFlowable 类型,它们都有助于实现相同的并行处理模式:
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
并行性和并发性 (Concurrence) 是既相似又有区别的两个概念,并行性是指两个或多个事件在同一时刻发生;而并发性是指两个或多个事件在同一时间间隔内发生。在多道程序环境下,并发性是指在一段时间内宏观上有多个程序在同时运行,但在单处理机系统中每一时刻却仅能有一道程序执行,故微观上这些程序只能是分时地交替执行。倘若在计算机系统中有多个处理机,则这些可以并发执行的程序便可被分配到多个处理机上,实现并行执行,即利用每个处理机来处理一个可并发执行的程序,这样,多个程序便可同时执行。
Dependent sub-flows
flatMap是一个强大的操作符,在很多情况下都有帮助。例如,给定一个返回Flowable的服务,我们想调用另一个服务,该服务的值由第一个服务发出:
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource.flatMap(inventoryItem ->
erp.getDemandAsync(inventoryItem.getId())
.map(demand
-> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand));
)
.subscribe();
Continuations
有时,当一个item变得可利用的时,我们希望对它执行一些相关的计算。这有时被称为延续,根据应该发生的事情和涉及的类型,可能涉及到的各种操作符来完成。
Dependent
最典型的场景是给定一个值,调用另一个服务,等待其结果并继续 :
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
通常,后面的序列也需要来自早期映射的值。这可以通过将the outer flatMap移动到the previous flatMap的内部部分来实现,例如:
service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)
这里,原始值将在the inner flatMap中可用的,由lambda变量捕获提供。
Non-dependent
在其他场景中,第一个源/dataflow的结果是不相关的,我们希望继续使用一个准独立的另一个源。在这里,flatMap也可以工作:
Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
.subscribe(System.out::println, Throwable::printStackTrace);
然而,在这种情况下的延续是可以观察到的,而不是更合适的单一。(这是可以理解的,因为从flatMapSingle的角度来看,sourceObservable是一个多值的源,因此映射也可以产生多个值)。
通常情况下,通过使用Completable作为中介人和它的运算符,然后用其他东西重新开始,会有一种表达能力更强(也更低开销)的方法:
sourceObservable
.ignoreElements() // returns Completable
.andThen(someSingleSource)
.map(v -> v.toString())
sourceObservable和someSingleSource之间的惟一依赖关系是,前者应该正常完成,以便后者被消费。
Deferred-dependent延迟依赖
有时,前一个序列和新序列之间存在隐式的数据依赖关系,由于某种原因,新序列没有通过“常规通道”。人们倾向于把这种延续写成如下:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println);
不幸的是,这个打印输出为0,因为Single.just(count.get())是在数据流还没有运行时在组装时计算的。我们需要推迟这个Single来源的评价,直到运行时完成主要来源:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println);
or
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.fromCallable(() -> count.get()))
.subscribe(System.out::println);
Type conversions
有时候,源或服务返回的类型与应该使用它的流不同。例如,在上面的库存示例中,getDemandAsync可以返回单个。如果代码示例保持不变,这将导致编译时错误(但是,经常会出现关于缺少过载的错误消息)。
在这种情况下,修复转换通常有两个选项:1)转换为所需的类型,2)查找并使用支持不同类型的特定操作符的重载。
Converting to the desired type
每个响应基类都具有操作符,它们可以执行此类转换,包括协议转换,以匹配其他类型。下面的矩阵显示了可用的转换选项:
1:当把一个多值源转换成一个单值源时,我们应该决定应该将多个源值中的哪个作为结果。
2:将可观测的流转换为可观测的流需要一个额外的决定:如何处理可观测源的无约束流?通过BackpressureStrategy参数或通过标准的可流动操作符(如onBackpressureBuffer、onBackpressureDrop、onbackpressure - est),有几种可用的策略(例如缓冲、删除、保持最新),这些策略还允许进一步定制backpressure行为。
3:当最多只有一个源项时,背压没有问题,因为它可以一直存储到下游准备好消耗为止。
Using an overload with the desired type
许多常用的操作符都有可以处理其他类型的重载。这些通常以目标类型的后缀命名:
不能使用关键字
在原始的Rx.NET中,发出单个项然后完成的操作符称为Return(T)。因为Java约定是用小写字母开始方法名,所以这应该是return(T),这是Java中的关键字,因此不可用。因此,RxJava选择将这个运算符命名为just(T)。操作符交换机也有同样的限制,必须命名为switchOnNext。另一个例子是Catch,它被命名为onErrorResumeNext。
类型擦除
许多期望用户提供返回活性类型的函数的操作符不能被重载,因为Function
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)
类型模棱两可
尽管某些操作符在类型擦除方面没有问题,但它们的签名可能会出现歧义,特别是如果使用Java 8和lambdas的话。例如,有几个重载的concatWith将各种其他的反应基类型作为参数(用于在底层实现中提供方便和性能优势):
Flowable<T> concatWith(Publisher<? extends T> other);
Flowable<T> concatWith(SingleSource<? extends T> other);
Publisher和SingleSource都作为函数接口(使用一个抽象方法的类型)出现,并可能鼓励用户尝试提供lambda表达式:
someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);
不幸的是,这种方法不起作用,而且示例根本没有打印2。事实上,自从2.1.10版本以来,它甚至没有编译,因为至少有4个包含重载的concatWith存在,编译器发现上面的代码不明确。
在这种情况下,用户可能希望延迟一些计算,直到someSource完成,因此正确的明确的操作符应该被defer:
someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);
有时,添加后缀以避免可能编译但在流中生成错误类型的逻辑歧义:
Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> mergeArray(Publisher<? extends T>... sources);
函数接口类型作为类型参数T涉及时,这也可能变得不明确。
Error handling
数据流可能会失败,这时错误将被发送给使用者。但是,有时候多个源可能会失败,这时就可以选择是否等待它们全部完成或失败。为了指出这个时机,许多操作符的名称都以DelayError词作为后缀(其他操作符的一个重载中有DelayError或delayErrors boolean标志):
Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);
当然,各种后缀可能同时出现:
Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);
Base class vs base type
由于静态和实例方法的数量非常多,所以基类可以被认为是很重的。RxJava 2的设计受到了响应式流规范的严重影响,因此,该库为每个响应式类型提供了一个类和一个接口:
1org.reactivestreams.Publisher是外部反应流库的一部分。它是通过受反应式流规范控制的标准机制与其他反应性库交互的主要类型。
2接口的命名约定是将源附加到半传统的类名。没有FlowableSource,因为发布者是由反应流库提供的(而且它的子类型也不能帮助进行互操作)。然而,这些接口在无反应流规范的意义上是不标准的,目前只针对RxJava