欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava的原理解析

程序员文章站 2022-03-06 08:21:20
...

还没有找到工作,闲着也是闲着就记录一下之前学过的知识点。

本文分析的大致内容是以下三个部分。关于RxJava的基本使用,不属于本文要点。源码基于RxJava 1.1.9 

  • RxJava的基本流程分析
  • map操作符的分析
  • 线程切换的分析

1.RxJava的基本流程分析

 Observable.create(object :Observable.OnSubscribe<String>{
        override fun call(t: Subscriber<in String>?) {

        }
    }).subscribe(object :Subscriber<String>(){
        override fun onNext(t: String?) {
        }

        override fun onCompleted() {
        }

        override fun onError(e: Throwable?) {
        }

    })

以上是RxJava最基本的写法。为了更清晰的了解它们,我们可以分开创建

//创建一个被观察者observable
val observable = Observable.create(object :Observable.OnSubscribe<String>{
        override fun call(t: Subscriber<in String>?) {}
    })


//创建一个观察者subscriber
val subscriber = object :Subscriber<String>(){
        override fun onNext(t: String?) {}
        override fun onCompleted() {}
        override fun onError(e: Throwable?) {}
    }

//注册观察者
observable.subscribe(subscriber)

1.1  Observable.create(Onsubscribe)方法

 public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }


//RxJavaHooks.onCreate(f)方法
 public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
        Func1<OnSubscribe, OnSubscribe> f = onObservableCreate;
        if (f != null) {
            return f.call(onSubscribe);
        }
        return onSubscribe;
    }

Observable.create(Onsubscribe)方法创建了一个OnSubscribe对象作为参数传入。

RxJavaHooks.onCreate()方法将OnSubscribe对象传入后,返回OnSubscribe对象。将返回的OnSubscribe对象作为

构造参数初始化Observable。我们再看Observable的源码:

public class Observable<T> {

    final OnSubscribe<T> onSubscribe;

    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
...
}

可以看到Observable只是记录了一个OnSubscribe对象而已。

总结:Observable.create()方法主要做的工作是创建了一个OnSubscribe对象,并在Observable内部记录下这个对象。

 

1.2 创建Subscriber

public abstract class Subscriber<T> implements Observer<T>, Subscription {
   
    private final SubscriptionList subscriptions;
    private final Subscriber<?> subscriber;  

    protected Subscriber() {
        this(null, false);
    }

    protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
        this.subscriber = subscriber;
        this.subscriptions = shareSubscriptions && subscriber != null ? 
        subscriber.subscriptions : new SubscriptionList();
    }

    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }

    @Override
    public final boolean isUnsubscribed() {
        return subscriptions.isUnsubscribed();
    }

}

Subscriber的代码很简单,它实现了Observer接口,说明它是一个观察者,并且实现了Subscription接口,为用户提供了

unsubscribe()取消注册方法 和 isUnsubscribed()判断是否取消注册方法。

 

1.3  Observable.subscribe(subscriber)方法

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
}
//参数一:之前创建的观察者Subscriber
//参数二:之前创建的被观察者Observable

 static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
        }
        
        subscriber.onStart();//1
        
        if (!(subscriber instanceof SafeSubscriber)) {
            
            subscriber = new SafeSubscriber<T>(subscriber);//2
        }

        try {
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);//3
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    RxJavaHooks.onObservableError(r);
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

可以看到 注解1 :subscriber调用了onStart()方法表示开始执行。

注解2:将subscriber封装成SafeSubscriber类。主要的目的是对subscriber的方法有严格的限制,比如说保证onComplete()和onError()只会有一个被执行并且只会执行一次。一旦它们被执行了onNext将不再执行。

注解3:这是个重点。我们先来看看 RxJavaHooks.onObservableStart(observable, observable.onSubscribe)的源码

public static <T> OnSubscribe<T> onObservableStart(Observable<T> instance, OnSubscribe<T> onSubscribe) {
        Func2<Observable, OnSubscribe, OnSubscribe> f = onObservableStart;
        if (f != null) {
            return f.call(instance, onSubscribe);
        }
        return onSubscribe;
    }

onObservableStart()方法中参数一是observable,参数二就是Observable.create()时候创建并记录起来的Onsubscribe对象。

然后在这个方法中返回了OnSubscribe对象。

所以RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

可以看作是OnSubscribe.call(subscriber)

 

总结:当Observable.create()方法时,会创建一个Onsubscribe对象,并存储起来。当Observable.subscribe(subscriber)时调用的是Onsubscribe.call(subscriber)

RxJava的原理解析

 

2. map操作符流程

一个小例子:传入一个字符串,经过map操作符后变为大写的字符串。

 val observableA = Observable.create(object :Observable.OnSubscribe<String>{
        override fun call(t: Subscriber<in String>?) {
            t?.onNext("aaa")
            t?.onCompleted()
        }
    })
    val observableB = observableA.map(object :Func1<String,String?>{
        override fun call(t: String?): String? {
           return t?.toUpperCase()
        }
    })
    val subscriber = object :Subscriber<String>(){
        override fun onNext(t: String?) {
            println("onNext:$t")
        }
        override fun onCompleted() {
            println("onCompleted")
        }
        override fun onError(e: Throwable?) {}
    }
    observableB.subscribe(subscriber)

我这里将它们分开来写了。可以看到首先创建了一个ObservableA,然后ObservableA.map()方法后创建了ObservableB。
最后由ObservableB来注册观察者observableB.subscribe(subscriber)。

第一步创建observableA的步骤上面已经说了。

我们只需要知道第一步创建了observableA和OnSubscribe 对象

我们从observableA.map()方法开始说

2.1 observableA.map()

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return create(new OnSubscribeMap<T, R>(this, func));
}

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    final Observable<T> source;
    
    final Func1<? super T, ? extends R> transformer;

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }
    
    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }
    
    static final class MapSubscriber<T, R> extends Subscriber<T> {
        
        final Subscriber<? super R> actual;
        
        final Func1<? super T, ? extends R> mapper;

        boolean done;
        
        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }
        
        @Override
        public void onNext(T t) {
            R result;
            
            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            
            actual.onNext(result);
        }

     ...
}

我们可以看到map方法创建并返回一个Observable 对象。我们标记为observableB。在看看它是怎么创建的。

首先它先创建了一个OnSubscribeMap对象。OnSubscribeMap类实现了OnSubscribe接口,在OnSubscribeMap类的构造函数中分别存储了observableA 和 Func1 方法。

在看以下create()方法做了什么操作。

public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

这个步骤之前有说过,就是存储了OnSubscribe对象。在这里存储的是observableB刚刚创建好的OnSubscribeMap对象。

 

2.2  observableB.subscribe(subscriber)

经过前面我们总结的当有observable.subscribe(subscriber)时,事实上就是OnSubscribe.call(subscriber)。

这里调用的是observableB创建的OnSubscribeMap调用call。即onSubscribemap.call(subscriber)

@Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);//1
        o.add(parent);
        source.unsafeSubscribe(parent);//2
    }

在call()方法中,

注解1:使用subscriber和transformer(Func1方法,之前存储好的)作为参数创建了MapSubscriber。

static final class MapSubscriber<T, R> extends Subscriber<T> {
        
        final Subscriber<? super R> actual;
        
        final Func1<? super T, ? extends R> mapper;
      
        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }
        
        @Override
        public void onNext(T t) {
            R result;
            
            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            
            actual.onNext(result);
        }
        
        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;
            
            actual.onError(e);
        }
        
        
        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }
        
      
    }

可以看出MapSubscriber就是一个观察者。在构造函数初始化的时候存储了原先的subscriber 为actaul ,存储Func1方法为mapper。

注解2:source.unsafeSubscribe(parent); source就是observableA。这句通过上面分析我们知道了实际上是observableA的OnSubscribe调用了call(parent),即onSubscribe.call(mapSubscriber)。

当mapSubscriber执行了onNext()方法的时候:

@Override
        public void onNext(T t) {
            R result;
            
            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            
            actual.onNext(result);
        }

mapper(Func1)的call()方法先执行了并返回结果。最后原先的subscriber再调用onNext()方法。

 

RxJava的原理解析

 

3.线程切换原理

RxJava的线程切换需要用到两个方法subscribeOn() 和observeOn()

val observableA = Observable.create(object :Observable.OnSubscribe<String>{
        override fun call(t: Subscriber<in String>?) {
            t?.onNext("aaa")
            t?.onCompleted()
        }
    })

    val observableB = observableA.subscribeOn(Schedulers.io())
    val observableC = observableB.observeOn(Schedulers.io())//由于没有依赖RxAndroid所以找不到AndroidSchdulers.main()。不过没有关系

    val subscriber = object :Subscriber<String>(){
        override fun onNext(t: String?) {
            println("onNext:$t")
        }
        override fun onCompleted() {
            println("onCompleted")
        }
        override fun onError(e: Throwable?) {}
    }
    observableC.subscribe(subscriber)

3.1 observableA.subscribeOn(Schedulers.io())

public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }

还是跟前面说过的,创建并返回一个Observable对象。OperatorSubscribeOn不用想也能猜得出它是一个OnSubscribe对象。

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {//1
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();//2
        subscriber.add(inner);
        
        inner.schedule(new Action0() {//3
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                    
                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };
                
                source.unsafeSubscribe(s);
            }
        });
    }
}

 

注解1:跟之前的一样。记录创建它的observable对象。

在call()方法中,将subscriber作为参数,创建一个新的Subscriber对象,然后交给Worker操作。

Worker是什么?我们可以把它当作是一个线程调度的代理类。

注解2:final Worker inner = scheduler.createWorker(); 中的scheduler是我们指定的 Schedulers.io() 

 public static Scheduler io() {
        return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
    }


public final class Schedulers {

    private final Scheduler computationScheduler;
    private final Scheduler ioScheduler;
    private final Scheduler newThreadScheduler;

    private static final AtomicReference<Schedulers> INSTANCE = new AtomicReference<Schedulers>();

    private static Schedulers getInstance() {
        for (;;) {
            Schedulers current = INSTANCE.get();
            if (current != null) {
                return current;
            }
            current = new Schedulers();   //1
            if (INSTANCE.compareAndSet(null, current)) {
                return current;
            } else {
                current.shutdownInstance();
            }
        }
    }

    private Schedulers() {
        @SuppressWarnings("deprecation")
        RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();

        Scheduler c = hook.getComputationScheduler();
        if (c != null) {
            computationScheduler = c;
        } else {
            computationScheduler = RxJavaSchedulersHook.createComputationScheduler();
        }

        Scheduler io = hook.getIOScheduler();//这个就是我们执行的类型
        if (io != null) {
            ioScheduler = io;
        } else {
            ioScheduler = RxJavaSchedulersHook.createIoScheduler();//2
        }

        Scheduler nt = hook.getNewThreadScheduler();
        if (nt != null) {
            newThreadScheduler = nt;
        } else {
            newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
        }
    }
}

注解1:Schedulers初始化

注解2:创建我们指定的线程类型

 

 RxJavaSchedulersHook.createIoScheduler()
 public static Scheduler createIoScheduler() {
        return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
    }

 public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory == null");
        }
        return new CachedThreadScheduler(threadFactory);
    }

返回的是CachedThreadScheduler 类型。我们再看CachedThreadScheduler的createWorker()方法

 @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

所以Worker 是 EventLoopWorker类型。

 

在call()方法中Worker调用了schedule方法

 @Override
        public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
            if (innerSubscription.isUnsubscribed()) {
                // don't schedule, we are unsubscribed
                return Subscriptions.unsubscribed();
            }

            ScheduledAction s = threadWorker.scheduleActual(new Action0() {
                @Override
                public void call() {
                    if (isUnsubscribed()) {
                        return;
                    }
                    action.call();
                }
            }, delayTime, unit);
            innerSubscription.add(s);
            s.addParent(innerSubscription);
            return s;
        }

我们需要注意的是scheduleActual方法

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }



从上面的方法可知道 执行worker.schedule()的时候参数action0,被封装了一次,然后再次封装成ScheduleAction类。

ScheduleAction类实现了Runnable接口,所以subscribeOn所做的线程切换是通过线程池来操作的。

 

3.2  observeOn()

public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, RxRingBuffer.SIZE);
    }

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
    }

observeOn()方法还是创建一个observable对象并返回。只是它通过lift的方式创建的。

我们再来看看OperatorObserveOn类是什么?

public final class OperatorObserveOn<T> implements Operator<T, T> 

public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> 

OperatorObserveOn实现了Operator接口,而Operator接口又继承了Subscriber类。我们可以把它看作是一个
观察者Subscriber。

我们再来看看lift()方法是如何创建observable的

lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }

还是跟之前一样通过create方法创建observable,另外也创建了OnSubscribeLift类,这个不难猜出它就是一个OnSubscribe。

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
    
    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

实现了OnSubscribe接口。

分别记录了前一个Observable的OnSubscribe 和 刚刚创建的Operator对象。

 

当subscribe(subscriber)时,触发了OnSubscribeLift.call(subscriber)

 @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);//1
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }

在注解1 中 调用了OperatorObserveOn.call(o)

 @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>
(scheduler, child, delayError, bufferSize);//1
            parent.init();
            return parent;
        }
    }

在注解1中,重新创建了一个subscriber对象。

 

static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {

 @Override
        public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            }
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }

        @Override
        public void onCompleted() {
            if (isUnsubscribed() || finished) {
                return;
            }
            finished = true;
            schedule();
        }

        @Override
        public void onError(final Throwable e) {
            if (isUnsubscribed() || finished) {
                RxJavaHooks.onError(e);
                return;
            }
            error = e;
            finished = true;
            schedule();
        }


 protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(this);
            }
        }




}

在这个subscriber的onNext,onComplete和onError方法中都有调用了schedule()方法。

在schedule()方法中recursiveScheduler.schedule(this);则是线程切换。所以observeOn的线程切换也是通过线程池的方式实现的。

 

 

最后我们整理一下:

 val observableA = Observable.create(object :Observable.OnSubscribe<String>{
        override fun call(t: Subscriber<in String>?) {
            t?.onNext("aaa")
            t?.onCompleted()
        }
    })

    val observableB = observableA.subscribeOn(Schedulers.io())
    val observableC = observableB.observeOn(Schedulers.io())

    val subscriber = object :Subscriber<String>(){
        override fun onNext(t: String?) {
            println("onNext:$t")
        }
        override fun onCompleted() {
            println("onCompleted")
        }
        override fun onError(e: Throwable?) {}
    }
    observableC.subscribe(subscriber)

当observableC.subscribe(subscriber)时,触发了

OnSubscribeLift.call(subscriber)方法,在这个方法中将原先的subscriber封装到了ObserveOnSubscriber。
然后observableB创建的OperatorSubscribeOn.call(ObserveOnSubscriber)。在OperatorSubscribeOn.call方法中,又将传递过来的subscriber再次封装,然后交给Worker执行,这个Worker是线程池的代理类,做了线程的切换,在onNext()方法的时候,ObserveOnSubscriber也执行了OnNext()方法。在ObserveOnSubscriber的onNext()方法中也做了线程切换的操作。