欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava详解-线程切换原理篇

程序员文章站 2024-03-24 17:45:52
...

概要

RxJava最神秘的部分莫过于此,我为了编写这篇文章也是一遍一遍的查看源码寻找它的运行原理,同样的也查阅了很多的相关资料,但是惭愧的是并未找到实质性有用的资料,很多都是避开这话题避而不谈,在此我们详细的对它进行剖析。
不得不说这个框架编写的真的很棒,相关类也是错综复杂,不过作为程序员我们必须拿出积极的态度来学习它的实现,以此提高自身的价值;好了废话不多说我们马上开始。

源码剖析

我们先来看一下外部的实现调用

public void onMineTask() {
        //声明一个ObservableCreate类型的 被观察者对象
        Observable mObservable = new ObservableCreate(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext("ONE");
                e.onNext("TWO");
                e.onNext("THREE");
                e.onNext("FOUR");
            }
        });
        //声明一个Observer类型的观察者对象
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                disposable = d;
                Log.e("回调执行了onSubscribe函数->", "观察者已成功订阅");
            }

            @Override
            public void onNext(String value) {
                if (disposable.isDisposed())
                    onComplete();

                value = mine_result.getText().toString() + "\n" + value;
                mine_result.setText(value);
                Log.e("回调执行了onNext函数->", value);

            }

            @Override
            public void onError(Throwable e) {
                Log.e("回调执行了onError函数->", e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e("回调执行了onComplete函数->", "本次结束!");
            }
        };
       //指定被观察者和观察者运行的线程
       //指定 被观察者线程      ObservableSubscribeOn类型
        mObservable.subscribeOn(Schedulers.io())  
                 //指定 观察者线程    ObservableObserveOn类型
                .observeOn(AndroidSchedulers.mainThread()) 
                 //事件发布
                .subscribe(observer);
    }

我们通过上述的代码可以看出mObservable它是ObservableCreate类型,通过他调取了[1]subscribeOn(Schedulers.io())来指定被观察者的执行线程,并返回一个Observable类型的返回值,当然这个Observable类型是个抽象类型;
而获取到Observable类型的返回值之后,我们有调用了[2]observeOn(AndroidSchedulers.mainThread())函数方法,实现了对观察者执行线程的指定,此时我们得到返回值依旧是Observable类型;
最终我们依旧使用Observable类型的返回值,通过调取[3]subscribe(observer)函数实现主题发布,并携带着observer观察者实例。
那么我们接下来,就按序讲解他们的执行的过程:

[ 1 ] subscribeOn(Schedulers.io())函数方法,源码解析

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        //此处查Null操作
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //这里是一个钩子函数,在无扩展性特殊操作情况下,
        //原封不动的返回一个ObservableSubscribeOn类型的值,
        //而由于它的基类是 Observable类型,所以我们直接将其基类类型
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

我们来看RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler))函数方法,它其实是一个钩子函数,在无扩展性特殊操作情况下,会原封不动的返回一个ObservableSubscribeOn类型的值,而由于它的基类是 Observable类型,所以我们直接将其基类类型返回。

而我们重点来看这方法中的参数new ObservableSubscribeOn<T>(this, scheduler),它是声明了一个ObservableSubscribeOn类型对象,并传入了一个Observable类型的被观察者,但由于Observable实现了ObservableSource的接口,而我们也只需要ObservableSource这部分内容,所以最终声明ObservableSubscribeOn类型实例,我们传入的是一个ObservableSource类型参数和一个指定运行线程的Scheduler类型参数.

ObservableSubscribeOn类中我们对subscribeActual(final Observer<? super T> observer)抽象方法进行了实现.但目前还未进行调用

[ 2 ]observeOn(AndroidSchedulers.mainThread())函数方法,源码解析

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        //调用其自身的多态方法
        return observeOn(scheduler, false, bufferSize());
    }

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //省略...
        //钩子函数,若无扩展性特殊逻辑实现依旧返回一个`ObservableObserveOn`类型的值,
        //由于`Observable`是其基类,所以这里直接将其基类类型返回值
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

此方法与[ 1 ]中的方法一样这里不再赘述,我们的重点是其参数new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize),它返回是一个ObservableObserveOn类型的值,但由于它的基类是Observable类型,所以此处直接按基类类型返回,它携带参数包括
ObservableSource类型的thisScheduler类型的执行线程;boolean类型的延迟错误,指示onError通知是否不能在另一侧的onNext通知之前切换;int类型的缓冲区大小.

ObservableObserveOn类同样对`subscribeActual(final Observer<? super T> observer)抽象方法进行了实现.但目前还未进行调用.

[ 3 ]subscribe(observer)函数实现主题发布,携带着观察者参数。源码解析:

 @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
       //省略...
            //调取 subscribeActual抽象方法
            subscribeActual(observer);
        } 
      //省略...
    }

我们来看subscribeActual(observer)这个函数,我们的被观察者的ObservableSubscribeOn和观察者的ObservableObserveOn分别都重写了此函数,那么此处的subscribeActual(observer)将进入哪个类呢?
我们将外部的调用链式表达分解一下来看:

     Observable observableSubscribeOn1 = mObservable.subscribeOn(Schedulers.io()); //第一次
        //指定 观察者线程    ObservableObserveOn类型
        Observable observeObserveOn2 = observableSubscribeOn1.observeOn(AndroidSchedulers.mainThread());
        //发布执行
        observeObserveOn2.subscribe(observer);

很清晰的我们能看到,最终对subscribe(observer)函数方法发起调用的是观察者ObservableObserveOn类,所以我们将会执行ObservableObserveOn类中的subscribeActual(observer)实现

 public ObservableObserveOn(ObservableSource<T> source, Scheduler 
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
     //省略...
            //创建一个线程,当前创建出来的是一个Handler线程
            Scheduler.Worker w = scheduler.createWorker();
            //此处我们调用上一个Observable类型对象的发布事件
            //即 被观察者对象的发布事件
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

首先明确一点此处的source对象是被观察者的Observable类型实例,由于在指定运行线程时我们都传入一个Observable类型对象,而传递的Observable都是上一个对象的实例。
此处的参数new ObserveOnObserver<T>(observer, w, delayError, bufferSize)是声明的ObservableObserveOn类的静态内部类的实例。

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
    ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
}

可以看出它实现了Observer接口,而我们在source.subscribe(...)方法中也仅需要这部分内容,至此我们又回到了Observable类中的subscribe(Observer<? super T> observer)函数方法。

public final void subscribe(Observer<? super T> observer) {
       //省略...
            subscribeActual(observer);
       //省略...

而再次调用subscribeActual(observer)函数方法,不会再进入到ObservableObserveOn类中,而是进入ObservableSubscribeOn类,执行其subscribeActual(final Observer<? super T> observer)实现方法

@Override
    public void subscribeActual(final Observer<? super T> observer) {
  //将观察者包装成ObservableObserveOn类型
  final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        //[ 3.1 ]调用此方法会进入观察者`ObservableObserveOn`类中的实现方法
        //可以看出此处还没进行被观察者的线程指定
        observer.onSubscribe(parent);
        //[ 3.2 ]在此处设置被观察者的运行线程
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

[ 3.1 ] 我们来看看observer.onSubscribe(parent),它在ObservableObserveOn类的内部类ObserveOnObserveronSubscribe(Disposable d)做了什么

 @Override
        public void onSubscribe(Disposable d) {
               //省略...
                downstream.onSubscribe(this);
            }
        }

downstream对象使我们在创建ObserveOnObserver类型实例时传入的Observer类型实例,即观察者实例,而它调取onSubscribe(this)方法后直接回调至我们的外部实现中所创建的观察者对象的onSubscribe(Disposable d)中,通知观察者订阅成功。

[ 3.2 ] 接下来分析一下parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)))方法是如何指定被观察者运行线程的,

先来分析他参数实现scheduler.scheduleDirect(new SubscribeTask(parent)),

[ 3.2.1 ] 来看new SubscribeTask(parent)做了什么

//实现Runnable 接口
final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
             //调取事件发布事件,而此处的source是`ObservableCreate`类型
            source.subscribe(parent);
        }
    }

此处实现了Runnable接口,并在重写的run()方法中调取了source.subscribe(parent),首先明确此时的source对象它的类型是谁?答案是ObservableCreate,因为在创建ObservableSubscribeOn类型实例时传入的ObservableSource类型参数是上一个对象,所以在执行subscribe(parent)方法后我们会通过subscribeActual(observer)方法函数,进入到ObservableCreate类中对subscribeActual(observer)方法实现;而在此处我们还未进行有效的调用.

[ 3.2.2 ] 分析完SubscribeTask后我们接着分析他外层scheduler.scheduleDirect(new SubscribeTask(parent))方法做了什么

   @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
  @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        //创建一个Worker线程,
        final Worker w = createWorker();
         //钩子函数,若无扩展性特殊处理则返回参数本身
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //声明一个处理任务,将Runnable和Work封装成DisposeTask
        DisposeTask task = new DisposeTask(decoratedRun, w);
        //调取woker对象的schedule方法
        w.schedule(task, delay, unit);
        return task;
    }

<3.2.2.1> 我们先调用了createWorker()函数创建了一个EventLoopWorker类型对象,利用它实现线程调度;而EventLoopWorker继承于Scheduler.Worker抽象类

static final class EventLoopWorker extends Scheduler.Worker {
       //省略... 
         //结束操作
        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();
                // 释放线程池操作
                pool.release(threadWorker);
            }
        }
        //是否结束
        @Override
        public boolean isDisposed() {
            return once.get();
        }
       //重写进程调度
        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            //执行线程调度执行
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

<3.2.2.2> 然后声明了一个DisposeTask类型对象,将Runnable对象和线程调度器包装在一起,而DisposeTask类实现了Runnable接口,重写了run()函数方法,

static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
      //省略...
        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                 //执行Runnable
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }

        //省略...
    }

<3.2.2.3> 完成上述预备工作后,我们开始调用w.schedule(task, delay, unit)方法进行任务执行

     @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
           //判定任务是否成功被订阅
            if (tasks.isDisposed()) {
                // 若没有被订阅,则不作任何处理
                return EmptyDisposable.INSTANCE;
            }
            //进行线程调度
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

<3.2.2.4> 调用 scheduleActual()进行线程任务调度

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
 //      省略...
        try {
//      线程提交
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

线程被提交后,会交给Executor来执行,调用Runnable接口的run方法,那么这就简单了.

小结

在上述操作中我们成功的将[被观察者]提交给了指定的线程去执行,那么接下来就是线程向上执行调取被观察者run()的流程了,别头大,这个框架的复杂度就是那么绕~~~,想要成长我们就得迈过这道坎,毕竟成长的就是这么痛苦!!
这里我建议一下,阅读这样复杂的框架的时候我们最好在Debug模式下,跟随着框架流程进行学习····好了休息一下吧!我们马上开始下半场的学习。

[ 3.3 ]我们在<3.2.2.4>()中执行的scheduleActual()中将Runnable封装成了ScheduledRunnable类型,那么首先执行进入它的里面,看看它的run()方法做了什么诡谲的操作.

public void run() {
        lazySet(THREAD_INDEX, Thread.currentThread());
        try {
            try {
                //获取的上层的Runnable对象,执行其run方法
                actual.run();
            } catch (Throwable e) {
               //若发生异常,则直接调用onError通知观察者
                RxJavaPlugins.onError(e);
            }
        //省略....
    }

可以看到ScheduledRunnable类中的run()并未做什么特殊操作,而是执行了上层的Runnable类型对象的run()方法,那么它的上层是谁呢?

[ 3.4 ] 目光转向[ 3.2.2 ]的源码,我们将其中把RunnableWork类型对象关联了起来,并封装成了一个DisposeTask类型对象,那么废话不多说我们直接去看看其中的源码

  @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }

[ 3.5 ]依旧是继续向上调取,我最终回到了ObservableSubscribeOn类中内部类SubscribeOnObserver中的内部类的SubscribeTaskrun()方法,饶了这么大一圈最终还是回到了最初始调取的地方才有实质的处理操作_!! 来看看吧

 final class SubscribeTask implements Runnable {
//      省略...
        @Override
        public void run() {
            //调取被观察者的 subscribe方法
            source.subscribe(parent);
        }
    }

[ 3.6 ] 此时的source就是外部第一次声明的Observable类型对象的实例了,即ObservableCreate类,那么我们将再次回到Observable类中执行其subscribe(Observer<? super T> observer)函数,老套路来看吧!

 public final void subscribe(Observer<? super T> observer) {
        //省略...
            subscribeActual(observer);
      //省略...
    }

[ 3.7 ]我们又一次要执行subscribeActual(observer)方法了,那么这次我们要进入哪个类呢?猜对了就是ObservableCreate

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //将观察者封装成CreateEmitter类型
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
         //回调观察者的接口,告诉观察者订阅成功,
         //并将新封装的观察者传递过去
        observer.onSubscribe(parent);
        try {
            //回调外部被观察者的接口,告诉被观察者主题已被订阅,
           //可进行接下来相关操作了,并将最新封装的观察者对象传入
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

[ 3.8 ] 终于出去了=_=!! 我们首先通知观察者成功订阅主题,然后再告诉被观察者被成功订阅,可以新进行接下来的主题发布了!!

 Observable mObservable = new ObservableCreate(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                //[ 4 ]发布下一个
                e.onNext("ONE");
                e.onNext("TWO");
                e.onNext("THREE");
                e.onNext("FOUR");
            }
        });
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("回调执行了onSubscribe函数->", "观察者已成功订阅");
            }
            @Override
            public void onNext(String value) {
                Log.e("回调执行了onNext函数->", value);
            }
            @Override
            public void onError(Throwable e) {
                Log.e("回调执行了onError函数->", e.getMessage());
            }
            @Override
            public void onComplete() {
                Log.e("回调执行了onComplete函数->", "本次结束!");
            }
        };

[ 3.8.1 ]调取ObservableEmitter类型的观察者的onNext(Object value)方法

 @Override
        public void onNext(T t) {
          //省略...
            if (!isDisposed()) {
                //调取未进行包装的观察者的onNext方法函数
                observer.onNext(t);
            }
        }

[ 3.8.2 ]这里的并没有进行任何处理,而是调取了上层的SubscribeOnObserver类型的观察者的onNext(Object value)

   @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

[ 3.8.3 ] 这里依旧是继续向上调取,通过调取上层的ObserveOnObserver类型的观察者的onNext(Object value)方法

 @Override
        public void onNext(T t) {
           //省略
            schedule();
        }
  void schedule() {
            if (getAndIncrement() == 0) {
                //进行线程调度,并将观察者传入
                worker.schedule(this);
            }
        }

[ 3.8.4 ]此处进行观察者执行线程的调度,由于我们在外部指定观察者运行于UI线程,所以此处我们HandlerScheduler类的schedule(Runnable run, long delay, TimeUnit unit)方法中

public abstract class Scheduler {
   public abstract static class Worker implements Disposable {     
        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }
   }
}
final class HandlerScheduler extends Scheduler {
   private static final class HandlerWorker extends Worker {
      public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            //省略...
             //将handle和run关联起来包装成新对象
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this;
            if (async) {
                message.setAsynchronous(true);
            }
           //handler发送消息
            handler.sendMessageDelayed(message, unit.toMillis(delay));
            //省略...
            return scheduled;
        }
    }
}

[ 3.8.5 ] 我们在这里将Runnable包装成Message同过Handler进行发送,这将进入到Hanlder类中的runWithScissors(final Runnable r, long timeout)方法

public final boolean runWithScissors(final Runnable r, long timeout) {
        //省略
        if (Looper.myLooper() == mLooper) {
            //执行Runnable接口的run()方法
            r.run();
            return true;
        }
        //省略...
    }

[ 3.8.6 ] 通过判定我们当前运行的线程是否跟Handler所在的线程是否一致 ,如果一致直接运行Runnablerun()方法,即我们再次回到了HandlerScheduler类中的内部类HandlerWorker下,执行其run()方法

 @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }

[ 3.8.7 ]没有意外,这没有进行任何处理,继续向上传递进入ObservableObserveOn类的run()

 @Override
        public void run() {
         //省略...
         drainNormal();
        }
void drainNormal() {
           //省略...
                    a.onNext(v);
          //省略...
        }

[ 3.8.8 ] 可以看到在此处我们回调了外部声明观察者接口的onNext(Object value)方法,进而执行我们自定义的业务!!!

    @Override
            public void onNext(String value) {
                if (disposable.isDisposed())
                    onComplete();
                value = mine_result.getText().toString() + "\n" + value;
                mine_result.setText(value);
                Log.e("回调执行了onNext函数->", value);
            }

小结

至此完成了线程切换的全部过程,不得不说这个框架的复杂程度尤其之高,为了写这篇文章也是累的自己一头的汗=_=!! 一开始分析源码的时候真是一个头两个大,主要是其中的类粘连性太强了,特别容易把自己搞得晕头转向,不过分析完之后脑中也就豁然开朗了。
由于篇幅太长,我们将整体流程大致的再进行一下梳理,之后我会将流程图奉上!!

流程梳理

    1. 我们在外部声明一个ObservableCreate类型被观察者 0实例和Observer类型的观察者实例
    1. 调取[被观察者]的subscribeOn(Schedulers.io())方法指定被观察者的执行线程,方法执行期间会声明一个ObservableSubscribeOn类型对象作为参数传入,它的父类继承于Observable抽象类,随后获取一个新的被观察者1
    1. 然后再调取observeOn(AndroidSchedulers.mainThread())方法来指定观察者的执行线程,方法执行期间会声明一个ObservableObserveOn类型对象作为参数传入,它的父类同样继承于Observable抽象类,再次返回一个新的被观察者2
  • 4.使用最新的[被观察者]调取subscribe(observer)方法进行主题发布,并将观察者作为参数传入。
  • 5.调用新的被观察者2 对象的subscribeActual(Observer<? super T> observer)方法,创建一个执行线程,调取新的被观察者1subscribe(Observer<? super T> observer)方法,比创建一个ObserveOnObserver类型对象传入
  • 6.调用新的被观察者1 对象的subscribeActual(Observer<? super T> observer)方法,创建一个执行线程,在线程的run()方法中调取被观察者 0subscribe(Observer<? super T> observer)方法,声明一个SubscribeOnObserver类型的对象传入
  • 7.调用被观察者0 对象的subscribeActual(Observer<? super T> observer)方法,通过调取被观察者subscribe(ObservableEmitter e)方法,回调到了我们的外部实现.
  • 8.在外部的subscribe(...)方法,调取观察者onNext(Object value)方法,而此时的观察者经过一套流程下来也是被进行的层层包装,最终回调到了我们的外部实现.

这篇文章篇幅我有点长,希望有兴趣的同学慢慢读,之后我把流程图梳理完毕后会如数奉上~~~

此文章只代表个人理解观点,如有差错希望积极之处,我们共同交流!!!

This ALL! Thanks EveryBody!

转载于:https://www.jianshu.com/p/7e4420109731

上一篇: gulp使用

下一篇: gulp使用