EventBus内部是如何实现的? 让咱们爱不释手

简言:java

相信你们都使用过EventBus,用起来真的是奥利给啊,使用简单,开销还小,这种异步框架真的是让人爱不释手啊,固然了,有一大部分人更加喜欢Rxjava这个框架,毕竟这个框架更NB, 可是今天得主角不是Rxjava,是EventBus,带你们看看他的内部实现是什么?固然在讲源码以前,仍是先介绍一下EventBus,我保证,只是介绍啊, 毕竟我也不想啰嗦。。。android

1.简介
EvenBus是一个android端优化的消息总线,简化了应用程序内各组件,组件与后台线程间的通讯,好比请求网络,等网络返回时经过Handler或者BroadCast通知UI,两个Fragement之间须要经过listener通讯,这些需求均可以经过Evenbus实现;
evenBus是一款针对 android 优化的发布/订阅事件总线。主要功能是代替Intent handler Broacaset 在fragement activity
service线程之间的传递消息,优势是开销小,代码更优雅,以及将发送者和接收者解耦.git


2.基本的使用:github

1,自定义一个类,能够是空类;
2.在要接收消息的页面注册
3.发送消息
4.接收消息的页面实现(共有四个函数,各功能不一样)
5.解除注册
 网络

3.EventBus的流程 (如图:官网找的)app

publisher是一个发布器,而后将咱们的事件Event经过post()方法发送到EventBus的主线当中,而后在这个EventBus的主线中,会经过事件(Event)类型匹配相应的订阅者 Subscriber。框架

4.源码分析(EventBus)异步

@Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main2);
        findViewById(R.id.btn).setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                //发送消息
                EventBus.getDefault().post(new MyEvents("hello eventbus"));
            }
        });
    }

    @Override
    protected void onStart() {
        super.onStart();
        //注册EventBus
        EventBus.getDefault().register(this);
    }

    @Override
    protected void onStop() {
        super.onStop();
        //注销EventBus
        EventBus.getDefault().unregister(this);
    }

    @Subscribe(threadMode = ThreadMode.MAIN)
    public void onMessageEvent(MyEvents events){
        Toast.makeText(this,events.message,Toast.LENGTH_SHORT).show();
    }

这个是Event的使用类,咱们看在使用EventBus时,不管注册仍是注销发送消息,都有getDefault()这个方法,因此咱们先看看这个getDefault方法实现了什么?async

/** Convenience singleton for apps using a process-wide EventBus instance. */
    public static EventBus getDefault() {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new EventBus();
                }
            }
        }
        return defaultInstance;
    }

  public static EventBusBuilder builder() {
        return new EventBusBuilder();
    }

    /** For unit test primarily. */
    public static void clearCaches() {
        SubscriberMethodFinder.clearCaches();
        eventTypesCache.clear();
    }

    /**
     * Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a
     * central bus, consider {@link #getDefault()}.
     */
    public EventBus() {
        this(DEFAULT_BUILDER);
    }

这里没有什么可介绍的,这是一个单例模式,咱们关注一下这个EventBus的构造函数,DEFAULT_BUILDER,咱们跟进去看一下:ide

咱们能够看出,这个EventBus最终是构建者模式来建立对象的。咱们看看他是如何建立的,

EventBus(EventBusBuilder builder) {
        subscriptionsByEventType = new HashMap<>();
        typesBySubscriber = new HashMap<>();
        stickyEvents = new ConcurrentHashMap<>();
        mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
        backgroundPoster = new BackgroundPoster(this);
        asyncPoster = new AsyncPoster(this);
        indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
        logSubscriberExceptions = builder.logSubscriberExceptions;
        logNoSubscriberMessages = builder.logNoSubscriberMessages;
        sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
        sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
        throwSubscriberException = builder.throwSubscriberException;
        eventInheritance = builder.eventInheritance;
        executorService = builder.executorService;
    }
 

  subscriptionsByEventType = new HashMap<>();
  typesBySubscriber = new HashMap<>();
  stickyEvents = new ConcurrentHashMap<>();

这三个HashMap,都是作什么?

第一个:它是以Event为key,subscript为value,当发送Event时,均可以经过这个HashMap找到对应订阅者。

第二个:它是以Subscriber为key,event为value,当咱们作反注册的时候都会操做这个hashMap

第三个:这个是维护的粘性事件,我以前讲过粘性事件的定义,这里就不追溯了。

  
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
  backgroundPoster = new BackgroundPoster(this);
  asyncPoster = new AsyncPoster(this);

咱们在看看这三个post(重要)

1)HandlerPoster:咱们看参数,传递的是主线程的Looper,是handler现实的,咱们追进去看看handerPoster作了什么?

final class HandlerPoster extends Handler {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

这个HandlerPoser继承Handler,

   PendingPostQueue queue;  这是一个队列
   int maxMillisInsideHandleMessage; post这个事件在hanlder中最大的时间值
    EventBus eventBus;
   handlerActive; 他标识的是是否运行起来了

咱们关注一下handleMessage方法作了什么?

经过do,while这个循环,从队列中获取数据,并调用eventBus,invokeSubscriber()分发事件,每分发完一次事件,就对比一下时间,判断这个时间,与上边定义的最大时间值,作比较,若是大于就跳出循环。

2)backgroundPoster (处理后台操做的)

final class BackgroundPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}

他实现的是Runnalbe,咱们直接看他的run()方法:run()主要是不断的从咱们队列中获取消息,而后经过invokeSubscruber进行事件分发。直到取完为止。

3)asyncPoster

class AsyncPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }

}

它也是实现的Runnable,

他的run方法,是获取队列中的一个,进行事件分发,与上边的post是不一样的,

 

上边三个post是EventBus中最核心的类,

 

3.subscriberMethodFinder(这个也是重要的,他是注解的找寻器)

 

二,关于注解的分析

@Subscribe(threadMode = ThreadMode.MAIN)
    public void onMessageEvent(MyEvents events){
        Toast.makeText(this,events.message,Toast.LENGTH_SHORT).show();
    }

咱们仍是看一下@Subscribe的源码:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
    ThreadMode threadMode() default ThreadMode.POSTING;

    /**
     * If true, delivers the most recent sticky event (posted with
     * {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
     */
    boolean sticky() default false;

    /** Subscriber priority to influence the order of event delivery.
     * Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
     * others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
     * delivery among subscribers with different {@link ThreadMode}s! */
    int priority() default 0;
}

ThreadMode 线程模式。这个是很是重要的,咱们分析一下:

public enum ThreadMode {
   
    POSTING,
     
   
    MAIN,


    BACKGROUND,


    ASYNC
}

1)POSTING:一种默认线程模式,表示在执行post事件操做的时候线程直接调订阅者的方法,不管该线程是否在主线程。

2)MAIN:表示在主线程中执行这个方法,

3)BACKGROUND:在后台线程中执行相应的方法。

4)ASYNC:不管发布的是否在主线程。它都会发布一个空线程进行处理。它线程独立,不会出现卡顿。

2.sticky: 这是定义的粘性事件,

3.priority: 优先级,默认状况下是POSTING.

 

三,register() 注册订阅

public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

第一行代码:经过反射获取到咱们的subscriberClass对象,

第二行代码:经过咱们获取到的对象找到对应的集合。subscriberMethodFinder这个找寻器进行寻找。

咱们看一下findSubscriberMethods这个方法源码:

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }

        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

咱们看一下findUsingInfo这个方法的源码:

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            findState.subscriberInfo = getSubscriberInfo(findState);
            if (findState.subscriberInfo != null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                findUsingReflectionInSingleClass(findState);
            }
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }

while这个循环,经过getSubScriberInfo这个方法返回的不为null,

首先获取订阅方法的集合,经过for循序遍历咱们的订阅方法,并经过checkAdd这个方法进行过滤,将符合的添加到subscriberMethods这个集合中。

若是etSubScriberInfo这个方法返回的为null,会走findUsingReflectionInSingleClass这个方法:咱们经过命名就和以猜出,他是经过反射进行查询的。咱们看一下源码:

private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // This is faster than getMethods, especially when subscribers are fat classes like Activities
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                if (parameterTypes.length == 1) {
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        if (findState.checkAdd(method, eventType)) {
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }

   methods = findState.clazz.getDeclaredMethods(); 经过反射作的,咱们猜的没错,这里就获取到全部订阅者的方法,

而后对获取的方法进行了依次的遍历,经过getParameterTpyes方法获取到咱们的参数,经过getAnnotation这个方法获取到咱们的Subscribe对象,这里其实主要作的就是过滤出被Subscribe修饰过的方法,subscribeAnnotation.threadMode,获取线程模式,经过这个线程模式进行调度,

2.咱们继续分析register 中的subscribe

// Must be called in synchronized block
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }

        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);

        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

Subscription 这个类是作什么的?咱们看一下:

final class Subscription {
    final Object subscriber;
    final SubscriberMethod subscriberMethod;
    /**
     * Becomes false as soon as {@link EventBus#unregister(Object)} is called, which is checked by queued event delivery
     * {@link EventBus#invokeSubscriber(PendingPost)} to prevent race conditions.
     */
    volatile boolean active;

    Subscription(Object subscriber, SubscriberMethod subscriberMethod) {
        this.subscriber = subscriber;
        this.subscriberMethod = subscriberMethod;
        active = true;
    }

    @Override
    public boolean equals(Object other) {
        if (other instanceof Subscription) {
            Subscription otherSubscription = (Subscription) other;
            return subscriber == otherSubscription.subscriber
                    && subscriberMethod.equals(otherSubscription.subscriberMethod);
        } else {
            return false;
        }
    }

    @Override
    public int hashCode() {
        return subscriber.hashCode() + subscriberMethod.methodString.hashCode();
    }
}

这里处理了订阅者,和封装的一些订阅方法,线程模式,等,

咱们回去继续看:

subscriptions这个为null.证实这个事件尚未注册过,咱们就新建立一个CopyOnWriteArrayList,并添加到SubScriptByEventType这个hashMap当中,

subscriptions为null的话就会抛出异常,证实这个时间已经注册过了。

subscriptions.size回去这个集合大小,

经过for循环遍历,经过优先级的条件添加到subscriptions中。

这里对粘性事件也作了处理,这里不讲粘性事件,

最后    checkPostStickyEventToSubscription(newSubscription, stickyEvent);完成了注册。

 

总结:subscribe

1.判断是否有注册过该事件

2.而后按照优先级加入到SubscriptionByEventType的value的list中

3.而后再添加到typesBySubscriber的value的List中。

4.分发事件