欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava 从源码分析流程

程序员文章站 2022-04-02 17:46:18
...

以下面这段代码为示例

public void run(final ThreadListener<T> listener) {
    Observable.create(new ObservableOnSubscribe<T>() {
        @Override
        public void subscribe(ObservableEmitter<T> e) throws Exception {
            listener.start(e);
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<T>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(T t) {
                    listener.onNext(t);
                }

                @Override
                public void onError(Throwable e) {
                    listener.onError(e.getMessage());
                }

                @Override
                public void onComplete() {

                }
            });
}

源码流程就看下面这个手画的图吧:
RxJava 从源码分析流程

可以看到类是一层一层被包裹的,那么当 ObservableEmitter 执行 e.next() 时,会调到 ObservableObserveOn 的内部类 ObserveOnObserver 的 onNext 方法, onNext 方法中调了 schedule, schedule 方法如下:

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this); // worker 是 HandlerScheduler.createWorker, 返回的是 HandlerScheduler 的内部类 HandlerWorker
    }
}

wordker.schedule 的主要代码如下:

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    // 这里的 handler 是 new Handler(Looper.getMainLooper())
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.
    // 这里切换到主线程
    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

    return scheduled;
}

所以接下来会执行 ScheduledRunnable 的 run 方法,接着执行到 run 方法中,会调用 drainNormal

void drainNormal() {
    // ...
    final Observer<? super T> a = actual;
    for (;;) {
        // ...
        for (;;) {
            // ...
            try {
                // ...
            } catch (Throwable ex) {
                // ...
                a.onError(ex);
                return;
            }
            // ...
            // 这里执行 onNext 方法,也就是会执行到我们实现的代码
            a.onNext(v);
        }
        // ...
    }
}
相关标签: RxJava