RxJava的原理解析
还没有找到工作,闲着也是闲着就记录一下之前学过的知识点。
本文分析的大致内容是以下三个部分。关于RxJava的基本使用,不属于本文要点。源码基于RxJava 1.1.9
- RxJava的基本流程分析
- map操作符的分析
- 线程切换的分析
1.RxJava的基本流程分析
Observable.create(object :Observable.OnSubscribe<String>{
override fun call(t: Subscriber<in String>?) {
}
}).subscribe(object :Subscriber<String>(){
override fun onNext(t: String?) {
}
override fun onCompleted() {
}
override fun onError(e: Throwable?) {
}
})
以上是RxJava最基本的写法。为了更清晰的了解它们,我们可以分开创建
//创建一个被观察者observable
val observable = Observable.create(object :Observable.OnSubscribe<String>{
override fun call(t: Subscriber<in String>?) {}
})
//创建一个观察者subscriber
val subscriber = object :Subscriber<String>(){
override fun onNext(t: String?) {}
override fun onCompleted() {}
override fun onError(e: Throwable?) {}
}
//注册观察者
observable.subscribe(subscriber)
1.1 Observable.create(Onsubscribe)方法
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
//RxJavaHooks.onCreate(f)方法
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<OnSubscribe, OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
Observable.create(Onsubscribe)方法创建了一个OnSubscribe对象作为参数传入。
RxJavaHooks.onCreate()方法将OnSubscribe对象传入后,返回OnSubscribe对象。将返回的OnSubscribe对象作为
构造参数初始化Observable。我们再看Observable的源码:
public class Observable<T> {
final OnSubscribe<T> onSubscribe;
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
...
}
可以看到Observable只是记录了一个OnSubscribe对象而已。
总结:Observable.create()方法主要做的工作是创建了一个OnSubscribe对象,并在Observable内部记录下这个对象。
1.2 创建Subscriber
public abstract class Subscriber<T> implements Observer<T>, Subscription {
private final SubscriptionList subscriptions;
private final Subscriber<?> subscriber;
protected Subscriber() {
this(null, false);
}
protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
this.subscriber = subscriber;
this.subscriptions = shareSubscriptions && subscriber != null ?
subscriber.subscriptions : new SubscriptionList();
}
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
@Override
public final boolean isUnsubscribed() {
return subscriptions.isUnsubscribed();
}
}
Subscriber的代码很简单,它实现了Observer接口,说明它是一个观察者,并且实现了Subscription接口,为用户提供了
unsubscribe()取消注册方法 和 isUnsubscribed()判断是否取消注册方法。
1.3 Observable.subscribe(subscriber)方法
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
//参数一:之前创建的观察者Subscriber
//参数二:之前创建的被观察者Observable
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
}
subscriber.onStart();//1
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);//2
}
try {
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);//3
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
} else {
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
RxJavaHooks.onObservableError(r);
}
}
return Subscriptions.unsubscribed();
}
}
可以看到 注解1 :subscriber调用了onStart()方法表示开始执行。
注解2:将subscriber封装成SafeSubscriber类。主要的目的是对subscriber的方法有严格的限制,比如说保证onComplete()和onError()只会有一个被执行并且只会执行一次。一旦它们被执行了onNext将不再执行。
注解3:这是个重点。我们先来看看 RxJavaHooks.onObservableStart(observable, observable.onSubscribe)的源码
public static <T> OnSubscribe<T> onObservableStart(Observable<T> instance, OnSubscribe<T> onSubscribe) {
Func2<Observable, OnSubscribe, OnSubscribe> f = onObservableStart;
if (f != null) {
return f.call(instance, onSubscribe);
}
return onSubscribe;
}
onObservableStart()方法中参数一是observable,参数二就是Observable.create()时候创建并记录起来的Onsubscribe对象。
然后在这个方法中返回了OnSubscribe对象。
所以RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
可以看作是OnSubscribe.call(subscriber)
总结:当Observable.create()方法时,会创建一个Onsubscribe对象,并存储起来。当Observable.subscribe(subscriber)时调用的是Onsubscribe.call(subscriber)
2. map操作符流程
一个小例子:传入一个字符串,经过map操作符后变为大写的字符串。
val observableA = Observable.create(object :Observable.OnSubscribe<String>{
override fun call(t: Subscriber<in String>?) {
t?.onNext("aaa")
t?.onCompleted()
}
})
val observableB = observableA.map(object :Func1<String,String?>{
override fun call(t: String?): String? {
return t?.toUpperCase()
}
})
val subscriber = object :Subscriber<String>(){
override fun onNext(t: String?) {
println("onNext:$t")
}
override fun onCompleted() {
println("onCompleted")
}
override fun onError(e: Throwable?) {}
}
observableB.subscribe(subscriber)
我这里将它们分开来写了。可以看到首先创建了一个ObservableA,然后ObservableA.map()方法后创建了ObservableB。
最后由ObservableB来注册观察者observableB.subscribe(subscriber)。
第一步创建observableA的步骤上面已经说了。
我们只需要知道第一步创建了observableA和OnSubscribe 对象
我们从observableA.map()方法开始说
2.1 observableA.map()
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(new OnSubscribeMap<T, R>(this, func));
}
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
final Observable<T> source;
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
...
}
我们可以看到map方法创建并返回一个Observable 对象。我们标记为observableB。在看看它是怎么创建的。
首先它先创建了一个OnSubscribeMap对象。OnSubscribeMap类实现了OnSubscribe接口,在OnSubscribeMap类的构造函数中分别存储了observableA 和 Func1 方法。
在看以下create()方法做了什么操作。
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
这个步骤之前有说过,就是存储了OnSubscribe对象。在这里存储的是observableB刚刚创建好的OnSubscribeMap对象。
2.2 observableB.subscribe(subscriber)
经过前面我们总结的当有observable.subscribe(subscriber)时,事实上就是OnSubscribe.call(subscriber)。
这里调用的是observableB创建的OnSubscribeMap调用call。即onSubscribemap.call(subscriber)
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);//1
o.add(parent);
source.unsafeSubscribe(parent);//2
}
在call()方法中,
注解1:使用subscriber和transformer(Func1方法,之前存储好的)作为参数创建了MapSubscriber。
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
}
可以看出MapSubscriber就是一个观察者。在构造函数初始化的时候存储了原先的subscriber 为actaul ,存储Func1方法为mapper。
注解2:source.unsafeSubscribe(parent); source就是observableA。这句通过上面分析我们知道了实际上是observableA的OnSubscribe调用了call(parent),即onSubscribe.call(mapSubscriber)。
当mapSubscriber执行了onNext()方法的时候:
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
mapper(Func1)的call()方法先执行了并返回结果。最后原先的subscriber再调用onNext()方法。
3.线程切换原理
RxJava的线程切换需要用到两个方法subscribeOn() 和observeOn()
val observableA = Observable.create(object :Observable.OnSubscribe<String>{
override fun call(t: Subscriber<in String>?) {
t?.onNext("aaa")
t?.onCompleted()
}
})
val observableB = observableA.subscribeOn(Schedulers.io())
val observableC = observableB.observeOn(Schedulers.io())//由于没有依赖RxAndroid所以找不到AndroidSchdulers.main()。不过没有关系
val subscriber = object :Subscriber<String>(){
override fun onNext(t: String?) {
println("onNext:$t")
}
override fun onCompleted() {
println("onCompleted")
}
override fun onError(e: Throwable?) {}
}
observableC.subscribe(subscriber)
3.1 observableA.subscribeOn(Schedulers.io())
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
还是跟前面说过的,创建并返回一个Observable对象。OperatorSubscribeOn不用想也能猜得出它是一个OnSubscribe对象。
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {//1
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();//2
subscriber.add(inner);
inner.schedule(new Action0() {//3
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
source.unsafeSubscribe(s);
}
});
}
}
注解1:跟之前的一样。记录创建它的observable对象。
在call()方法中,将subscriber作为参数,创建一个新的Subscriber对象,然后交给Worker操作。
Worker是什么?我们可以把它当作是一个线程调度的代理类。
注解2:final Worker inner = scheduler.createWorker(); 中的scheduler是我们指定的 Schedulers.io()
public static Scheduler io() {
return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}
public final class Schedulers {
private final Scheduler computationScheduler;
private final Scheduler ioScheduler;
private final Scheduler newThreadScheduler;
private static final AtomicReference<Schedulers> INSTANCE = new AtomicReference<Schedulers>();
private static Schedulers getInstance() {
for (;;) {
Schedulers current = INSTANCE.get();
if (current != null) {
return current;
}
current = new Schedulers(); //1
if (INSTANCE.compareAndSet(null, current)) {
return current;
} else {
current.shutdownInstance();
}
}
}
private Schedulers() {
@SuppressWarnings("deprecation")
RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();
Scheduler c = hook.getComputationScheduler();
if (c != null) {
computationScheduler = c;
} else {
computationScheduler = RxJavaSchedulersHook.createComputationScheduler();
}
Scheduler io = hook.getIOScheduler();//这个就是我们执行的类型
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = RxJavaSchedulersHook.createIoScheduler();//2
}
Scheduler nt = hook.getNewThreadScheduler();
if (nt != null) {
newThreadScheduler = nt;
} else {
newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
}
}
}
注解1:Schedulers初始化
注解2:创建我们指定的线程类型
RxJavaSchedulersHook.createIoScheduler()
public static Scheduler createIoScheduler() {
return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory == null");
}
return new CachedThreadScheduler(threadFactory);
}
返回的是CachedThreadScheduler 类型。我们再看CachedThreadScheduler的createWorker()方法
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
所以Worker 是 EventLoopWorker类型。
在call()方法中Worker调用了schedule方法
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
action.call();
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}
我们需要注意的是scheduleActual方法
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
从上面的方法可知道 执行worker.schedule()的时候参数action0,被封装了一次,然后再次封装成ScheduleAction类。
ScheduleAction类实现了Runnable接口,所以subscribeOn所做的线程切换是通过线程池来操作的。
3.2 observeOn()
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
observeOn()方法还是创建一个observable对象并返回。只是它通过lift的方式创建的。
我们再来看看OperatorObserveOn类是什么?
public final class OperatorObserveOn<T> implements Operator<T, T>
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
OperatorObserveOn实现了Operator接口,而Operator接口又继承了Subscriber类。我们可以把它看作是一个
观察者Subscriber。
我们再来看看lift()方法是如何创建observable的
lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
还是跟之前一样通过create方法创建observable,另外也创建了OnSubscribeLift类,这个不难猜出它就是一个OnSubscribe。
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
实现了OnSubscribe接口。
分别记录了前一个Observable的OnSubscribe 和 刚刚创建的Operator对象。
当subscribe(subscriber)时,触发了OnSubscribeLift.call(subscriber)
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);//1
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
在注解1 中 调用了OperatorObserveOn.call(o)
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>
(scheduler, child, delayError, bufferSize);//1
parent.init();
return parent;
}
}
在注解1中,重新创建了一个subscriber对象。
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}
@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaHooks.onError(e);
return;
}
error = e;
finished = true;
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
}
在这个subscriber的onNext,onComplete和onError方法中都有调用了schedule()方法。
在schedule()方法中recursiveScheduler.schedule(this);则是线程切换。所以observeOn的线程切换也是通过线程池的方式实现的。
最后我们整理一下:
val observableA = Observable.create(object :Observable.OnSubscribe<String>{
override fun call(t: Subscriber<in String>?) {
t?.onNext("aaa")
t?.onCompleted()
}
})
val observableB = observableA.subscribeOn(Schedulers.io())
val observableC = observableB.observeOn(Schedulers.io())
val subscriber = object :Subscriber<String>(){
override fun onNext(t: String?) {
println("onNext:$t")
}
override fun onCompleted() {
println("onCompleted")
}
override fun onError(e: Throwable?) {}
}
observableC.subscribe(subscriber)
当observableC.subscribe(subscriber)时,触发了
OnSubscribeLift.call(subscriber)方法,在这个方法中将原先的subscriber封装到了ObserveOnSubscriber。 然后observableB创建的OperatorSubscribeOn.call(ObserveOnSubscriber)。在OperatorSubscribeOn.call方法中,又将传递过来的subscriber再次封装,然后交给Worker执行,这个Worker是线程池的代理类,做了线程的切换,在onNext()方法的时候,ObserveOnSubscriber也执行了OnNext()方法。在ObserveOnSubscriber的onNext()方法中也做了线程切换的操作。
上一篇: Linux命令基础使用
下一篇: gitlab安装注册记录
推荐阅读