RxJava的消息订阅和线程切换原理
0.版权声明
本文由玉刚说写作平台提供写作赞助,版权归玉刚说微信公众号所有
原作者:四月葡萄
版权声明:未经玉刚说许可,不得以任何形式转载
1.前言
本文主要是对RxJava的消息订阅和线程切换进行源码分析,相关的使用方式等不作详细介绍。
本文源码基于rxjava:2.1.14
。
2. RxJava简介
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
上面这段话来自于RxJava在github上面的官方介绍。翻译成中文的大概意思就是:
RxJava是一个在Java虚拟机上的响应式扩展,通过使用可观察的序列将异步和基于事件的程序组合起来的一个库。
它扩展了观察者模式来支持数据/事件序列,并且添加了操作符,这些操作符允许你声明性地组合序列,同时抽象出要关注的问题:比如低级线程、同步、线程安全和并发数据结构等。
简单点来说, RxJava就是一个使用了观察者模式,能够异步的库。
3. 观察者模式
上面说到,RxJava扩展了观察者模式,那么什么是观察模式呢?我们先来了解一下。
举个例子,以微信公众号为例,一个微信公众号会不断产生新的内容,如果我们读者对这个微信公众号的内容感兴趣,就会订阅这个公众号,当公众号有新内容时,就会推送给我们。我们收到新内容时,如果是我们感兴趣的,就会点进去看下;如果是广告的话,就可能直接忽略掉。这就是我们生活中遇到的典型的观察者模式。
在上面的例子中,微信公众号就是一个被观察者(Observable
),不断的产生内容(事件),而我们读者就是一个观察者(Observer
) ,通过订阅(subscribe
)就能够接受到微信公众号(被观察者)推送的内容(事件),根据不同的内容(事件)做出不同的操作。
3.1 Rxjava角色说明
RxJava的扩展观察者模式中就是存在这么4种角色:
角色 | 角色功能 |
---|---|
被观察者(Observable ) |
产生事件 |
观察者(Observer ) |
响应事件并做出处理 |
事件(Event ) |
被观察者和观察者的消息载体 |
订阅(Subscribe ) |
连接被观察者和观察者 |
3.2 RxJava事件类型
RxJava中的事件分为三种类型:Next
事件、Complete
事件和Error
事件。具体如下:
事件类型 | 含义 | 说明 |
---|---|---|
Next |
常规事件 | 被观察者可以发送无数个Next事件,观察者也可以接受无数个Next事件 |
Complete |
结束事件 | 被观察者发送Complete事件后可以继续发送事件,观察者收到Complete事件后将不会接受其他任何事件 |
Error |
异常事件 | 被观察者发送Error事件后,其他事件将被终止发送,观察者收到Error事件后将不会接受其他任何事件 |
4.RxJava的消息订阅
在分析RxJava消息订阅原理前,我们还是先来看下它的简单使用步骤。这里为了方便讲解,就不用链式代码来举例了,而是采用分步骤的方式来逐一说明(平时写代码的话还是建议使用链式代码来调用,因为更加简洁)。其使用步骤如下:
- 创建被观察者(
Observable
),定义要发送的事件。- 创建观察者(
Observer
),接受事件并做出响应操作。- 观察者通过订阅(
subscribe
)被观察者把它们连接到一起。
4.1 RxJava的消息订阅例子
这里我们就根据上面的步骤来实现这个例子,如下:
//步骤1. 创建被观察者(Observable),定义要发送的事件。
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
//步骤2. 创建观察者(Observer),接受事件并做出响应操作。
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
//步骤3. 观察者通过订阅(subscribe)被观察者把它们连接到一起。
observable.subscribe(observer);
其输出结果为:
onSubscribe
onNext : 文章1
onNext : 文章2
onNext : 文章3
onComplete
4.2 源码分析
下面我们对消息订阅过程中的源码进行分析,分为两部分:创建被观察者过程和订阅过程。
4.2.1 创建被观察者过程
首先来看下创建被观察者(Observable
)的过程,上面的例子中我们是直接使用Observable.create()
来创建Observable
,我们点进去这个方法看下。
4.2.1.1 Observable类的create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
可以看到,create()
方法中也没做什么,就是创建一个ObservableCreate
对象出来,然后把我们自定义的ObservableOnSubscribe
作为参数传到ObservableCreate
中去,最后就是调用 RxJavaPlugins.onAssembly()
方法。
我们先来看看ObservableCreate
类:
4.2.1.2 ObservableCreate类
public final class ObservableCreate<T> extends Observable<T> {//继承自Observable
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;//把我们创建的ObservableOnSubscribe对象赋值给source。
}
}
可以看到,ObservableCreate
是继承自Observable
的,并且会把ObservableOnSubscribe
对象给存起来。
再看下RxJavaPlugins.onAssembly()
方法
4.2.1.3 RxJavaPlugins类的onAssembly()
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
//省略无关代码
return source;
}
很简单,就是把上面创建的ObservableCreate
给返回。
4.2.1.4 简单总结
所以Observable.create()
中就是把我们自定义的ObservableOnSubscribe
对象重新包装成一个ObservableCreate
对象,然后返回这个ObservableCreate
对象。
注意,这种重新包装新对象的用法在RxJava中会频繁用到,后面的分析中我们还会多次遇到。
放个图好理解,包起来哈~
4.2.1.5 时序图
Observable.create()
的时序图如下所示:
4.2.2 订阅过程
接下来我们就看下订阅过程的代码,同样,点进去Observable.subscribe()
:
4.2.2.1 Observable类的subscribe()
public final void subscribe(Observer<? super T> observer) {
//省略无关代码
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
//省略无关代码
}
可以看到,实际上其核心的代码也就两句,我们分开来看下:
4.2.2.2 RxJavaPlugins类的onSubscribe()
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
//省略无关代码
return observer;
}
跟之前代码一样,这里同样也是把原来的observer
返回而已。
再来看下subscribeActual()
方法。
4.2.2.3 Observable类的subscribeActual()
protected abstract void subscribeActual(Observer<? super T> observer);
Observable
类的subscribeActual()
中的方法是一个抽象方法,那么其具体实现在哪呢?还记得我们前面创建被观察者的过程吗,最终会返回一个ObservableCreate
对象,这个ObservableCreate
就是Observable
的子类,我们点进去看下:
4.2.2.4 ObservableCreate类的subscribeActual()
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//触发我们自定义的Observer的onSubscribe(Disposable)方法
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
可以看到,subscribeActual()
方法中首先会创建一个CreateEmitter
对象,然后把我们自定义的观察者observer
作为参数给传进去。这里同样也是包装起来,放个图:
这个CreateEmitter
实现了ObservableEmitter
接口和Disposable
接口,如下:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
//代码省略
}
然后就是调用了observer.onSubscribe(parent)
,实际上就是调用观察者的onSubscribe()
方法,即告诉观察者已经成功订阅到了被观察者。
继续往下看,subscribeActual()
方法中会继续调用source.subscribe(parent)
,这里的source
就是ObservableOnSubscribe
对象,即这里会调用ObservableOnSubscribe
的subscribe()
方法。
我们具体定义的subscribe()
方法如下:
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
ObservableEmitter
,顾名思义,就是被观察者发射器。
所以,subscribe()
里面的三个onNext()
方法和一个onComplete()
会逐一被调用。
这里的ObservableEmitter
接口其具体实现为CreateEmitter
,我们看看CreateEmitte
类的onNext()
方法和onComplete()
的实现:
4.2.2.5 CreateEmitter类的onNext()和onComplete()等
//省略其他代码
@Override
public void onNext(T t) {
//省略无关代码
if (!isDisposed()) {
//调用观察者的onNext()
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//调用观察者的onComplete()
observer.onComplete();
} finally {
dispose();
}
}
}
可以看到,最终就是会调用到观察者的onNext()
和onComplete()
方法。至此,一个完整的消息订阅流程就完成了。
另外,可以看到,上面有个isDisposed()
方法能控制消息的走向,即能够切断消息的传递,这个后面再来说。
4.2.2.6 简单总结
Observable
(被观察者)和Observer
(观察者)建立连接(订阅)之后,会创建出一个发射器CreateEmitter
,发射器会把被观察者中产生的事件发送到观察者中去,观察者对发射器中发出的事件做出响应处理。可以看到,是订阅之后,Observable
(被观察者)才会开始发送事件。
放张事件流的传递图:
4.2.2.7 时序流程图
再来看下订阅过程的时序流程图:
4.3 切断消息
之前有提到过切断消息的传递,我们先来看下如何使用:
4.3.1 切断消息
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
Observer<String> observer = new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe : " + d);
mDisposable=d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
mDisposable.dispose();
Log.d(TAG, "切断观察者与被观察者的连接");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
observable.subscribe(observer);
输出结果为:
onSubscribe : null
onNext : 文章1
切断观察者与被观察者的连接
可以看到,要切断消息的传递很简单,调用下Disposable
的dispose()
方法即可。调用dispose()
之后,被观察者虽然能继续发送消息,但是观察者却收不到消息了。
另外有一点需要注意,上面onSubscribe
输出的Disposable
值是"null"
,并不是空引用null
。
4.3.2 切断消息源码分析
我们这里来看看下dispose()
的实现。Disposable
是一个接口,可以理解Disposable
为一个连接器,调用dispose()
后,这个连接器将会中断。其具体实现在CreateEmitter
类,之前也有提到过。我们来看下CreateEmitter
的dispose()
方法:
4.3.2.1 CreateEmitter的dispose()
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
就是调用DisposableHelper.dispose(this)
而已。
4.3.2.2 DisposableHelper类
public enum DisposableHelper implements Disposable {
DISPOSED
;
//其他代码省略
public static boolean isDisposed(Disposable d) {
//判断Disposable类型的变量的引用是否等于DISPOSED
//即判断该连接器是否被中断
return d == DISPOSED;
}
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
//这里会把field给设为DISPOSED
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
}
可以看到DisposableHelper
是一个枚举类,并且只有一个值:DISPOSED
。dispose()
方法中会把一个原子引用field
设为DISPOSED
,即标记为中断状态。因此后面通过isDisposed()
方法即可以判断连接器是否被中断。
4.3.2.3 CreateEmitter类中的方法
再回头看看CreateEmitter
类中的方法:
@Override
public void onNext(T t) {
//省略无关代码
if (!isDisposed()) {
//如果没有dispose(),才会调用onNext()
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
//如果dispose()了,会调用到这里,即最终会崩溃
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
//省略无关代码
if (!isDisposed()) {
try {
//如果没有dispose(),才会调用onError()
observer.onError(t);
} finally {
//onError()之后会dispose()
dispose();
}
//如果没有dispose(),返回true
return true;
}
//如果dispose()了,返回false
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//如果没有dispose(),才会调用onComplete()
observer.onComplete();
} finally {
//onComplete()之后会dispose()
dispose();
}
}
}
从上面的代码可以看到:
- 如果没有
dispose
,observer.onNext()
才会被调用到。onError()
和onComplete()
互斥,只能其中一个被调用到,因为调用了他们的任意一个之后都会调用dispose()
。- 先
onError()
后onComplete()
,onComplete()
不会被调用到。反过来,则会崩溃,因为onError()
中抛出了异常:RxJavaPlugins.onError(t)
。实际上是dispose
后继续调用onError()
都会炸。
5.RxJava的线程切换
上面的例子和分析都是在同一个线程中进行,这中间也没涉及到线程切换的相关问题。但是在实际开发中,我们通常需要在一个子线程中去进行一些数据获取操作,然后要在主线程中去更新UI,这就涉及到线程切换的问题了,通过RxJava我们也可以把线程切换写得还简洁。
5.1 线程切换例子
关于RxJava如何使用线程切换,这里就不详细讲了。
我们直接来看一个例子,并分别打印RxJava在运行过程中各个角色所在的线程。
new Thread() {
@Override
public void run() {
Log.d(TAG, "Thread run() 所在线程为 :" + Thread.currentThread().getName());
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "Observable subscribe() 所在线程为 :" + Thread.currentThread().getName());
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "Observer onSubscribe() 所在线程为 :" + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.d(TAG, "Observer onNext() 所在线程为 :" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "Observer onError() 所在线程为 :" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.d(TAG, "Observer onComplete() 所在线程为 :" + Thread.currentThread().getName());
}
});
}
}.start();
输出结果为:
Thread run() 所在线程为 :Thread-2
Observer onSubscribe() 所在线程为 :Thread-2
Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1
Observer onNext() 所在线程为 :main
Observer onNext() 所在线程为 :main
Observer onComplete() 所在线程为 :main
从上面的例子可以看到:
Observer
(观察者)的onSubscribe()
方法运行在当前线程中。Observable
(被观察者)中的subscribe()
运行在subscribeOn()
指定的线程中。Observer
(观察者)的onNext()
和onComplete()
等方法运行在observeOn()
指定的线程中。
5.2 源码分析
下面我们对线程切换的源码进行一下分析,分为两部分:subscribeOn()
和observeOn()
。
5.2.1 subscribeOn()源码分析
首先来看下subscribeOn()
,我们的例子中是这么个使用的:
.subscribeOn(Schedulers.io())
subscribeOn()
方法要传入一个Scheduler
类对象作为参数,Scheduler
是一个调度类,能够延时或周期性地去执行一个任务。
5.2.1.1 Scheduler类型
通过Schedulers
类我们可以获取到各种Scheduler
的子类。RxJava提供了以下这些线程调度类供我们使用:
Scheduler类型 | 使用方式 | 含义 | 使用场景 |
---|---|---|---|
IoScheduler | Schedulers.io() |
io操作线程 | 读写SD卡文件,查询数据库,访问网络等IO密集型操作 |
NewThreadScheduler | Schedulers.newThread() |
创建新线程 | 耗时操作等 |
SingleScheduler | Schedulers.single() |
单例线程 | 只需一个单例线程时 |
ComputationScheduler | Schedulers.computation() |
CPU计算操作线程 | 图片压缩取样、xml,json解析等CPU密集型计算 |
TrampolineScheduler | Schedulers.trampoline() |
当前线程 | 需要在当前线程立即执行任务时 |
HandlerScheduler | AndroidSchedulers.mainThread() |
Android主线程 | 更新UI等 |
5.2.1.2 Schedulers类的io()
下面我们来看下Schedulers.io()
的代码,其他的Scheduler
子类都差不多,就不逐以分析了,有兴趣的请自行查看哈~
@NonNull
static final Scheduler IO;
@NonNull
public static Scheduler io() {
//1.直接返回一个名为IO的Scheduler对象
return RxJavaPlugins.onIoScheduler(IO);
}
static {
//省略无关代码
//2.IO对象是在静态代码块中实例化的,这里会创建按一个IOTask()
IO = RxJavaPlugins.initIoScheduler(new IOTask());
}
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
//3.IOTask中会返回一个IoHolder对象
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
//4.IoHolder中会就是new一个IoScheduler对象出来
static final Scheduler DEFAULT = new IoScheduler();
}
可以看到,Schedulers.io()
中使用了静态内部类的方式来创建出了一个单例IoScheduler
对象出来,这个IoScheduler
是继承自Scheduler的。这里mark一发,后面会用到这个IoScheduler
的。
5.2.1.3 Observable类的subscribeOn()
然后,我们就来看下subscribeOn()的代码:
public final Observable<T> subscribeOn(Scheduler scheduler) {
//省略无关代码
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
可以看到,首先会将当前的Observable
(其具体实现为ObservableCreate
)包装成一个新的ObservableSubscribeOn
对象。
放个图:
跟前面一样,RxJavaPlugins.onAssembly()
也是将ObservableSubscribeOn
对象原样返回而已,这里就不看了。
可以看下ObservableSubscribeOn
的构造方法:
5.2.1.4 ObservableSubscribeOn类的构造方法
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
也就是把source
和scheduler
这两个保存一下,后面会用到。
然后subscribeOn()
方法就完了。好像也没做什么,就是重新包装一下对象而已,然后将新对象返回。即将一个旧的被观察者包装成一个新的被观察者。
5.2.1.5 ObservableSubscribeOn类的subscribeActual()
接下来我们回到订阅过程,为什么要回到订阅过程呢?因为事件的发送是从订阅过程开始的啊。
虽然我们这里用到了线程切换,但是呢,其订阅过程前面的内容跟上一节分析的是一样的,我们这里就不重复了,直接从不一样的地方开始。还记得订阅过程中Observable
类的subscribeActual()
是个抽象方法吗?因此要看其子类的具体实现。在上一节订阅过程中,其具体实现是在ObservableCreate
类。但是由于我们调用subscribeOn()
之后,ObservableCreate
对象被包装成了一个新的ObservableSubscribeOn
对象了。因此我们就来看看ObservableSubscribeOn
类中的subscribeActual()
方法:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
subscribeActual()
中同样也将我们自定义的Observer
给包装成了一个新的SubscribeOnObserver
对象。同样,放张图:
然后就是调用Observer
的onSubscribe()
方法,可以看到,到目前为止,还没出现过任何线程相关的东西,所以Observer
的onSubscribe()
方法就是运行在当前线程中。
然后我们重点看下最后一行代码,首先创建一个SubscribeTask
对象,然后就是调用scheduler.scheduleDirect()
.。
我们先来看下SubscribeTask
类:
5.2.1.6 SubscribeTask类
//SubscribeTask是ObservableSubscribeOn的内部类
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//这里的source就是我们自定义的Observable对象,即ObservableCreate
source.subscribe(parent);
}
}
很简单的一个类,就是实现了Runnable
接口,然后run()
中调用Observer.subscribe()
。
5.2.1.7 Scheduler类的scheduleDirect()
再来看下scheduler.scheduleDirect()
方法
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
往下看:
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//createWorker()在Scheduler类中是个抽象方法,所以其具体实现在其子类中
//因此这里的createWorker()应当是在IoScheduler中实现的。
//Worker中可以执行Runnable
final Worker w = createWorker();
//实际上decoratedRun还是这个run对象,即SubscribeTask
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//将Runnable和Worker包装成一个DisposeTask
DisposeTask task = new DisposeTask(decoratedRun, w);
//Worker执行这个task
w.schedule(task, delay, unit);
return task;
}
我们来看下创建Worker
和Worker
执行任务的过程。
5.2.1.8 IoScheduler的createWorker()和schedule()
final AtomicReference<CachedWorkerPool> pool;
public Worker createWorker() {
//就是new一个EventLoopWorker,并且传一个Worker缓存池进去
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
//构造方法
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
//从缓存Worker池中取一个Worker出来
this.threadWorker = pool.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
//省略无关代码
//Runnable交给threadWorker去执行
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
注意,不同的Scheduler
类会有不同的Worker
实现,因为Scheduler
类最终是交到Worker
中去执行调度的。
我们来看下Worker
缓存池的操作:
5.2.1.9 CachedWorkerPool的get()
static final class CachedWorkerPool implements Runnable {
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
//如果缓冲池不为空,就从缓存池中取threadWorker
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
//如果缓冲池中为空,就创建一个并返回。
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
}
5.2.1.10 NewThreadWorker的scheduleActual()
我们再来看下threadWorker.scheduleActual()
。 ThreadWorker
类没有实现scheduleActual()
方法,其父类NewThreadWorker
实现了该方法,我们点进去看下:
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
//构造方法中创建一个ScheduledExecutorService对象,可以通过ScheduledExecutorService来使用线程池
executor = SchedulerPoolFactory.create(threadFactory);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//这里的decoratedRun实际还是run对象
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//将decoratedRun包装成一个新对象ScheduledRunnable
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
//省略无关代码
if (delayTime <= 0) {
//线程池中立即执行ScheduledRunnable
f = executor.submit((Callable<Object>)sr);
} else {
//线程池中延迟执行ScheduledRunnable
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
//省略无关代码
return sr;
}
}
这里的executor
就是使用线程池去执行任务,最终SubscribeTask
的run()
方法会在线程池中被执行,即Observable
的subscribe()
方法会在IO线程中被调用。这与上面例子中的输出结果符合:
Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1
5.2.1.11 简单总结
Observer
(观察者)的onSubscribe()
方法运行在当前线程中,因为在这之前都没涉及到线程切换。- 如果设置了
subscribeOn(指定线程)
,那么Observable
(被观察者)中subscribe()
方法将会运行在这个指定线程中去。
5.2.1.12 时序图
来张总的subscribeOn()
切换线程时序图
5.2.1.13 多次设置subscribeOn()的问题
如果我们多次设置subscribeOn()
,那么其执行线程是在哪一个呢?先来看下例子
//省略前后代码,看重点部分
.subscribeOn(Schedulers.io())//第一次
.subscribeOn(Schedulers.newThread())//第二次
.subscribeOn(AndroidSchedulers.mainThread())//第三次
其输出结果为:
Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1
即只有第一次的subscribeOn()
起作用了。这是为什么呢?
我们知道,每调用一次subscribeOn()
就会把旧的被观察者包装成一个新的被观察者,经过了三次调用之后,就变成了下面这个样子:
同时,我们知道,被观察者被订阅时是从最外面的一层通知到里面的一层,那么当传到上图第三层时,也就是ObservableSubscribeOn
(第一次)那一层时,管你之前是在哪个线程,subscribeOn(Schedulers.io())
都会把线程切到IO线程中去执行,所以多次设置subscribeOn()
时,只有第一次生效。
5.2.2 observeOn()
我们再来看下observeOn()
,还是先来回顾一下我们例子中的设置:
//指定在Android主线程中执行
.observeOn(AndroidSchedulers.mainThread())
5.2.2.1 Observable类的observeOn()
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//省略无关代码
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
同样,这里也是新包装一个ObservableObserveOn
对象,注意,这里包装的旧被观察者是ObservableSubscribeOn
对象了,因为之前调用过subscribeOn()
包装了一层了,所以现在是如下图所示:
RxJavaPlugins.onAssembly()
也是原样返回。
我们看看ObservableObserveOn
的构造方法。
5.2.2.2 ObservableObserveOn类的构造方法
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
里面就是一些变量赋值而已。
5.2.2.3 ObservableObserveOn的subscribeActual()
和subscribeOn()
差不多,我们就直接来看ObservableObserveOn
的subscribeActual()
方法了。
@Override
protected void subscribeActual(Observer<? super T> observer) {
//判断是否当前线程
if (scheduler instanceof TrampolineScheduler) {
//是当前线程的话,直接调用里面一层的subscribe()方法
//即调用ObservableSubscribeOn的subscribe()方法
source.subscribe(observer);
} else {
//创建Worker
//本例子中的scheduler为AndroidSchedulers.mainThread()
Scheduler.Worker w = scheduler.createWorker();
//这里会将Worker包装到ObserveOnObserver对象中去
//注意:source.subscribe没有涉及到Worker,所以还是在之前设置的线程中去执行
//本例子中source.subscribe就是在IO线程中执行。
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
同样,这里也将observer
给包装了一层,如下图所示:
source.subscribe()
中将会把事件逐一发送出去,我们这里只看下ObserveOnObserver
中的onNext()
方法的处理,onComplete()
等就不看了,实际上都差不多。
5.2.2.4 ObserveOnObserver的onNext()
@Override
public void onNext(T t) {
//省略无关代码
if (sourceMode != QueueDisposable.ASYNC) {
//将信息存入队列中
queue.offer(t);
}
schedule();
}
就是调用schedule()
而已。
5.2.2.5 ObserveOnObserver的schedule()
void schedule() {
if (getAndIncrement() == 0) {
//ObserveOnObserver同样实现了Runnable接口,所以就把它自己交给worker去调度了
worker.schedule(this);
}
}
Android主线程调度器里面的代码就不分析了,里面实际上是用handler
来发送Message
去实现的,感兴趣的可以看下。
既然ObserveOnObserver
实现了Runnable
接口,那么就是其run()
方法会在主线程中被调用。
我们来看下ObserveOnObserver
的run()
方法:
5.2.2.6 ObserveOnObserver的run()
@Override
public void run() {
//outputFused默认是false
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
这里会走到drainNormal()
方法。
5.2.2.7 ObserveOnObserver的drainNormal()
void drainNormal() {
int missed = 1;
//存储消息的队列
final SimpleQueue<T> q = queue;
//这里的actual实际上是SubscribeOnObserver
final Observer<? super T> a = actual;
//省略无关代码
//从队列中取出消息
v = q.poll();
//...
//这里调用的是里面一层的onNext()方法
//在本例子中,就是调用SubscribeOnObserver.onNext()
a.onNext(v);
//...
}
至于SubscribeOnObserver.onNext()
,里面也没切换线程的逻辑,就是调用里面一层的onNext()
,所以最终会调用到我们自定义的Observer
中的onNext()
方法。因此,Observer
的onNext()
方法就在observeOn()
中指定的线程中给调用了,在本例中,就是在Android主线程中给调用。
5.2.2.8 简单总结
- 如果设置了
observeOn(指定线程)
,那么Observer
(观察者)中的onNext()
、onComplete()
等方法将会运行在这个指定线程中去。subscribeOn()
设置的线程不会影响到observeOn()
。
5.2.2.9 时序图
最后,来张observeOn()时序图:
6.其他
因本人水平有限,如有错误,欢迎指出并交流~四月葡萄的博客
另外,打个广告哈~
广深求工作介绍和内推哈~
上一篇: 设计模式-策略模式
下一篇: 设计模式之——策略模式