EventBus 原理简析
基于 eventbus:3.1.1 分析。
一.注解写法
关键字是@Subscribe
后面括号内可选参数是,threadMode表示可以选择切换到哪个线程。sticky是否粘性事件,priority表示优先级。
ThreadMode: POSTING 表示发送事件的线程,当前线程。
ThreadMode: MAIN 表示UI线程
ThreadMode: MAIN_ORDERED 会先进入主线程队列,然后主线程依次处理队列消息
ThreadMode: BACKGROUND 当前线程是后台线程则在当前线程处理,否则会丢到后台线程池处理。
ThreadMode: ASYNC 同MAIN_ORDERED类似,会丢进后台线程队列,让后台线程池依次处理。
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
ThreadMode threadMode() default ThreadMode.POSTING;
/**
* If true, delivers the most recent sticky event (posted with
* {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
*/
boolean sticky() default false;
/** Subscriber priority to influence the order of event delivery.
* Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
* others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
* delivery among subscribers with different {@link ThreadMode}s! */
int priority() default 0;
}
二.注册与反注册:
EventBus.getDefault().register(subscriber);
EventBus.getDefault().unregister(subscriber);
EventBus 的特点就是可以在任意类注册,不像广播一样必须在有 context 的地方注册。
日常使用,可以把这些放在父类里,比如 BaseActivity 和 BaseFragment 这些。如果某个子类 Activity 根本没有任何Subscriber注解导致报错?没关系 try catch 一下就行。
getDefault()方法就是获取单例EventBus的单例,然后看 register 的调用流程,register 方法会把 this 这个当前类传进去。
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
第三行看这名字,像是『找出所有 @Subscribe 标记的方法』,跟踪一下看看怎么实现的。
经过 findSubscriberMethods -> findUsingInfo -> findUsingReflectionInSingleClass (一般流程),我们到这个最后这个方法里看看
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
可以看到第五行是Java知名的反射用法 java.lang.Class.getDeclaredMethod() 。通过它获取到了类的所有方法,又在for循环下,使用一大堆代码中筛选出了我们想要的。可以看到条件包括以下几个条件,Public方法、非abstract或static等方法、方法的参数只有1个、使用了Subscribe注解。筛选后就得到我们想要的方法了,接着把以下这些信息『方法名 参数类型 注解上写的线程类型 注解上写的优先级 注解上写的是否粘性事件』一个打包放到一个叫 SubscriberMethod 类实例中。
注意在 findUsingInfo 中有不断向上遍历父类的操作。所以这个EventBus的注册操作是可以继承的。
然后回到 register 方法,我们看到紧接着就是对这堆 SubscriberMethod 执行了 subscribe 方法。
首先 EventBus 库有一个类叫 Subscription,两个成员 subscriber 和 subscriberMethod。
final class Subscription {
final Object subscriber;
final SubscriberMethod subscriberMethod;
...
}
了解反射机制的话,让容易知道如果我们有了包含在 SubscriberMethod 里面的方法名、参数类型,又有了这个实例对象。就可以通过反射调用的该方法了。所以 Subscription 把这两个捆绑在一个当做一个整体。
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
...
}
subscribe 方法里主要是把这次注册的类实例和注解方法信息 放到 EventBus 这个控制中心的两个成员变量里面缓存着。
这两个成员变量分别是CopyOnWriteArrayList<Subscription>> subscriptionsByEventType 和 Map<Object, List<Class<?>>> typesBySubscriber。
在对 subscriptionsByEventType 的 Value List<Subscription> 进行添加的时候,会根据优先级插入到指定顺序。这样 Subscriber 方法被调用的时候就可以根据设置的优先级来处理了。
画个图展示一下内部结构就是。
注册流程就到此为止了,下面讲讲反注册流程。调用的是 unregister 这个方法。
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
/** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
这下子对应起上面那两个Map的内部结构图,就很容易看懂了。
先通过 unregister(this) 传进来的 this 实例,在 typesBySubscriber 这个 Map 里面找到这个类实例的所有 Event 类型。然后删除 Subscription 里面,调用者 subscriber == this 的,最后在 typesBySubscriber 把对应的 Map.Entry 删掉。
三.发送事件 和 处理事件:
执行的代码是 EventBus.getDefault().post(new XXEvent())
/** Posts the given event to the event bus. */
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
有一点有趣的是,postSingleEvent 方法里会判断一个属性 eventInheritance,看是否向上查找它的父类。然后搜了下代码,这个 eventInheritance 默认是为true的。让我学会了一个小技巧就是发送的 Event 有继承关系也是可以触达的。
在 postSingleEventForEventType 里,通过 subscriptionsByEventType 这个 Map 类型的属性,找到了这个事件的所有 Subscription。然后就是遍历这堆 Subscription,逐个执行 postToSubscription 方法,由于涉及到线程调度,所以需要传递一个 postingState.isMainThread 信息。
接下来就是处理事件了
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
根据 Subscription 里面带有的 threadMode 属性,结合发送事件所处在的线程,会进入下面的5种case。本质还是分两种情况,一种是直接在当前线程上处理,即直接调用 invokeSubscriber(subscription, event) 。一种是把事件入队到主线程或后台线程。
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
直接调用 invokeSubscriber(subscription, event) ,就是在这个方法里,利用反射执行这个方法了。
对于另外一种入队的操作。可以看到有类里有3种 Poster。其中 mainThreadPoster 使用的其实是子类 HandlerPoster。
private final Poster mainThreadPoster;
private final BackgroundPoster backgroundPoster;
private final AsyncPoster asyncPoster;
拿BackgroundPoster来举例
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
这3个Poster都分别有一个 PendingPostQueue,看源码可以知道这是队列。里面的结点是一个个的 PendingPost。PendingPost 包含了事件event,我们熟悉的 Subscription,和下一个结点 next。
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;
Subscription subscription;
PendingPost next;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
PendingPost 使用一个静态 List 做缓存,创建一个 PendingPost 会优先从这个 List 里面拿出并赋值(没有的话再考虑 new 一个)。删除也是判断如果没超过1万,则塞入缓存 List 中。
在 BackgroundPoster 的 enqueue 方法里,会先通过 subscription 和 event 生成一个 PendingPost。然后会让这个 Poster 的队列,对刚生成的 PendingPost 进行入队操作。接着是获取 eventBus 的线程池 ExecutorService,让线程池开始工作。在 EventBusBuilder 里面可以看到,默认的线程池是 Executors.newCachedThreadPool(),也就是允许建立无限多线程的线程池。
但是需要注意的是,在 BackgroundPoster 里其实只是拿出一个线程,在 run 方法里通过 while 循环不断拿出队列里的事件来执行。在AsyncPoster 里才是真正的,让线程池放开手脚的,可以无限制的开启新线程来执行。
run方法里核心还是调用到 invokeSubscriber 这个方法,同样利用反射执行到这个我们注解 @Subscriber 标记的方法。
对于主线程 HandlerPoster,是通过参数 Looper.getMainLooper() 创建出一个运行在主线程的 Handler,然后随便发个空消息到这个 Handler,就可以在主线程上一样的从队列拿出事件来处理了。见如下 HandlerPoster里面的源码
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
到这里,整个通用的流程讲完了。
粘性事件
粘性事件就跟Android的粘性广播一样。允许在先发送事件,再进行注册的情况下,也能顺利接受到该事件。
发送粘性事件的代码是这样的 EventBus.getDefault().postSticky(new XXEvent())
private final Map<Class<?>, Object> stickyEvents;
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
可以看到,粘性事件的发送,把事件保存在 stickyEvents 这个 Map 里面。这个 Map 的Value是一个 Object 类型(而不是List类型)也说明了,粘性事件只会保存最后一个事件,并不是全部都存下来。
然后在之前讲到的 subscribe 方法里( register 的时候调用到),有这么一段。
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
...
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
会在开启了 eventInheritance 的情况下, 通过 isAssignableFrom 判断出有没有哪个 Subscriber 订阅了当前这个事件或者当前事件的超类事件。然后通过 checkPostStickyEventToSubscription -> postToSubscription 最终回到我们之前讲到的处理事件的流程。
EventBusBuilder
通过这一次阅读源码,我也了解到了原来 EventBus 支持使用 EventBusBuilder 配置的。
public class EventBusBuilder {
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
boolean logSubscriberExceptions = true;
boolean logNoSubscriberMessages = true;
boolean sendSubscriberExceptionEvent = true;
boolean sendNoSubscriberEvent = true;
boolean throwSubscriberException;
boolean eventInheritance = true;
boolean ignoreGeneratedIndex;
boolean strictMethodVerification;
ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
List<Class<?>> skipMethodVerificationForClasses;
List<SubscriberInfoIndex> subscriberInfoIndexes;
Logger logger;
MainThreadSupport mainThreadSupport;
EventBusBuilder() {
}
/** Default: true */
public EventBusBuilder logSubscriberExceptions(boolean logSubscriberExceptions) {
this.logSubscriberExceptions = logSubscriberExceptions;
return this;
}
/** Default: true */
public EventBusBuilder logNoSubscriberMessages(boolean logNoSubscriberMessages) {
this.logNoSubscriberMessages = logNoSubscriberMessages;
return this;
}
/** Default: true */
public EventBusBuilder sendSubscriberExceptionEvent(boolean sendSubscriberExceptionEvent) {
this.sendSubscriberExceptionEvent = sendSubscriberExceptionEvent;
return this;
}
/** Default: true */
public EventBusBuilder sendNoSubscriberEvent(boolean sendNoSubscriberEvent) {
this.sendNoSubscriberEvent = sendNoSubscriberEvent;
return this;
}
/**
* Fails if an subscriber throws an exception (default: false).
* <p/>
* Tip: Use this with BuildConfig.DEBUG to let the app crash in DEBUG mode (only). This way, you won't miss
* exceptions during development.
*/
public EventBusBuilder throwSubscriberException(boolean throwSubscriberException) {
this.throwSubscriberException = throwSubscriberException;
return this;
}
/**
* By default, EventBus considers the event class hierarchy (subscribers to super classes will be notified).
* Switching this feature off will improve posting of events. For simple event classes extending Object directly,
* we measured a speed up of 20% for event posting. For more complex event hierarchies, the speed up should be
* >20%.
* <p/>
* However, keep in mind that event posting usually consumes just a small proportion of CPU time inside an app,
* unless it is posting at high rates, e.g. hundreds/thousands of events per second.
*/
public EventBusBuilder eventInheritance(boolean eventInheritance) {
this.eventInheritance = eventInheritance;
return this;
}
/**
* Provide a custom thread pool to EventBus used for async and background event delivery. This is an advanced
* setting to that can break things: ensure the given ExecutorService won't get stuck to avoid undefined behavior.
*/
public EventBusBuilder executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}
/**
* Method name verification is done for methods starting with onEvent to avoid typos; using this method you can
* exclude subscriber classes from this check. Also disables checks for method modifiers (public, not static nor
* abstract).
*/
public EventBusBuilder skipMethodVerificationFor(Class<?> clazz) {
if (skipMethodVerificationForClasses == null) {
skipMethodVerificationForClasses = new ArrayList<>();
}
skipMethodVerificationForClasses.add(clazz);
return this;
}
/** Forces the use of reflection even if there's a generated index (default: false). */
public EventBusBuilder ignoreGeneratedIndex(boolean ignoreGeneratedIndex) {
this.ignoreGeneratedIndex = ignoreGeneratedIndex;
return this;
}
/** Enables strict method verification (default: false). */
public EventBusBuilder strictMethodVerification(boolean strictMethodVerification) {
this.strictMethodVerification = strictMethodVerification;
return this;
}
/** Adds an index generated by EventBus' annotation preprocessor. */
public EventBusBuilder addIndex(SubscriberInfoIndex index) {
if (subscriberInfoIndexes == null) {
subscriberInfoIndexes = new ArrayList<>();
}
subscriberInfoIndexes.add(index);
return this;
}
/**
* Set a specific log handler for all EventBus logging.
* <p/>
* By default all logging is via {@link android.util.Log} but if you want to use EventBus
* outside the Android environment then you will need to provide another log target.
*/
public EventBusBuilder logger(Logger logger) {
this.logger = logger;
return this;
}
比如说可以配置『继承的时间是否起作用』,可以配置后台线程池等操作。用法如下
EventBus myEventBus =
EventBus.builder()
.eventInheritance(false)
.executorService(Executors
.newSingleThreadExecutor())
.build();
然后把这个 myEventBus 缓存起来,将来通过这个实例发送消息即可。
上一篇: HighCharts 为每个column指定不同的颜色
下一篇: EventBus详解