欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java扩展库RxJava的基本结构与适用场景小结

程序员文章站 2024-03-12 20:53:44
基本结构 我们先来看一段最基本的代码,分析这段代码在rxjava中是如何实现的。 observable.onsubscribe on...

基本结构

我们先来看一段最基本的代码,分析这段代码在rxjava中是如何实现的。

observable.onsubscribe<string> onsubscriber1 = new observable.onsubscribe<string>() {
  @override
  public void call(subscriber<? super string> subscriber) {
    subscriber.onnext("1");
    subscriber.oncompleted();
  }
};
subscriber<string> subscriber1 = new subscriber<string>() {
  @override
  public void oncompleted() {

  }

  @override
  public void onerror(throwable e) {

  }

  @override
  public void onnext(string s) {

  }
};

observable.create(onsubscriber1)
    .subscribe(subscriber1);

首先我们来看一下observable.create的代码

public final static <t> observable<t> create(onsubscribe<t> f) {
  return new observable<t>(hook.oncreate(f));
}

protected observable(onsubscribe<t> f) {
  this.onsubscribe = f;
}

直接就是调用了observable的构造函数来创建一个新的observable对象,这个对象我们暂时标记为observable1,以便后面追溯。
同时,会将我们传入的onsubscribe对象onsubscribe1保存在observable1的onsubscribe属性中,这个属性在后面的上下文中很重要,大家留心一下。

接下来我们来看看subscribe方法。

public final subscription subscribe(subscriber<? super t> subscriber) {
  return observable.subscribe(subscriber, this);
}

private static <t> subscription subscribe(subscriber<? super t> subscriber, observable<t> observable) {
  ...
  subscriber.onstart();
  hook.onsubscribestart(observable, observable.onsubscribe).call(subscriber);
  return hook.onsubscribereturn(subscriber);
}

可以看到,subscribe之后,就直接调用了observable1.onsubscribe.call方法,也就是我们代码中的onsubscribe1对象的call方法
,传入的参数就是我们代码中定义的subscriber1对象。call方法中所做的事情就是调用传入的subscriber1对象的onnext和oncomplete方法。
这样就实现了观察者和被观察者之间的通讯,是不是很简单?

public void call(subscriber<? super string> subscriber) {
  subscriber.onnext("1");
  subscriber.oncompleted();
}

rxjava使用场景小结

1.取数据先检查缓存的场景
取数据,首先检查内存是否有缓存
然后检查文件缓存中是否有
最后才从网络中取
前面任何一个条件满足,就不会执行后面的

final observable<string> memory = observable.create(new observable.onsubscribe<string>() {
  @override
  public void call(subscriber<? super string> subscriber) {
    if (memorycache != null) {
      subscriber.onnext(memorycache);
    } else {
      subscriber.oncompleted();
    }
  }
});
observable<string> disk = observable.create(new observable.onsubscribe<string>() {
  @override
  public void call(subscriber<? super string> subscriber) {
    string cachepref = rxpreferences.getstring("cache").get();
    if (!textutils.isempty(cachepref)) {
      subscriber.onnext(cachepref);
    } else {
      subscriber.oncompleted();
    }
  }
});

observable<string> network = observable.just("network");

//主要就是靠concat operator来实现
observable.concat(memory, disk, network)
.first()
.subscribeon(schedulers.newthread())
.subscribe(s -> {
  memorycache = "memory";
  system.out.println("--------------subscribe: " + s);
});

2.界面需要等到多个接口并发取完数据,再更新

//拼接两个observable的输出,不保证顺序,按照事件产生的顺序发送给订阅者
private void testmerge() {
  observable<string> observable1 = demoutils.createobservable1().subscribeon(schedulers.newthread());
  observable<string> observable2 = demoutils.createobservable2().subscribeon(schedulers.newthread());

  observable.merge(observable1, observable2)
      .subscribeon(schedulers.newthread())
      .subscribe(system.out::println);
}

3.一个接口的请求依赖另一个api请求返回的数据

举个例子,我们经常在需要登陆之后,根据拿到的token去获取消息列表。

这里用rxjava主要解决嵌套回调的问题,有一个专有名词叫callback hell

networkservice.gettoken("username", "password")
  .flatmap(s -> networkservice.getmessage(s))
  .subscribe(s -> {
    system.out.println("message: " + s);
  });

4.界面按钮需要防止连续点击的情况

rxview.clicks(findviewbyid(r.id.btn_throttle))
  .throttlefirst(1, timeunit.seconds)
  .subscribe(avoid -> {
    system.out.println("click");
  });

5.响应式的界面

比如勾选了某个checkbox,自动更新对应的preference

sharedpreferences preferences = preferencemanager.getdefaultsharedpreferences(this);
rxsharedpreferences rxpreferences = rxsharedpreferences.create(preferences);

preference<boolean> checked = rxpreferences.getboolean("checked", true);

checkbox checkbox = (checkbox) findviewbyid(r.id.cb_test);
rxcompoundbutton.checkedchanges(checkbox)
    .subscribe(checked.asaction());

6.复杂的数据变换

observable.just("1", "2", "2", "3", "4", "5")
  .map(integer::parseint)
  .filter(s -> s > 1)
  .distinct()
  .take(3)
  .reduce((integer, integer2) -> integer.intvalue() + integer2.intvalue())
  .subscribe(system.out::println);//9