RxJava 从源码分析流程
程序员文章站
2022-04-02 17:46:18
...
以下面这段代码为示例
public void run(final ThreadListener<T> listener) {
Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> e) throws Exception {
listener.start(e);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<T>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(T t) {
listener.onNext(t);
}
@Override
public void onError(Throwable e) {
listener.onError(e.getMessage());
}
@Override
public void onComplete() {
}
});
}
源码流程就看下面这个手画的图吧:
可以看到类是一层一层被包裹的,那么当 ObservableEmitter 执行 e.next() 时,会调到 ObservableObserveOn 的内部类 ObserveOnObserver 的 onNext 方法, onNext 方法中调了 schedule, schedule 方法如下:
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this); // worker 是 HandlerScheduler.createWorker, 返回的是 HandlerScheduler 的内部类 HandlerWorker
}
}
wordker.schedule 的主要代码如下:
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
// 这里的 handler 是 new Handler(Looper.getMainLooper())
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
// 这里切换到主线程
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
所以接下来会执行 ScheduledRunnable 的 run 方法,接着执行到 run 方法中,会调用 drainNormal
void drainNormal() {
// ...
final Observer<? super T> a = actual;
for (;;) {
// ...
for (;;) {
// ...
try {
// ...
} catch (Throwable ex) {
// ...
a.onError(ex);
return;
}
// ...
// 这里执行 onNext 方法,也就是会执行到我们实现的代码
a.onNext(v);
}
// ...
}
}