Java扩展库RxJava的基本结构与适用场景小结
基本结构
我们先来看一段最基本的代码,分析这段代码在rxjava中是如何实现的。
observable.onsubscribe<string> onsubscriber1 = new observable.onsubscribe<string>() { @override public void call(subscriber<? super string> subscriber) { subscriber.onnext("1"); subscriber.oncompleted(); } }; subscriber<string> subscriber1 = new subscriber<string>() { @override public void oncompleted() { } @override public void onerror(throwable e) { } @override public void onnext(string s) { } }; observable.create(onsubscriber1) .subscribe(subscriber1);
首先我们来看一下observable.create的代码
public final static <t> observable<t> create(onsubscribe<t> f) { return new observable<t>(hook.oncreate(f)); } protected observable(onsubscribe<t> f) { this.onsubscribe = f; }
直接就是调用了observable的构造函数来创建一个新的observable对象,这个对象我们暂时标记为observable1,以便后面追溯。
同时,会将我们传入的onsubscribe对象onsubscribe1保存在observable1的onsubscribe属性中,这个属性在后面的上下文中很重要,大家留心一下。
接下来我们来看看subscribe方法。
public final subscription subscribe(subscriber<? super t> subscriber) { return observable.subscribe(subscriber, this); } private static <t> subscription subscribe(subscriber<? super t> subscriber, observable<t> observable) { ... subscriber.onstart(); hook.onsubscribestart(observable, observable.onsubscribe).call(subscriber); return hook.onsubscribereturn(subscriber); }
可以看到,subscribe之后,就直接调用了observable1.onsubscribe.call方法,也就是我们代码中的onsubscribe1对象的call方法
,传入的参数就是我们代码中定义的subscriber1对象。call方法中所做的事情就是调用传入的subscriber1对象的onnext和oncomplete方法。
这样就实现了观察者和被观察者之间的通讯,是不是很简单?
public void call(subscriber<? super string> subscriber) { subscriber.onnext("1"); subscriber.oncompleted(); }
rxjava使用场景小结
1.取数据先检查缓存的场景
取数据,首先检查内存是否有缓存
然后检查文件缓存中是否有
最后才从网络中取
前面任何一个条件满足,就不会执行后面的
final observable<string> memory = observable.create(new observable.onsubscribe<string>() { @override public void call(subscriber<? super string> subscriber) { if (memorycache != null) { subscriber.onnext(memorycache); } else { subscriber.oncompleted(); } } }); observable<string> disk = observable.create(new observable.onsubscribe<string>() { @override public void call(subscriber<? super string> subscriber) { string cachepref = rxpreferences.getstring("cache").get(); if (!textutils.isempty(cachepref)) { subscriber.onnext(cachepref); } else { subscriber.oncompleted(); } } }); observable<string> network = observable.just("network"); //主要就是靠concat operator来实现 observable.concat(memory, disk, network) .first() .subscribeon(schedulers.newthread()) .subscribe(s -> { memorycache = "memory"; system.out.println("--------------subscribe: " + s); });
2.界面需要等到多个接口并发取完数据,再更新
//拼接两个observable的输出,不保证顺序,按照事件产生的顺序发送给订阅者 private void testmerge() { observable<string> observable1 = demoutils.createobservable1().subscribeon(schedulers.newthread()); observable<string> observable2 = demoutils.createobservable2().subscribeon(schedulers.newthread()); observable.merge(observable1, observable2) .subscribeon(schedulers.newthread()) .subscribe(system.out::println); }
3.一个接口的请求依赖另一个api请求返回的数据
举个例子,我们经常在需要登陆之后,根据拿到的token去获取消息列表。
这里用rxjava主要解决嵌套回调的问题,有一个专有名词叫callback hell
networkservice.gettoken("username", "password") .flatmap(s -> networkservice.getmessage(s)) .subscribe(s -> { system.out.println("message: " + s); });
4.界面按钮需要防止连续点击的情况
rxview.clicks(findviewbyid(r.id.btn_throttle)) .throttlefirst(1, timeunit.seconds) .subscribe(avoid -> { system.out.println("click"); });
5.响应式的界面
比如勾选了某个checkbox,自动更新对应的preference
sharedpreferences preferences = preferencemanager.getdefaultsharedpreferences(this); rxsharedpreferences rxpreferences = rxsharedpreferences.create(preferences); preference<boolean> checked = rxpreferences.getboolean("checked", true); checkbox checkbox = (checkbox) findviewbyid(r.id.cb_test); rxcompoundbutton.checkedchanges(checkbox) .subscribe(checked.asaction());
6.复杂的数据变换
observable.just("1", "2", "2", "3", "4", "5") .map(integer::parseint) .filter(s -> s > 1) .distinct() .take(3) .reduce((integer, integer2) -> integer.intvalue() + integer2.intvalue()) .subscribe(system.out::println);//9