RxJava2源码分析(下):操作符和线程调度
目录
一、概述
在上一篇文章,咱们详细分析了Rxjava2的结构流程:RxJava2源码分析(上):架构解析https://blog.csdn.net/qq_29152241/article/details/82559595。那么这篇文章就来分析RxJava2的精髓:操作符和及其线程调度。
二、map操作符
在RxJava2中有将近50个操作符可以帮助用户做组合、变换等复杂操作,有关RxJava2操作符的用法和每个操作符的说明可以看我Github上的这个项目:https://github.com/cozing/RxJava2Retrofit2Hybrid
咱们现在采用链式调用方式重写上面栗子代码,并添加map操作符:
Observable.just(10086)
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) throws Exception {
return "整形转成字符串" + integer;
}
})
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
在这段代码中咱们添加了map操作符将原来整形数据“10086”转换成了字符串“整形转成字符串10086”,然后将数据发射出去,在观察者接收到数据时候打印出来。其中map操作符就是起到映射作用,将原数据转换成了另一个数据,充当着转换的功能,看其源码:
public abstract class Observable<T> implements ObservableSource<T> {
...
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
...
}
关于RxJavaPlugins.onAssembly的实现,咱们在上一篇文章中创建被观察者的时候讲述过,这是一个hook的抽象代理,在这里得到的就是他的实参,咱们直接看ObservableMap,也就是map方法返回的具体数据。
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
...
}
其中super(source):
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
...
}
可以看到,创建实例时候,将被观察者对象ObservableSource和变换函数Function保存起来。我们知道在调用订阅方法subscribe()时:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//调用订阅方法时,内部调用的是subscribeActual
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
内部调用的是subscribeActual(),在这里就是
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
...
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
...
}
因此真正的订阅实现是在这个方法,接着看MapObserver类实现,MapObserver是ObservableMap的一个内部类,在创建MapObserver类实例,构造方法传入观察者Observer和事件转换函数Function,看它实现:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
...
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
//构造方法内部将传进来的观察者actual和转换函数mapper保存起来
super(actual);
this.mapper = mapper;
}
//调用onNext的时候,先走到这里
@Override
public void onNext(T t) {
//前面对发送事件做一系列判断
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
//最后v返回的是mapper.apply(t)
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//最终观察者onNext的v,也就是转换函数转换过后的数据
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
步骤如下:
1、首先构造方法内部将传进来的观察者actual和转换函数mapper保存起来;
2、对传入的发射事件做一系列判断;
3、最后v返回的是mapper.apply(t);
4、最终观察者onNext的v,也就是转换函数转换过后的数据。
先看mapper.apply(t):
public interface Function<T, R> {
R apply(@NonNull T t) throws Exception;
}
内部实现非常简单,就是将数据T转换成了R返回,因此在调用onNext发射事件时,将传入的事件转换成了新事件v,发射给了观察者。
至此map操作符的源码分析完毕,在RxJava2中有将近50个操作符,每个操作符都有它自己的功能和实现方式,篇幅有限咱们不对其他操作符做分析,童鞋们可以课后根据咱们分析map的方式对其他操作符做分析。
接下来咱们分析RxJava2中的线程调度。
三、线程调度
线程调度分为subscribeOn和observeOn,这两个属于RxJava2功能性操作符中的线程调度操作符,对应事件订阅线程和接收线程。
1、subscribeOn
将上面栗子的代码添加subscribeOn:
Observable.just(10086)
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) throws Exception {
return "整形转成字符串" + integer;
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
System.out.println("接收到数据:" + o);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
表示订阅的时候发生在io线程,看源码:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
是咱们非常熟悉的hook抽象代理了,直接看ObservableSubscribeOn:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//存储被观察者ObservableSource和线程调度实例
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
...
}
仔细看,这部分代码是不是和上面分析map的时候非常相似?没错,和map操作符流程一样,ObservableSubscribeOn只是一个装饰类,当调用订阅subscribe方法时,最终调用的是这个类的subscribeActual()方法,subscribeActual()方法内部的实现也和map操作符内部实现一样,内部维护一个Observer,最后的订阅发生在该Observer内部,在这里是ObservableSubscribeOn的内部类SubscribeOnObserver:
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
回到subscribeActual()方法,看下面一句:
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
这部分有点绕,咱们一步步看,首先调用了SubscribeOnObserver的setDisposable(),传入scheduler.scheduleDirect()返回的Disposable,而Disposable通过传入SubscribeTask实例生成,看SubscribeTask实现:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
...
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
}
这是ObservableSubscribeOn的一个内部类,其中source为创建ObservableSubscribeOn时候调用构造方法传入的经过map处理转换后的ObserableSource被观察者,也就是最新的observable,SubscribeTask类实现了Runnable接口并重写其run方法,内部做source.subscribe(parent)订阅操作。
接着看scheduler.scheduleDirect:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
继续看scheduleDirect方法:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
该方法是在当前线程上立即执行任务,也就是SubscribeTask,负责将Observable转到新的线程做处理。首先createWorker()会创建一个ioScheduer,ioScheduer内部维护了一个缓存工作线程池CachedWorkerPool,其内部是维护一个ScheduledExecutorService线程池做循环处理事件,包括处理、移除等操作,在Scheduler内部其实已经将线程切换到了ioScheduler。其内部的代码非常繁杂,如需列出代码,将会需要非常大的篇幅,这部分代码确实可读性有些差,在读这部分代码的时候,建议大家结合注释跟踪,不然里面错综复杂的逻辑会把人绕晕,在此咱们知道他的作用即可。继续分析,当调用parent.setDisposable时候,会将一个个Woker添加到线程池中进行管理。最后在转换后的线程中,调用Runnable实现类SubscribeTask中的run()方法:
@Override
public void run() {
source.subscribe(parent);
}
对前面Observable执行订阅,当上游的被观察者发射事件的时候,通过中转到新的线程执行,最后在观察者Observer中接收到数据。
2、observeOn
继续对上面的例子做observeOn做处理:
Observable.just(10086)
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) throws Exception {
return "整形转成字符串" + integer;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
System.out.println("接收到数据:" + o);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
表示在订阅的时候发生在io线程,在mainThread接收。直接看observeOn源码:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
看其有三个参数的observeOn方法:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
还是老套路,不多解释,继续看ObservableObserveOn:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//不会走这里
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//创建Worker
Scheduler.Worker w = scheduler.createWorker();
//注释1
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
...
]
继续老套路,省略,直接看订阅发生的真正方法subscribeActual(),看注释1,和subscribeOn不同的是,这里直接订阅了Observer,也就是ObserveOnObserver,看ObserveOnObserver的实现:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
...
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
...
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
...
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
...
}
是一个Observer代理类,属于ObservableObserveOn的内部类,最终上游发射数据将会在此处做真正的处理。因为咱们分析源码只需抓住主干,在几个on...f方法里,咱们直接看onNext():
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
看schedule():
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
调用了worker的调度方法schedule(),内部传参为为一个Runnable,在这里就是ObserveOnObserver,直接看他的run()方法:
@Override
public void run() {
if (outputFused) {
//默认是false
drainFused();
} else {
drainNormal();
}
}
接着看drainNormal():
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
//检查是否已取消或者为空
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
//注释1,从队列中取出事件
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//将事件传递给新线程Observer的onNext()
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
咱们看注释1处,这个queue是在订阅发生时调用的函数onSubscribe中赋值,根据上面代码的逻辑可以发现最后将事件传递给了新线程中Observer的onNext(),这个onNext()方法就是暴露给开发者的接口中的onNext(),至此,observeOn这个观察者的线程切换完成。
四、总结
通过以上源码跟踪分析,咱们基本将map操作符、两个线程调度操作符subscribeOn/observeOn分析完成,RxJava2的源码相对来说是比较复杂的,里面使用了大量的接口定义,不过当你明白了其中一个操作符,再去看其他的时候就会发现都大同小异,所以还是比较好理解的。大家可以去看看其他操作符的源码并了解其原理,比如常用的zip、filt、concat等操作符,去理解RxJava2开发者的思想,能学到很多东西。
关注我的博客和github,有大量福利哟~
csdn博客:https://blog.csdn.net/qq_29152241
github:https://github.com/cozing