欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

EventBus框架源码解析下(源码解析)

程序员文章站 2022-06-09 23:33:57
...

转载请标明出处:【顾林海的博客】

个人开发的微信小程序,目前功能是书籍推荐,后续会完善一些新功能,希望大家多多支持!
EventBus框架源码解析下(源码解析)


前言

EventBus是典型的发布订阅模式,多个订阅者可以订阅某个事件,发布者通过发布事件使多个订阅者接受到并处理事件。在EventBus中,事件会被发布到总线中,然后根据订阅者查找到对应的事件,同时把事件传递给订阅者。通过这种形式,可以在Activity与Activity、Fragment与Fragment、Activity与Fragment之间进行通信,降低了通信的成本,但随着事件的增多,整个事件传递的复杂程度会提高,对于刚加入项目的队员来说难以阅读,因此在使用EventBus作为项目之间通信时,可以专门创建一个Event的module,专门用于各个模块或各个页面之间的事件传递,当然良好的注释是非常有必要的。


register方法流程概要

public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}

register是EventBus的订阅者的注册方法,传入订阅者的对象,通过该对象可以找到该订阅者订阅的方法。在方法中通过getClass方法获取订阅者的类型,拿到订阅者类型后通过它获取该订阅者订阅的所有方法,获取对应订阅者所有的订阅方法的操作是在SubscriberMethodFinder类执行的,这个在稍后会讲到,之后通过遍历订阅方法执行subscriber方法。

总结register做的工作:

  1. 拿到订阅者类型。
  2. 拿到订阅者类中定义所有订阅方法和对应的订阅事件
  3. 遍历订阅者中订阅方法并调用subscribe方法。

在发送事件后,EventBus是如何找到订阅该事件的订阅者呢?从上面我们拿到了订阅者以及订阅者相关的订阅信息,接着要做的事情就是将订阅者和订阅事件进行绑定,subscriber方法做了就是这些事情,我们来看看subscriber方法的具体实现。

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    //获取订阅方法内的事件类型
    Class<?> eventType = subscriberMethod.eventType;
    //将订阅者与订阅方法存储在Subscription对象中
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    //根据事件类型获取Subscription对象集合
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        //存储对应事件类型的Subscription集合
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        //如果多次注册会报以下异常
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }

    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            //优先级插入
            subscriptions.add(i, newSubscription);
            break;
        }
    }

    //根据订阅者获取所有订阅方法
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    //添加到订阅者对应的事件集合中
    subscribedEvents.add(eventType);

    //是否为粘性事件
    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            //遍历粘性事件集合(事件类包含自身或当前事件类的子类或子接口)并处理
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            //直接获取事件类对应的粘性事件并处理
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}


private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
    if (stickyEvent != null) {
        //进行事件处理
        postToSubscription(newSubscription, stickyEvent, isMainThread());
    }
}

subscribe方法传入的是订阅者和订阅方法,这个方法可以理解为做一些存储性的工作,subscriptionByEventType集合以事件为键,Subscription集合为值,Subscription存储着订阅者以及对应的订阅方法,我们可以通过事件类型从subscriptionByEventType集合中拿到所有该事件类型对应的订阅信息,处理subscriptionByEventType集合的时机是在后面执行post方法,事件发出后,所有注册过该事件的方法都需要执行,这时就需要subscriptionByEventType来获取对应事件的所有订阅信息。
值得注意的是EventBus在某个订阅者中是不能多次注册的,否则会报订阅类已注册的错误,事件对应的订阅信息添加会通过优先级添加。
订阅信息存储完毕后,通过typesBySubscriber集合以订阅者的类型为键,以该订阅者中的所有订阅事件为值,通过订阅者类型可以从typesBySubscriber集合中获取该订阅者的所有事件类型。
之后会判断当前的订阅方法订阅的事件是否为粘性事件,如果是粘性事件会执行相关的粘性事件,通过postToSubscription方法处理相关逻辑,后面会讲到。

总结subscribe做的工作:

  1. 相关事件(key)与对应的所有订阅信息(value:订阅者和订阅方法)的一个映射,通过subscriptionByEventType集合存储。
  2. 订阅者(key)与对应的所有事件(value)的一个映射,通过typesBySubscriber集合存储。
  3. 对粘性事件的处理。

EventBus的线程模式

subscribe方法最后是对粘性事件进行了处理,在讲解粘性事件之前,我们先来看看EventBus的线程模式。

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
    //线程模式
    ThreadMode threadMode() default ThreadMode.POSTING;

    //是否粘性事件
    boolean sticky() default false;

    //优先级
    int priority() default 0;
}

Subscribe是一个注解类,我们在定义订阅某个方法时会通过它来修饰该方法是EventBus中的事件方法,其中ThreadMode是一个枚举类,它被定义为线程模式,也就是事件处理在哪个线程是由它来指定,sticky方法返回boolean类型,true代表该事件为粘性事件,false代表该事件不是粘性事件,所谓的粘性事件是指当Event发送出去后,如果订阅者没有注册,之后当订阅者注册时,该事件还是会接收到。priority方法的返回的值是int类型,代表事件处理的优先级,默认为0。

public enum ThreadMode {
    POSTING,
    MAIN,
    MAIN_ORDERED,
    BACKGROUND,
    ASYNC
}

在ThreadMode线程模式的枚举类中定义了5中模式。
POSTING是默认的线程模式,发送事件和事件处理是在同一线程中执行,效率是高,因为在同一线程执行,避免了线程间的切换,当然使用POSTING线程模式时需要注意在主线程中的情况,因为主线程是不允许做耗时操作的,所以使用该模式必须要避免耗时操作,以免造成线程阻塞。
使用MAIN线程模式时,事件处理会在主线程(UI线程)中执行,如果发布事件所在的线程是主线程,事件会立即执行,否则事件会被放在队列中等待处理。
使用MAIN_ORDERED线程模式时,事件处理会在主线程中执行,它与MAIN不同之处在于,处于MAIN_ORDERED线程模式的事件会被放在队列中等待处理,而处于MAIN线程模式时,会立即执行。
使用BACKGROUND线程模式时,如果事件是在主线程中发布的,事件会被放在工作线程中的队列中,等待处理,如果事件不在主线程中发布,说明事件的发布和事件的处理是在同一线程中(工作线程)。
使用ASYNC线程模式时,事件发送和处理都与UI线程无关,并且发送事件是异步的,通过线程池来管理发送事件和处理事件的线程。
值得注意的是,如果发送事件在子线程,处理事件需要在UI线程中进行,这时就需要通过Handler来实现,后面会讲到。

EventBus的粘性事件

理解了ThreadMode线程模式后,我们继续对register流程最后粘性事件处理的讲解,在前面register方法流程概要中,subscribe方法做了事件的存储工作并对粘性事件进行了处理。为了方便查看,这里再次贴出subscribe方法中对粘性事件处理的代码。

private final Map<Class<?>, Object> stickyEvents;
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    subscriptionsByEventType集合处理
    ...
    typesBySubscriber集合处理
    ...
    
    //是否为粘性事件
    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            //遍历粘性事件集合(事件类包含自身或当前事件类的子类或子接口)并处理
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            //直接获取事件类对应的粘性事件并处理
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}

private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
    if (stickyEvent != null) {
        //进行事件处理
        postToSubscription(newSubscription, stickyEvent, isMainThread());
    }
}

在上面注册时,需要知道发送的事件是否为粘性事件,如果是粘性事件,需要检查该粘性事件之前是否发送过,如果发送过,需要立即处理对应的粘性事件。
eventInheritance是boolean类型,在获取EventBus实例时,eventInheritance值被初始化为true,说明我们在对事件进行处理时,需要对事件类的子类或接口也进行处理,上面代码中是对stickyEvents集合进行遍历,查看当前的粘性事件与集合中的粘性事件是否存在是子类或接口的关系,如果是就对事件进行处理。
stickyEvents是粘性事件的集合,并且该集合使用了ConcurrentHashMap,ConcurrentHashMap在线程安全的基础上提供了更好的写并发能力。eventInheritance如果为false,就是对当前的粘性事件进行处理,不管eventInheritance为何值,最后都会调用checkPostStickyEventToSubscription方法。

private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
    if (stickyEvent != null) {
        //进行事件处理
        postToSubscription(newSubscription, stickyEvent, isMainThread());
    }
}

checkPostStickyEventToSubscription方法做的事情是对粘性事件进行了判断,如果粘性事件存在,调用postToSubscription方法,根据subscription(订阅者和订阅方法信息)和事件就可以执行事件的处理。

发送粘性事件是通过EventBus的postSticky方法来发送的,源码如下。

public void postSticky(Object event) {
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    post(event);
}

postSticky方法中会通过同步代码块,将粘性事件存储在stickyEvents集合中,将粘性事件存储完毕,还会执行post方法,也就是将事件发送出去,为什么这么做,这是因为粘性事件处理的情况有两种:一是如果粘性事件先被注册,之后通过postSticky发送粘性事件,由于该粘性事件已经存在所以需要进行处理;二是粘性事件先发送,之后订阅者去订阅该粘性事件,这时需要去处理粘性事件。


事件发送与处理

在平时发送事件是通过EventBus的post方法来发送的,只不过postSticky方法比post方法多做了一步,就是存储粘性事件,post方法代码如下:

private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
    @Override
    protected PostingThreadState initialValue() {
        return new PostingThreadState();
    }
};

public void post(Object event) {
    PostingThreadState postingState = currentPostingThreadState.get();
    List<Object> eventQueue = postingState.eventQueue;
    //将事件入队
    eventQueue.add(event);
    if (!postingState.isPosting) {
        //是否在主线程
        postingState.isMainThread = isMainThread();
        //事件是否已经发送
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            while (!eventQueue.isEmpty()) {
                //循环遍历事件队列
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            // 重置
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

currentPostingThreadState使用ThreadLocal保存线程间的事件队列,PostingThreadState保存着一些状态信息,如下:

final static class PostingThreadState {
    //事件队列
    final List<Object> eventQueue = new ArrayList<>();
    //是否已经发送
    boolean isPosting;
    //是否在主线程
    boolean isMainThread;
    //订阅信息
    Subscription subscription;
    //事件
    Object event;
    boolean canceled;
}

在post方法中,会通过循环遍历事件列表,执行postSingleEvent方法。

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    //事件
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    if (eventInheritance) {
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            //遍历相关的事件类
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

postSingleEvent方法内部会根据当前的事件,通过lookupAllEventTypes方法递归获取事件的父类和接口,最终通过循环拿到类和接口,并对这些类和接口进行事件的处理。

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        //获取订阅信息列表
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        //遍历订阅信息列表
        for (Subscription subscription : subscriptions) {
            //postingState存有订阅信息(订阅者,订阅方法)和事件
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted = false;
            try {
                //通知所有订阅该事件的订阅者都去执行该事件
                postToSubscription(subscription, event, postingState.isMainThread);
                //是否中断
                aborted = postingState.canceled;
            } finally {
                //重置
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}

postSingleEventForEventType方法中,通过对应事件获取所有的订阅信息,并遍历执行postToSubscription方法。

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            //立刻执行
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                //发送事件在主线程,立刻执行
                invokeSubscriber(subscription, event);
            } else {
                //在非UI线程,通过Handler处理
                //循环遍历执行
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                //Handler,循环遍历执行
                mainThreadPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                //发送事件在主线程,通过Handler处理,循环遍历执行
                backgroundPoster.enqueue(subscription, event);
            } else {
                //在非UI线程立即执行
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            //异步
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}


通过前面对ThreadMode线程模式的讲解,这里的postToSubcription方法就很好理解了,事件通过不同的线程模式进行相应的处理,线程模式采用POSTING会执行invokeSubscriber方法。

void invokeSubscriber(Subscription subscription, Object event) {
    try {
        //通过反射执行相关事件方法
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

很明显EventBus对事件的处理是通过反射来执行的。
postToSubscription方法的逻辑很简单,就是通过对应的线程模式来处理事件,那如何发送事件以及处理事件?整个EventBus的核心就是通过以下3个Poster来实现的。

private final Poster mainThreadPoster;
EventBus(EventBusBuilder builder) {
    ...
    mainThreadSupport = builder.getMainThreadSupport();
    //创建HandlerPoster实例
    mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
    backgroundPoster = new BackgroundPoster(this);
    asyncPoster = new AsyncPoster(this);
   ...
}

第一种Poster,也就是mainThreadPoster,在初始化mainThreadPoster时会判断mainThreadSupport是否为空,其中EventBusBuilder类是EventBus的配置类,采用的是建造者模式,查看EventBusBuilder中的getMainThreadSupport方法。

MainThreadSupport getMainThreadSupport() {
    if (mainThreadSupport != null) {
        return mainThreadSupport;
    } else if (Logger.AndroidLogger.isAndroidLogAvailable()) {
        //获取主线程的Looper
        Object looperOrNull = getAndroidMainLooperOrNull();
        return looperOrNull == null ? null :
                new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
    } else {
        return null;
    }
}

Object getAndroidMainLooperOrNull() {
    try {
        return Looper.getMainLooper();
    } catch (RuntimeException e) {
        // Not really a functional Android (e.g. "Stub!" maven dependencies)
        return null;
    }
}

getMainThreadSupport方法的作用就是返回MainThreadSupport的实例,传入的是主线程的Looper。

public interface MainThreadSupport {

    boolean isMainThread();

    Poster createPoster(EventBus eventBus);

    class AndroidHandlerMainThreadSupport implements MainThreadSupport {

        private final Looper looper;

        public AndroidHandlerMainThreadSupport(Looper looper) {
            this.looper = looper;
        }

        @Override
        public boolean isMainThread() {
            return looper == Looper.myLooper();
        }

        @Override
        public Poster createPoster(EventBus eventBus) {
            return new HandlerPoster(eventBus, looper, 10);
        }
    }

}

mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;

回到mainThreadPoster的初始化代码,看到mainThreadSupport不为null,就会执行MainThreadSupport的createPoster工厂方法创建处理事件的HandlerPoster。

public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    //说明头结点为空,进行第二次判断
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            //重新发送
                            handlerActive = false;
                            return;
                        }
                    }
                }
                //执行eventBus的invokeSubscriber方法,通过反射执行事件方法
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

HandlerPoster继承自Handler并实现了handleMessage方法,在初始化时传入的是主线程的Looper,通过HandlerPoster就可以将事件发送到主线程中并处理,HandlerPoster内部有一个队列,最终subscription(订阅者和订阅方法信息)和事件会被包装成PendingPost对象添加到这个队列中。enqueue方法做的就是添加操作,添加完毕后,会通过sendMessage发送一个消息,最后在handleMessage方法中处理。
handleMessage方法中通过一个死循环,不断的从队列中取出PendingPost实例,如果取出的PendingPost实例不为null,就执行EventBus的invokeSubscriber方法。

void invokeSubscriber(PendingPost pendingPost) {
    Object event = pendingPost.event;
    Subscription subscription = pendingPost.subscription;
    //清空
    PendingPost.releasePendingPost(pendingPost);
    if (subscription.active) {
        //处理事件
        invokeSubscriber(subscription, event);
    }
}

invokeSubscriber方法最终还是会执行invokeSubscriber方法通过反射来执行对应事件的方法。

第二种Poster,就是backgroundPoster,也就是说事件的处理会在子线程中执行。具体代码如下:

final class BackgroundPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    //反射处理事件方法
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}

BackgroundPoster实现了Runnable接口,实现run方法,内部也有一个队列用于存储PendingPost实例(subscription(订阅者和订阅方法信息)和事件),初次添加队列时,会通过线程池执行线程,在run方法中通过死循环从队列中取出PendingPost实例,PendingPost实例存在执行EventBus的invokeSubscriber方法,最终通过反射执行事件处理方法。

第三种Poster,就是asyncPoster,内部一个事件对应一个线程,事件发送是并行的,并且事件处理是在后台线程执行。

class AsyncPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }

}

在每次调用enqueue方法添加时,通过线程池对每个事件创建一个线程并执行,在run方法中发送一次事件执行一次,事件与事件之间互不干扰。


事件队列的处理

从上面三种Poster方式可以看到,事件发送时订阅者和订阅方法信息会被包装成PendingPost对象,如果事件过多,创建PendingPost对象也会过多,对象的频繁创建与销毁会带来性能上的消耗,因此EventBus采用的对象复用的方式,在PendingPost中提供了一个对象池用于复用,既然是用于对象的复用,这个池的肯定不是无限大的,在PendingPost中对这个池的最大个数不超过10000。

final class PendingPost {
    private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

    Object event;
    Subscription subscription;
    PendingPost next;

    private PendingPost(Object event, Subscription subscription) {
        this.event = event;
        this.subscription = subscription;
    }

    static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
        return new PendingPost(event, subscription);
    }

    static void releasePendingPost(PendingPost pendingPost) {
        pendingPost.event = null;
        pendingPost.subscription = null;
        pendingPost.next = null;
        synchronized (pendingPostPool) {
            if (pendingPostPool.size() < 10000) {
                //复用
                pendingPostPool.add(pendingPost);
            }
        }
    }

在PendingPost类中可以看到集合中的数据处理是非常精妙的,在执行obtainPendingPost方法时,会去查看pendingPostPool中是否有值,如果存在,先从列表尾部移除,同时复用移除的对象;在releasePendingPost方法中,进行了数据的清理,同时会查看pendingPostPool列表长度是否超过10000,没有超过,就会复用这个对象(这个对象内部数据已经进行了清空)。通过这种复用方式来避免对象的频繁创建,造成GC的频繁处理。

PendingPost实例会被添加到PendingPostQueue中,PendingPostQueue主要是对PendingPost对象的存取。代码如下:

final class PendingPostQueue {
    private PendingPost head;
    private PendingPost tail;

    synchronized void enqueue(PendingPost pendingPost) {
        if (pendingPost == null) {
            throw new NullPointerException("null cannot be enqueued");
        }
        if (tail != null) {
            //重置队尾
            tail.next = pendingPost;
            tail = pendingPost;
        } else if (head == null) {
            //如果队头为null,说明队列为空,设置队头和队尾
            head = tail = pendingPost;
        } else {
            throw new IllegalStateException("Head present, but no tail");
        }
        notifyAll();
    }

    synchronized PendingPost poll() {
        PendingPost pendingPost = head;
        if (head != null) {
            head = head.next;
            if (head == null) {
                tail = null;
            }
        }
        return pendingPost;
    }

    synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
        if (head == null) {
            wait(maxMillisToWait);
        }
        return poll();
    }

}

PendingPostQueue内部的存储方式与单链表的结构是一样的。enqueue和poll方法分别是入队与出队,入队时会将添加的实例存储在队尾,出队时移出队列,遵循先进先出原则,如果调用第二个poll方法,传入一个maxMillisToWait的时长锁,比如在Background线程的run方法中,从PendingPostQueue中取出PendingPost,传入了1000,表示出队时会等待1000毫秒,在等待期间释放它所持有的锁,当执行了入队操作,入队完毕执行notifyAll方法以唤醒出队时的等待锁,这样做的原因是为了在出队时,如果同时进行了入队操作,这时获取的为空,通过wait方法等待指定时间,当等待超时并且队列为空,backgroundPoster会退出后台线程,mainThreadPoster也会停止发送消息。


解析订阅方法

在上面register方法中,第一步做的就是找到订阅者订阅的所有事件方法,事件方法的相关信息会被包装成SubscriberMethod实例。

public class SubscriberMethod {
    final Method method;
    final ThreadMode threadMode;
    final Class<?> eventType;
    final int priority;
    final boolean sticky;
    /** Used for efficient comparison */
    String methodString;

    public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) {
        this.method = method;
        this.threadMode = threadMode;
        this.eventType = eventType;
        this.priority = priority;
        this.sticky = sticky;
    }


}

SubscriberMethod存储着订阅方法、线程模式、事件类型、优先级、是否粘性事件以及方法名称。接下来重点是在如何查找订阅类中的订阅方法,相关操作在SubscriberMethodFinder中的findSubscriberMethods方法中。

private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
    if (subscriberMethods != null) {
        return subscriberMethods;
    }

    if (ignoreGeneratedIndex) {
        subscriberMethods = findUsingReflection(subscriberClass);
    } else {
        subscriberMethods = findUsingInfo(subscriberClass);
    }
    if (subscriberMethods.isEmpty()) {
        throw new EventBusException("Subscriber " + subscriberClass
                + " and its super classes have no public methods with the @Subscribe annotation");
    } else {
        METHOD_CACHE.put(subscriberClass, subscriberMethods);
        return subscriberMethods;
    }
}

METHOD_CHCHE是缓存对应订阅类中的订阅方法的集合。方法中会先去查看缓存中对应订阅类是否存在,如果存在就直接返回订阅类对应的订阅方法的集合,如果不存在,判断ignoreGenerateIndex,由于ignoreGenerateIndex的初始化被设置为false,直接看findUsingInfo方法,从方法名可以看出该方法做的工作就是查找订阅方法并返回订阅方法信息的集合。

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
    FindState findState = prepareFindState();
    //对FindState对象中属性进行初始化
    findState.initForSubscriber(subscriberClass);
    //findState.clazz指向的就是订阅者的Class对象
    while (findState.clazz != null) {
        //通过debug,发现 findState.subscriberInfo=null
        findState.subscriberInfo = getSubscriberInfo(findState);
        if (findState.subscriberInfo != null) {
            SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
            for (SubscriberMethod subscriberMethod : array) {
                if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                    findState.subscriberMethods.add(subscriberMethod);
                }
            }
        } else {
            findUsingReflectionInSingleClass(findState);
        }
        //对FindState对象中的属性进行清空
        findState.moveToSuperclass();
    }
    return getMethodsAndRelease(findState);
}

FindState是SubscriberMethodFinder的静态内部类,存储一些状态,比如订阅者和订阅事件的方法信息,内部会调用findUsingReflectionInsingleClass方法。

private void findUsingReflectionInSingleClass(FindState findState) {
    Method[] methods;
    try {
        // This is faster than getMethods, especially when subscribers are fat classes like Activities
        //获取订阅者订阅者的所有方法
        methods = findState.clazz.getDeclaredMethods();
    } catch (Throwable th) {
        // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
        //如果出现异常只取订阅者的public方法
        methods = findState.clazz.getMethods();
        findState.skipSuperClasses = true;
    }
    //循环遍历订阅者的方法
    for (Method method : methods) {
        //获取方法修饰类型
        int modifiers = method.getModifiers();
        //方法修饰类型是否为public
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            //获取方法的参数列表
            Class<?>[] parameterTypes = method.getParameterTypes();
            //EventBus订阅者订阅的方法参数必须是一个
            if (parameterTypes.length == 1) {
                //获取注解类型是否为Subscribe
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                if (subscribeAnnotation != null) {
                    //订阅方法中的参数Class对象
                    Class<?> eventType = parameterTypes[0];
                    if (findState.checkAdd(method, eventType)) {
                        /**
                         * 获取线程模型
                         */
                        ThreadMode threadMode = subscribeAnnotation.threadMode();
                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                    }
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException("@Subscribe method " + methodName +
                        "must have exactly 1 parameter but has " + parameterTypes.length);
            }
        } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
            String methodName = method.getDeclaringClass().getName() + "." + method.getName();
            throw new EventBusException(methodName +
                    " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
        }
    }
}

方法中通过getDeclaredMethods方法获取订阅类的所有方法,通过遍历方法,判断该方法的注解类型是否为Subscribe类型并且方法内的参数个数为1个,如果满足以上条件,说明该方法就是事件订阅的方法,最终会获取订阅方法上的注解的相关信息,将其包装成SubscriberMethod对象,添加到findState的subscriberMethods集合中。

private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
    List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
    //清空
    findState.recycle();
    //对应prepareFindState方法,这里存储FindState对象到FIND_STATE_POOL数组中。
    synchronized (FIND_STATE_POOL) {
        for (int i = 0; i < POOL_SIZE; i++) {
            if (FIND_STATE_POOL[i] == null) {
                FIND_STATE_POOL[i] = findState;
                break;
            }
        }
    }
    return subscriberMethods;
}

获取到订阅类相关的订阅方法后,从findState中取出并进行清空,清空后的FindState对象会被存储在FIND_STATE_POOL数组中用于复用。

相关标签: EventBus