EventBus源码解析

原文出处:TmWork。 

正文前言

在写这篇源码分析的时候,我看过网上的几乎所有的关于EventBus3.0的源码分析,但是看完之后我觉得这样的源码分析不是我想要的,他们只是很简单的解释表面的意思,甚至于很重要的地方都没有解释清楚。并没有一片文章去分析作者为什么这样设计、引入这样一个类的好处是什么?于是我尝试着去探索作者的想法初衷,并记下过程的所在所想。

说实话看源码的中途我确实放弃了,因为对我来说很是晦涩,大概中断了4天,我又开始看了,好歹是第一次看源代码,不能留下对源码阴影吧。

写完这篇博客,我还是有很多疑问,思考了很久,最后问了学长,他们也说不知道(虽然我们一致认为有个地方checkAddWithMethodSignature里面的一个判断没啥用)。然后去eventbus提了issue,但是可能是因为表达的不清楚,至今还没有人回,这也让我认识到提问题也是很讲究的。然后在百万哥群里感觉stackoverflow还不错,又去了stackflow。最后没办法准备email greenrobot,发现似乎没有email,只好求助于jakewharton,第一次写英语email,感觉英语还是很烂啊,其实也不抱啥希望,多尝试一下呗。仅仅是为了解决心中的疑惑,解决疑惑对我来说是一种快乐。

如果你在看这篇博客的时候思路不是那么清晰,我在文章最后的部分画了一些图,可能对你梳理逻辑会有一些帮助。

开始正文

EventBus

EventBus基于观察者模式的Android事件分发总线。看一下这个图:

blob.png

从这个图可以看出,EventBus使得 事件发送 和 事件接收 很好的解耦。另外使得Android的组件例如Activity和Fragment之间的通信变得简单。最总要的一点是可以使得代码变得整洁。

使用EventBus

使用EventBus也是非常的简单,首先当然是加依赖啦:

compile 'org.greenrobot:eventbus:3.0.0'

使用分为三步:

  1. 定义消息事件MessageEvent,也就是创建事件类型

    public class MessageEvent {
        public final String message;
        public MessageEvent(String message) {
            this.message = message;
        }
    }
    
  2. 选择你要订阅的订阅者(subscriber),例如我选择的是Activity,可以在onCreate()加入 EventBus.getDefault().register(this)。在不需要接收事件发生时可以 EventBus.getDefault().unregister(this);

    在订阅者里需要用注解关键字 @Subscribe来“告诉”EventBus使用什么方法处理event

    @Subscribe
    public void onMessageEvent(MessageEvent event) {
        Toast.makeText(this, event.message, Toast.LENGTH_SHORT).show();
    }
    

           注意方法只能被public修饰,在EventBus3.0之后该方法名字就可以自由的取了,之前的好像只能是onEvent().

  3. 最后就是发送事件了

    EventBus.getDefault().post(new MessageEvent("HelloEveryone"));
    

这样我选择的Activity就会接收到该事件,并且触发onMessageEvent方法。

源代码分析

由于是第一次分析源代码, 不知道咋搞,那我就从使用的三个步骤来分析源代码。
本文是针对和我一样的新手来写的,所以正式看源代码之前,先来了解一下注解吧,我们以@Subscribe为例来讲:

注解@Subscribe

现在来看看在订阅者中使用的@Subscribe,此注解关键字的参数有三个。看源代码:

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
  ThreadMode threadMode() default ThreadMode.POSTING;
    /**
     * If true, delivers the most recent sticky event (posted with
     * {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
     */
    boolean sticky() default false;
    /** Subscriber priority to influence the order of event delivery.
     * Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
     * others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
     * delivery among subscribers with different {@link ThreadMode}s! */
    int priority() default 0;
}

@Target指明作用的是Method

ThreadMode 是enum(枚举)类型,threadMode默认值是POSTING。ThreadMode有四种模式:

  • POSTING :Subscriber会在post event的所在线程回调,故它不需要切换线程来分发事件,因此开销最小。它要求task完成的要快,不能请求MainThread,适用简单的task。

  • MAIN :Subscriber会在主线程(有时候也被叫做UI线程)回调,如果post event所在线程是MainThread,则可直接回调。注意不能阻塞主线程。

  • BACKGROUND :Subscriber会在后台线程中回调。如果post event所在线程不是MainThread,那么可直接回调;如果是MainThread,EventBus会用单例background thread来有序地分发事件。注意不能阻塞background thread。

  • ASYNC:当处理事件的Method是耗时的,需要使用此模式。尽量避免同时触发大量的耗时较长的异步操作,EventBus使用线程池高效的复用已经完成异步操作的线程。

再回到@Subscribe,还有两个参数,sticky(粘性)默认值是false,如果是true,那么可以通过EventBus的postSticky方法分发最近的粘性事件给该订阅者(前提是该事件可获得)。

最后一个参数是priority,Method的优先级,优先级高的可以先获得分发的事件。这个不会影响不同的ThreadMode的分发事件顺序。

获取EventBus实例(单例)

使用 EventBus.getDefault()会比较方便的获取到EventBus的实例。看一下此方法:

/** Convenience singleton for apps using a process-wide EventBus instance. */
   public static EventBus getDefault() {
       if (defaultInstance == null) {
           synchronized (EventBus.class) {
               if (defaultInstance == null) {
                   defaultInstance = new EventBus();
               }
           }
       }
       return defaultInstance;
   }

声明defaultInstance:

static volatile EventBus defaultInstance;

注意volatile关键字在java并发编程中常用,比synchronized的开销成本要小,轻便。作用是线程能访问共享变量,什么是共享变量呢?共享变量包括所有的实例变量,静态变量和数组元素,他们都存放在堆内存中。

getDefault方法使用了double check(双重检查锁定模式),多了一层判断,故可以减少上锁开销。另外和volatile一起使用保证了单例。

初始化:

blob.png

使用Builder设计模式,可以生成定制的EventBus单例,或者默认的EventBus单例。暂时不深究。

register

获取EventBus单例之后就可以register了

public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

仔细看代码不难读懂,名字已经表明了功能。首先得到订阅者的Class对象,然后通过subscriberMethodFinder寻找subscriberMethods。findSubscriberMethods()方法具体实现:

blob.png

方法缓存METHOD_CACHE map的key是Class,value是List。
首先获取从此map(从名字可以理解成缓存)中获取subscriberMethods,如果不是null,直接返回。若是null,则就要面对ignoreGeneratedIndex这个变量了,该变量暂时不用深挖。只需要知道,有两种方式可以获取subscriberMethods即可。 ignoreGeneratedIndex的意义我觉得在研究了两种方式之后大概会明白。

首先看看一下subscriberMethods = findUsingReflection(subscriberClass);的实现:

private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            findUsingReflectionInSingleClass(findState);
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }

请注意最后的return getMethodsAndRelease(findState)方法,这个方法非常重要!!一个是获取methods,另一个是释放findState里面的map信息。看一下方法实现:

blob.png

这里先把methods取了出来,然后recycle。接下来的这个for循环和之前的prepareFindState这个方法是前后呼应的。我们来看看这里面的智慧:

blob.png

对比发现,这是个复用池。第一次prepareFindState时,FIND_STATE_POOL[i]全不是null,有了一个state之后把相应的FIND_STATE_POOL[i]的引用变为null,这是为了防止并发编程时造成相互干扰。用完了这个state之后,recycle下面的for循环,会找到一个引用为null的FIND_STATE_POOL[i],并把自身引用赋给他。这样在并发的时候,这个复用池的效果就出来了:用的时候隔离开,用完了放回去。

接下来最重要的findUsingReflectionInSingleClass(findState)

private void findUsingReflectionInSingleClass(FindState findState) {
        Method\[\] methods;
        try {
            // This is faster than getMethods, especially when subscribers are fat classes like Activities
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                Class<?>\[\] parameterTypes = method.getParameterTypes();
                if (parameterTypes.length == 1) {
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes\[0\];
                        if (findState.checkAdd(method, eventType)) {
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }

使用反射的方式查找subscriberMethods。初始findState之后,开始while循环查找,在每次查找时,只用一个Class对象,然后在继承结构上往上查找超类的subscriberMethods。可是看到这,我就想问了为啥不直接用Class.getMethods直接获取该类的全部方法呢?作者用getDeclaredMethods其实是做过深层次考虑的,如果这个类比较庞大,用getMethods查找所有的方法就显得很笨重了,如果使用的是getDeclaredMethods(该类声明的方法不包括从父类那里继承来的public方法),速度就会快一些,因为找的方法变少了,没有什么equlas,toString,hashCode等系统类的方法。这里说的比较抽象,读者可以实验一下,或者goole这两个方法的区别

从新手的角度来解释:

1.Java的反射机制中,可以通过Class对象getMethods,这里用的是getDeclaredMethods,原因是对某些像Activity庞大的Class对象速度更快。clazz这种写法在很多程序中都出现了,这样的叫法是为了避免关键字而采取的变音(s和z读音像)。

2.modifier中文意思是修饰符,从if语句可以很容易知道这里需要解析获取的methods,并且知道需public修饰.其中有一个变量int MODIFIERS_IGNORE

private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC;

这是位运算,具体的计算可以google,但是这里的写法确实比较高级。(modifiers & MODIFIERS_IGNORE) == 0 这一句就可以检查不能是这四个关键字的一个,按照平常的写法会写的比较冗长。

3.if (parameterTypes.length == 1) 表明methods的参数只能有一个

Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null)

很自然,需要的method是被@Subscribe注解的,如果不了解注解的可以去看看Java编程思想。

5.这里还有一个判断:

findState.checkAdd(method, eventType)

从checkAdd()控制的程序流向,知道他是来控制findState.subscriberMethods是否添加找到的method。为什么要这个判断呢?
看到这里我觉得是时候看看这个FindState类了:

static class FindState {
        final Map<Class, Object> anyMethodByEventType = new HashMap<>();
        ..............
        boolean checkAdd(Method method, Class<?> eventType) {
            // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
            // Usually a subscriber doesn't have methods listening to the same event type.
            Object existing = anyMethodByEventType.put(eventType, method);
            if (existing == null) {
                return true;
            } else {
                if (existing instanceof Method) {
                    if (!checkAddWithMethodSignature((Method) existing, eventType)) {
                        // Paranoia check
                        throw new IllegalStateException();
                    }
                    // Put any non-Method object to "consume" the existing Method
                    anyMethodByEventType.put(eventType, this);
                }
                return checkAddWithMethodSignature(method, eventType);
            }
        }
        private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
            methodKeyBuilder.setLength(0);
            methodKeyBuilder.append(method.getName());
            methodKeyBuilder.append('>').append(eventType.getName());
            String methodKey = methodKeyBuilder.toString();
            Class<?> methodClass = method.getDeclaringClass();
            Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
            if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
                // Only add if not already found in a sub class
                return true;
            } else {
                // Revert the put, old class is further down the class hierarchy
                subscriberClassByMethodKey.put(methodKey, methodClassOld);
                return false;
            }
        }
    }

这个涉及到两层检查,第一层判断有无method监听此eventType,如果没有则可直接把找到的method加到subscriberMethods中。这里有一个细节,对我来说比较新, Object existing = anyMethodByEventType.put(eventType, method);,没有用get方法,而是用map的put方法,如果eventType键对应有value,则返回此value,并用method替换此value;如果没有对应的value,则返回null,同样也需要替换。这样写就比map的get方法要简洁,不然又要写判断和put。

第二层检查,有的人说是避免一个类出现多个方法监听同一个事件,但是我想说的是no!!实验出真知,明明是可以的,那些没试验过的人不要乱说。第二层检查的关键是读懂checkAddWithMethodSignature(),从字面意思来说是从MethodSignature(方法签名)判断能否把找到的method加进去。

那什么是方法签名呢?仔细看代码,发现其实方法签名是methodName + eventTypeName。区分method很关键的是从方法名和参数类型以及参数个数入手,由于之前的判断中已经限制了参数的个数是一个了,所以只需要考虑方法名和方法的参数类型。

再看看methodClassOld.isAssignableFrom(methodClass)这个方法,(强烈建议看英文注释)发现这是判断methodClassOld是否是methodClass的父类。这样以来这个方法返回的就一直是false,毫无意义,这里作者到底想干什么?我不知道,希望有人来解释一下。

我的猜想:作者这么做很有可能是为了防止在找父类时覆盖了子类的方法,因为此方法是子类是重写,方法名参数名完全一样(方法签名);另一个原因是可能是当一个类有多个方法监听同一个event(尽管一般不会这样做),也能将这些方法加进去。

findUsingInfo

findUsingInfo是EventBus 3.0刚加的功能,特点是比反射快,因为他是在编译期完成subscriber的注册register,而不是在运行期。如何使用这个可以查看Subscriber Index。强烈建议和我一样的新手多看官方文档。

经过这么多苦难,方法总算找完了,那么现在我们来看看设计者是如何设计subcribe的。

subscribe

让我们先不看源代码思考一下,如果你要注册这些方法 ,你会怎么做呢?第一个我会判断这些方法是否已经被注册过该事件了,不能让你平白无故的注册两次吧。第二个需要考虑@subscribe里面的参数问题,你还记得三个参数吗?threadMode,priority,sticky。由于threadMode是处理post的,所以暂时不用管他。第三个我会考虑该怎么组织这些找到的方法,最简单也是最自然的想法是用event来组织这些找到的方法。

我们先验证我们的看法:

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }
		......
}

方法CopyOnWriteArrayList subscriptions = subscriptionsByEventType.get(eventType);这句话很好的验证了第三个看法。

假若subscriptions里已经有了newSubscription,那么就会抛出异常,这验证了第一个看法。这里有个比较重要的类Subscription把subcriber和subcriberMethod包装了一下。看看他的成员变量:

final class Subscription {
    final Object subscriber;
    final SubscriberMethod subscriberMethod;
    /**
     * Becomes false as soon as {@link EventBus#unregister(Object)} is called, which is checked by queued event delivery
     * {@link EventBus#invokeSubscriber(PendingPost)} to prevent race conditions.
     */
    volatile boolean active;
}

重要的变量active,会在unregister调用之后变成false,active会在事件分发时被检查,若是false,此时unregister的功能就实现了。

现在我们来分析设计者如何处理priority和sticky这两个参数:

先看priority:

int size = subscriptions.size();
       for (int i = 0; i <= size; i++) {
           if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
               subscriptions.add(i, newSubscription);
               break;
           }
       }

遍历subscriptions,找到比subscriberMethod.priority小的位置,把newSubscription插进去,这也表明了subscription中priority大的在前,这样在事件分发时就会先获取。

接着sticky:

再说sticky之前,我们应该先看看Sticky Event,不然下面的分析会很模糊不清晰。

看完之后我觉得Sticky Event的设计初衷是:当我们post一个event时,订阅了的subcriber都会收到信息并进行相应的操作,这没问题,可是我的需求还没完,我还需要在创建一个对象如Activity之后再次使用event携带的信息,这下怎么办呢。换做是我会先把这些信息存起来等着用,好消息EventBus已经用sticky机制帮我们实现了!而且根据官方文档这个机制的实质是在内存中存储这个event,那么在哪里存储的呢?用什么存储的呢?我们看看官方提到的:EventBus.getDefault().postSticky(event):

public void postSticky(Object event) {
       synchronized (stickyEvents) {
           stickyEvents.put(event.getClass(), event);
       }
       // Should be posted after it is putted, in case the subscriber wants to remove immediately
       post(event);
   }

原来就是用的stickyEvents这个map存储的,并且从这个put我们可以知道stickyEvents的key是event的Class对象,value是event。

我们做个小demo,在一个对象如Activity中先调用EventBus.getDefault().postSticky(new MessageEvent(“Hello everyone!”));,再调用EventBus.register,结果如何呢?从结果可以预想在register(更准确的说应该是subscribe)中肯定会有post的调用。带着这个设想,我们继续看subcribe方法:

if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }

这里的四行注释,你看懂了多少,如果很少,那么用有道查查你不认识的单词吧,再读读。此时你已经站在必须读懂这些注释的路口了。

我来说说我的理解吧,抛砖引玉:
event和其子类中存在的sticky events应该在继承关系中被考虑。在迭代的过程中,所有的event可能会因为大量的sticky events变得低效,为了使得查询变得高效应该改变数据结构,例如增加一个map来存储一个superclass的subclass,map<superclass,list>表面的意思看懂了,可还是不懂真正的意思,接着往下看:eventInheritance ,初始化是在EventBusBuilder中,初始值是true。

比较有意思的是stickyEvents.entrySet(),这个的用处就是为了既可以获取key,也可以获取value。此方法遍历map比一般的用迭代器的效率高。

这里又碰到了eventType.isAssignableFrom(candidateEventType)这个方法,我感觉这里应该是多态的原因,如果postSticky()的参数是subMessageEvent,那么@Subscribe注解方法中的参数是SupMessageEvent也可以接收到此消息。

这里找到stickyEvent之后会checkPostStickyEventToSubscription(newSubscription, stickyEvent);接着在这个方法里又会调用postToSubscription,这验证了之前的猜想,确实会在subcribe中post stickyEvent。有木有一种柯南的成就感呢?

到这里subscribe已经结合我们的猜想全部讲解完,下面我们接着讲最后一个post的细节

post

接着上面的我们先看看postToSubscription:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
       switch (subscription.subscriberMethod.threadMode) {
           case POSTING:
               invokeSubscriber(subscription, event);
               break;
           case MAIN:
               if (isMainThread) {
                   invokeSubscriber(subscription, event);
               } else {
                   mainThreadPoster.enqueue(subscription, event);
               }
               break;
           case BACKGROUND:
               if (isMainThread) {
                   backgroundPoster.enqueue(subscription, event);
               } else {
                   invokeSubscriber(subscription, event);
               }
               break;
           case ASYNC:
               asyncPoster.enqueue(subscription, event);
               break;
           default:
               throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
       }
   }

由于需要分发事件,所以之前没讲的ThreadMode此时起到作用了,如果忘了它的作用看看上面的@Subcribe回顾一下。在几个分支中都会有invokeSubscriber(subscription, event):

void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

很显然用的反射来调用method,第一个参数是订阅此event的对象object,第二个参数是event。

第一个分支POSTING:直接调用invokeSubscriber,这也验证了之前说的post和方法回调会在同一线程中。

第二个分支MAIN:如果post是在MainThread则直接调用上面的方法;如果不是则向mainThreadPoster加入队列。mainThreadPoster类型是HandlerPoster,其继承自Handler。到这里我们应该回顾一下Android的消息机制了,简单说一下:handler 涉及到发送和处理message,sendMessage 之后,Looper会接受message,最终的message会由Looper交给handler处理。这里先看看HandlerPoster的enqueue:

void enqueue(Subscription subscription, Object event) {
       PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
       synchronized (this) {
           queue.enqueue(pendingPost);
           if (!handlerActive) {
               handlerActive = true;
               if (!sendMessage(obtainMessage())) {
                   throw new EventBusException("Could not send handler message");
               }
           }
       }
   }

找到sendMessage方法了吧。发送之后,Handler就会有相应的处理方法:hadnlerMessage:

@Override
   public void handleMessage(Message msg) {
       boolean rescheduled = false;
       try {
           long started = SystemClock.uptimeMillis();
           while (true) {
               PendingPost pendingPost = queue.poll();
               if (pendingPost == null) {
                   synchronized (this) {
                       // Check again, this time in synchronized
                       pendingPost = queue.poll();
                       if (pendingPost == null) {
                           handlerActive = false;
                           return;
                       }
                   }
               }
               eventBus.invokeSubscriber(pendingPost);
               long timeInMethod = SystemClock.uptimeMillis() - started;
               if (timeInMethod >= maxMillisInsideHandleMessage) {
                   if (!sendMessage(obtainMessage())) {
                       throw new EventBusException("Could not send handler message");
                   }
                   rescheduled = true;
                   return;
               }
           }
       } finally {
           handlerActive = rescheduled;
       }
   }

这里不追究细节,只需要知道在这个方法里是怎么处理接收到的消息的。这里调用的是 eventBus.invokeSubscriber(pendingPost);。源码:

void invokeSubscriber(PendingPost pendingPost) {
       Object event = pendingPost.event;
       Subscription subscription = pendingPost.subscription;
       PendingPost.releasePendingPost(pendingPost);
       if (subscription.active) {
           invokeSubscriber(subscription, event);
       }
   }

还记得之前介绍Subscription时说过的active变量吗?
其实我之前很仔细看上面的enqueue和hadnleMessage时,看到PendingPost这个类时,很是懵,但是到这我就懂了一点:为了包装,不然你又要传subcription和event,不仅如此,上面的入队操作也会变难。其实PendingPost的设计上还有一个很好的地方:

static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
        return new PendingPost(event, subscription);
    }
    static void releasePendingPost(PendingPost pendingPost) {
        pendingPost.event = null;
        pendingPost.subscription = null;
        pendingPost.next = null;
        synchronized (pendingPostPool) {
            // Don't let the pool grow indefinitely
            if (pendingPostPool.size() < 10000) {
                pendingPostPool.add(pendingPost);
            }
        }
    }

不难发现这里很好的控制了pendingPostPool的大小,因为当你obtain一个post时,pendingPostPool的大小会减一,然后在你release时又会增加一。不得不说,设计者在细节上还是花了一些心思的。PendingPostQueue很简单很单纯,就是入队和出队。

第三个分支BACKGROUND:看一下BackgroundPoster:

final class BackgroundPoster implements Runnable {
   	......
    public void enqueue(Subscription subscription, Object event) {
       ......
	   if (!executorRunning) {
             executorRunning = true;
             eventBus.getExecutorService().execute(this);
          }
    }
    @Override
    public void run() {
        ......
    }
}

BackgroundPoster实现Runnable接口,通过eventBus.getExecutorService获取ExecutorService,并通过getExecutorService().execute(Runnable)执行下面的run方法,这就好比handlerMessage,其他的细节和上面的HandlerPoster类似。

最后一个分支ASYNC:这里只有一点和第三分支不同,他不管post event所在的线程是不是MainThread,其他的都差不多一样。

看完postToSubscription之后,我们对如何分发一个事件已经很熟悉了,有了“底层”知识,我们来看看EventBus的post入口吧:

/** Posts the given event to the event bus. */
    public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);
        if (!postingState.isPosting) {
            postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

这里有一个比较有意思的东西currentPostingThreadState,类型是ThreadLocal,之前看过《Android开发艺术探索》,知道了ThreadLoacl可以在不同的线程中维护一套数据的副本,换个说法就是ThreadLoacl会根据不同的线程拿到相应的数据,彼此互不干扰。

在while循环中一直执行postSingleEvent(eventQueue.remove(0), postingState);直至事件队列为空。我们再来看看这个方法:

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                Log.d(TAG, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

如果eventInheritance为true,也就是考虑event的继承关系。订阅了此event超类的method也会收到此消息。剩下的postSingleEventForEventType(eventQueue.remove(0) postingState);我不打算继续讲了,因为已经毫无难度了,其中会调用之前讲的postToSubscription。

到此大题的源码分析已经完成,unregister和subcribe过程相对应,相对而言比较简单。

分析整体框架

弄清楚上面的四个步骤之后,我们一起来梳理一下设计者是如何完成观察者模式中的订阅和发送事件以及取消订阅。这里我尝试用图的形式来表达我的所思所想。

我们先看subscriber。

blob.png

一个subscriber可以订阅多个类型的Event,而且对于同一种类型的Event可以有多个method进行处理(尽管我们一般不会这样做)。

在subscribe方法中,引入Subscription对subscriber、event进行了封装。经过判断之后,把“合格”的subscription加入subscriptionsByEventType中看下面的图来梳理这个map的结构。

blob.png

当我们分发事件时,也就是post(Object event)时,他会间接调用postSingleEventForEventType这个方法,通过传入的参数event,在subscriptionsByEventType找到event对应的value,再继续相应的操作。

为了取消订阅这里引入typesBySubscriber和上面的类似,那我们再画一个图来梳理逻辑层次。

blob.png

我们取消一个subscriber的订阅是,也就是unregister(Object subscriber).我们会在typesBySubscriber中找到该subsriber对应的evnt。然后再由此event去subscriptionsByEventType找到一系列的subscription,并把他们remove。