详解用RxJava实现事件总线(Event Bus)
目前大多数开发者使用eventbus或者otto作为事件总线通信库,对于rxjava使用者来说,rxjava也可以轻松实现事件总线,因为它们都依据于观察者模式。
不多说,上代码
/** * rxbus * created by yokeyword on 2015/6/17. */ public class rxbus { private static volatile rxbus defaultinstance; private final subject<object, object> bus; // publishsubject只会把在订阅发生的时间点之后来自原始observable的数据发射给观察者 public rxbus() { bus = new serializedsubject<>(publishsubject.create()); } // 单例rxbus public static rxbus getdefault() { if (defaultinstance == null) { synchronized (rxbus.class) { if (defaultinstance == null) { defaultinstance = new rxbus(); } } } return defaultinstance ; } // 发送一个新的事件 public void post (object o) { bus.onnext(o); } // 根据传递的 eventtype 类型返回特定类型(eventtype)的 被观察者 public <t> observable<t> toobservable (class<t> eventtype) { return bus.oftype(eventtype); // 这里感谢小鄧子的提醒: oftype = filter + cast // return bus.filter(new func1<object, boolean>() { // @override // public boolean call(object o) { // return eventtype.isinstance(o); // } // }) .cast(eventtype); } }
注:
1、subject同时充当了observer和observable的角色,subject是非线程安全的,要避免该问题,需要将 subject转换为一个 serializedsubject ,上述rxbus类中把线程非安全的publishsubject包装成线程安全的subject。
2、publishsubject只会把在订阅发生的时间点之后来自原始observable的数据发射给观察者。
3、oftype操作符只发射指定类型的数据,其内部就是filter+cast(这里非常感谢@小鄧子 的提醒)
public final <r> observable<r> oftype(final class<r> klass) { return filter(new func1<t, boolean>() { @override public final boolean call(t t) { return klass.isinstance(t); } }).cast(klass); }
filter操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。
cast操作符可以将一个observable转换成指定类型的observable。
分析:
rxbus工作流程图
1、首先创建一个可同时充当observer和observable的subject;
2、在需要接收事件的地方,订阅该subject(此时subject是作为observable),在这之后,一旦subject接收到事件,立即发射给该订阅者;
3、在我们需要发送事件的地方,将事件post至subject,此时subject作为observer接收到事件(onnext),然后会发射给所有订阅该subject的订阅者。
对于rxbus的使用,就和普通的rxjava订阅事件很相似了。
先看发送事件的代码:
rxbus.getdefault().post(new userevent (1, "yoyo"));
userevent是要发送的事件,如果你用过eventbus, 很容易理解,userevent的代码:
public class userevent { long id; string name; public userevent(long id,string name) { this.id= id; this.name= name; } public long getid() { return id; } public string getname() { return name; } }
再看接收事件的代码:
// rxsubscription是一个subscription的全局变量,这段代码可以在oncreate/onstart等生命周期内 rxsubscription = rxbus.getdefault().toobserverable(userevent.class) .subscribe(new action1<userevent>() { @override public void call(userevent userevent) { long id = userevent.getid(); string name = userevent.getname(); ... } }, new action1<throwable>() { @override public void call(throwable throwable) { // todo: 处理异常 } });
最后,一定要记得在生命周期结束的地方取消订阅事件,防止rxjava可能会引起的内存泄漏问题。
@override protected void ondestroy() { super.ondestroy(); if(!rxsubscription.isunsubscribed()) { rxsubscription.unsubscribe(); } }
这样,一个简单的event bus就实现了!如果你的项目已经开始使用rxjava,也许可以考虑替换掉eventbus或otto,减小项目体积。
rxbus、eventbus因为解耦太彻底,滥用的话,项目可维护性会越来越低;一些简单场景更推荐用回调、subject来代替事件总线。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。