欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rxjava2.0 线程切换的原理及SubscribeOn,ObserveOn的用法

程序员文章站 2024-02-28 10:00:04
...

android开发过程中经常会用到线程切换,比如数据加载、文件存储、数据库操作等都是在io线程处理,而处理结果的展示刷新ui需要在ui线程进行。
如果不用rxjava,我们可能会asycTask,或者retrofit自己默认的callback来在ui线程刷新ui。但是当我们的操作变复杂时,比如一个接口的数据依赖于另一个接口的返回,或者一次上传多张图片功能,就会造成接口多层嵌套进而增加维护成本

注:由于observable实现了observablesource接口所以下文所说的observable与source等价

线程切换方式

先上一段代码及运行结果,呈现出一种直观的认识

        Observable.just("Some String")
                .map(new Function<String, Integer>() {

                    @Override
                    public Integer apply(String s) throws Exception {
                        Log.d("cong", "1 threadName:"+Thread.currentThread().getName());
                        return s.length();
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        Log.d("cong", "2 threadName:"+Thread.currentThread().getName());
                        return 2 * integer;
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        Log.d("cong", "3 threadName:"+Thread.currentThread().getName());
                        return 2 * integer;
                    }
                })
                .observeOn(Schedulers.computation()) // change thread
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        Log.d("cong", "4 threadName:"+Thread.currentThread().getName());
                        return 2 * integer;
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        Log.d("cong", "5 threadName:"+Thread.currentThread().getName());
                        return 2 * integer;
                    }
                })
                .observeOn(Schedulers.newThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d("cong", "6 threadName:"+Thread.currentThread().getName());
                    }
                });

rxjava2.0 线程切换的原理及SubscribeOn,ObserveOn的用法
可以发现5处线程切换其实只生效了3处,第一个subscribeOn,两个ObserveOn。
要理解原因必须要了解rxjava操作符的设计模式,rxjava操作符的设计其实是大量利用包装者模式来包装observer,observable。

以map操作符为例:

public interface Function<T, R> {
   
   //传递一个类型的值并将值转换为另一个类型
    R apply(@NonNull T t) throws Exception;
}

 public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        
        // 新生成一个observable,并且将本类,及mapper 作为参数传递到新的observable 中
        return  new ObservableMap<T, R>(this, mapper));
 }
  
 public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
     
       /**1: 每个source其实就是调用操作符的时候的observable主体即被包装的observable
       *  2: MapperObserver是对当前source接收的observer类型的封装,当upstreamSource observable类型的数据传递过来的时候会在mapperobserser中做转换,最后调用到真实的被包装的observer的onnext
       *       
       *  **/
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //真实的被包装的observer
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}
     

线程切换操作符的原理类似,首先我们还是先给出第一个示例的流程图。
由于source包装的都是调用操作符的source,及所谓的upstreamsource

什么是upstream downstream

        upstream       downstream

source <------------- operator ------------------> consumer/further operators
操作符的左边是upstream 操作符的右边是dowmstream

操作符中理解这张图的关键就是subscribeOn既会影响upstream也会影响downstream,而observeOn只会影响downStream

图中向上的箭头是subscribe过程,向下的箭头是onnext通知过程

rxjava2.0 线程切换的原理及SubscribeOn,ObserveOn的用法