欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava2源码分析(下):操作符和线程调度

程序员文章站 2024-03-03 19:51:46
...

目录

 

一、概述

二、map操作符

三、线程调度

1、subscribeOn

2、observeOn

四、总结


一、概述

    在上一篇文章,咱们详细分析了Rxjava2的结构流程:RxJava2源码分析(上):架构解析​​​​​​​https://blog.csdn.net/qq_29152241/article/details/82559595​​​​​​​。那么这篇文章就来分析RxJava2的精髓:操作符和及其线程调度。

 

二、map操作符

    在RxJava2中有将近50个操作符可以帮助用户做组合、变换等复杂操作,有关RxJava2操作符的用法和每个操作符的说明可以看我Github上的这个项目:https://github.com/cozing/RxJava2Retrofit2Hybrid

    咱们现在采用链式调用方式重写上面栗子代码,并添加map操作符:

Observable.just(10086)
        .map(new Function<Integer, Object>() {
            @Override
            public Object apply(Integer integer) throws Exception {
                return "整形转成字符串" + integer;
            }
        })
        .subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

在这段代码中咱们添加了map操作符将原来整形数据“10086”转换成了字符串“整形转成字符串10086”,然后将数据发射出去,在观察者接收到数据时候打印出来。其中map操作符就是起到映射作用,将原数据转换成了另一个数据,充当着转换的功能,看其源码:

public abstract class Observable<T> implements ObservableSource<T> {
...
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
...
}

关于RxJavaPlugins.onAssembly的实现,咱们在上一篇文章中创建被观察者的时候讲述过,这是一个hook的抽象代理,在这里得到的就是他的实参,咱们直接看ObservableMap,也就是map方法返回的具体数据。

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
...
}

其中super(source):

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
    protected final ObservableSource<T> source;
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
...
}

可以看到,创建实例时候,将被观察者对象ObservableSource和变换函数Function保存起来。我们知道在调用订阅方法subscribe()时:

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            //调用订阅方法时,内部调用的是subscribeActual
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            RxJavaPlugins.onError(e);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

内部调用的是subscribeActual(),在这里就是

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
...
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
...
}

因此真正的订阅实现是在这个方法,接着看MapObserver类实现,MapObserver是ObservableMap的一个内部类,在创建MapObserver类实例,构造方法传入观察者Observer和事件转换函数Function,看它实现:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
...
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
           //构造方法内部将传进来的观察者actual和转换函数mapper保存起来
            super(actual);
            this.mapper = mapper;
        }

        //调用onNext的时候,先走到这里
        @Override
        public void onNext(T t) {

            //前面对发送事件做一系列判断
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                //最后v返回的是mapper.apply(t)
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //最终观察者onNext的v,也就是转换函数转换过后的数据
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}

步骤如下:

1、首先构造方法内部将传进来的观察者actual和转换函数mapper保存起来;

2、对传入的发射事件做一系列判断;

3、最后v返回的是mapper.apply(t);

4、最终观察者onNext的v,也就是转换函数转换过后的数据。

先看mapper.apply(t):

public interface Function<T, R> {

    R apply(@NonNull T t) throws Exception;
}

内部实现非常简单,就是将数据T转换成了R返回,因此在调用onNext发射事件时,将传入的事件转换成了新事件v,发射给了观察者。

    至此map操作符的源码分析完毕,在RxJava2中有将近50个操作符,每个操作符都有它自己的功能和实现方式,篇幅有限咱们不对其他操作符做分析,童鞋们可以课后根据咱们分析map的方式对其他操作符做分析。

    接下来咱们分析RxJava2中的线程调度。

 

三、线程调度

    线程调度分为subscribeOn和observeOn,这两个属于RxJava2功能性操作符中的线程调度操作符,对应事件订阅线程和接收线程。

1、subscribeOn

    将上面栗子的代码添加subscribeOn:

Observable.just(10086)
        .map(new Function<Integer, Object>() {
            @Override
            public Object apply(Integer integer) throws Exception {
                return "整形转成字符串" + integer;
            }
        })
        .subscribeOn(Schedulers.io())
        .subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {
                System.out.println("接收到数据:" + o);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

表示订阅的时候发生在io线程,看源码:

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

是咱们非常熟悉的hook抽象代理了,直接看ObservableSubscribeOn:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        //存储被观察者ObservableSource和线程调度实例
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
...
}

仔细看,这部分代码是不是和上面分析map的时候非常相似?没错,和map操作符流程一样,ObservableSubscribeOn只是一个装饰类,当调用订阅subscribe方法时,最终调用的是这个类的subscribeActual()方法,subscribeActual()方法内部的实现也和map操作符内部实现一样,内部维护一个Observer,最后的订阅发生在该Observer内部,在这里是ObservableSubscribeOn的内部类SubscribeOnObserver:

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

回到subscribeActual()方法,看下面一句:

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

这部分有点绕,咱们一步步看,首先调用了SubscribeOnObserver的setDisposable(),传入scheduler.scheduleDirect()返回的Disposable,而Disposable通过传入SubscribeTask实例生成,看SubscribeTask实现:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
...
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
}

这是ObservableSubscribeOn的一个内部类,其中source为创建ObservableSubscribeOn时候调用构造方法传入的经过map处理转换后的ObserableSource被观察者,也就是最新的observable,SubscribeTask类实现了Runnable接口并重写其run方法,内部做source.subscribe(parent)订阅操作。

    接着看scheduler.scheduleDirect:

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

继续看scheduleDirect方法:

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

该方法是在当前线程上立即执行任务,也就是SubscribeTask,负责将Observable转到新的线程做处理。首先createWorker()会创建一个ioScheduer,ioScheduer内部维护了一个缓存工作线程池CachedWorkerPool,其内部是维护一个ScheduledExecutorService线程池做循环处理事件,包括处理、移除等操作,在Scheduler内部其实已经将线程切换到了ioScheduler。其内部的代码非常繁杂,如需列出代码,将会需要非常大的篇幅,这部分代码确实可读性有些差,在读这部分代码的时候,建议大家结合注释跟踪,不然里面错综复杂的逻辑会把人绕晕,在此咱们知道他的作用即可。继续分析,当调用parent.setDisposable时候,会将一个个Woker添加到线程池中进行管理。最后在转换后的线程中,调用Runnable实现类SubscribeTask中的run()方法:

@Override
public void run() {
    source.subscribe(parent);
}

对前面Observable执行订阅,当上游的被观察者发射事件的时候,通过中转到新的线程执行,最后在观察者Observer中接收到数据。

 

2、observeOn

    继续对上面的例子做observeOn做处理:

Observable.just(10086)
        .map(new Function<Integer, Object>() {
            @Override
            public Object apply(Integer integer) throws Exception {
                return "整形转成字符串" + integer;
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {
                System.out.println("接收到数据:" + o);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

表示在订阅的时候发生在io线程,在mainThread接收。直接看observeOn源码:

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

看其有三个参数的observeOn方法:

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

还是老套路,不多解释,继续看ObservableObserveOn:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //不会走这里
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
           //创建Worker
            Scheduler.Worker w = scheduler.createWorker();
           //注释1
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
...
]

继续老套路,省略,直接看订阅发生的真正方法subscribeActual(),看注释1,和subscribeOn不同的是,这里直接订阅了Observer,也就是ObserveOnObserver,看ObserveOnObserver的实现:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
...
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
...
       final Observer<? super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;
...
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
...
}

是一个Observer代理类,属于ObservableObserveOn的内部类,最终上游发射数据将会在此处做真正的处理。因为咱们分析源码只需抓住主干,在几个on...f方法里,咱们直接看onNext():

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

看schedule():

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

调用了worker的调度方法schedule(),内部传参为为一个Runnable,在这里就是ObserveOnObserver,直接看他的run()方法:

@Override
public void run() {
    if (outputFused) {
       //默认是false
        drainFused();
    } else {
        drainNormal();
    }
}

接着看drainNormal():

void drainNormal() {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = actual;

    for (;;) {
        //检查是否已取消或者为空
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }
        for (;;) {
            boolean d = done;
            T v;

            try {
               //注释1,从队列中取出事件
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                s.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;

            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }

            //将事件传递给新线程Observer的onNext()
            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

咱们看注释1处,这个queue是在订阅发生时调用的函数onSubscribe中赋值,根据上面代码的逻辑可以发现最后将事件传递给了新线程中Observer的onNext(),这个onNext()方法就是暴露给开发者的接口中的onNext(),至此,observeOn这个观察者的线程切换完成。

 

四、总结

    通过以上源码跟踪分析,咱们基本将map操作符、两个线程调度操作符subscribeOn/observeOn分析完成,RxJava2的源码相对来说是比较复杂的,里面使用了大量的接口定义,不过当你明白了其中一个操作符,再去看其他的时候就会发现都大同小异,所以还是比较好理解的。大家可以去看看其他操作符的源码并了解其原理,比如常用的zip、filt、concat等操作符,去理解RxJava2开发者的思想,能学到很多东西。

RxJava2源码分析(下):操作符和线程调度

 

    关注我的博客和github,有大量福利哟~

    csdn博客:https://blog.csdn.net/qq_29152241

    github:https://github.com/cozing