EventBus是经过使用发布者/订阅者模式来实现解耦的Android和Java开源库。在Android开发中一般使用EventBus实现Activities, Fragments, Threads, Services等组件之间的通讯。但EventBus不能实现跨进程间的通讯。java
UML类图 android
源码分析 EventBus.java数组
package de.greenrobot.event;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.util.Log;
/**
*
Class based event bus, optimized for Android. By default, subscribers will handle events in methods named "onEvent".
*
* @author Markus Junginger, greenrobot
*/
public class EventBus {
/**
* Log tag, apps may override it.
*/
public static String TAG = "Event";
private static final EventBus defaultInstance = new EventBus();//默认实例
public enum ThreadMode {
/**
* Subscriber will be called in the same thread, which is posting the event.
*/
PostThread,//发布事件所在线程订阅
/**
* Subscriber will be called in Android's main thread (sometimes referred to as UI thread). */ MainThread,//主线程订阅 /* BackgroundThread */ } //表示某个类中同一个方法名的全部重载方法。(也就是一个类中全部的订阅方法),缓存,目的:提升性能 private static final Map<String, List<Method>> methodCache = new HashMap<String, List<Method>>(); //保存事件类型的全部的事件(包括父类和接口),懒加载的,缓存,目的:了提升性能 private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<Class<?>, List<Class<?>>>(); //一个事件类型的全部订阅者:发送事件时使用到(时间复杂度为:O(1)) //用CopyOnWriteArrayList是为了读取是线程安全的。 private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType; //一个订阅对象的全部订阅事件: // 订阅(register)时是加入 // 取消订阅(unregister)时使用:获取当前订阅对象的全部订阅事件,而后在订阅事件中将这个订阅者删除。 private final Map<Object, List<Class<?>>> typesBySubscriber; //每一个线程的事件队列 private final ThreadLocal<List<Object>> currentThreadEventQueue = new ThreadLocal<List<Object>>() { @Override protected List<Object> initialValue() { return new ArrayList<Object>(); } }; //每一个线程的队列是否在运转中 private final ThreadLocal<BooleanWrapper> currentThreadIsPosting = new ThreadLocal<BooleanWrapper>() { @Override protected BooleanWrapper initialValue() { return new BooleanWrapper(); } }; private String defaultMethodName = "onEvent"; private PostViaHandler mainThreadPoster; public static EventBus getDefault() { return defaultInstance; } public EventBus() { subscriptionsByEventType = new HashMap<Class<?>, CopyOnWriteArrayList<Subscription>>(); typesBySubscriber = new HashMap<Object, List<Class<?>>>(); mainThreadPoster = new PostViaHandler(Looper.getMainLooper()); } public void register(Object subscriber) {//注册 register(subscriber, defaultMethodName, ThreadMode.PostThread);//默认方法名,即onEvent, } public void registerForMainThread(Object subscriber) { register(subscriber, defaultMethodName, ThreadMode.MainThread); } //这里应该加同步synchronized public void register(Object subscriber, String methodName, ThreadMode threadMode) {//传入订阅者,遍历查找其订阅的全部事件 List<Method> subscriberMethods = findSubscriberMethods(subscriber.getClass(), methodName); for (Method method : subscriberMethods) { Class<?> eventType = method.getParameterTypes()[0];//参数的类型 //查找的时候是根据参数的类型来肯定订阅者的 subscribe(subscriber, method, eventType, threadMode); } } //若是当前类含有父类,是否查找父类中的全部含指定名称的方法 private List<Method> findSubscriberMethods(Class<?> subscriberClass, String methodName) {//查找某个类中全部指定名称的方法 String key = subscriberClass.getName() + '.' + methodName; //类名+方法名为Key //key相同的同时进入,会出现两次建立 List<Method> subscriberMethods; synchronized (methodCache) { subscriberMethods = methodCache.get(key);//方法名称同样的存在多个重载方法 } if (subscriberMethods != null) { return subscriberMethods; } //同时进入 subscriberMethods = new ArrayList<Method>(); Class<?> clazz = subscriberClass; HashSet<Class<?>> eventTypesFound = new HashSet<Class<?>>();// 同一个类参数相同的方法只加入一次(就是重写的方法) while (clazz != null) { String name = clazz.getName(); if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("android.")) {//过滤掉系统类 // Skip system classes, this just degrades performance break; } Method[] methods = clazz.getDeclaredMethods(); for (Method method : methods) { if (method.getName().equals(methodName)) { Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 1) {//参数为1个 if (eventTypesFound.add(parameterTypes[0])) { // Only add if not already found in a sub class subscriberMethods.add(method); } } } } clazz = clazz.getSuperclass();//查找父类,也就是父类方法也会被调用 } if (subscriberMethods.isEmpty()) {//没有订阅方法,抛出异常 throw new RuntimeException("Subscriber " + subscriberClass + " has no methods called " + methodName); } else { synchronized (methodCache) { methodCache.put(key, subscriberMethods); } return subscriberMethods; } } public void register(Object subscriber, Class<?> eventType, Class<?>... moreEventTypes) {//指定事件类型 register(subscriber, defaultMethodName, ThreadMode.PostThread, eventType, moreEventTypes); } public void registerForMainThread(Object subscriber, Class<?> eventType, Class<?>... moreEventTypes) {//指定事件类型注册 register(subscriber, defaultMethodName, ThreadMode.MainThread, eventType, moreEventTypes); } public synchronized void register(Object subscriber, String methodName, ThreadMode threadMode, Class<?> eventType, Class<?>... moreEventTypes) { Class<?> subscriberClass = subscriber.getClass(); Method method = findSubscriberMethod(subscriberClass, methodName, eventType); subscribe(subscriber, method, eventType, threadMode); for (Class<?> anothereventType : moreEventTypes) { method = findSubscriberMethod(subscriberClass, methodName, anothereventType); subscribe(subscriber, method, anothereventType, threadMode); } } //应该在对象lock中调用 private void subscribe(Object subscriber, Method subscriberMethod, Class<?> eventType, ThreadMode threadMode) { //调用方:register(Object subscriber, String methodName, ThreadMode threadMode)没有加锁 //并发的问题:多个线程同时进入,不一样页面都含有同一事件 CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<Subscription>(); subscriptionsByEventType.put(eventType, subscriptions);//事件类型为key,订阅者为value,一对多 } else { for (Subscription subscription : subscriptions) {//同一事件的订阅者,屡次设置 if (subscription.subscriber == subscriber) { throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } } subscriberMethod.setAccessible(true); Subscription subscription = new Subscription(subscriber, subscriberMethod, threadMode); subscriptions.add(subscription); List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);//订阅者为key,订阅的事件为value,一对多 if (subscribedEvents == null) { subscribedEvents = new ArrayList<Class<?>>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); } /** * Class.getMethod is slow on Android 2.3 (and probably other versions), so use getDeclaredMethod and go up in the * class hierarchy if neccessary. */ private Method findSubscriberMethod(Class<?> subscriberClass, String methodName, Class<?> eventType) { Class<?> clazz = subscriberClass; while (clazz != null) { try { return clazz.getDeclaredMethod(methodName, eventType); } catch (NoSuchMethodException ex) { clazz = clazz.getSuperclass(); } } throw new RuntimeException("Method " + methodName + " not found in " + subscriberClass + " (must have single parameter of event type " + eventType + ")"); } /** * Unregisters the given subscriber for the given event classes. */ public synchronized void unregister(Object subscriber, Class<?>... eventTypes) { if (eventTypes.length == 0) { throw new IllegalArgumentException("Provide at least one event class"); } List<Class<?>> subscribedClasses = typesBySubscriber.get(subscriber); if (subscribedClasses != null) { for (Class<?> eventType : eventTypes) { unubscribeByEventType(subscriber, eventType); subscribedClasses.remove(eventType); } if (subscribedClasses.isEmpty()) { typesBySubscriber.remove(subscriber); } } else { Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } } /** * Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */ private void unubscribeByEventType(Object subscriber, Class<?> eventType) { List<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null) { int size = subscriptions.size(); for (int i = 0; i < size; i++) { if (subscriptions.get(i).subscriber == subscriber) { subscriptions.remove(i); i--; size--; } } } } /** * Unregisters the given subscriber from all event classes. */ public synchronized void unregister(Object subscriber) { List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);//删除全部的订阅事件 if (subscribedTypes != null) { for (Class<?> eventType : subscribedTypes) { unubscribeByEventType(subscriber, eventType);//删除事件类型 } typesBySubscriber.remove(subscriber); } else { Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } } //post(Object event) //post(Object event) /** * Posts the given event to the event bus. */ public void post(Object event) {//String Object List<Object> eventQueue = currentThreadEventQueue.get(); //每一个线程一个队列,保证同一个线程发送事件的有序性 eventQueue.add(event); BooleanWrapper isPosting = currentThreadIsPosting.get(); if (isPosting.value) {//当前线程队列正在运转 return; } else {//没有运转开始运转 isPosting.value = true; try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0)); //开始发送事件 } } finally { isPosting.value = false; } } } private void postSingleEvent(Object event) throws Error { List<Class<?>> eventTypes = findEventTypes(event.getClass()); //找到全部的事件类型(须要给父事件和接口事件也发送) boolean subscriptionFound = false; int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) {//只能保证Map的原子性 subscriptions = subscriptionsByEventType.get(clazz); } if (subscriptions != null) { for (Subscription subscription : subscriptions) {//由于是CopyOnWriteArrayList,因此写入时读取是线程安全的 if (subscription.threadMode == ThreadMode.PostThread) {//在发送线程中执行 postToSubscribtion(subscription, event); } else if (subscription.threadMode == ThreadMode.MainThread) {//在主线程中执行 mainThreadPoster.enqueue(event, subscription);//在主线程中执行 } else { throw new IllegalStateException("Unknown thread mode: " + subscription.threadMode); } } subscriptionFound = true; } } if (!subscriptionFound) { Log.d(TAG, "No subscripers registered for event " + event.getClass()); } } //查找全部的事件类型:当前类型和全部父类型,全部接口,包括父类型 // /** * Finds all Class objects including super classes and interfaces. */ private List<Class<?>> findEventTypes(Class<?> eventClass) { synchronized (eventTypesCache) { List<Class<?>> eventTypes = eventTypesCache.get(eventClass); //缓存中存在,直接返回 if (eventTypes == null) { eventTypes = new ArrayList<Class<?>>(); Class<?> clazz = eventClass; while (clazz != null) {//事件类型,父事件 eventTypes.add(clazz); //自己是一个事件类型 addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } eventTypesCache.put(eventClass, eventTypes); } return eventTypes; } } /** * Recurses through super interfaces. */ static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) { for (Class<?> interfaceClass : interfaces) { if (!eventTypes.contains(interfaceClass)) { eventTypes.add(interfaceClass); addInterfaces(eventTypes, interfaceClass.getInterfaces()); } } } static void postToSubscribtion(Subscription subscription, Object event) throws Error { try { subscription.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { Throwable cause = e.getCause(); Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), cause); if (cause instanceof Error) { throw (Error) cause; } } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } } final static class Subscription { final Object subscriber; final Method method; final ThreadMode threadMode; Subscription(Object subscriber, Method method, ThreadMode threadMode) { this.subscriber = subscriber; this.method = method; this.threadMode = threadMode; } @Override public boolean equals(Object other) { if (other instanceof Subscription) { Subscription otherSubscription = (Subscription) other; // Super slow (improve once used): http://code.google.com/p/android/issues/detail?id=7811 return subscriber == otherSubscription.subscriber && method.equals(otherSubscription.method); } else { return false; } } @Override public int hashCode() { // Check performance once used return subscriber.hashCode() + method.hashCode(); } } /** * For ThreadLocal, much faster to set than storing a new Boolean. */ final static class BooleanWrapper {//提升性能,使用Boolean,修改值时须要每次set boolean value; } final static class PostViaHandler extends Handler {//主线程发送队列 PostViaHandler(Looper looper) { super(looper); } void enqueue(Object event, Subscription subscription) { PendingPost pendingPost = PendingPost.obtainPendingPost(event, subscription); //待发送对象 Message message = obtainMessage(); message.obj = pendingPost; if (!sendMessage(message)) {//发送失败:usually because the looper processing the message queue is exiting. throw new RuntimeException("Could not send handler message"); } } @Override public void handleMessage(Message msg) { PendingPost pendingPost = (PendingPost) msg.obj; Object event = pendingPost.event; Subscription subscription = pendingPost.subscription; PendingPost.releasePendingPost(pendingPost); postToSubscribtion(subscription, event); } } } 复制代码
PendingPost.java缓存
package de.greenrobot.event;
import java.util.ArrayList;
import java.util.List;
import de.greenrobot.event.EventBus.Subscription;
final class PendingPost {//使用了享元模式
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;
Subscription subscription;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
static PendingPost obtainPendingPost(Object event, Subscription subscription) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
static void releasePendingPost(PendingPost pendingPost) {
synchronized (pendingPostPool) {
pendingPostPool.add(pendingPost);
}
}
}
复制代码
优势安全
1.在做者的注释中(Class based event bus[指Google guava库中的 event bus], optimized for Android. By default, subscribers will handle events in methods named "onEvent".)能够看到,为了优化性能,弃用了注解,使用固定或指定订阅方法的方式,性能优化
2.轻巧,不依赖Context。一些处理细节上也作了相应的优化,如:使用BooleanWrapper代替Boolean,这样就不须要每次更改值都调用ThreadLocal的set方法;PendingPost的建立采用了享元模式提升了获取其实例的性能,不须要频繁建立和销毁;利用CopyOnWriteArrayList在写入安全的状况下提升了读取性能。bash
3.能指定在主线程中订阅事件。由于一般状况下,咱们会在非UI线程中发送事件来更新UI,而更新UI须要在主线程中执行。并发
不足app
1.为了性能最求性能的极致,失去了订阅方法的灵活性,只能指定onEvent方法才能监听,或者注册时调用指定方法名,没有使用注解那么灵活。ide
2.订阅者接收事件的线程不能根据订阅事件设置。由于常常咱们在一个类中会订阅多个事件,而每一个事件基本上是没有相关性的,在哪一个线程上订阅是根据事件自己来决定的。
三、强制使用事件的传递性,如:咱们发送某个事件,那么这个事件全部的父类及接口都会被强制触发,最好有设置开关,让用户决定。
解读
public void post(Object event) {//String Object
List<Object> eventQueue = currentThreadEventQueue.get(); //每一个线程一个队列,保证同一个线程发送事件的有序性
eventQueue.add(event);
BooleanWrapper isPosting = currentThreadIsPosting.get();
if (isPosting.value) {//当前线程队列正在运转
//在同一个线程中何时会走到这里呢?若是走不到这里那么这个队列就没有意义
return;
} else {//没有运转开始运转
isPosting.value = true;
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0)); //开始发送事件
}
} finally {
isPosting.value = false;
}
}
}
若是咱们能找到队列正在运转的状况下,那么队列就有意义,能保证事件处理的有序性。
举例:
public class RefreshEvent {}
public class Activity1 extends AppCompatActivity{
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
EventBus.getDefault().register(this);
}
@Override
protected void onDestroy() {
EventBus.getDefault().unregister(this);
super.onDestroy();
}
public void onEvent(TestEvent event) {
EventBus.getDefault().post(new RefreshEvent());
}
}
public class ActivityB extends AppCompatActivity{
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
EventBus.getDefault().register(this);
}
@Override
protected void onDestroy() {
EventBus.getDefault().unregister(this);
super.onDestroy();
}
public void onEvent(TestEvent event) {
}
}
咱们在后台业务方法中发送了刷新事件:
EventBus.getDefault().post(new RefreshEvent());
上面的例子中:一、一个事件被多个订阅者订阅;二、订阅者收到事件后再次发送相同事件。
在这种状况下,若是咱们不使用队列,那么在同一个线程中,先发送的事件订阅者就可能会被后收到。
这里使用ThreadLocal+队列,就很巧妙地维护了在同一个线程中事件处理的有序性。
复制代码
bug
public void register(Object subscriber, String methodName, ThreadMode threadMode) 方法存在线程安全的问题,除非调用者都加了锁,但代码中最经常使用的调用public void register(Object subscriber, String methodName, ThreadMode threadMode)却没有加锁。
第一步,实现主逻辑:
EventBus.java 代码
package org.hjb.eventbus;
import android.util.Log;
import androidx.annotation.NonNull;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
public class EventBus {
private static final String TAG = "EventBus";
private static volatile EventBus mDefaultEventBus;
private static final String DEFAULT_METHOD_NAME = "onEvent";
//key:订阅事件
//value:订阅者信息,订阅者信息包括:订阅对象,订阅方法
private final HashMap<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType = new HashMap<>();
private ThreadLocal<ArrayList<Object>> currentThreadEventQueue = new ThreadLocal<ArrayList<Object>>() {
@Override
protected ArrayList<Object> initialValue() {
return new ArrayList<>();
}
};
private ThreadLocal<Boolean> currentThreadIsPosting = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return Boolean.FALSE;
}
};
public EventBus() {
}
public static EventBus getDefault() {
if (mDefaultEventBus == null) {
synchronized (EventBus.class) {
if (mDefaultEventBus == null) {
mDefaultEventBus = new EventBus();
}
}
}
return mDefaultEventBus;
}
/**
* 订阅
*
* @param subscriber 订阅者
*/
public synchronized void register(@NonNull Object subscriber) {//订阅
//保存当前订阅者的全部订阅事件
//由于查询是经过订阅事件来查找的,因此使用map来保存订阅数据
//查找当前订阅对象的全部订阅方法
ArrayList<Method> methods = findSubscriberMethods(subscriber, DEFAULT_METHOD_NAME);
for (Method method : methods) {
//订阅事件
Class<?> eventType = method.getParameterTypes()[0];
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else { //须要判断是否重复添加(同一个事件已经加过订阅者),抛出异常
for (Subscription subscription : subscriptions) {
if (subscription.subscriber == subscriber) {
throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
}
}
}
method.setAccessible(true);//防止私有方法不能访问
subscriptions.add(new Subscription(subscriber, method));
}
}
/**
* 须要查找当前对象全部带onEvent的重载方法,包括父类
* <p>
* 注意点:
* 一、覆盖的方法不要重复添加
* 二、过滤掉系统类,java/javax/android开头的
* 三、只查找单个参数的方法
* 四、私有方法也查找
*
* @param subscriber 订阅者
* @return 全部订阅对象
*/
private ArrayList<Method> findSubscriberMethods(Object subscriber, String methodName) {
ArrayList<Method> subscriberMethods = new ArrayList<>();
Class<?> clazz = subscriber.getClass();
while (clazz != null) {
//过滤系统类
String name = clazz.getName();
if (name.startsWith("java") || name.startsWith("javax") || name.startsWith("android")) {
break;
}
//查找目标方法
for (Method method : clazz.getDeclaredMethods()) {
if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) {
int i = 0, size = subscriberMethods.size();
for (; i < size; i++) {
if (subscriberMethods.get(i).getParameterTypes()[0].equals(method.getParameterTypes()[0])) {
break;
}
}
if (i == size) {//表示不存在,加入
subscriberMethods.add(method);
}
}
}
clazz = clazz.getSuperclass();
}
return subscriberMethods;
}
/**
* 发送事件
* <p>
* 须要给全部订阅者发送事件
* <p>
* 注意:咱们要发送的不只是当前对象的订阅者,同时包括当前父类对象和接口对象的订阅者。
* <p>
* 解决有有序性的问题:咱们须要保证同一个线程中
*
* @param event 事件
*/
public void post(Object event) {
List<Object> eventQueue = currentThreadEventQueue.get();
eventQueue.add(event);
Boolean isPosting = currentThreadIsPosting.get();
if (!isPosting.booleanValue()) {
currentThreadIsPosting.set(Boolean.TRUE);
while (!eventQueue.isEmpty()) {
postEvent(eventQueue.remove(0));
}
currentThreadIsPosting.set(Boolean.FALSE);
} else {
Log.i("hejunbin", "正在运转");
}
}
/**
* 发送事件
*
* @param event 待发送事件
*/
private void postEvent(Object event) {
//一、找到当前对象的全部事件类型
ArrayList<Class<?>> eventTypes = findEventTypes(event.getClass());
//二、找到此事件全部的订阅者
boolean subscriptionFound = false; //是否找到了订阅者
for (Class<?> eventType : eventTypes) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventType);
}
if (subscriptions != null && subscriptions.size() > 0) {
subscriptionFound = true;
for (Subscription subscription : subscriptions) {
try {
subscription.method.invoke(subscription.subscriber, event);
} catch (Exception e) {
Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), e.getCause());
}
}
}
}
if (!subscriptionFound) {
Log.d(TAG, "No subscripers registered for event " + event.getClass());
}
}
/**
* 找到当前对象的全部事件类型(包括父类和接口)
*
* @param eventClass 事件类型
* @return 全部事件类型
*/
private ArrayList<Class<?>> findEventTypes(Class<?> eventClass) {
ArrayList<Class<?>> eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
//接口自己自己存在层级
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
return eventTypes;
}
private void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
eventTypes.add(interfaceClass);
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}
/**
* 取消订阅
*
* @param subscriber 订阅者
*/
public synchronized void unregister(Object subscriber) {//取消订阅
for (Map.Entry<Class<?>, CopyOnWriteArrayList<Subscription>> entry : subscriptionsByEventType.entrySet()) {
int size = entry.getValue().size();
for (int i = 0; i < size; i++) {
if (entry.getValue().get(i).subscriber == subscriber) {
entry.getValue().remove(i);
i--;
size--;
}
}
}
}
/**
* 订阅对象
*/
static class Subscription {
Object subscriber;
Method method;
public Subscription(Object subscriber, Method method) {
this.subscriber = subscriber;
this.method = method;
}
}
}
复制代码
能够在主线程中订阅事件
在Android利用Handler的机制,将事件发送到主队列中执行。
由于须要将订阅信息(Subscription)和事件(event)都须要传递,而Handler中的Message只能传递单个Object,因此本身建立了PendingPost包装类。
EventBus.java代码
package org.hjb.eventbus;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.util.Log;
import androidx.annotation.MainThread;
import androidx.annotation.NonNull;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
public class EventBus {
private static final String TAG = "EventBus";
private static volatile EventBus mDefaultEventBus;
private static final String DEFAULT_METHOD_NAME = "onEvent";
public enum ThreadMode {
PostThread, //post所在线程
MainThread,//主线程
}
//key:订阅事件
//value:订阅者信息,订阅者信息包括:订阅对象,订阅方法
private final HashMap<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType = new HashMap<>();
private ThreadLocal<ArrayList<Object>> currentThreadEventQueue = new ThreadLocal<ArrayList<Object>>() {
@Override
protected ArrayList<Object> initialValue() {
return new ArrayList<>();
}
};
private ThreadLocal<Boolean> currentThreadIsPosting = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return Boolean.FALSE;
}
};
private MainThreadHandler mainThreadHandler;
public EventBus() {
mainThreadHandler = new MainThreadHandler();
}
public static EventBus getDefault() {
if (mDefaultEventBus == null) {
synchronized (EventBus.class) {
if (mDefaultEventBus == null) {
mDefaultEventBus = new EventBus();
}
}
}
return mDefaultEventBus;
}
/**
* 订阅
*
* @param subscriber 订阅者
*/
public synchronized void register(@NonNull Object subscriber) {//订阅
subscribe(subscriber, ThreadMode.PostThread);
}
public synchronized void registerForMainThread(Object subscriber) {
subscribe(subscriber, ThreadMode.MainThread);
}
public synchronized void register(Object subscriber, ThreadMode mode) {
subscribe(subscriber, mode);
}
private void subscribe(Object subscriber, ThreadMode mode) {//须要在线程安全的方法中调用
//保存当前订阅者的全部订阅事件
//由于查询是经过订阅事件来查找的,因此使用map来保存订阅数据
//查找当前订阅对象的全部订阅方法
ArrayList<Method> methods = findSubscriberMethods(subscriber, DEFAULT_METHOD_NAME);
for (Method method : methods) {
//订阅事件
Class<?> eventType = method.getParameterTypes()[0];
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else { //须要判断是否重复添加(同一个事件已经加过订阅者),抛出异常
for (Subscription subscription : subscriptions) {
if (subscription.subscriber == subscriber) {
throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
}
}
}
method.setAccessible(true);//防止私有方法不能访问
subscriptions.add(new Subscription(subscriber, method, mode));
}
}
/**
* 须要查找当前对象全部带onEvent的重载方法,包括父类
* <p>
* 注意点:
* 一、覆盖的方法不要重复添加
* 二、过滤掉系统类,java/javax/android开头的
* 三、只查找单个参数的方法
* 四、私有方法也查找
*
* @param subscriber 订阅者
* @return 全部订阅对象
*/
private ArrayList<Method> findSubscriberMethods(Object subscriber, String methodName) {
ArrayList<Method> subscriberMethods = new ArrayList<>();
Class<?> clazz = subscriber.getClass();
while (clazz != null) {
//过滤系统类
String name = clazz.getName();
if (name.startsWith("java") || name.startsWith("javax") || name.startsWith("android")) {
break;
}
//查找目标方法
for (Method method : clazz.getDeclaredMethods()) {
if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) {
int i = 0, size = subscriberMethods.size();
for (; i < size; i++) {
if (subscriberMethods.get(i).getParameterTypes()[0].equals(method.getParameterTypes()[0])) {
break;
}
}
if (i == size) {//表示不存在,加入
subscriberMethods.add(method);
}
}
}
clazz = clazz.getSuperclass();
}
return subscriberMethods;
}
/**
* 发送事件
* <p>
* 须要给全部订阅者发送事件
* <p>
* 注意:咱们要发送的不只是当前对象的订阅者,同时包括当前父类对象和接口对象的订阅者。
* <p>
* 解决有有序性的问题:咱们须要保证同一个线程中
*
* @param event 事件
*/
public void post(Object event) {
List<Object> eventQueue = currentThreadEventQueue.get();
eventQueue.add(event);
Boolean isPosting = currentThreadIsPosting.get();
if (!isPosting.booleanValue()) {
currentThreadIsPosting.set(Boolean.TRUE);
while (!eventQueue.isEmpty()) {
postEvent(eventQueue.remove(0));
}
currentThreadIsPosting.set(Boolean.FALSE);
} else {
Log.i("hejunbin", "正在运转");
}
}
/**
* 发送事件
*
* @param event 待发送事件
*/
private void postEvent(Object event) {
//一、找到当前对象的全部事件类型
ArrayList<Class<?>> eventTypes = findEventTypes(event.getClass());
//二、找到此事件全部的订阅者
boolean subscriptionFound = false; //是否找到了订阅者
for (Class<?> eventType : eventTypes) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventType);
}
if (subscriptions != null && subscriptions.size() > 0) {
subscriptionFound = true;
for (Subscription subscription : subscriptions) {
if (subscription.mode == ThreadMode.PostThread) {
postEvent(subscription, event);
} else if (subscription.mode == ThreadMode.MainThread) {//在主线程中发送
mainThreadHandler.enqueue(subscription, event);
} else {//非法
throw new IllegalStateException("Unknown thread mode: " + subscription.mode);
}
}
}
}
if (!subscriptionFound) {
Log.d(TAG, "No subscripers registered for event " + event.getClass());
}
}
private static void postEvent(Subscription subscription, Object event) {
try {
subscription.method.invoke(subscription.subscriber, event);
} catch (Exception e) {
Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), e.getCause());
}
}
/**
* 找到当前对象的全部事件类型(包括父类和接口)
*
* @param eventClass 事件类型
* @return 全部事件类型
*/
private ArrayList<Class<?>> findEventTypes(Class<?> eventClass) {
ArrayList<Class<?>> eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
//接口自己自己存在层级
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
return eventTypes;
}
private void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
eventTypes.add(interfaceClass);
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}
/**
* 取消订阅
*
* @param subscriber 订阅者
*/
public synchronized void unregister(Object subscriber) {//取消订阅
for (Map.Entry<Class<?>, CopyOnWriteArrayList<Subscription>> entry : subscriptionsByEventType.entrySet()) {
int size = entry.getValue().size();
for (int i = 0; i < size; i++) {
if (entry.getValue().get(i).subscriber == subscriber) {
entry.getValue().remove(i);
i--;
size--;
}
}
}
}
/**
* 订阅对象
*/
static class Subscription {
Object subscriber;
Method method;
ThreadMode mode;
public Subscription(Object subscriber, Method method, ThreadMode mode) {
this.subscriber = subscriber;
this.method = method;
this.mode = mode;
}
}
static class MainThreadHandler extends Handler {
MainThreadHandler() {
super(Looper.getMainLooper());
}
void enqueue(Subscription subscription, Object event) {
Message message = Message.obtain();
message.obj = new PendingPost(subscription, event);
//如今有两个对象须要传递,但Message只能传递单个对象,而Java没有元祖类型,因此须要本身建立对象或者放入数组中
if (!sendMessage(message)) {//发送失败:usually because the looper processing the message queue is exiting.
throw new RuntimeException("Could not send handler message");
}
}
@Override
public void handleMessage(Message msg) {
PendingPost pendingPost = (PendingPost) msg.obj;
postEvent(pendingPost.subscription, pendingPost.event);
}
}
}
复制代码
PendingPost.java代码
package de.greenrobot.event;
import java.util.ArrayList;
import java.util.List;
import de.greenrobot.event.EventBus.Subscription;
final class PendingPost {
Object event;
Subscription subscription;
PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
复制代码
性能优化
findSubscriberMethods 每次都要循环遍历查找,能够第一次查找后缓存起来,下次直接从缓存中获取。(咱们常常在进入页面时register,在返回页面时unregister,当第二次进入时就可使用缓存中获取了)
实现代码为:
private static final Map<String, ArrayList<Method>> methodCache = new HashMap<>();
/**
* 须要查找当前对象全部带onEvent的重载方法,包括父类
* <p>
* 注意点:
* 一、覆盖的方法不要重复添加
* 二、过滤掉系统类,java/javax/android开头的
* 三、只查找单个参数的方法
* 四、私有方法也查找
*
* @param subscriberClass 订阅者类
* @return 全部订阅对象
*/
private ArrayList<Method> findSubscriberMethods(Class<?> subscriberClass, String methodName) {
String key = subscriberClass.getName() + '.' + methodName; //类名+方法名为Key
ArrayList<Method> subscriberMethods = methodCache.get(key);
if (subscriberMethods != null) {
return subscriberMethods;
}
synchronized (methodCache) {
subscriberMethods = methodCache.get(key);
if (subscriberMethods != null) {
return subscriberMethods;
}
subscriberMethods = new ArrayList<>();
Class<?> clazz = subscriberClass;
while (clazz != null) {
//过滤系统类
String name = clazz.getName();
if (name.startsWith("java") || name.startsWith("javax") || name.startsWith("android")) {
break;
}
//查找目标方法
for (Method method : clazz.getDeclaredMethods()) {
if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) {
int i = 0, size = subscriberMethods.size();
for (; i < size; i++) {
if (subscriberMethods.get(i).getParameterTypes()[0].equals(method.getParameterTypes()[0])) {
break;
}
}
if (i == size) {//表示不存在,加入
subscriberMethods.add(method);
}
}
}
clazz = clazz.getSuperclass();
}
if (subscriberMethods.isEmpty()) {//没有订阅方法,抛出异常
throw new RuntimeException("Subscriber " + subscriberClass + " has no methods called " + methodName);
} else {
methodCache.put(key, subscriberMethods);
return subscriberMethods;
}
}
}
复制代码
findEventTypes 每次发送事件都会调用,而内部实现也须要循环遍历,也能够经过缓存来提升性能。
修改代码为:
private static final Map<Class<?>, ArrayList<Class<?>>> eventTypesCache = new HashMap<>();
/**
* 找到当前对象的全部事件类型(包括父类和接口)
*
* @param eventClass 事件类型
* @return 全部事件类型
*/
private ArrayList<Class<?>> findEventTypes(Class<?> eventClass) {
ArrayList<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes != null) {
return eventTypes;
}
synchronized (eventTypesCache) {
eventTypes = eventTypesCache.get(eventClass);
if (eventTypes != null) {
return eventTypes;
}
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
//接口自己自己存在层级
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
return eventTypes;
}
}
复制代码
unregister方法,无论有没有订阅过,都须要双层遍历,时间复杂度为O(n2),能够创建以subscriber为key的哈希表,将时间复杂度降为O(n)。
修改代码为:
//存储一个订阅对象的全部订阅事件(key:订阅对象,value:全部订阅事件)
private final HashMap<Object, List<Class<?>>> eventTypesBySubscriber = new HashMap<>();
private void subscribe(Object subscriber, ThreadMode mode) {//须要在线程安全的方法中调用
//保存当前订阅者的全部订阅事件
//由于查询是经过订阅事件来查找的,因此使用map来保存订阅数据
//查找当前订阅对象的全部订阅方法
ArrayList<Method> methods = findSubscriberMethods(subscriber.getClass(), DEFAULT_METHOD_NAME);
for (Method method : methods) {
//订阅事件
Class<?> eventType = method.getParameterTypes()[0];
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else { //须要判断是否重复添加(同一个事件已经加过订阅者),抛出异常
for (Subscription subscription : subscriptions) {
if (subscription.subscriber == subscriber) {
throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
}
}
}
method.setAccessible(true);//防止私有方法不能访问
subscriptions.add(new Subscription(subscriber, method, mode));
//订阅对象的所订阅的数据类型
List<Class<?>> subscribedEvents = eventTypesBySubscriber.get(subscriber);//订阅者为key,订阅的事件为value,一对多
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
eventTypesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
}
}
/**
* 取消订阅
*
* @param subscriber 订阅者
*/
public synchronized void unregister(Object subscriber) {//取消订阅
List<Class<?>> eventTypes = eventTypesBySubscriber.get(subscriber);
if (eventTypes == null) {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
return;
}
for (Class<?> eventType : eventTypes) { //虽然这里也是循环,可是这里的只是单个订阅者中的事件,而不是全局的事件的遍历
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size =subscriptions.size();
for (int i = 0; i < size; i++) {
if (subscriptions.get(i).subscriber == subscriber) {
subscriptions.remove(i);
i--;
size--;
}
}
}
}
eventTypesBySubscriber.remove(subscriber);
}
复制代码
PendingPost使用享元模式(重复利用对象)
修改代码为:
package org.hjb.eventbus;
import java.util.ArrayList;
import java.util.List;
import org.hjb.eventbus.EventBus.Subscription;
public class PendingPost { //使用了享元模式
private static final List<PendingPost> pendingPostPool = new ArrayList<>(0);
Subscription subscription;
Object event;
private PendingPost(EventBus.Subscription subscription, Object event) {
this.subscription = subscription;
this.event = event;
}
static PendingPost obtain(Subscription subscription, Object event) {
synchronized (pendingPostPool) {//看池子里是否有对象,若是有,使用池子里但对象
// if (!pendingPostPool.isEmpty()) {
// PendingPost pendingPost = pendingPostPool.remove(0);
// pendingPost.subscription = subscription;
// pendingPost.event = event;
// return pendingPost;
// }
//上面注释的代码使用第一个元素,应该使用最后一个元素,
//由于使用的是数组存储,若是remove第一个的话,那么每次取出的操做都涉及到数组的搬移操做时间复杂度为O(n)
//若是删除最后一个元素,不涉及搬移操做时间复杂度为O(1)
//因此应该写成
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.subscription = subscription;
pendingPost.event = event;
return pendingPost;
}
}
return new PendingPost(subscription, event);//没有,建立
}
void recycle() {
synchronized (pendingPostPool) {
pendingPostPool.add(this);
}
}
}
复制代码
使用时也须要修改,代码为:
static class MainThreadHandler extends Handler {
MainThreadHandler() {
super(Looper.getMainLooper());
}
void enqueue(Subscription subscription, Object event) {
Message message = Message.obtain();
//如今有两个对象须要传递,但Message只能传递单个对象,而Java没有元祖类型,因此须要本身建立对象或者放入数组中
message.obj = PendingPost.obtain(subscription, event);
if (!sendMessage(message)) {//发送失败:usually because the looper processing the message queue is exiting.
throw new RuntimeException("Could not send handler message");
}
}
@Override
public void handleMessage(Message msg) {
PendingPost pendingPost = (PendingPost) msg.obj;
postEvent(pendingPost.subscription, pendingPost.event);
pendingPost.recycle();//回收
}
}
复制代码
咱们能够看出,1.0.1版本仍是有些bug和不足的,后面我将分析2.x和3.x版本。