欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

教你轻松理解Rxjava之线程切换流程(observeOn与subscribeOn)

程序员文章站 2022-04-02 18:08:36
...
几句屁话

      Rxjava我开始就觉得不就是能把线程切换吗?我的Handler和AsyncTask就能轻松解决,我还学什么这玩意......现在回想我就是一个井底之蛙

理解RxJava

  1. 订阅,向上走的一个过程
  2. 数据流,向下走的一个过程
借用别人的图给讲解下
请看下面的示意图(向上的箭头表示订阅操作的方向,向下的箭头表示数据流向,箭头的颜色表示所在的线程,曲折的箭头表示发生了线程切换)

教你轻松理解Rxjava之线程切换流程(observeOn与subscribeOn)

订阅的操作符:create丶doOnSubscribe

数据流的操作符:map丶flatMap丶过滤等主要是对数据处理的操作符

走之前记住一句话:subscribeOn切换订阅线程,observeOn处理数据流方向

1:当我们调用了subscribe的发起订阅

2:向上走,我只需要关心subscribeOn和订阅的操作符

3:向下走,我只需要关心observeOn和数据流的操作符


看不懂?来一段代码!

 Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        Log.d("wyz", "create:" + Thread.currentThread().getName());
                        e.onNext(1);
                    }
                })
                .subscribeOn(Schedulers.io())
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        Log.d("wyz", "map1:" + Thread.currentThread().getName());
                        return integer;
                    }
                })
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        Log.d("wyz", "doOnSubscribe1:" + Thread.currentThread().getName());
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        Log.d("wyz", "doOnSubscribe2:" + Thread.currentThread().getName());
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(Integer integer) throws Exception {
                        Log.d("wyz","flatMap:" + Thread.currentThread().getName());
                        return Observable.fromArray(integer);
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d("wyz", "执行完毕:" + Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

一步一步来

当前线程状态(main线程)

  1. 订阅,向上走
  2. 经过第一个操作符observeOn,是控制数据流线程的,跳过
  3. 经过flatMap处理数据流的操作符,跳过
  4. 经过observeOn跳过
  5. 经过subscribeOn,将线程切换到IO线程,现在订阅处于IO线程
  6. 经过doOnSubscribe,执行里面代码,在IO线程中执行  输出:doOnSubscribe2:RxCachedThreadScheduler-1
  7. 经过subscribeOn,将线程切换到main线程,现在订阅处于main线程
  8. 经过doOnSubscribe,执行里面代码,在main线程中执行  输出:doOnSubscribe1:main
  9. 经过map方法,处理数据流的方法,跳过
  10. 经过subscribeOn,将线程切换到IO线程,现在订阅处于IO线程
  11. 经过create方法,在IO线程中执行 输出:create:RxCachedThreadScheduler-1

订阅完成,开始发射数据 注意:从上面结束我们知道,当前处于IO线程哦

当前线程状态(IO线程)

  1. 发射数据,向下走
  2. 经过subscribeOn跳过
  3. 经过map,在IO线程中执行 输出:map1:RxCachedThreadScheduler-1
  4. 经过doOnSubscribe跳过
  5. 经过subscribeOn跳过
  6. 经过doOnSubscribe跳过
  7. 经过subscribeOn跳过
  8. 经过observeOn,将线程切换到IO线程
  9. 经过flatMap执行,在IO线程执行 输出:flatMap:RxCachedThreadScheduler-2
  10. 经过observeOn,将线程切换到main线程
  11. 结束执行onNext方法,在main线程 输出:执行完毕:main

好了,就是一个向上子向下走的过程,这下我理解清楚了....我一开始弄的一锅粥

相关标签: Rxjava