自己动手造一个 RxJava(三)—— 线程调度
目录
4.线程调度
终于来到最后一个 part 了。线程调度是 RxJava 中另一核心部分,这也是我花最多时间去理解的地方。
RxJava 是通过 subscribeOn(Scheduler scheduler)
和 observeOn(Scheduler scheduler)
两个方法来实现线程调度的。
subscribeOn()
,指定上游事件发送所在的线程,可以放在任何位置,但是只有第一次的指定是有效的。observeOn()
,指定下游事件接收所在的线程,可以多次指定,即如果有多次切换线程的需求,只要在每个需要切换的地方之前调用一次observeOn()
即可。- Scheduler 是一个调度器的类,它指定了事件应该运行在什么线程。
我们先来看下面这个例子。
Observable.just(1,2,3)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return String.valueOf(integer);
}
})
.observeOn(Schedulers.computation())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String string) {
System.out.println("onNext:"+string);
}
});
使用 just()
方法创建一个 Observable,随后通过 subscribeOn(Schedulers.io())
指定 1,2,3 在 io 线程发送,并使用 observeOn(Schedulers.newThread())
指定 map()
操作在新的线程执行,最后调用 observeOn(Schedulers.computation())
让下游的回调在 computation 线程执行,总共完成了 3 次线程切换。
接下来我们来看怎么实现。
subscribeOn 的实现
我们先忽略 Schedule 的实现,只关注如何将上游的事件切换到新的线程中去执行。
在 事件发送 中,我们是在 action.call()
中通过调用 onNext()
、onCompleted()
来产生事件的,因此我们可以将这些方法的放到一个新的线程中去调用。
就像这样。
MyObservable.create(new MyAction1<MyObserver<Integer>>() {
@Override
public void call(MyObserver<Integer> myObserver) {
new Thread(new Runnable() {
@Override
public void run() {
myObserver.onNext(1);
myObserver.onNext(2);
myObserver.onNext(3);
myObserver.onCompleted();
}
}).start();
}
})
当然我们不能这么简单粗暴的将新建线程的操作暴露在外面,使用者在调用 create()
方法的时候只关注事件如何发送,线程切换应该放在 subscribeOn()
方法中实现,所以我们要思考如何将这一系列的事件包裹到新的线程中运行。
回顾 简单的映射 中,我们在 map()
方法中将原来的 MyObservable 转变为一个新的 MyObservable,结合这种思想,我们是不是可以将普通的 MyObservable 转变成一个新的封装了线程操作的 MyObservable 呢?
答案是肯定的。来看我们的 subscribeOn()
是怎么实现的。
public MyObservable<T> subscribeOn() {
MyObservable<T> upstream = this;
return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
@Override
public void call(MyObserver<T> myObserver) {
new Thread(new Runnable() {
@Override
public void run() {
upstream.subscribe(new MyObserver<T>() {
@Override
public void onNext(T t) {
myObserver.onNext(t);
}
@Override
public void onCompleted() {
myObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
myObserver.onError(e);
}
});
}
}).start();
}
});
}
同 map()
方法一样,我们用 upsteam
变量保存了当前的 MyObservable 实例,随后返回一个新的 MyObservable 对象,并在 call()
方法中开启了一个子线程,在 run()
方法中调用 upsteam.subscribe()
,将上游 upsteam 中的回调全部转移到新 MyObservable 的回调中去,于是我们就实现了将一个普通的 MyObservable 转变为一个新的含有线程操作的 MyObservable 。
看下使用效果。
MyObservable.create(new MyAction1<MyObserver<Integer>>() {
@Override
public void call(MyObserver<Integer> myObserver) {
System.out.println("call:" + Thread.currentThread().getName());
myObserver.onNext(1);
myObserver.onNext(2);
myObserver.onNext(3);
myObserver.onCompleted();
}
})
.subscribeOn()
.subscribe(new MyObserver<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + Thread.currentThread().getName());
}
@Override
public void onCompleted() {
System.out.println("onCompleted:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
});
我们在 call()
、onNext()
和 onCompleted()
中打印了所在线程的名字,运行结果如下。
call:Thread-0
onNext:Thread-0
onNext:Thread-0
onNext:Thread-0
onCompleted:Thread-0
可以看到事件的发送和接收都在一个新的子线程 Thread-0 里面。
我们来梳理一下执行的流程。
通过 Observable.create()
创建了 MyObservable 1 ,随后调用 subscribeOn()
变换得到新的 MyObservable 2 ,最后调用 subscribe()
传入一个 MyObserver 。注意,这里的 MyObserver 是传给 MyObservable 2 的,所以我们将其命名为 MyObserver 2 。
在主线程的时候,由 MyObservable 2 调用 subscribe()
。
public void subscribe(MyObserver<T> myObserver) {
action.call(myObserver);
}
subscribe()
会调用 MyObservable 2 中的 action 执行 call()
方法,它的实现就在刚才的 subscribeOn()
里面。
public MyObservable<T> subscribeOn() {
MyObservable<T> upstream = this;
return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
@Override
public void call(MyObserver<T> myObserver) {
new Thread(new Runnable() {
@Override
public void run() {
upstream.subscribe(new MyObserver<T>() {
@Override
public void onNext(T t) {
myObserver.onNext(t);
}
@Override
public void onCompleted() {
myObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
myObserver.onError(e);
}
});
}
}).start();
}
});
}
这里的 call()
我们已经在内部开启一个新线程,所以会进入 Thread-0 线程。在线程执行体中,我们调用了 upsteam.subscirbe()
,即 1.subscribe()
, subscribe()
又会调用 MyObservable 1 中的 action 执行 1.call()
, 1.call()
的实现在我们最开始的 create()
里面。
MyObservable.create(new MyAction1<MyObserver<Integer>>() {
@Override
public void call(MyObserver<Integer> myObserver) {
System.out.println("call:" + Thread.currentThread().getName());
myObserver.onNext(1);
myObserver.onNext(2);
myObserver.onNext(3);
myObserver.onCompleted();
}
})
我们调用了三次 onNext()
和一次 onCompleted()
,上图我只画了第一个 onNext()
的调用,即 1.onNext()
, 1.onNext()
的回调在 subscribe()
中我们将其转发给了 2.onNext()
。
public MyObservable<T> subscribeOn() {
/*省略*/
upstream.subscribe(new MyObserver<T>() {
@Override
public void onNext(T t) {
myObserver.onNext(t);
}
@Override
public void onCompleted() {
myObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
myObserver.onError(e);
}
});
/*省略*/
}
所以最终会来到一开始我们传入的 MyObserver 中,执行 System.out.println()
方法。
MyObservable.create()//省略实现
.subscribeOn()
.subscribe(new MyObserver<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + Thread.currentThread().getName());
}
@Override
public void onCompleted() {
System.out.println("onCompleted" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
});
为什么 subscribeOn 只在第一次生效
我们来看下面的例子。
MyObservable.create(new MyAction1<MyObserver<Integer>>() {
@Override
public void call(MyObserver<Integer> myObserver) {
System.out.println("call:" + Thread.currentThread().getName());
myObserver.onNext(1);
}
})
.subscribeOn()
.map(new Func<Integer, String>() {
@Override
public String call(Integer integer) {
System.out.println("map:" + Thread.currentThread().getName());
return String.valueOf(integer);
}
})
.subscribeOn()
.subscribe(new MyObserver<String>() {
@Override
public void onNext(String string) {
System.out.println("onNext:" + Thread.currentThread().getName());
}
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
});
在 create()
后面和 map()
后面都调用了一次 subscribeOn()
,可能一开始我们会理所当然的觉得,create()
中 print()
方法会发生在子线程1,map()
中的 print()
会发生在子线程2,那么实际结果是怎样的呢?
call:Thread-1
map:Thread-1
onNext:Thread-1
所有的 print()
方法都发生在子线程1,也就是说第二个 subscribeOn()
是无效的。来看下流程图就知道为什么了。
可以看到,虽然我们在第二次调用 subscribeOn()
的时候,从主线程切换到了 Thread-0 线程,但是在第一次调用 subscribe()
的时候,它又让接下来的流程从 Thread-0 切换到 Thread-1 ,而真正的事件发送,即 onNext()
以及它们的回调,统统发生在 Thread-1 里面,所以不管我们在第一次调用 subscribeOn()
之后,又调用了几次 subscribeOn()
,它们的作用只会让你的线程从 main 切换 Thread-0,Thread-1,Thread-2,……,Thread-n,而 onNext()
以及它们的回调将会在最后一个新建出来的子线程执行(忽略 observeOn()
的影响)。
observeOn 的实现
前面讲过, observeOn()
方法作用的是它的直接下游,如果是在 subscribe()
前面调用的,那么它改变的是回调所在的线程,即 onNext()
、 onCompleted()
和 onError()
的实现。如果是在其他操作符如 map()
前面调用的呢?其实也是一样的,我们再次回顾 map()
的实现。
public <R> MyObservable<R> map(Func<T, R> func) {
final MyObservable<T> upstream = this;
return new MyObservable<R>(new MyAction1<MyObserver<R>>() {
@Override
public void call(MyObserver<R> myObserver) {
upstream.start(new MyObserver<T>() {
@Override
public void onNext(T t) {
myObserver.onNext(func.call(t));
}
@Override
public void onCompleted() {
myObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
myObserver.onError(e);
}
});
}
});
}
map()
中的核心语句是 func.call(t)
的调用,并将其传递到下游的 myObserver ,所以要想切换 func.call(t)
所在的线程,就必须改变 onNext()
回调所在的线程。
写法很简单,我们返回一个新的 MyObservable,并在上游的 onNext()
回调中新建一个线程,再将回调传递给下游,也就是当前新返回的 MyObservable。
public MyObservable<T> observeOn(Scheduler scheduler) {
MyObservable<T> upstream = this;
return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
@Override
public void call(MyObserver<T> myObserver) {
upstream.subscribe(new MyObserver<T>() {
@Override
public void onNext(T t) {
new Thread(new Runnable() {
@Override
public void run() {
myObserver.onNext(t);
}
}).start();
}
@Override
public void onCompleted() {
myObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
myObserver.onError(e);
}
});
}
});
}
这里我们忽略了对 onCompleted()
和 onError()
的处理,因为我们要保证它们和 onNext()
是执行在同一个子线程中的,需要借助线程池来实现,这个我们待会再讨论,现在只需关注怎么改变下游的线程。先来看看我们的 observeOn()
怎么使用吧。
MyObservable.create(new MyAction1<MyObserver<Integer>>() {
@Override
public void call(MyObserver<Integer> myObserver) {
System.out.println("call:" + Thread.currentThread().getName());
myObserver.onNext(1);
}
})
.observeOn()
.subscribe(new MyObserver<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + Thread.currentThread().getName());
}
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
});
将下游的回调指定在新的子线程,运行结果如下。
call:main
onNext:Thread-0
达到了我们想要的效果,再来梳理下执行流程。
我们通过 create()
操作和 observeOn()
生成了 MyObservable2 对象,随后调用 subscribe()
方法, subscribe()
方法会调用 2.call()
方法,而 2.call()
的实现我们是在 observeOn()
中声明的,即调用上游 MyObservable1 的 subscribe()
方法, 1.subscribe()
方法调用 1.call()
方法, 它的实现在 create()
中已经声明,即调用 1.onNext()
方法, 1.onNext()
的回调同样在 observeOn()
内部,此时会开启一个新的子线程,进入 Thread-0 ,在线程体中调用 2.onNext()
,它的回调在我们声明的 MyObserver2 中,即打印输出当前线程。
讲起来很啰嗦,大家可以自己根据流程在纸上画一遍,一下子会清晰很多。
最后我们再来看一个比较复杂的场景,由一个 subscribeOn()
和多个 observeOn()
同时使用的例子,代码如下。
MyObservable.create(new MyAction1<MyObserver<Integer>>() {
@Override
public void call(MyObserver<Integer> myObserver) {
System.out.println("call:" + Thread.currentThread().getName());
myObserver.onNext(1);
}
})
.subscribeOn()
.observeOn()
.map(new Func<Integer, String>() {
@Override
public String call(Integer integer) {
System.out.println("map:" + Thread.currentThread().getName());
return String.valueOf(integer);
}
})
.observeOn()
.subscribe(new MyObserver<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + Thread.currentThread().getName());
}
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
});
这里一共切换了三次线程,运行结果如下。
call:Thread-0
map:Thread-1
onNext:Thread-2
发送事件运行在 Thread-0 线程,map 映射运行在 Thread-1 线程,结果回调发生在 Thread-2 线程。流程如下所示。
看起来非常复杂,这里就不再赘述,需要大家比较耐心的看下去,跟随代码,理解线程是如何在整个流程中发生切换的。
利用线程池进行调度
前面在写 observeOn()
方法的时候我们只对 onNext()
方法开启了子线程,而没有对 onCompleted()
和 onError()
进行操作。
public MyObservable<T> observeOn(Scheduler scheduler) {
MyObservable<T> upstream = this;
return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
@Override
public void call(MyObserver<T> myObserver) {
upstream.subscribe(new MyObserver<T>() {
@Override
public void onNext(T t) {
new Thread(new Runnable() {
@Override
public void run() {
myObserver.onNext(t);
}
}).start();
}
@Override
public void onCompleted() {
myObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
myObserver.onError(e);
}
});
}
});
}
因为它们之间其实是独立的关系,我们在 onNext()
中通过 new Thread().start()
的方式开启了一个子线程,但是我们没有办法让 onCompleted()
同样执行在这个新建出来的线程中。事实上,onNext()
的写法也是有问题的。一旦我们在发送事件的时候,调用了多次 onNext()
,那么它在每次回调的时候,就会新开辟一个线程,导致所有事件都在不同的子线程中去处理,就不能保证事件能够按照发送的顺序进行接收了。
那么解决的办法就是使用线程池来管理我们的线程。
还记得RxJava在切换线程的时候是怎么写的吗?
Observable.just(1,2,3)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
在调用 subscribeOn()
和 observeOn()
的时候,需要传入一个是 Scheduler 类的对象,前面说过,它相当于一个调度器,能够指定我们事件执行在什么线程,而 Schedulers 是一个单例,它用来管理和提供不同的调度器(即线程池)供开发者调用。
我们可以模仿 RxJava 的方式来实现线程池的管理。首先定义一个 Scheduler 抽象类,它包含 schedule()
、 finish()
和 isFinished()
方法。
public abstract class Scheduler {
public abstract void schedule(Runnable runnable);
public abstract void finish();
public abstract boolean isFinished();
}
接下来是我们提供两个 Scheduler 的实现类。
public class NewThreadScheduler extends Scheduler {
private ExecutorService executor;
private boolean isFinished = false;
public NewThreadScheduler() {
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NewThread-" + System.currentTimeMillis());
}
};
executor = Executors.newSingleThreadExecutor(threadFactory);
}
@Override
public void schedule(Runnable runnable) {
if (!isFinished) {
executor.execute(runnable);
}
}
@Override
public void finish() {
if (!isFinished) {
executor.shutdown();
isFinished = true;
}
}
@Override
public boolean isFinished() {
return isFinished;
}
}
public class ChildThreadScheduler extends Scheduler {
private ExecutorService executor;
private boolean isFinished = false;
public ChildThreadScheduler() {
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "ChildThread-" + System.currentTimeMillis());
}
};
executor = Executors.newSingleThreadExecutor(threadFactory);
}
@Override
public void schedule(Runnable runnable) {
if (!isFinished) {
executor.execute(runnable);
}
}
@Override
public void finish() {
if (!isFinished) {
executor.shutdown();
isFinished = true;
}
}
@Override
public boolean isFinished() {
return isFinished;
}
}
可以看到,我们分别在两个类的构造函数中,声明了一个 ThreadFactory,并将其传入 Executors.newSingleThreadExecutor()
方法中,返回一个 ExecutorService 对象。注意,这里使用 newSingleThreadExecutor()
是为了保证 runnable 对象能够按顺序进入线程池,以确保事件能够按照我们定义的顺序去执行。
在 schedule()
方法中,我们调用 executor.execute(runnable)
方法,让线程池执行runnable对象,在 finish()
方法中,调用了 executor.shutdown()
方法,它会在线程池执行完任务后,关闭线程池,在两个方法在执行前都会提前判断 isFinished 的值,避免抛出 RejectedExecutionException 的异常。
以上这些涉及到一些线程池的知识,不清楚地同学可以先去了解一下。
这两个类的唯一区别,就是构造函数中 ThreadFactory 返回的线程的名字不一样。在这里,我们只是为了做一个简单的区分。
接着我们定义一个 Schedulers 的单例。
public class Schedulers {
private static class Singleton {
private static Schedulers instance = new Schedulers();
}
private static Schedulers getInstance() {
return Singleton.instance;
}
private ChildThreadScheduler childThreadScheduler;
public static Scheduler newThread() {
return new NewThreadScheduler();
}
public static Scheduler childThread() {
if (getInstance().childThreadScheduler == null) {
getInstance().childThreadScheduler = new ChildThreadScheduler();
}else if (getInstance().childThreadScheduler.isFinished()){
getInstance().childThreadScheduler = new ChildThreadScheduler();
}
return getInstance().childThreadScheduler;
}
}
当调用 Schedulers.newThread()
方法时,直接返回一个新的 NewThreadScheduler 对象。
当调用 Schedulers.childThread()
方法时,会返回一个单例中维护的 ChildThreadScheduler 对象,如果这个线程池为空或者已经被关闭,那么再重新返回一个新的实例。
现在我们可以看出这两个线程池的区别,newThread()
每次都会开启一个新的线程池,而 childThread()
则会使用同一个线程池。
定义好线程管理相关的类后,我们就可以改造 subscribeOn()
方法了。
public MyObservable<T> subscribeOn(Scheduler scheduler) {
MyObservable<T> upstream = this;
return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
@Override
public void call(MyObserver<T> myObserver) {
scheduler.schedule(new Runnable() {
@Override
public void run() {
upstream.subscribe(new MyObserver<T>() {
@Override
public void onNext(T t) {
myObserver.onNext(t);
}
@Override
public void onCompleted() {
myObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
myObserver.onError(e);
}
});
}
});
}
});
}
我们将 new Thread().start()
的方式,改成了 scheduler.schedule()
,非常的简单。
再看看 ObserverOn()
方法。
public MyObservable<T> observeOn(Scheduler scheduler) {
MyObservable<T> upstream = this;
return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
@Override
public void call(MyObserver<T> myObserver) {
upstream.subscribe(new MyObserver<T>() {
@Override
public void onNext(T t) {
scheduler.schedule(new Runnable() {
@Override
public void run() {
myObserver.onNext(t);
}
});
}
@Override
public void onCompleted() {
scheduler.schedule(new Runnable() {
@Override
public void run() {
myObserver.onCompleted();
scheduler.finish();
}
});
}
@Override
public void onError(Throwable e) {
scheduler.schedule(new Runnable() {
@Override
public void run() {
myObserver.onError(e);
scheduler.finish();
}
});
}
});
}
});
}
现在,我们不管是在 onNext()
、onCompleted()
还是 onError()
方法中,调用 scheduler.schedule()
方法,都是同一个 scheduler 对象在执行,即它们都跑在同一个线程池中。
再来测试一下。
MyObservable.create(new MyAction1<MyObserver<Integer>>() {
@Override
public void call(MyObserver<Integer> myObserver) {
System.out.println("call:" + Thread.currentThread().getName());
myObserver.onNext(1);
myObserver.onNext(2);
myObserver.onCompleted();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new MyObserver<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer+" "+Thread.currentThread().getName());
}
@Override
public void onCompleted() {
System.out.println("onCompleted:"+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {}
});
运行结果如下。
call:NewThread-1533382601665
onNext:1 NewThread-1533382601666
onNext:2 NewThread-1533382601666
onCompleted:NewThread-1533382601666
效果不错,我们成功让事件在 NewThread-1533382601665 线程中发送,并在 NewThread-1533382601666 中回调结果,但是我们会发现,程序依然在运行状态,不会自动结束进程。这是因为我们传进去的 scheduler 都没有被关闭,那么现在问题来了,我们要怎样关闭这个 scheduler?
关闭线程池
为了确保线程池在不再有任务的情况下关闭,我们必须在最后一刻才调用 scheduler.finish()
方法。观察前面的那几个流程图,我们知道整个流程在执行到最后都会来到我们一开始传进去的 MyObserver 回调中,所以我们可以对 subscribe()
方法做些改变,让它能够在 onCompleted()
或者 onError()
方法执行完关闭线程池。
新建一个 mySubscribe()
方法,同subscribe()
一样,它调用了 action.call()
方法,但是传进去的是一个新的 MyObserver ,在回调中再去调用外部传进去的 myObserver.onCompleted()
和 myObserver.onError()
,最后执行 finish()
方法,这样就能确保我们对线程池的关闭是在整个流程的最后一刻执行的。
public void mySubscribe(MyObserver<T> myObserver) {
action.call(new MyObserver<T>() {
@Override
public void onNext(T t) {
myObserver.onNext(t);
}
@Override
public void onCompleted() {
myObserver.onCompleted();
finish();//关闭线程池
}
@Override
public void onError(Throwable e) {
myObserver.onError(e);
finish();//关闭线程池
}
});
}
注意,现在这个方法与 subscribe()
的区别是, mySubscribe()
是我们在外部调用的,而 subscribe()
是在内部调用的。
再看下 finish()
怎么实现。
public class MyObservable<T> {
/*已省略*/
private Set<Scheduler> schedulers;
private MyObservable(MyAction1<MyObserver<T>> action) {
this.action = action;
this.schedulers = new HashSet<>();
}
private MyObservable(MyAction1<MyObserver<T>> action, Set<Scheduler> schedulers) {
this.action = action;
this.schedulers = schedulers;
}
private void finish(){
for (Scheduler scheduler : schedulers) {
scheduler.finish();
}
}
/*已省略*/
}
我们在内部新增了一个 Scheduler 的集合变量 schedulers ,在单参数的构造函数中初始化,并提供一个双参数的构造函数,方便我们在 map()
、subscribeOn()
和 observeOn()
中创建新实例时传递这个变量。
这几个方法的改动如下。
public <R> MyObservable<R> map(Func<T, R> func) {
final MyObservable<T> upstream = this;
return new MyObservable<R>(new MyAction1<MyObserver<R>>() {
@Override
public void call(MyObserver<R> myObserver) {
upstream.subscribe(new MyObserver<T>() {
@Override
public void onNext(T t) {
myObserver.onNext(func.call(t));
}
@Override
public void onCompleted() {
myObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
myObserver.onError(e);
}
});
}
}, schedulers);
}
public MyObservable<T> subscribeOn(Scheduler scheduler) {
schedulers.add(scheduler);
MyObservable<T> upstream = this;
return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
@Override
public void call(MyObserver<T> myObserver) {
scheduler.schedule(new Runnable() {
@Override
public void run() {
upstream.subscribe(new MyObserver<T>() {
@Override
public void onNext(T t) {
myObserver.onNext(t);
}
@Override
public void onCompleted() {
myObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
myObserver.onError(e);
}
});
}
});
}
}, schedulers);
}
public MyObservable<T> observeOn(Scheduler scheduler) {
schedulers.add(scheduler);
MyObservable<T> upstream = this;
return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
@Override
public void call(MyObserver<T> myObserver) {
upstream.subscribe(new MyObserver<T>() {
@Override
public void onNext(T t) {
scheduler.schedule(new Runnable() {
@Override
public void run() {
myObserver.onNext(t);
}
});
}
@Override
public void onCompleted() {
scheduler.schedule(new Runnable() {
@Override
public void run() {
myObserver.onCompleted();
}
});
}
@Override
public void onError(Throwable e) {
scheduler.schedule(new Runnable() {
@Override
public void run() {
myObserver.onError(e);
}
});
}
});
}
}, schedulers);
}
最后,我们用一个比较复杂的例子来演示。
MyObservable.create(new MyAction1<MyObserver<Integer>>() {
@Override
public void call(MyObserver<Integer> myObserver) {
System.out.println("call:" + Thread.currentThread().getName());
myObserver.onNext(1);
myObserver.onCompleted();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.childThread())
.map(new Func<Integer, String>() {
@Override
public String call(Integer integer) {
System.out.println("map:" + Thread.currentThread().getName());
return String.valueOf(integer);
}
})
.observeOn(Schedulers.newThread())
.map(new Func<String, Integer>() {
@Override
public Integer call(String string) {
System.out.println("map:" + Thread.currentThread().getName());
return Integer.parseInt(string);
}
})
.observeOn(Schedulers.childThread())
.mySubscribe(new MyObserver<Integer>() {
@Override
public void onNext(Integer string) {
System.out.println("onNext:" + Thread.currentThread().getName());
}
@Override
public void onCompleted() {
System.out.println("onCompleted:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {}
});
执行结果如下。
call:NewThread-1533441899656
map:ChildThread-1533441899658
map:NewThread-1533441899658
onNext:ChildThread-1533441899658
onCompleted:ChildThread-1533441899658
控制台输出了每个打印事件所在的线程,并且自动结束了进程。可以看到,这个流程里面包含了三个不同的线程,两个不同的 NewThread 线程,还有一个 ChildThread 线程。
它们的流程图如下。
结语
到这里我们整个《自己动手造一个RxJava》的讲解就结束了,非常感谢大家的阅读,在写本文之前自己是花了一周的时间去理解,然后又花了一周的时间才把整个思路和分析整理出来,算是我第一次花这么大精力去写的一篇文章了。本文篇幅较长,某些地方可能讲得比较啰嗦,但是对新手而言如果能够耐心的看下去,是非常不错的学习资料。若有错误的地方,也请各位读者及时指出,欢迎大家一起探讨。
同时感谢以下两位作者提供的参考资料:
给 Android 开发者的 RxJava 详解
RxJava 系列文章
▷ 自己动手造一个 RxJava(一)—— 理解临时任务对象
▷ 自己动手造一个 RxJava(二)—— 事件的发送、接收与映射
▶ 自己动手造一个 RxJava(三)—— 线程调度