欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

简单谈谈RxJava和多线程并发

程序员文章站 2024-03-05 23:57:43
前言 相信对于rxjava,大家应该都很熟悉,他最核心的两个字就是异步,诚然,它对异步的处理非常的出色,但是异步绝对不等于并发,更不等于线程安全,如果把这几个概念搞混了,...

前言

相信对于rxjava,大家应该都很熟悉,他最核心的两个字就是异步,诚然,它对异步的处理非常的出色,但是异步绝对不等于并发,更不等于线程安全,如果把这几个概念搞混了,错误的使用rxjava,是会来带非常多的问题的。

rxjava与并发

首先让我们来看一段rxjava协议的原文:

observables must issue notifications to observers serially (not in parallel). they may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

如上所述,rxjava对多线程并发其实并没有做非常的多保护,这段话中说,如果多个observables从多个线程中发射数据,必须要满足happens-before原则。

下面来看一个简单的例子:

final publishsubject<integer> subject = publishsubject.create();

subject.subscribe(new subscriber<integer>() {
 @override
 public void oncompleted() {

 }

 @override
 public void onerror(throwable e) {

 }

 @override
 public void onnext(integer integer) {
  unsafecount = unsafecount + integer;
  log.d("tag", "onnext: " + unsafecount);
 }
});

findviewbyid(r.id.send).setonclicklistener(new view.onclicklistener() {
 @override
 public void onclick(view v) {
  final int unit = 1;
  for(int i = 0;i < 10;i++) {
   new thread(new runnable() {
    @override
    public void run() {
     for (int j = 0; j < 1000; j++) {
      subject.onnext(unit);
     }
    }
   }).start();
  }
 }
});

这是一个最典型的多线程问题,从10个线程中发射数据并相加,这样最终得到的答案是小于10000的。虽然使用了rxjava,但是这样的使用对于并发是没有意义的,因为rxjava并没有去处理并发带来的问题。我们可以看下subject的onnext方法的源码,里面很简单,就是调用了对应observer的onnext方法而已。不止是这样,绝大多数的subject都是线程不安全的,所以当你在使用这样的类的时候(典型场景就是自制的rxbus),如果从多个线程中发射数据,那你就要小心了。

对于这样的问题,有两种解决方案:

第一种就是简单的使用传统的解决方法,比如用atomicinteger代替int。

第二种则是使用rxjava的解决方案,在这里就是用serializedsubject去代替subject:

final publishsubject<integer> subject = publishsubject.create();

subject.subscribe(new subscriber<integer>() {
 @override
 public void oncompleted() {

 }

 @override
 public void onerror(throwable e) {

 }

 @override
 public void onnext(integer integer) {
  unsafecount = unsafecount + integer;
  count.addandget(integer);

  log.d("tag", "onnext: " + count);
 }
});

final serializedsubject<integer, integer> ser = new serializedsubject<integer, integer>(subject);

findviewbyid(r.id.send).setonclicklistener(new view.onclicklistener() {
 @override
 public void onclick(view v) {
  final int unit = 1;

  for(int i = 0;i < 10;i++){
   new thread(new runnable() {
    @override
    public void run() {
     for(int j = 0;j < 1000;j++){
      ser.onnext(unit);
     }
    }
   }).start();
  }
 }
});

可以看一下serializedsubject的onnext方法做了什么:

@override
public void onnext(t t) {
 if (terminated) {
  return;
 }
 synchronized (this) {
  if (terminated) {
   return;
  }
  if (emitting) {
   fastlist list = queue;
   if (list == null) {
    list = new fastlist();
    queue = list;
   }
   list.add(nl.next(t));
   return;
  }
  emitting = true;
 }
 try {
  actual.onnext(t);
 } catch (throwable e) {
  terminated = true;
  exceptions.throworreport(e, actual, t);
  return;
 }
 for (;;) {
  for (int i = 0; i < max_drain_iteration; i++) {
   fastlist list;
   synchronized (this) {
    list = queue;
    if (list == null) {
     emitting = false;
     return;
    }
    queue = null;
   }
   for (object o : list.array) {
    if (o == null) {
     break;
    }
    try {
     if (nl.accept(actual, o)) {
      terminated = true;
      return;
     }
    } catch (throwable e) {
     terminated = true;
     exceptions.throwiffatal(e);
     actual.onerror(onerrorthrowable.addvalueaslastcause(e, t));
     return;
    }
   }
  }
 }
}

处理方式很简单,如果有其他线程在发射数据,那就将数据放置到队列中,等待下次发射。这保证了同一时间只会有一个线程调用onnext,oncomplete和onerror这些方法。

但是这样操作显然是会造成性能的影响的,所以rxjava并不会把所有的操作都打上线程安全的标签。

在这里就要引申出一个问题,那就是使用者对create方法的滥用,其实这个方法不应该被使用者频繁的调用的,因为你必须要小心的处理所有的数据发射,接收的逻辑。相反的,使用已有的操作符能很好的解决这个问题,所以下次大家在遇到问题的时候不要简单的使用create去自己写,而是应该想想有没有现成的操作符可以完成相应的需求。

rxjava中的一些操作符

rxjava中有一些操作符也和多线程并发有关,下面让我来讲一讲merge和concat,以及他们的一些变种操作符。

对于多线程发射数据,有时候我们需要得到的结果也保持和发射时候一样的顺序,这个时候如果我们使用merge这个操作符去结合多个发射源,那么就会产生一定的问题了(例子中做了非常不好的示范——使用了create操作符,请大家不要学习这样的写法,这里单纯是为了求证结果)。

observable o1 = observable.create(new observable.onsubscribe<integer>() {
 @override
 public void call(final subscriber<? super integer> subscriber) {
  new thread(new runnable() {
   @override
   public void run() {
    try {
     thread.sleep(1000);
     subscriber.onnext(1);
     subscriber.oncompleted();
    } catch (interruptedexception e) {
     e.printstacktrace();
    }
   }
  }).start();
 }
});
observable o2 = observable.create(new observable.onsubscribe<integer>() {
 @override
 public void call(subscriber<? super integer> subscriber) {
  subscriber.onnext(2);
  subscriber.oncompleted();
 }
});

observable.merge(o1,o2)
  .subscribe(new subscriber<integer>() {
   @override
   public void oncompleted() {

   }

   @override
   public void onerror(throwable e) {

   }

   @override
   public void onnext(integer i) {
    log.d("tag", "onnext: " + i);
   }
  });

对于这样的场景,我们得到的答案将是2,1而不是先得到o1发射的数据,再获取o2的数据。

究其原因,就是因为merge其实就是给什么传什么,也不会去管数据发射的顺序:

@override
public void onnext(observable<? extends t> t) {
  if (t == null) {
    return;
  }
  if (t == observable.empty()) {
    emitempty();
  } else
  if (t instanceof scalarsynchronousobservable) {
    tryemit(((scalarsynchronousobservable<? extends t>)t).get());
  } else {
    innersubscriber<t> inner = new innersubscriber<t>(this, uniqueid++);
    addinner(inner);
    t.unsafesubscribe(inner);
    emit();
  }
}

可以看到在经过lift操作之后,对应的中间人mergesubscriber的onnext,没有什么多余的代码,所以在多个observable从多线程中发射数据的时候,顺序当然不能得到保证。

一个单词说明这个问题:interleaving——交错。merge后的数据源可能是交错的。由于merge有这样数据交错的问题,所以它的变种—flatmap也会有同样的问题。

对于这样的场景,我们可以使用concat操作符来完成:

concat waits to subscribe to each additional observable that you pass to it until the previous observable completes.

根据文档,我们知道concat操作符是一个接一个的处理数据源的数据的。

if (wip.getandincrement() != 0) {
  return;
}

final int delayerrormode = this.delayerrormode;

for (;;) {
  if (actual.isunsubscribed()) {
    return;
  }

  if (!active) {
    if (delayerrormode == boundary) {
      if (error.get() != null) {
        throwable ex = exceptionsutils.terminate(error);
        if (!exceptionsutils.isterminated(ex)) {
          actual.onerror(ex);
        }
        return;
      }
    }

    boolean maindone = done;
    object v = queue.poll();
    boolean empty = v == null;

    if (maindone && empty) {
      throwable ex = exceptionsutils.terminate(error);
      if (ex == null) {
        actual.oncompleted();
      } else
      if (!exceptionsutils.isterminated(ex)) {
        actual.onerror(ex);
      }
      return;
    }

    if (!empty) {

      observable<? extends r> source;

      try {
        source = mapper.call(notificationlite.<t>instance().getvalue(v));
      } catch (throwable mappererror) {
        exceptions.throwiffatal(mappererror);
        drainerror(mappererror);
        return;
      }

      if (source == null) {
        drainerror(new nullpointerexception("the source returned by the mapper was null"));
        return;
      }

      if (source != observable.empty()) {

        if (source instanceof scalarsynchronousobservable) {
          scalarsynchronousobservable<? extends r> scalarsource = (scalarsynchronousobservable<? extends r>) source;

          active = true;

          arbiter.setproducer(new concatmapinnerscalarproducer<t, r>(scalarsource.get(), this));
        } else {
          concatmapinnersubscriber<t, r> innersubscriber = new concatmapinnersubscriber<t, r>(this);
          inner.set(innersubscriber);

          if (!innersubscriber.isunsubscribed()) {
            active = true;

            source.unsafesubscribe(innersubscriber);
          } else {
            return;
          }
        }
        request(1);
      } else {
        request(1);
        continue;
      }
    }
  }
  if (wip.decrementandget() == 0) {
    break;
  }
}

通过源码我们可以知道,active字段就保证了如果上一个数据源还没有发射完数据,就会一直在for循环中等待,直到上一个数据源发射完了数据重置了active字段。

对于concat,其实还存在一个问题,那就是多个observable变成了串行,会大大的增加整个rxjava事件流的处理时间,对于这个场景,我们可以使用concateager来解决。concateager的源码就不带大家分析了,有兴趣的同学可以自行查看。

总结

这篇文章比较短,讲的东西也比较浅显,其实就是讨论了一下rxjava中多线程并发的几个问题。最后我想说,rxjava并不是什么高大上的东西,在你的项目引入之前,要考虑一下是否真的有必要这么做。就算真的有场景需要rxjava,也请不要一口气把项目中所有的操作都换成rxjava,一些简单的操作不一定需要使用rxjava的操作符的实现,用了反而降低了代码的可读性,切勿为了使用rx而使用rx。

好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流。