欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

自己动手造一个 RxJava(三)—— 线程调度

程序员文章站 2024-03-03 19:51:52
...

目录

4.线程调度

终于来到最后一个 part 了。线程调度是 RxJava 中另一核心部分,这也是我花最多时间去理解的地方。

RxJava 是通过 subscribeOn(Scheduler scheduler)observeOn(Scheduler scheduler) 两个方法来实现线程调度的。

  • subscribeOn(),指定上游事件发送所在的线程,可以放在任何位置,但是只有第一次的指定是有效的。
  • observeOn(),指定下游事件接收所在的线程,可以多次指定,即如果有多次切换线程的需求,只要在每个需要切换的地方之前调用一次 observeOn() 即可。
  • Scheduler 是一个调度器的类,它指定了事件应该运行在什么线程。

我们先来看下面这个例子。

Observable.just(1,2,3)
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return String.valueOf(integer);
            }
        })
        .observeOn(Schedulers.computation())
        .subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(String string) {
                System.out.println("onNext:"+string);
            }
        });

使用 just() 方法创建一个 Observable,随后通过 subscribeOn(Schedulers.io()) 指定 1,2,3 在 io 线程发送,并使用 observeOn(Schedulers.newThread()) 指定 map() 操作在新的线程执行,最后调用 observeOn(Schedulers.computation()) 让下游的回调在 computation 线程执行,总共完成了 3 次线程切换。

接下来我们来看怎么实现。

subscribeOn 的实现

我们先忽略 Schedule 的实现,只关注如何将上游的事件切换到新的线程中去执行。

事件发送 中,我们是在 action.call() 中通过调用 onNext()onCompleted() 来产生事件的,因此我们可以将这些方法的放到一个新的线程中去调用。

就像这样。

MyObservable.create(new MyAction1<MyObserver<Integer>>() {
    @Override
    public void call(MyObserver<Integer> myObserver) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                myObserver.onNext(1);
                myObserver.onNext(2);
                myObserver.onNext(3);
                myObserver.onCompleted();
            }
        }).start();
    }
})

当然我们不能这么简单粗暴的将新建线程的操作暴露在外面,使用者在调用 create() 方法的时候只关注事件如何发送,线程切换应该放在 subscribeOn() 方法中实现,所以我们要思考如何将这一系列的事件包裹到新的线程中运行。

回顾 简单的映射 中,我们在 map() 方法中将原来的 MyObservable 转变为一个新的 MyObservable,结合这种思想,我们是不是可以将普通的 MyObservable 转变成一个新的封装了线程操作的 MyObservable 呢?

答案是肯定的。来看我们的 subscribeOn() 是怎么实现的。

public MyObservable<T> subscribeOn() {
    MyObservable<T> upstream = this;
    return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
        @Override
        public void call(MyObserver<T> myObserver) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    upstream.subscribe(new MyObserver<T>() {
                        @Override
                        public void onNext(T t) {
                            myObserver.onNext(t);
                        }

                        @Override
                        public void onCompleted() {
                            myObserver.onCompleted();
                        }

                        @Override
                        public void onError(Throwable e) {
                            myObserver.onError(e);
                        }
                    });
                }
            }).start();
        }
    });
}

map() 方法一样,我们用 upsteam 变量保存了当前的 MyObservable 实例,随后返回一个新的 MyObservable 对象,并在 call() 方法中开启了一个子线程,在 run() 方法中调用 upsteam.subscribe(),将上游 upsteam 中的回调全部转移到新 MyObservable 的回调中去,于是我们就实现了将一个普通的 MyObservable 转变为一个新的含有线程操作的 MyObservable 。

看下使用效果。

MyObservable.create(new MyAction1<MyObserver<Integer>>() {
        @Override
        public void call(MyObserver<Integer> myObserver) {
            System.out.println("call:" + Thread.currentThread().getName());
            myObserver.onNext(1);
            myObserver.onNext(2);
            myObserver.onNext(3);
            myObserver.onCompleted();
        }
    })
            .subscribeOn()
            .subscribe(new MyObserver<Integer>() {
                @Override
                public void onNext(Integer integer) {
                    System.out.println("onNext:" + Thread.currentThread().getName());
                }

                @Override
                public void onCompleted() {
                    System.out.println("onCompleted:" + Thread.currentThread().getName());
                }

                @Override
                public void onError(Throwable e) {

                }
            });

我们在 call()onNext()onCompleted() 中打印了所在线程的名字,运行结果如下。

call:Thread-0
onNext:Thread-0
onNext:Thread-0
onNext:Thread-0
onCompleted:Thread-0

可以看到事件的发送和接收都在一个新的子线程 Thread-0 里面。

我们来梳理一下执行的流程。

自己动手造一个 RxJava(三)—— 线程调度

通过 Observable.create() 创建了 MyObservable 1 ,随后调用 subscribeOn() 变换得到新的 MyObservable 2 ,最后调用 subscribe() 传入一个 MyObserver 。注意,这里的 MyObserver 是传给 MyObservable 2 的,所以我们将其命名为 MyObserver 2 。

在主线程的时候,由 MyObservable 2 调用 subscribe()

public void subscribe(MyObserver<T> myObserver) {
    action.call(myObserver);
}

subscribe() 会调用 MyObservable 2 中的 action 执行 call() 方法,它的实现就在刚才的 subscribeOn() 里面。

public MyObservable<T> subscribeOn() {
    MyObservable<T> upstream = this;
    return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
        @Override
        public void call(MyObserver<T> myObserver) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    upstream.subscribe(new MyObserver<T>() {
                        @Override
                        public void onNext(T t) {
                            myObserver.onNext(t);
                        }

                        @Override
                        public void onCompleted() {
                            myObserver.onCompleted();
                        }

                        @Override
                        public void onError(Throwable e) {
                            myObserver.onError(e);
                        }
                    });
                }
            }).start();
        }
    });
}

这里的 call() 我们已经在内部开启一个新线程,所以会进入 Thread-0 线程。在线程执行体中,我们调用了 upsteam.subscirbe() ,即 1.subscribe()subscribe() 又会调用 MyObservable 1 中的 action 执行 1.call()1.call() 的实现在我们最开始的 create() 里面。

MyObservable.create(new MyAction1<MyObserver<Integer>>() {
        @Override
        public void call(MyObserver<Integer> myObserver) {
            System.out.println("call:" + Thread.currentThread().getName());
            myObserver.onNext(1);
            myObserver.onNext(2);
            myObserver.onNext(3);
            myObserver.onCompleted();
        }
    })

我们调用了三次 onNext() 和一次 onCompleted() ,上图我只画了第一个 onNext() 的调用,即 1.onNext()1.onNext() 的回调在 subscribe() 中我们将其转发给了 2.onNext()

public MyObservable<T> subscribeOn() {
    /*省略*/
            upstream.subscribe(new MyObserver<T>() {
                @Override
                public void onNext(T t) {
                    myObserver.onNext(t);
                }

                @Override
                public void onCompleted() {
                    myObserver.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    myObserver.onError(e);
                }
            });
    /*省略*/
}

所以最终会来到一开始我们传入的 MyObserver 中,执行 System.out.println() 方法。

MyObservable.create()//省略实现
            .subscribeOn()
            .subscribe(new MyObserver<Integer>() {
                @Override
                public void onNext(Integer integer) {
                    System.out.println("onNext:" + Thread.currentThread().getName());
                }

                @Override
                public void onCompleted() {
                    System.out.println("onCompleted" + Thread.currentThread().getName());
                }

                @Override
                public void onError(Throwable e) {

                }
            });

为什么 subscribeOn 只在第一次生效

我们来看下面的例子。

MyObservable.create(new MyAction1<MyObserver<Integer>>() {
        @Override
        public void call(MyObserver<Integer> myObserver) {
            System.out.println("call:" + Thread.currentThread().getName());
            myObserver.onNext(1);
        }
    })
            .subscribeOn()
            .map(new Func<Integer, String>() {
                @Override
                public String call(Integer integer) {
                    System.out.println("map:" + Thread.currentThread().getName());
                    return String.valueOf(integer);
                }
            })
            .subscribeOn()
            .subscribe(new MyObserver<String>() {
                @Override
                public void onNext(String string) {
                    System.out.println("onNext:" + Thread.currentThread().getName());
                }

                @Override
                public void onCompleted() {}

                @Override
                public void onError(Throwable e) {}
            });

create() 后面和 map() 后面都调用了一次 subscribeOn() ,可能一开始我们会理所当然的觉得,create()print() 方法会发生在子线程1,map() 中的 print() 会发生在子线程2,那么实际结果是怎样的呢?

call:Thread-1
map:Thread-1
onNext:Thread-1

所有的 print() 方法都发生在子线程1,也就是说第二个 subscribeOn() 是无效的。来看下流程图就知道为什么了。

自己动手造一个 RxJava(三)—— 线程调度

可以看到,虽然我们在第二次调用 subscribeOn() 的时候,从主线程切换到了 Thread-0 线程,但是在第一次调用 subscribe() 的时候,它又让接下来的流程从 Thread-0 切换到 Thread-1 ,而真正的事件发送,即 onNext() 以及它们的回调,统统发生在 Thread-1 里面,所以不管我们在第一次调用 subscribeOn() 之后,又调用了几次 subscribeOn() ,它们的作用只会让你的线程从 main 切换 Thread-0,Thread-1,Thread-2,……,Thread-n,而 onNext() 以及它们的回调将会在最后一个新建出来的子线程执行(忽略 observeOn() 的影响)。

observeOn 的实现

前面讲过, observeOn() 方法作用的是它的直接下游,如果是在 subscribe() 前面调用的,那么它改变的是回调所在的线程,即 onNext()onCompleted()onError() 的实现。如果是在其他操作符如 map() 前面调用的呢?其实也是一样的,我们再次回顾 map() 的实现。

public <R> MyObservable<R> map(Func<T, R> func) {
    final MyObservable<T> upstream = this;
    return new MyObservable<R>(new MyAction1<MyObserver<R>>() {
        @Override
        public void call(MyObserver<R> myObserver) {
            upstream.start(new MyObserver<T>() {
                @Override
                public void onNext(T t) {
                    myObserver.onNext(func.call(t));
                }
                @Override
                public void onCompleted() {
                    myObserver.onCompleted();
                }
                @Override
                public void onError(Throwable e) {
                    myObserver.onError(e);
                }
            });
        }
    });
}

map() 中的核心语句是 func.call(t) 的调用,并将其传递到下游的 myObserver ,所以要想切换 func.call(t) 所在的线程,就必须改变 onNext() 回调所在的线程。

写法很简单,我们返回一个新的 MyObservable,并在上游的 onNext() 回调中新建一个线程,再将回调传递给下游,也就是当前新返回的 MyObservable。

public MyObservable<T> observeOn(Scheduler scheduler) {
    MyObservable<T> upstream = this;
    return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
        @Override
        public void call(MyObserver<T> myObserver) {
            upstream.subscribe(new MyObserver<T>() {
                @Override
                public void onNext(T t) {
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            myObserver.onNext(t);
                        }
                    }).start();
                }

                @Override
                public void onCompleted() {
                    myObserver.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    myObserver.onError(e);
                }
            });
        }
    });
}

这里我们忽略了对 onCompleted()onError() 的处理,因为我们要保证它们和 onNext() 是执行在同一个子线程中的,需要借助线程池来实现,这个我们待会再讨论,现在只需关注怎么改变下游的线程。先来看看我们的 observeOn() 怎么使用吧。

MyObservable.create(new MyAction1<MyObserver<Integer>>() {
    @Override
    public void call(MyObserver<Integer> myObserver) {
        System.out.println("call:" + Thread.currentThread().getName());
        myObserver.onNext(1);
    }
})
        .observeOn()
        .subscribe(new MyObserver<Integer>() {
            @Override
            public void onNext(Integer integer) {
                System.out.println("onNext:" + Thread.currentThread().getName());
            }

            @Override
            public void onCompleted() {}

            @Override
            public void onError(Throwable e) {}
        });

将下游的回调指定在新的子线程,运行结果如下。

call:main
onNext:Thread-0

达到了我们想要的效果,再来梳理下执行流程。

自己动手造一个 RxJava(三)—— 线程调度

我们通过 create() 操作和 observeOn() 生成了 MyObservable2 对象,随后调用 subscribe() 方法, subscribe() 方法会调用 2.call() 方法,而 2.call() 的实现我们是在 observeOn() 中声明的,即调用上游 MyObservable1 的 subscribe() 方法, 1.subscribe() 方法调用 1.call() 方法, 它的实现在 create() 中已经声明,即调用 1.onNext() 方法, 1.onNext() 的回调同样在 observeOn() 内部,此时会开启一个新的子线程,进入 Thread-0 ,在线程体中调用 2.onNext() ,它的回调在我们声明的 MyObserver2 中,即打印输出当前线程。

讲起来很啰嗦,大家可以自己根据流程在纸上画一遍,一下子会清晰很多。

最后我们再来看一个比较复杂的场景,由一个 subscribeOn() 和多个 observeOn() 同时使用的例子,代码如下。

MyObservable.create(new MyAction1<MyObserver<Integer>>() {
    @Override
    public void call(MyObserver<Integer> myObserver) {
        System.out.println("call:" + Thread.currentThread().getName());
        myObserver.onNext(1);
    }
})
        .subscribeOn()
        .observeOn()
        .map(new Func<Integer, String>() {
            @Override
            public String call(Integer integer) {
                System.out.println("map:" + Thread.currentThread().getName());
                return String.valueOf(integer);
            }
        })
        .observeOn()
        .subscribe(new MyObserver<Integer>() {
            @Override
            public void onNext(Integer integer) {
                System.out.println("onNext:" + Thread.currentThread().getName());
            }

            @Override
            public void onCompleted() {}

            @Override
            public void onError(Throwable e) {}
        });

这里一共切换了三次线程,运行结果如下。

call:Thread-0
map:Thread-1
onNext:Thread-2

发送事件运行在 Thread-0 线程,map 映射运行在 Thread-1 线程,结果回调发生在 Thread-2 线程。流程如下所示。

自己动手造一个 RxJava(三)—— 线程调度

看起来非常复杂,这里就不再赘述,需要大家比较耐心的看下去,跟随代码,理解线程是如何在整个流程中发生切换的。

利用线程池进行调度

前面在写 observeOn() 方法的时候我们只对 onNext() 方法开启了子线程,而没有对 onCompleted()onError() 进行操作。

public MyObservable<T> observeOn(Scheduler scheduler) {
    MyObservable<T> upstream = this;
    return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
        @Override
        public void call(MyObserver<T> myObserver) {
            upstream.subscribe(new MyObserver<T>() {
                @Override
                public void onNext(T t) {
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            myObserver.onNext(t);
                        }
                    }).start();
                }

                @Override
                public void onCompleted() {
                    myObserver.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    myObserver.onError(e);
                }
            });
        }
    });
}

因为它们之间其实是独立的关系,我们在 onNext() 中通过 new Thread().start() 的方式开启了一个子线程,但是我们没有办法让 onCompleted() 同样执行在这个新建出来的线程中。事实上,onNext() 的写法也是有问题的。一旦我们在发送事件的时候,调用了多次 onNext() ,那么它在每次回调的时候,就会新开辟一个线程,导致所有事件都在不同的子线程中去处理,就不能保证事件能够按照发送的顺序进行接收了。

那么解决的办法就是使用线程池来管理我们的线程。

还记得RxJava在切换线程的时候是怎么写的吗?

Observable.just(1,2,3)
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())

在调用 subscribeOn()observeOn() 的时候,需要传入一个是 Scheduler 类的对象,前面说过,它相当于一个调度器,能够指定我们事件执行在什么线程,而 Schedulers 是一个单例,它用来管理和提供不同的调度器(即线程池)供开发者调用。

我们可以模仿 RxJava 的方式来实现线程池的管理。首先定义一个 Scheduler 抽象类,它包含 schedule()finish()isFinished() 方法。

public abstract class Scheduler {
    public abstract void schedule(Runnable runnable);
    public abstract void finish();
    public abstract boolean isFinished();
}

接下来是我们提供两个 Scheduler 的实现类。

public class NewThreadScheduler extends Scheduler {

    private ExecutorService executor;

    private boolean isFinished = false;

    public NewThreadScheduler() {
        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NewThread-" + System.currentTimeMillis());
            }
        };
        executor = Executors.newSingleThreadExecutor(threadFactory);
    }

    @Override
    public void schedule(Runnable runnable) {
        if (!isFinished) {
            executor.execute(runnable);
        }
    }

    @Override
    public void finish() {
        if (!isFinished) {
            executor.shutdown();
            isFinished = true;
        }
    }

    @Override
    public boolean isFinished() {
        return isFinished;
    }

}
public class ChildThreadScheduler extends Scheduler {

    private ExecutorService executor;

    private boolean isFinished = false;

    public ChildThreadScheduler() {
        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "ChildThread-" + System.currentTimeMillis());
            }
        };
        executor = Executors.newSingleThreadExecutor(threadFactory);
    }

    @Override
    public void schedule(Runnable runnable) {
        if (!isFinished) {
            executor.execute(runnable);
        }
    }

    @Override
    public void finish() {
        if (!isFinished) {
            executor.shutdown();
            isFinished = true;
        }
    }

    @Override
    public boolean isFinished() {
        return isFinished;
    }

}

可以看到,我们分别在两个类的构造函数中,声明了一个 ThreadFactory,并将其传入 Executors.newSingleThreadExecutor() 方法中,返回一个 ExecutorService 对象。注意,这里使用 newSingleThreadExecutor() 是为了保证 runnable 对象能够按顺序进入线程池,以确保事件能够按照我们定义的顺序去执行。

schedule() 方法中,我们调用 executor.execute(runnable) 方法,让线程池执行runnable对象,在 finish() 方法中,调用了 executor.shutdown() 方法,它会在线程池执行完任务后,关闭线程池,在两个方法在执行前都会提前判断 isFinished 的值,避免抛出 RejectedExecutionException 的异常。

以上这些涉及到一些线程池的知识,不清楚地同学可以先去了解一下。

这两个类的唯一区别,就是构造函数中 ThreadFactory 返回的线程的名字不一样。在这里,我们只是为了做一个简单的区分。

接着我们定义一个 Schedulers 的单例。

public class Schedulers {

    private static class Singleton {
        private static Schedulers instance = new Schedulers();
    }

    private static Schedulers getInstance() {
        return Singleton.instance;
    }

    private ChildThreadScheduler childThreadScheduler;

    public static Scheduler newThread() {
        return new NewThreadScheduler();
    }

    public static Scheduler childThread() {
        if (getInstance().childThreadScheduler == null) {
            getInstance().childThreadScheduler = new ChildThreadScheduler();
        }else if (getInstance().childThreadScheduler.isFinished()){
            getInstance().childThreadScheduler = new ChildThreadScheduler();
        }
        return getInstance().childThreadScheduler;
    }
}

当调用 Schedulers.newThread() 方法时,直接返回一个新的 NewThreadScheduler 对象。
当调用 Schedulers.childThread() 方法时,会返回一个单例中维护的 ChildThreadScheduler 对象,如果这个线程池为空或者已经被关闭,那么再重新返回一个新的实例。

现在我们可以看出这两个线程池的区别,newThread() 每次都会开启一个新的线程池,而 childThread() 则会使用同一个线程池。

定义好线程管理相关的类后,我们就可以改造 subscribeOn() 方法了。

public MyObservable<T> subscribeOn(Scheduler scheduler) {
    MyObservable<T> upstream = this;
    return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
        @Override
        public void call(MyObserver<T> myObserver) {
            scheduler.schedule(new Runnable() {
                @Override
                public void run() {
                    upstream.subscribe(new MyObserver<T>() {
                        @Override
                        public void onNext(T t) {
                            myObserver.onNext(t);
                        }

                        @Override
                        public void onCompleted() {
                            myObserver.onCompleted();
                        }

                        @Override
                        public void onError(Throwable e) {
                            myObserver.onError(e);
                        }
                    });
                }
            });
        }
    });
}

我们将 new Thread().start() 的方式,改成了 scheduler.schedule() ,非常的简单。

再看看 ObserverOn() 方法。

public MyObservable<T> observeOn(Scheduler scheduler) {
    MyObservable<T> upstream = this;
    return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
        @Override
        public void call(MyObserver<T> myObserver) {
            upstream.subscribe(new MyObserver<T>() {
                @Override
                public void onNext(T t) {
                    scheduler.schedule(new Runnable() {
                        @Override
                        public void run() {
                            myObserver.onNext(t);
                        }
                    });
                }

                @Override
                public void onCompleted() {
                    scheduler.schedule(new Runnable() {
                        @Override
                        public void run() {
                            myObserver.onCompleted();
                            scheduler.finish();
                        }
                    });
                }

                @Override
                public void onError(Throwable e) {
                    scheduler.schedule(new Runnable() {
                        @Override
                        public void run() {
                            myObserver.onError(e);
                            scheduler.finish();
                        }
                    });
                }
            });
        }
    });
}

现在,我们不管是在 onNext()onCompleted() 还是 onError() 方法中,调用 scheduler.schedule() 方法,都是同一个 scheduler 对象在执行,即它们都跑在同一个线程池中。

再来测试一下。

MyObservable.create(new MyAction1<MyObserver<Integer>>() {
        @Override
        public void call(MyObserver<Integer> myObserver) {
            System.out.println("call:" + Thread.currentThread().getName());
            myObserver.onNext(1);
            myObserver.onNext(2);
            myObserver.onCompleted();
        }
    })
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .subscribe(new MyObserver<Integer>() {
                @Override
                public void onNext(Integer integer) {
                    System.out.println("onNext:" + integer+" "+Thread.currentThread().getName());
                }

                @Override
                public void onCompleted() {
                    System.out.println("onCompleted:"+Thread.currentThread().getName());
                }

                @Override
                public void onError(Throwable e) {}
            });

运行结果如下。

call:NewThread-1533382601665
onNext:1 NewThread-1533382601666
onNext:2 NewThread-1533382601666
onCompleted:NewThread-1533382601666

效果不错,我们成功让事件在 NewThread-1533382601665 线程中发送,并在 NewThread-1533382601666 中回调结果,但是我们会发现,程序依然在运行状态,不会自动结束进程。这是因为我们传进去的 scheduler 都没有被关闭,那么现在问题来了,我们要怎样关闭这个 scheduler?

关闭线程池

为了确保线程池在不再有任务的情况下关闭,我们必须在最后一刻才调用 scheduler.finish() 方法。观察前面的那几个流程图,我们知道整个流程在执行到最后都会来到我们一开始传进去的 MyObserver 回调中,所以我们可以对 subscribe() 方法做些改变,让它能够在 onCompleted() 或者 onError() 方法执行完关闭线程池。

新建一个 mySubscribe() 方法,同subscribe() 一样,它调用了 action.call() 方法,但是传进去的是一个新的 MyObserver ,在回调中再去调用外部传进去的 myObserver.onCompleted()myObserver.onError() ,最后执行 finish() 方法,这样就能确保我们对线程池的关闭是在整个流程的最后一刻执行的。

public void mySubscribe(MyObserver<T> myObserver) {
    action.call(new MyObserver<T>() {
        @Override
        public void onNext(T t) {
            myObserver.onNext(t);
        }

        @Override
        public void onCompleted() {
            myObserver.onCompleted();
            finish();//关闭线程池
        }

        @Override
        public void onError(Throwable e) {
            myObserver.onError(e);
            finish();//关闭线程池
        }
    });
}

注意,现在这个方法与 subscribe() 的区别是, mySubscribe() 是我们在外部调用的,而 subscribe() 是在内部调用的。

再看下 finish() 怎么实现。

public class MyObservable<T> {

    /*已省略*/

    private Set<Scheduler> schedulers;

    private MyObservable(MyAction1<MyObserver<T>> action) {
        this.action = action;
        this.schedulers = new HashSet<>();
    }

    private MyObservable(MyAction1<MyObserver<T>> action, Set<Scheduler> schedulers) {
        this.action = action;
        this.schedulers = schedulers;
    }

    private void finish(){
        for (Scheduler scheduler : schedulers) {
            scheduler.finish();
        }
    }

    /*已省略*/
}

我们在内部新增了一个 Scheduler 的集合变量 schedulers ,在单参数的构造函数中初始化,并提供一个双参数的构造函数,方便我们在 map()subscribeOn()observeOn() 中创建新实例时传递这个变量。

这几个方法的改动如下。

public <R> MyObservable<R> map(Func<T, R> func) {
    final MyObservable<T> upstream = this;
    return new MyObservable<R>(new MyAction1<MyObserver<R>>() {
        @Override
        public void call(MyObserver<R> myObserver) {
            upstream.subscribe(new MyObserver<T>() {
                @Override
                public void onNext(T t) {
                    myObserver.onNext(func.call(t));
                }

                @Override
                public void onCompleted() {
                    myObserver.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    myObserver.onError(e);
                }
            });
        }
    }, schedulers);
}
public MyObservable<T> subscribeOn(Scheduler scheduler) {
    schedulers.add(scheduler);
    MyObservable<T> upstream = this;
    return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
        @Override
        public void call(MyObserver<T> myObserver) {
            scheduler.schedule(new Runnable() {
                @Override
                public void run() {
                    upstream.subscribe(new MyObserver<T>() {
                        @Override
                        public void onNext(T t) {
                            myObserver.onNext(t);
                        }

                        @Override
                        public void onCompleted() {
                            myObserver.onCompleted();
                        }

                        @Override
                        public void onError(Throwable e) {
                            myObserver.onError(e);
                        }   
                    });
                }
            });
        }
    }, schedulers);
}
public MyObservable<T> observeOn(Scheduler scheduler) {
    schedulers.add(scheduler);
    MyObservable<T> upstream = this;
    return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
        @Override
        public void call(MyObserver<T> myObserver) {
            upstream.subscribe(new MyObserver<T>() {
                @Override
                public void onNext(T t) {
                    scheduler.schedule(new Runnable() {
                        @Override
                        public void run() {
                            myObserver.onNext(t);
                        }
                    });
                }

                @Override
                public void onCompleted() {
                    scheduler.schedule(new Runnable() {
                        @Override
                        public void run() {
                            myObserver.onCompleted();
                        }
                    });
                }

                @Override
                public void onError(Throwable e) {
                    scheduler.schedule(new Runnable() {
                        @Override
                        public void run() {
                            myObserver.onError(e);
                        }
                    });
                }
            });
        }
    }, schedulers);
}

最后,我们用一个比较复杂的例子来演示。

MyObservable.create(new MyAction1<MyObserver<Integer>>() {
        @Override
        public void call(MyObserver<Integer> myObserver) {
            System.out.println("call:" + Thread.currentThread().getName());
            myObserver.onNext(1);
            myObserver.onCompleted();
        }
    })
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.childThread())
            .map(new Func<Integer, String>() {
                @Override
                public String call(Integer integer) {
                    System.out.println("map:" + Thread.currentThread().getName());
                    return String.valueOf(integer);
                }
            })
            .observeOn(Schedulers.newThread())
            .map(new Func<String, Integer>() {
                @Override
                public Integer call(String string) {
                    System.out.println("map:" + Thread.currentThread().getName());
                    return Integer.parseInt(string);
                }
            })
            .observeOn(Schedulers.childThread())
            .mySubscribe(new MyObserver<Integer>() {
                @Override
                public void onNext(Integer string) {
                    System.out.println("onNext:" + Thread.currentThread().getName());
                }

                @Override
                public void onCompleted() {
                    System.out.println("onCompleted:" + Thread.currentThread().getName());
                }

                @Override
                public void onError(Throwable e) {}
            });

执行结果如下。

call:NewThread-1533441899656
map:ChildThread-1533441899658
map:NewThread-1533441899658
onNext:ChildThread-1533441899658
onCompleted:ChildThread-1533441899658

控制台输出了每个打印事件所在的线程,并且自动结束了进程。可以看到,这个流程里面包含了三个不同的线程,两个不同的 NewThread 线程,还有一个 ChildThread 线程。

它们的流程图如下。

自己动手造一个 RxJava(三)—— 线程调度

结语

到这里我们整个《自己动手造一个RxJava》的讲解就结束了,非常感谢大家的阅读,在写本文之前自己是花了一周的时间去理解,然后又花了一周的时间才把整个思路和分析整理出来,算是我第一次花这么大精力去写的一篇文章了。本文篇幅较长,某些地方可能讲得比较啰嗦,但是对新手而言如果能够耐心的看下去,是非常不错的学习资料。若有错误的地方,也请各位读者及时指出,欢迎大家一起探讨。

同时感谢以下两位作者提供的参考资料:

给 Android 开发者的 RxJava 详解
RxJava 系列文章


自己动手造一个 RxJava(一)—— 理解临时任务对象
自己动手造一个 RxJava(二)—— 事件的发送、接收与映射
自己动手造一个 RxJava(三)—— 线程调度