欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Android RxJava源码分析-RxJava的线程调度机制

程序员文章站 2024-03-03 20:13:22
...

前言

本次主要讲解的内容:

1、RxJava是如何实现线程调度的

2、RxJava实现线程调度的原理

在上篇分析中我们知道RxJava的流程是如何运作的,并对运作流程结合源码进行了分析

Rxjava的流程是:

  •  1、Observable.create 创建事件源,但并不生产也不发射事件。

  •  2、实现observer接口,但此时没有也无法接受到任何发射来的事件。

  •  3、订阅 observable.subscribe(observer), 此时会调用具体Observable的实现类中的subscribeActual方法, 此时会才会真正触发事件源生产事件,事件源生产出来的事件通过Emitter的onNext,onError,onComplete发射给observer对应的方法由下游observer消费掉。从而完成整个事件流的处理。

RxJava的定义:

回顾一下RxJava结束中是如何定义的

https://github.com/ReactiveX/RxJava/wiki

//RxJava主页地址:https://github.com/ReactiveX/RxJava/wiki

RxJava is a Java VM implementation of ReactiveX (Reactive Extensions): a library for composing asynchronous and event-based programs by using observable sequences.
// 翻译:RxJava 是ReactiveX (Reactive Extensions)在JavaVM上使用Observable序列组合异步和基于事件的程序的库。

RxJava is Lightweight
RxJava tries to be very lightweight. It is implemented as a single JAR that is focused on just the Observable abstraction and related higher-order functions.
// 翻译:RxJava尽力做到非常轻巧。它仅关注Observable的抽象和与之相关的高层函数,实现为一个单独的JAR文件。
  • 总结:RxJava 是一个 基于事件流、实现异步操作 的库

那么问题来了,我们知道RxJava 是一个 基于事件流、实现异步操作 的库,结合RxJava的流程分析我们发现,哪里实现了异步操作,没看到有异步操作的影子呀,接下来一起来看看RxJava是如何实现异步操作的和RxJava实现异步操作的原理。


RxJava是如何实现线程调度的

分析下之前RxJava的例子,打印一下RxJava执行流程中各个方法的线程名称:

private void doRxJava() {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        // 1. 创建被观察者 & 生产事件
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.e("hyl", " subscribe: " + Thread.currentThread().getName());
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    }).subscribe(new Observer<Integer>() {
        // 2. 通过通过订阅(subscribe)连接观察者和被观察者
        // 3. 创建观察者 & 定义响应事件的行为
        @Override
        public void onSubscribe(Disposable d) {
            Log.e("hyl", " onSubscribe: " + Thread.currentThread().getName());
            Log.d("TAG", "开始采用subscribe连接");
        }
        // 默认最先调用复写的 onSubscribe()

        @Override
        public void onNext(Integer value) {
            Log.e("hyl", " onNext: " + Thread.currentThread().getName());
            Log.d("TAG", "对Next事件" + value + "作出响应");
        }

        @Override
        public void onError(Throwable e) {
            Log.e("hyl", " onError: " + Thread.currentThread().getName());
            Log.d("TAG", "对Error事件作出响应");
        }

        @Override
        public void onComplete() {
            Log.e("hyl", " onComplete: " + Thread.currentThread().getName());
            Log.d("TAG", "对Complete事件作出响应");
        }

    });
}

运行的结果如下:

2019-09-28 17:27:55.994 8725-8725/com.study.rxjava E/hyl:  onSubscribe: main
2019-09-28 17:27:55.994 8725-8725/com.study.rxjava E/hyl:  subscribe: main
2019-09-28 17:27:55.994 8725-8725/com.study.rxjava E/hyl:  onNext: main
2019-09-28 17:27:55.994 8725-8725/com.study.rxjava E/hyl:  onNext: main
2019-09-28 17:27:55.994 8725-8725/com.study.rxjava E/hyl:  onNext: main
2019-09-28 17:27:55.994 8725-8725/com.study.rxjava E/hyl:  onComplete: main

该方法是在主线程中调用的,所以RxJava正常调用的情况下,所有方法都运行在main线程中。

那么RxJava如何进行异步操作呢?我们知道Android的UI线程是不能做网络操作,也不能做耗时操作,所以一般我们把网络或耗时操作都放在非UI线程中执行;联想下项目中使用RxJava实现网络请求,其实定义Observable时有这么句代码:

.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())

1、线程调度subscribeOn

首先我们先增加.subscribeOn(Schedulers.io())来看看效果

private void doThreadRxJava() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            // 1. 创建被观察者 & 生产事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.e("hyl", " subscribe: " + Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io())//这里增加了这一句
        .subscribe(new Observer<Integer>() {
            // 2. 通过通过订阅(subscribe)连接观察者和被观察者
            // 3. 创建观察者 & 定义响应事件的行为
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("hyl", " onSubscribe: " + Thread.currentThread().getName());
                Log.d("TAG", "开始采用subscribe连接");
            }
            // 默认最先调用复写的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                Log.e("hyl", " onNext: " + Thread.currentThread().getName());
                Log.d("TAG", "对Next事件" + value + "作出响应");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("hyl", " onError: " + Thread.currentThread().getName());
                Log.d("TAG", "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.e("hyl", " onComplete: " + Thread.currentThread().getName());
                Log.d("TAG", "对Complete事件作出响应");
            }

        });
    }

运行的结果如下:

2019-09-28 18:24:43.424 8725-8725/com.study.rxjava E/hyl:  onSubscribe: main
2019-09-28 18:24:43.426 8725-19508/com.study.rxjava E/hyl:  subscribe: RxCachedThreadScheduler-2
2019-09-28 18:24:43.426 8725-19508/com.study.rxjava E/hyl:  onNext: RxCachedThreadScheduler-2
2019-09-28 18:24:43.426 8725-19508/com.study.rxjava E/hyl:  onNext: RxCachedThreadScheduler-2
2019-09-28 18:24:43.426 8725-19508/com.study.rxjava E/hyl:  onNext: RxCachedThreadScheduler-2
2019-09-28 18:24:43.426 8725-19508/com.study.rxjava E/hyl:  onComplete: RxCachedThreadScheduler-2

我们发现情况发生了改变:除了onSubscribe方法还运行在main线程(订阅发生的线程)其它方法全部都运行在一个名为RxCachedThreadScheduler-2的线程中。我们来看看rxjava是怎么完成这个线程调度的。

在.subscribeOn(Schedulers.io())中我们先来分析一下Schedulers.io() 是什么。

public final class Schedulers {
    @NonNull
    public static Scheduler io() {
        //hook函数,这里相当于返回的是IO
        return RxJavaPlugins.onIoScheduler(IO);
    }
}

//进入RxJavaPlugins.onIoScheduler(IO)
@NonNull
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
    Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
    if (f == null) {
        //这里相当于返回的是传入的值
        return defaultScheduler;
    }
    return apply(f, defaultScheduler);
}

往下跟,继续分析IO是什么?

public final class Schedulers {
    //1、IO是个static变量
    @NonNull
    static final Scheduler IO;
    
    static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
    
        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
        //2、IO初始化的地方是这里,还是一个hook函数
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
    
        TRAMPOLINE = TrampolineScheduler.instance();
    
        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }
}

//继续分析RxJavaPlugins.initIoScheduler(new IOTask())
public final class RxJavaPlugins {
    @NonNull
    public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
        ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
        Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
        if (f == null) {
            //3、这里是返回结果,相当于IO等价于callRequireNonNull(defaultScheduler)
            return callRequireNonNull(defaultScheduler);
        }
        return applyRequireNonNull(f, defaultScheduler);
    }
    
    //继续分析callRequireNonNull(defaultScheduler)
    @NonNull
    static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
        try {
            //4、IO相当于是new IOTask().call
            return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }
}

//分析ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
public final class ObjectHelper {
    public static <T> T requireNonNull(T object, String message) {
        if (object == null) {
            throw new NullPointerException(message);
        }
        return object;
    }
}

从源码跟踪得知IO相当于是new IOTask().call,再一起分析一下IOTask是什么?

public final class Schedulers {

    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
    
    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            //new IOTask().call相当于就是new IoScheduler()
            return IoHolder.DEFAULT;
        }
    }
}

IoScheduler又是什么呢?翻译了一下字面意思是 IO调度器 。

public final class IoScheduler extends Scheduler {

    final ThreadFactory threadFactory;
    final AtomicReference<CachedWorkerPool> pool;

    //2、第2步
    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }
    //1、第1步
    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }
    
    //3、第3步
    @Override
    public void start() {
        //
        CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }
    
    static final class CachedWorkerPool implements Runnable {
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
        final CompositeDisposable allWorkers;
        private final ScheduledExecutorService evictorService;
        private final Future<?> evictorTask;
        private final ThreadFactory threadFactory;
        //4、第4步,new IoScheduler()后Rxjava会创建CachedWorkerPool的线程池
        CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeDisposable();
            this.threadFactory = threadFactory;

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                //5、第5步创建并运行了一个名为RxCachedWorkerPoolEvictor的清除线程,主要作用是清除不再使用的一些线程。
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                //ScheduledExecutorService,是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响。
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }

        @Override
        public void run() {
            evictExpiredWorkers();
        }

        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            // No cached worker found, so create a new one.
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }

        void release(ThreadWorker threadWorker) {
            // Refresh expire time before putting worker back in pool
            threadWorker.setExpirationTime(now() + keepAliveTime);

            expiringWorkerQueue.offer(threadWorker);
        }

        void evictExpiredWorkers() {
            if (!expiringWorkerQueue.isEmpty()) {
                long currentTimestamp = now();

                for (ThreadWorker threadWorker : expiringWorkerQueue) {
                    if (threadWorker.getExpirationTime() <= currentTimestamp) {
                        if (expiringWorkerQueue.remove(threadWorker)) {
                            allWorkers.remove(threadWorker);
                        }
                    } else {
                        // Queue is ordered with the worker that will expire first in the beginning, so when we
                        // find a non-expired worker we can stop evicting.
                        break;
                    }
                }
            }
        }

        long now() {
            return System.nanoTime();
        }

        void shutdown() {
            allWorkers.dispose();
            if (evictorTask != null) {
                evictorTask.cancel(true);
            }
            if (evictorService != null) {
                evictorService.shutdownNow();
            }
        }
    }
    
}

根据代码注释得知,它就是一个用来创建和缓存线程的线程池,原来Rxjava就是通过这个调度器来调度线程的,从上面的代码可以看出,new IoScheduler()后Rxjava会创建CachedWorkerPool的线程池,同时也创建并运行了一个名为RxCachedWorkerPoolEvictor的清除线程.

但目前只创建了线程池并没有实际的thread,所以Schedulers.io()相当于只做了线程调度的前期准备。

接下来回去分析一下subscribeOn方法

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        //判空
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //hook function 相当于return new ObservableSubscribeOn<T>(this, scheduler)
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

.subscribeOn(Schedulers.io())其实就 相当于return new ObservableSubscribeOn<T>(this, scheduler),而根据上面的分析我们知道知道了这里的scheduler其实就是IoScheduler。

进入 ObservableSubscribeOn,来看看这是什么?

//1、AbstractObservableWithUpstream是继承Observable的
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    //2、新增了scheduler属性
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    //3、上篇文章分析得知:Observable.subscribe()方法最终都是调用了对应的实现类的subscribeActual方法
    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        //5、SubscribeOnObserver也是装饰模式的体现, 是对observer的一个扩展装饰,只是添加了Disposable的管理。
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        
        // 4、没有任何线程调度,直接调用的,所以观察者的onSubscribe方法没有切换线程, 
        //所以demo是观察者方法,只有onSubscribe还运行在main线程
        observer.onSubscribe(parent);
        
        //6、添加了Disposable的管理。
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    //再分析下SubscribeTask
    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;

        final AtomicReference<Disposable> upstream;

        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }

        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
        }

        @Override
        public void onComplete() {
            downstream.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(upstream);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

    //就是一个Runnable,最终运行Observable的subscribe方法
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //这里的source其实就是Observable执行subscribe订阅的方法,所以可以确定Observable的内部实现方法subscribe是运行在子线程的
            source.subscribe(parent);
        }
    }
}

//*分析一下scheduler.scheduleDirect(new SubscribeTask(parent))
public abstract class Scheduler {
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        // 抽象方法,这个是IoSchedular 中的执行了createWorker()的实现
        final Worker w = createWorker();
        //hook函数,就相当于返回的它自己run
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //Runnable的包装,管理Dispose
        DisposeTask task = new DisposeTask(decoratedRun, w);
        // 线程调度,内部是异步实现
        w.schedule(task, delay, unit);

        return task;
    }
}

//简化IoScheduler代码
public final class IoScheduler extends Scheduler {
    @NonNull
    @Override
    public Worker createWorker() {
        //创建线程
        return new EventLoopWorker(pool.get());
    }
    
    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }
        //异步执行
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
}

//分析下threadWorker.scheduleActual(action, delayTime, unit, tasks);
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
}

总结一下ObservableSubscribeOn:

1、ObservableSubscribeOn extend AbstractObservableWithUpstream是继承Observable的

2、新增了scheduler属性,这里运用了装饰模式,RxJava好多地方用到了装饰模式

3、重写了subscribeActual方法,上篇文章分析得知:Observable.subscribe()方法最终都是调用了对应的实现类的subscribeActual方法

4、执行了observer.onSubscribe(parent);这里没有任何线程调度,直接调用的,所以观察者的onSubscribe方法没有切换线程,所以demo是观察者方法,只有onSubscribe还运行在main线程

5、SubscribeOnObserver也是装饰模式的体现, 是对observer的一个扩展装饰,只是添加了Disposable的管理。

6、Observer添加了Disposable的管理。parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

7、SubscribeTask就是一个Runnable,最终运行Observable的生产事件的subscribe方法

再运行的结果:

2019-10-09 14:27:29.626 13521-13521/com.study.rxjava E/hyl:  onSubscribe: main
2019-10-09 14:27:29.627 13521-13590/com.study.rxjava E/hyl:  subscribe: RxCachedThreadScheduler-1
2019-10-09 14:27:29.627 13521-13590/com.study.rxjava E/hyl:  onNext: RxCachedThreadScheduler-1
2019-10-09 14:27:29.627 13521-13590/com.study.rxjava E/hyl:  onNext: RxCachedThreadScheduler-1
2019-10-09 14:27:29.627 13521-13590/com.study.rxjava E/hyl:  onNext: RxCachedThreadScheduler-1
2019-10-09 14:27:29.627 13521-13590/com.study.rxjava E/hyl:  onComplete: RxCachedThreadScheduler-1

从上面的运行结果来看,因为生产者Observable已被调度到RxCachedThreadScheduler-1线程中,同时发射事件并没有切换线程,所以发射后消费事件的onNext onErro onComplete也在RxCachedThreadScheduler-1线程中。

总结:

Android RxJava源码分析-RxJava的线程调度机制

2、线程调度observeOn

private void doThreadMainRxJava() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            // 1. 创建被观察者 & 生产事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.e("hyl", " subscribe: " + Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Integer>() {
            // 2. 通过通过订阅(subscribe)连接观察者和被观察者
            // 3. 创建观察者 & 定义响应事件的行为
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("hyl", " onSubscribe: " + Thread.currentThread().getName());
                Log.d("TAG", "开始采用subscribe连接");
            }
            // 默认最先调用复写的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                Log.e("hyl", " onNext: " + Thread.currentThread().getName());
                Log.d("TAG", "对Next事件" + value + "作出响应");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("hyl", " onError: " + Thread.currentThread().getName());
                Log.d("TAG", "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.e("hyl", " onComplete: " + Thread.currentThread().getName());
                Log.d("TAG", "对Complete事件作出响应");
            }

        });
    }

添加.observeOn(AndroidSchedulers.mainThread())后查看下运行结果

2019-09-29 14:52:40.605 32096-32096/com.study.rxjava E/hyl:  onSubscribe: main
2019-09-29 14:52:40.608 32096-32174/com.study.rxjava E/hyl:  subscribe: RxCachedThreadScheduler-1
2019-09-29 14:52:40.609 32096-32096/com.study.rxjava E/hyl:  onNext: main
2019-09-29 14:52:40.610 32096-32096/com.study.rxjava E/hyl:  onNext: main
2019-09-29 14:52:40.610 32096-32096/com.study.rxjava E/hyl:  onNext: main
2019-09-29 14:52:40.610 32096-32096/com.study.rxjava E/hyl:  onComplete: main

从结果看出 事件的生产线程运行在RxCachedThreadScheduler-1中,而事件的接收线程则被调度到了main线程中。关键代码是因为.observeOn(AndroidSchedulers.mainThread())的作用。下面我们着重分析下这句代码都做了哪些事情。

AndroidSchedulers.mainThread()

AndroidSchedulers.mainThread()是RxAndroid库中的,需要引入

implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

看一下AndroidSchedulers.mainThread()内部的具体实现

/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

这是一个在android主线程上执行操作的scheduler,看一下MAIN_THREAD

/** Android-specific Schedulers. */
public final class AndroidSchedulers {
    //2、handler机制 new HandlerScheduler(new Handler(Looper.getMainLooper()), false)
    private static final class MainHolder {
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
    }
    //1、 MainHolder.DEFAULT
    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });
}

代码分析得出AndroidSchedulers.mainThread()就是 new HandlerScheduler(new Handler(Looper.getMainLooper()), false),其实就是使用了handler机制到main线程执行的。

observeOn

public abstract class Observable<T> implements ObservableSource<T> {

    //1、return observeOn(scheduler, false, bufferSize());
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    //2、new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
}

从源码分析主要是new ObservableObserveOn,这个ObservableObserveOn眼熟,就是分析subscribeOn时的ObservableSubscribeOn,都是集成Observable的,确认过眼神,遇见对的人,他们是兄弟,哈哈!

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
    
    //Observable的subscribe方法最终都是调用subscribeActual方法,重点在这里
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //传入的AndroidSchedulers.mainThread(),它的本质就是一个HandlerScheduler;
        //scheduler 就是前面提到的HandlerScheduler;
        //HandlerScheduler并不是TrampolineScheduler实例,所以执行入else
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //获取Worker实例: 创建 HandlerWorker
            Scheduler.Worker w = scheduler.createWorker();
            //传递订阅关系: 调用上层Observable的subscribe,将订阅向上传递
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}

ObserveOnObserver类对observer进行装饰,事件源发射的事件,是通过observer的onNext,onError,onComplete发射到观察者的。

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {


        @Override
        public void onNext(T t) {
            //控制不二次执行
            if (done) {
                return;
            }
            // 默认是0,不等于 QueueDisposable.ASYNC,所以将发射的T对象入队
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            //1,执行该方法
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }

        void schedule() {
            // 2、保证只有唯一任务在运行
            if (getAndIncrement() == 0) {
                // 3、调用的就是HandlerWorker的schedule方法(参数为Runnable对象)
                worker.schedule(this);
            }
        }
        
        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = downstream;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        // 5、从队列中queue中取出事件,即取出参数T
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;
                    
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }
                    //6、此时,因为经由Hanlder回调在主线程了,调用下游observer的onNext将事件v发射出去
                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
        
        //这里run方法的执行,需要看HandlerWorker类的分析,见下面
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {//4、执行此判断
                drainNormal();
            }
        }
        
        //7、onError和onComplete的逻辑
        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (disposed) {
                queue.clear();
                return true;
            }
             //d为done,即true
            if (d) {
                Throwable e = error;
                //delayError为false
                if (delayError) {
                    if (empty) {
                        disposed = true;
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    //此处我们回调的是onComplete,所以e为null
                    if (e != null) {
                        disposed = true;
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        //因为此时onNext任务队列为空,所以走到这
                        disposed = true;
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }
    }
    
//1、在回调的onNext和onComplete都没有做什么特殊处理,不过它们都调用了schedule
//2、因为AtomicInteger的VALUE值,默认是0,我们还没有进行任何操作,所以会走到里面worker.schedule
//3、上面的的worker是HandlerWorker,我们去看一下它的schedule:

//HandlerWorker就会调用到主线程的handler
private static final class HandlerWorker extends Worker {
        private final Handler handler;
        private final boolean async;

        private volatile boolean disposed;

        HandlerWorker(Handler handler, boolean async) {
            this.handler = handler;
            this.async = async;
        }

        @Override
        @SuppressLint("NewApi") // Async will only be true when the API is available to call.
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            if (async) {
                message.setAsynchronous(true);
            }
            //1、我想看到这里,大家可能就明白了吧,因为看到了熟悉的handler(前面我们说了,它是一个在主线程的Handler)
            //2、Message,把当前Runnable发送到轮训器里面,取出执行其run方法,
            //3、所以onNext和onComplete都是在主线程回调的。
            //4、我们现在要关注的是ObserveOnObserver的run方法,因为我们上面把它的封装对象传入的轮训器中,回到上面的代码的run方法
            handler.sendMessageDelayed(message, unit.toMillis(delay));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }

总结:

1、AndroidSchedulers.mainThread()先创建一个包含handler的Scheduler, 这个handler是主线程的handler。

2、observeOn方法创建ObservableObserveOn,它是上游Observable的一个装饰类,其中包含前面创建的Scheduler和bufferSize等.

3、当订阅方法subscribe被调用后,ObservableObserveOn的subscribeActual方法创建Scheduler.Worker并调用上游的subscribe方法,同时将自身接收的参数'observer'用装饰类ObserveOnObserver装饰后传递给上游。

4、当上游调用ObserveOnObserver的onNext、onError和onComplete方法时,ObserveOnObserver将上游发送的事件加入到队列queue中,然后再调用scheduler将处理事件的方法调度到对应的线程中(main thread)。处理事件的方法将queue中保存的事件取出来,调用下游原始的observer再发射出去。

5、经过以上流程,下游处理事件的消费者线程就运行在了observeOn调度后的主线程中。

(1)subscribeOn只对上游有效,因为是在订阅过程中传递的,如果有多个,那么只有第一个”生效”(其实对于传递订阅关系都生效了,只是最终事件发射只体现出了最上游subscribeOn的作用)

(2)observerOn只对下游有效,因为它是在事件发射出来之后,回调事件的过程中生效的