EventBus实现分析
最近看了一下EventBus实现的源码,分享一下自己的心得体会。
EventBus这个库实现很简单,如官方所说一样,库体积很小。将组件之间的通信通过解耦的方式表现的淋漓尽致,废话不多说,EventBus优点官网上列举了一堆,有兴趣的可以去看一下http://greenrobot.org/eventbus/。
分享主要有以下几个方面:
1)简单使用
- 简单的说明EventBus的使用
2)解决什么问题
- 比如解决了组件之间解耦问题,线程切换问题。
3)实现细节 - EventBus是怎样注册订阅者,怎样通过post将消息发送到订阅者。
4)优化 - EventBus中使用的一些细节优化,考虑很周全。
5)问题 - 针对EventBus中使用的一些问题。
中间可能会介绍新旧版本的区别,新旧版本指的是3.0及以后的版本和以前的版本。要注意的是新旧版本不仅是版本号的区别,包名也不一样。旧版本的包名是de.greenrobot.even,新版本的包名是org.greenrobot.eventbu。
一、简单使用
1.1 新版本使用
注册消息订阅者,代码如下所示:
public class EbNewActivity extends BaseNewActivity {
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_eb_new);
// 注册必须在Fragment初始化之前,因为订阅消息发送是在Fragment中
EventBus.getDefault().register(this);
getSupportFragmentManager()
.beginTransaction()
.add(R.id.frame_content, EbNewFragment.newInstance())
.commit();
}
@Subscribe(threadMode = ThreadMode.MAIN)
public void onEventBusMainEvent(String msg) {
Log.d("onEventBusMainEvent", Thread.currentThread().getName() + "," + msg);
}
// 默认是ThreadMode.POSTING,注意这里参数是Object,与其它不同
@Subscribe
public void onEventBusPostEvent(Object msg) {
Log.i("onEventBusPostEvent", Thread.currentThread().getName() + "," + msg);
}
@Subscribe(threadMode = ThreadMode.BACKGROUND)
public void onEventBusBackgroundEvent(String msg) {
Log.w("onEventBusBgEvent", Thread.currentThread().getName() + "," + msg);
}
@Subscribe(threadMode = ThreadMode.ASYNC)
public void onEventBusAsyncEvent(String msg) {
Log.e("onEventBusAsyncEvent", Thread.currentThread().getName() + "," + msg);
}
@Override
protected void onDestroy() {
super.onDestroy();
// 注销,否则会导致Activity内存泄露
EventBus.getDefault().unregister(this);
}
}
activity_eb_new.xml布局文件很简单,里面只有一个LinearLayout,代码如下所示:
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout
xmlns:android="http://schemas.android.com/apk/res/android"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:orientation="vertical"
android:id="@+id/frame_content"
/>
@Subscribe注解除了指定EventBus的线程模型,还可以指定订阅者的优先级以及是否为sticky模式。
优先级的意思是当EventBus#post消息时,根据订阅的方法的优先级来执行相应的方法。优先级高的即priority的值越大的,先收到订阅消息。在注册订阅者时,就会根据这个priority的大小对顺序进行排序。代码如下所示:
org.greenrobot.eventbus.EventBus#subscribe
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
// 根据优先级进行排序
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
}
stick模式是指当EventBus#postSticky消息时,该消息会被缓存,当下次调用EventBus#register注册订阅者后,会把相应接收消息的方法都执行一遍。
可能描述太抽象,举个例子:一个项目组在一个房间封闭开发项目,其中测试人员和UI设计都不在。这个时候产品经理跑过来说,改个需求,这时候只有开发人员都知道。产品经理口头通知改需求就相当于post,发送的消息只有已经注册过接收者接收。产品经理把要改的需求打印到一张纸上,然后粘在门上,并向房间里面的开发人员描述了修改的需求。测试人员和UI设计回来之后,发现粘在门上的需求变更通知,也都知道了。产品经理把修改的需求粘在门上并向室内开发人员描述修改的需求这个过程就相当于postSticky,后面注册的接收者也会收到这个消息,先注册的就不用说了。
postSticky可以用来做延迟接收消息。下面看下源码的实现:
postSticky操作:
1)缓存消息
org.greenrobot.eventbus.EventBus#postSticky
public void postSticky(Object event) {
synchronized (stickyEvents) {
// 这里会缓存消息
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
2)接收消息
org.greenrobot.eventbus.EventBus#register
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
// 这里处理订阅的方法
subscribe(subscriber, subscriberMethod);
}
}
}
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// ... 中间代码省略
if (subscriberMethod.sticky) {
// 重点看这里
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
// 执行消息会到这里
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
// 执行消息会到这里
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
// --> Strange corner case, which we don't take care of here.
// 执行消息或者加入消息队列中
postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper());
}
}
postToSubscription方法中会根据线程模型来执行订阅的方法。也就是后注册进来的订阅者也会收到之前的postStick消息。
- 线程模型
EventBus的线程模型有四种,分别如下所示:
public enum ThreadMode {
POSTING,
MAIN,
BACKGROUND,
ASYNC
}
POSTING表示在当前线程执行,也就是post方法在哪个线程中执行,对应的接收消息的方法就在哪个线程执行;
MAIN表示接收的消息在UI线程中执行;
BACKGROUND表示接收的消息在非UI线程中执行,如果当前是在非UI线程中执行post,则会在当前线程中执行接收消息的方法;如果是在UI线程中执行post方法,则会在非UI线程中执行接收消息的方法。
ASYNC表示在与执行post线程不同的另外一个非UI线程中执行接收消息的方法。
上面的解释可能不太好理解,上面的代码日志输出如下所示:
onEventBusAsyncEvent: pool-1-thread-1,onAttach
onEventBusMainEvent: main,onAttach
onEventBusBgEvent: pool-1-thread-2,onAttach
onEventBusPostEvent: main,onAttach
对比下日志前面的线程名称就清楚了。
-
发送消息
public class EbNewFragment extends Fragment {
public static EbNewFragment newInstance() { EbNewFragment fragment = new EbNewFragment(); return fragment; } @Override public void onAttach(Context context) { super.onAttach(context); // 发送消息 EventBus.getDefault().post("onAttach"); }
}
因为在Activity中注册的方法接收的消息类型是String和Object(eventInheritance参数为true,第二篇会解释),所以这里post(“onAttach”)后,那些方法都会收到消息。
1.2 旧版本使用
老版本接收消息的订阅方法都是以onEvent开头,代码如下所示:
public class EbOldActivity extends Activity {
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_eb_old);
EventBus.getDefault().register(this);
getSupportFragmentManager()
.beginTransaction()
.add(R.id.frame_content, EbOldFragment.newInstance())
.commit();
}
public void onEvent(String msg) {
log("onEvent", msg);
}
public void onEventMainThread(String msg) {
log("onEventMainThread", msg);
}
public void onEventBackgroundThread(String msg) {
log("onEventBackgroundThread", msg);
}
public void onEventAsync(String msg) {
log("onEventAsync", msg);
}
@Override
protected void onDestroy() {
super.onDestroy();
EventBus.getDefault().unregister(this);
}
}
activity_eb_old.xml布局文件内容与上面的activity_eb_new.xml布局文件类似。
onEvent后缀表示线程对应的线程模型,对应关系看下面的源码就清楚了:
de.greenrobot.event.SubscriberMethodFinder#findSubscriberMethods部分代码:
if (methodName.startsWith(ON_EVENT_METHOD_NAME)) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
// ON_EVENT_METHOD_NAME为onEvent
String modifierString = methodName.substring(ON_EVENT_METHOD_NAME.length());
ThreadMode threadMode;
if (modifierString.length() == 0) {
// onEvent方法
threadMode = ThreadMode.PostThread;
} else if (modifierString.equals("MainThread")) {
// onEventMainThread方法
threadMode = ThreadMode.MainThread;
} else if (modifierString.equals("BackgroundThread")) {
// onEventBackgroundThread方法
threadMode = ThreadMode.BackgroundThread;
} else if (modifierString.equals("Async")) {
// onEventAsync方法
threadMode = ThreadMode.Async;
} else {
if (skipMethodVerificationForClasses.containsKey(clazz)) {
continue;
} else {
throw new EventBusException("Illegal onEvent method, check for typos: " + method);
}
}
// ...
}
}
}
从上面源码中也可以看出来注册的订阅方法有一些要求:
- 方法必须是public,不能是抽象和静态的;
- 方法参数只能有一个。
- 其它方法不能以onEvent开头,或者必须通过EventBusBuilder来创建EventBus实例,并且调用EventBusBuilder#skipMethodVerificationFor方法注册class,以忽略当前包含其它以onEvent开头的方法。
整个调用流程如下图所示:
OK,简单使用介绍完了,下面看看解决了什么问题。
二、解决什么问题
通过上面的使用,很明显知道EventBus解决了什么问题。
2.1 通信解耦
通过EventBus.register来注册订阅者,通过注解反射的方式提取接收消息的方法,再通过EventBus.post发送消息,从接收消息方法的缓存列表中找到对应消息接收者进行消息处理。这个时候也是通过反射调用方法。
所以这里是通过反射的方式来做到解耦。
2.2 线程切换
线程模型在老版本上是通过后缀来区别,新版本是以注解的方式来指定的,这样为线程切换提供很大的便利。EventBus自己提供了一个线程池来管理线程,ASYNC和BACKGROUND标记的方法都是在同一个线程池中的线程来执行的。
另外就是EventBus还提供了一个消息队列,发送的消息是先进消息队列还是立及执行,这个是根据线程模型来的。部分源码如下所示:
org.greenrobot.eventbus.EventBus#postToSubscription
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
// 立及执行
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
// 立及执行
invokeSubscriber(subscription, event);
} else {
// UI线程的消息队列
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
// 后台线程的消息队列
backgroundPoster.enqueue(subscription, event);
} else {
// 立及执行
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
// 异步线程的消息队列
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
消息队列有三种,UI线程执行消息的消息队列、后台线程的消息队列、异步线程的消息队列。
三、实现细节
前面在介绍EventBus的使用时,也顺带着讲了一些细节。下面先从两个流程开始,一个是注册流程,另一个是发送消息并且处理的流程。
3.1 注册流程
注册过程要根据消息类型缓存对应消息接收的方法,缓存的集合是一个HashMap,缓存集合代码如下所示:
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
其中键是消息类型的Class对象,值中的org.greenrobot.eventbus.Subscription的对象中包含当前方法所在的对象,即订阅者,还包含订阅的方法。代码如下所示:
final class Subscription {
final Object subscriber;
final SubscriberMethod subscriberMethod;
// ... 其它代码省略
}
public class SubscriberMethod {
// 订阅方法
final Method method;
// 线程模型
final ThreadMode threadMode;
// 消息类型
final Class<?> eventType;
// 优先级
final int priority;
// stick模式
final boolean sticky;
/** Used for efficient comparison */
String methodString;
// ...其它代码省略
}
中间的解析缓存过程如下图所示:
3.2 消息处理流程
上面的缓存过程在这一步起作用了。
1)暂存消息到消息队列
当调用EventBus#post消息时,先从当前线程中取到消息队列,然后将消息放入到消息队列中,再从消息队列中取消息进行处理。代码如下所示:
org.greenrobot.eventbus.EventBus#post
public void post(Object event) {
// 每个线程都有一个消息队列,避免同步,提高效率
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
// 参数准备
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
// 执行消息
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
2)找到消息类型对应方法
这一步主要是从消息类型的HashMap缓存subscriptionsByEventType中取出对应的方法集合。代码如下postSingleEventForEventType方法所示:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
// eventInheritance默认为true
if (eventInheritance) {
// 查找所有event类型
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
// 遍历event的类型,针对每种类型都会处理
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
// ... 省略部分代码
}
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
// ... 省略部分代码
}
// ...省略部分代码
}
return true;
}
return false;
}
在上面postSingleEvent方法中使用一个eventInheritance参数,这个参数默认为true,也就是说消息类型对应的父类都会被找出来。上一节的示例中,如果有订阅方法的参数是Object,post消息是String,这时候对应处理消息的方法参数是Object的方法也会被执行;如果是false,则参数为Object的方法不会执行,只有参数为String的方法会执行。这个参数是在EventBusBuilder中设置后,创建EventBus对象时使用。
3)执行对应方法
postToSubscription是将消息加入执行列表中还是直接执行,这个是根据线程模型来的,前面有说过。下面就不贴代码了。
四、优化
在最后将消息加入到消息队列中的时候,EventBus在处理消息时,做了优化处理。
4.1 主线程执行消息
当消息需要在UI线程中执行时,消息可能会被加入到消息队列中,也就是加入到mainThreadPoster队列中。代码如下所示:
org.greenrobot.eventbus.EventBus#postToSubscription
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
// 部分代码省略...
}
}
在主线程中执行消息代码如下所示:
final class HandlerPoster extends Handler {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
// 这个值是10,即10ms
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
// 触发handleMessage方法被调用
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
// 处理消息超过10ms
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
// 结束循环
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
这里做了一个优化处理是当EventBusUI线程中待执行的消息队列queue中的消息执行耗时超过maxMillisInsideHandleMessage,即10ms,就会停止执行queue消息队列中的消息,不阻碍App中UI线程中的其他消息执行。等App中的UI线程中的消息执行完后,再执行到这里的handleMessage方法。在handleMessage方法中的sendMessage(obtainMessage())操作就是这个意思。
4.2 后台线程执行消息
同样,EventBus执行后台线程(非UI线程)中的消息队列中的消息时,也使用了优化。org.greenrobot.eventbus.EventBus#backgroundPoster为后台线程的消息队列,
final class BackgroundPoster implements Runnable {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 先加入到队列中
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
// 最后会在线程池中的线程中执行run方法
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
// 如果消息队列中有消息,则立及返回,否则会等1秒钟,再从消息队列取消息
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
// 执行对应订阅的方法
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
while循环中的queue.poll方法如下所示:
org.greenrobot.eventbus.PendingPostQueue
synchronized PendingPost poll() {
PendingPost pendingPost = head;
if (head != null) {
head = head.next;
if (head == null) {
tail = null;
}
}
return pendingPost;
}
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
if (head == null) {
// 线程等待
wait(maxMillisToWait);
}
// 这里调用上面的poll方法
return poll();
}
上面这种做法避免了线程切换带来的开销。也就是说,如果两次post消息间隔是在1秒内,则有可能这两次消息都是在同一个线程中执行,而不是两个线程。但不知道为什么这个值是1秒。
五、问题
5.1 订阅方法继承问题
5.1.1 新版本订阅方法的继承问题
- 子类override的方法,@subscribe注解不管是写在子类方法上还是基类的方法上,子类方法都会被执行。
前提条件是订阅对象使用的是子类对象,因为是遵循java多态语法规则,子类override基类中的方法,所以调用的是子类中的方法。如果子类方法中调用了super,则会调用基类方法。如果没有调用,则会完全override基类方法,基类方法不会执行。 - 基类方法上使用@subscribe注解后,子类override的方法则不需要注解标记,即使加了效果也是一样的。
5.1.2 旧版本的继承问题
旧版本解析订阅方法都是以onEvent开头的,规则更简单一点,override方法完全按照java多态语法来。
5.2 订阅方法重载问题
重载的方法如果也标记为订阅方法,消息类型都兼容,则都会执行。什么意思呢?就像前面的举例,如果post的是String类型,重载的参数是String和Object类型,则Object类型的方法也会执行。如果post的是Object类型,那么String参数的订阅方法不会被执行,Object参数的订阅方法会被执行。
为什么都订阅的重载方法都会执行呢?因为它们都订阅了啊,我是认真的。
六、写在最后
最后总结一下,用EventBus要注意register方法和unregister方法配对使用,否则容易导致内存泄露。EventBus要灵活使用,建议在项目某一解耦层使用,如果大面积使用会导致项目缺乏逻辑性,使用不当会导致代码臃肿,所以注意使用场景。
原文链接:http://www.jacpy.com/2018/07/26/eventbus-implement-analysis-2.html
上一篇: mysql 查询拓展 触发器 交叉表 存储过程_MySQL
下一篇: EventBus使用详解