欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  移动技术

RxJava2 线程调度的方法

程序员文章站 2022-05-26 16:20:39
subscribeon和observeon负责线程切换,同时某些操作符也默认指定了线程. 我们这里不分析在线程中怎么执行的.只看如何切换到某个指定线程. subscri...

subscribeon和observeon负责线程切换,同时某些操作符也默认指定了线程.

我们这里不分析在线程中怎么执行的.只看如何切换到某个指定线程.

subscribeon

observable.subscribeon()在方法内部生成了一个observablesubscribeon对象.

主要看一下observablesubscribeon的subscribeactual方法.

 @override
  public void subscribeactual(final observer<? super t> observer) {
    final subscribeonobserver<t> parent = new subscribeonobserver<t>(observer);
    //调用下游的observer的onsubscribe方法
    observer.onsubscribe(parent);
    //通过subscribetask执行了上游observable的subscribeactual方法
    parent.setdisposable(scheduler.scheduledirect(new subscribetask(parent)));
  }

scheduler.scheduledirect(runnable)用于执行subscribetask这个任务.subscribetask本身是runnable的实现类.看一下其run方法.

    @override
    public void run() {
      //上游的observable.subscribe方法被切换到了新的线程
      source.subscribe(parent);
    }

首先可以得出结论:subscribeon将上游的observable的subscribe方法切换到了新的线程.

如果多次调用subscribeon切换线程,会有什么效果?

由下往上,每次调用subscribeon,都会导致上游的observable的subscribeactual切换到指定的线程.那么最后一次调用的切换最上游的创建型操作符的subscribeactual的执行线程.如果操作符有默认执行线程怎么办?

操作符默认线程

如果是创建型操作符,处于最上游,那么subscribeon的线程切换对它不起作用.天高皇帝远,县官不如现管.就是这个道理.
如果是其它操作符,会是怎样的?

以操作符timeout为例:它对应observabletimeouttimed和timeoutobserver

 @override
    public void onnext(t t) {
      downstream.onnext(t);
      //超时计时
      starttimeout(idx + 1);
    }

    void starttimeout(long nextindex) {
      //交给操作符默认的线程执行
      task.replace(worker.schedule(new timeouttask(nextindex, this), timeout, unit));
    }

    @override
    public void onerror(throwable t) {
        downstream.onerror(t); 
    }

    @override
    public void oncomplete() {
        downstream.oncomplete();
      }
    }

    @override
    public void ontimeout(long idx) {
        downstream.onerror(new timeoutexception(timeoutmessage(timeout, unit)));
    }

//timeouttask.java
static final class timeouttask implements runnable {

    @override
    public void run() {
      parent.ontimeout(idx);
    }
  }

可以看到操作符默认的执行线程只用来做超时计时任务,如果超时了,会在操作符的默认线程执行onerror方法..操作符默认线程对下游的observer造成什么影响要做具体对待.

observeon

observeon对应observableobserveonobserveonobserver.

 //observableobserveon.java
 @override
  protected void subscribeactual(observer<? super t> observer) {
    if (scheduler instanceof trampolinescheduler) {
      source.subscribe(observer);
    } else {
      scheduler.worker w = scheduler.createworker();
      source.subscribe(new observeonobserver<t>(observer, w, delayerror, buffersize));
    }
  }
 //observeonobserver.java 
  @override
    public void onsubscribe(disposable d) {
      if (disposablehelper.validate(this.upstream, d)) {
        if (d instanceof queuedisposable) {
          if (m == queuedisposable.sync) {
          //执行下游observer的onsubscribe方法
            downstream.onsubscribe(this);
            schedule();
            return;
          }
          if (m == queuedisposable.async) {
           //执行下游observer的onsubscribe方法
            downstream.onsubscribe(this);
            return;
          }
        }
         //执行下游observer的onsubscribe方法
        downstream.onsubscribe(this);
      }
    }
    @override
    public void onnext(t t) {
     //省略
      schedule();
    }
    @override
    public void onerror(throwable t) {
     //省略
      schedule();
    }
     void schedule() {
      if (getandincrement() == 0) {
      /*
      observeonobserver是runnable的实现类.交给线程池执行
      */
        worker.schedule(this);
      }
    }
    
    
    void drainnormal() {
      final observer<? super t> a = downstream;
      for (;;) {
        for (;;) {
          t v;
          try {
            v = q.poll();
          } catch (throwable ex) {
            a.onerror(ex);
            return;
          }
          //执行下游observer的onnext方法
          a.onnext(v);
        }
      }
    }

    void drainfused() {
      for (;;) {
        if (!delayerror && d && ex != null) {
          //执行下游observer的onerror方法
          downstream.onerror(error);
          return;
        }
        downstream.onnext(null);
        if (d) {
          ex = error;
          if (ex != null) {
            //执行下游observer的onerror方法
            downstream.onerror(ex);
          } else {
            //执行下游observer的oncomplete方法
            downstream.oncomplete();
          }
          return;
        }
      }
    }
    //执行线程任务
    @override
    public void run() {
      if (outputfused) {
        drainfused();
      } else {
        drainnormal();
      }
    }

从上面可以看出observableobserveon在其subscribeactual方法中并没有切换上游observable的subscribe方法的执行线程.但是observeonobserver在其onnext,onerror和oncomplete中通过schedule()方法将下游observer的各个方法切换到了新的线程.

得出结论: observeon负责切换的是下游observer的各个方法的执行线程

如果下游多次通过observeon切换线程,会有什么效果?

每次切换都会对其下游造成影响,直到遇到下一个observeon为止.

observer(onsubscribe,onnext,onerror,oncomplete)

onnext,onerror,oncomplete与上游最近的observeon所切换的线程保持一致.onsubscribe则不同.
遇到线程切换的时候,会首先在对应的observable的subscribeactual方法内,先调用observer.onsubscribe方法.而observer.onsubscribe会逐级向上传递直到最上游,而最上游的observer.onsubscribe是在subscribeactual方法内调用,这是在主线程执行的.所以onsubscribe方法无论如何都是在主线程执行.

doonsubscribe

.doonsubscribe(new consumer<disposable>() {
          @override
          public void accept(disposable disposable) throws exception {
           
          }
        })

我们要看的是方法accept的执行线程.

通过源码找到对应的disposablelambdaobserver.

 @override
  public void onsubscribe(disposable d) {
  //在这里调用了accept方法.
      onsubscribe.accept(d);
  }

这就要看上游在哪个线程执行了observer.onsubscribe(disposable)方法.

在创建型操作符的subscribeactual方法和subscribeon对应的observable的subscribeactual方法内调用了observer.onsubscribe(disposable)方法.那么这两处的执行线程就决定了onsubscribe.accept(d);的执行线程.

dofinally

对应observabledofinally和dofinallyobserver

 //dofinallyobserver.java
 @override
    public void onerror(throwable t) {
      runfinally();
    }

    @override
    public void oncomplete() {
      runfinally();
    }

    @override
    public void dispose() {
      runfinally();
    }
    
     void runfinally() {
       onfinally.run();
    }

可以看到与它所对应的dofinallyobserver的onerror,oncomplete,dispose方法的执行线程有关,这三个方法的执行线程又受到上游的observeon的影响.如果没有observeon,则会受到最上游的observable.subscribeactual方法影响.

doonerror

对应observabledooneach和dooneachobserver

//dooneachobserver.java
 @override
    public void onerror(throwable t) {
        onerror.accept(t);
    }

和自身对应的observer.onerror所在线程保持一致.

doonnext

对应observabledooneach和dooneachobserver

//dooneachobserver.java
 @override
    public void onnext(t t) {
        onnext.accept(t);
    }

和自身对应的observer.onnext所在线程保持一致.

操作符对应方法参数的执行线程

包io.reactivex.functions下的接口类一般用于处理上游数据然后往下传递.这些接口类的方法一般在对应的observer.onnext中调用.所以他们的线程保持一致.

总结:

subscribeon由下往上逐级切换observable.subscribe的执行线程,不受observeon影响,也不受具有默认指定线程的非创建型操作符影响,但是会被更上游的subscribeon夺取线程切换的权利,直到最上游.如果最上游的创建型操作符也有默认执行线程,那么任何一个subscribeon的线程切换不起作用.subscribeon由下向上到达最上游后,然后由上往下影响下游的observer的执行线程.遇到observeon会被夺取线程切换的权利.observeon影响的是下游的observer的执行线程,由上往下,遇到另一个observeon会移交线程控制权力,遇到指定默认线程非创建型的操作符,要视具体情况对待.

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。