前期因为工做中须要将EventBus
替换成RxBus
,因此按照EventBus
的用法封装了一套本身的RxBus
,基本知足了使用,项目发出后有很多兄弟告诉我没有EventBus
的Sticky
功能,因此得闲完善这个功能,一样是按照EventBus3.0
注解的方式去实现和调用java
在Android开发中,Sticky事件只指事件消费者在事件发布以后才注册的也能接收到该事件的特殊类型。Android中就有这样的实例,也就是Sticky Broadcast,即粘性广播。正常状况下若是发送者发送了某个广播,而接收者在这个广播发送后才注册本身的Receiver,这时接收者便没法接收到刚才的广播,为此Android引入了StickyBroadcast,在广播发送结束后会保存刚刚发送的广播(Intent),这样当接收者注册完Receiver后就能够接收到刚才已经发布的广播。这就使得咱们能够预先处理一些事件,让有消费者时再把这些事件投递给消费者。github
彻底按照EventBus3.0版本的注解的方式去使用安全
RxBus.getDefault().post(new EventStickText("我是sticky消息"));复制代码
@Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
public void event(EventStickText eventStickText) {
Observable.timer(1, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe(aLong -> {
textView.setText(eventStickText.getMsg());
});
}
@Override
protected void onStart() {
super.onStart();
RxBus.getDefault().register(this);
}
@Override
protected void onDestroy() {
super.onDestroy();
RxBus.getDefault().unRegister(this);
}复制代码
本篇的实现原理是基于以前的RxBus
封装的基础上的完善,因此须要大体了解RxBus
以前基本功能的封装原理方能更加全面的理解一下的内容ide
sticky
注解不懂注解的同窗能够先看下以前我写的两瓶关于注解的博客this
Java-注解详解spa
Android-注解详解.net
这里添加boolean sticky()
的方法,而且默认指定参数false
线程
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Subscribe {
int code() default -1;
ThreadMode threadMode() default ThreadMode.CURRENT_THREAD;
boolean sticky() default false;
}复制代码
sticky
加入数据封装类中将数据新加入的注解sticky
加入到数据封装类中,在最后分发事件时用于区分sticky
事件
public class SubscriberMethod {
public Method method;
public ThreadMode threadMode;
public Class<?> eventType;
public Object subscriber;
public int code;
public boolean sticky;
public SubscriberMethod(Object subscriber, Method method, Class<?> eventType, int code,ThreadMode threadMode,boolean sticky ) {
this.method = method;
this.threadMode = threadMode;
this.eventType = eventType;
this.subscriber = subscriber;
this.code = code;
this.sticky=sticky;
}
******
}复制代码
post
事件分析由于sticky
的特殊性,而自带RxJava
提供给咱们四种方式处理Subject分发
ReplaySubject
在订阅者订阅时,会发送全部的数据给订阅者,不管它们是什么时候订阅的。
PublishSubject
只会给在订阅者订阅的时间点以后的数据发送给观察者。
AsyncSubject
只在原Observable
事件序列完成后,发送最后一个数据,后续若是还有订阅者继续订阅该Subject
, 则能够直接接收到最后一个值。
BehaviorSubject
在订阅者订阅时,会发送其最近发送的数据(若是此时尚未收到任何数据,它会发送一个默认值)。
因此只有ReplaySubject
和BehaviorSubject
具有Sticky
的特性。
可是:这两种方式都不适合
BehaviorSubject
由于只是保留最近一次的事件,这样会致使事件的覆盖问题
ReplaySubject能解决BehaviorSubject的事件丢失的问题,能保存全部的事件,可是分发起来确实一个难点,暂时没有找到合适的处理方法
还有咱们以前的封装采用的PublishSubject
的实现方式去分发RxBus
的事件,若是换成任何其余的分发机制都会致使sticky事件和正常事件数据须要独立来作,成本过高
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}复制代码
解决办法:经过Map<事件类型,事件>手动记录消息事件,和PublishSubject
数据统一块儿来处理,避免速度的独立,这里选择线程安全的ConcurrentHashMap
ConcurrentHashMap
记录事件初始化
/*stick数据*/
private final Map<Class<?>, Object> stickyEvent =new ConcurrentHashMap<>();复制代码
在post
方法中添加事件
/** * 提供了一个新的事件,单一类型 * * @param o 事件数据 */
public void post(Object o) {
synchronized (stickyEvent) {
stickyEvent.put(o.getClass(), o);
}
bus.onNext(o);
}复制代码
sticky
注解获得Observable
对象在register(Object subscriber)
方法中经过反射获得自定义注解的数据,而后放入到自定义的数据类型SubscriberMethod
中
/** * 注册 * * @param subscriber 订阅者 */
public void register(Object subscriber) {
/*避免重复建立*/
if(eventTypesBySubscriber.containsKey(subscriber)){
return;
}
Class<?> subClass = subscriber.getClass();
Method[] methods = subClass.getDeclaredMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(Subscribe.class)) {
//得到参数类型
Class[] parameterType = method.getParameterTypes();
//参数不为空 且参数个数为1
if (parameterType != null && parameterType.length == 1) {
Class eventType = parameterType[0];
addEventTypeToMap(subscriber, eventType);
Subscribe sub = method.getAnnotation(Subscribe.class);
int code = sub.code();
ThreadMode threadMode = sub.threadMode();
boolean sticky = sub.sticky();
SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber, method, eventType, code, threadMode,
sticky);
addSubscriberToMap(eventType, subscriberMethod);
addSubscriber(subscriberMethod);
}
}
}
}复制代码
当事件触发之后,经过SubscriberMethod
记录的数据生成不一样的Observable对象,如今对sticky消息增长了响应的对象处理
/** * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者 */
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
synchronized (stickyEvent) {
Observable<T> observable = bus.ofType(eventType);
final Object event = stickyEvent.get(eventType);
if (event != null) {
return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
subscriber.onNext(eventType.cast(event));
}
}));
} else {
return observable;
}
}
}复制代码
这里使用merge
操做符:能够将多个Observables
合并生成一个Observable。
Observable
对象分发事件获得sticky的压缩Observable
对象后,还须要按照注解中被指定的线程去触发事件任务
/** * 用于处理订阅事件在那个线程中执行 * * @param observable * @param subscriberMethod * @return */
private Observable postToObservable(Observable observable, SubscriberMethod subscriberMethod) {
switch (subscriberMethod.threadMode) {
case MAIN:
observable.observeOn(AndroidSchedulers.mainThread());
break;
case NEW_THREAD:
observable.observeOn(Schedulers.newThread());
break;
case CURRENT_THREAD:
observable.observeOn(Schedulers.immediate());
break;
case IO:
observable.observeOn(Schedulers.io());
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
}
return observable;
}复制代码
这里很简单,直接运用RxJava和RxAndroid的线程去管理便可
由于这里的sticky消息采用的是map队列记录的方式去实现,因此当sticky消息不在被需求记录的时候,或者程序退出,须要手动清空map队列的数据,避免内存溢出和浪费
/** * 移除指定eventType的Sticky事件 */
public <T> T removeStickyEvent(Class<T> eventType) {
synchronized (stickyEvent) {
return eventType.cast(stickyEvent.remove(eventType));
}
}
/** * 移除全部的Sticky事件 */
public void removeAllStickyEvents() {
synchronized (stickyEvent) {
stickyEvent.clear();
}
}复制代码
经过sticky消息的完善,RxBus已经彻底实现了EventBus3.0的所有功能,而且所有安装EventBus3.0的使用方法来封装,方便项目的迁移和使用。
注解方式定义
post方式分发事件
sticky消息功能
注册销毁简单化