RxBus-实现EventBus之Sticky

背景

前期因为工做中须要将EventBus替换成RxBus,因此按照EventBus的用法封装了一套本身的RxBus,基本知足了使用,项目发出后有很多兄弟告诉我没有EventBusSticky功能,因此得闲完善这个功能,一样是按照EventBus3.0注解的方式去实现和调用java

RxBus-实现EventBus之postgit


效果

这里写图片描述

sticky是什么

在Android开发中,Sticky事件只指事件消费者在事件发布以后才注册的也能接收到该事件的特殊类型。Android中就有这样的实例,也就是Sticky Broadcast,即粘性广播。正常状况下若是发送者发送了某个广播,而接收者在这个广播发送后才注册本身的Receiver,这时接收者便没法接收到刚才的广播,为此Android引入了StickyBroadcast,在广播发送结束后会保存刚刚发送的广播(Intent),这样当接收者注册完Receiver后就能够接收到刚才已经发布的广播。这就使得咱们能够预先处理一些事件,让有消费者时再把这些事件投递给消费者。github

使用

彻底按照EventBus3.0版本的注解的方式去使用安全

  • 发送消息
    RxBus.getDefault().post(new EventStickText("我是sticky消息"));复制代码
  • 接收消息
@Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
    public void event(EventStickText eventStickText) {
        Observable.timer(1, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe(aLong -> {
            textView.setText(eventStickText.getMsg());
        });
    }


    @Override
    protected void onStart() {
        super.onStart();
        RxBus.getDefault().register(this);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        RxBus.getDefault().unRegister(this);
    }复制代码

实现

本篇的实现原理是基于以前的RxBus封装的基础上的完善,因此须要大体了解RxBus以前基本功能的封装原理方能更加全面的理解一下的内容ide

原RxBus基本功能实现原理:EventBus彻底同样的RxBuspost

1.添加sticky注解

不懂注解的同窗能够先看下以前我写的两瓶关于注解的博客this

Java-注解详解spa

Android-注解详解.net

这里添加boolean sticky()的方法,而且默认指定参数false线程

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Subscribe {
    int code() default -1;
    ThreadMode threadMode() default ThreadMode.CURRENT_THREAD;
    boolean sticky() default false;
}复制代码

2.将sticky加入数据封装类中

将数据新加入的注解sticky加入到数据封装类中,在最后分发事件时用于区分sticky事件

public class SubscriberMethod {
    public Method method;
    public ThreadMode threadMode;
    public Class<?> eventType;
    public Object subscriber;
    public int code;
    public boolean sticky;

    public SubscriberMethod(Object subscriber, Method method, Class<?> eventType, int code,ThreadMode threadMode,boolean sticky ) {
        this.method = method;
        this.threadMode = threadMode;
        this.eventType = eventType;
        this.subscriber = subscriber;
        this.code = code;
        this.sticky=sticky;
    }
 ******
 }复制代码

3.记录post事件分析

由于sticky的特殊性,而自带RxJava提供给咱们四种方式处理Subject分发

  • ReplaySubject在订阅者订阅时,会发送全部的数据给订阅者,不管它们是什么时候订阅的。

  • PublishSubject只会给在订阅者订阅的时间点以后的数据发送给观察者。

  • AsyncSubject只在原Observable事件序列完成后,发送最后一个数据,后续若是还有订阅者继续订阅该Subject, 则能够直接接收到最后一个值。

  • BehaviorSubject在订阅者订阅时,会发送其最近发送的数据(若是此时尚未收到任何数据,它会发送一个默认值)。

因此只有ReplaySubjectBehaviorSubject具有Sticky的特性。

可是:这两种方式都不适合

  • BehaviorSubject由于只是保留最近一次的事件,这样会致使事件的覆盖问题

  • ReplaySubject能解决BehaviorSubject的事件丢失的问题,能保存全部的事件,可是分发起来确实一个难点,暂时没有找到合适的处理方法

  • 还有咱们以前的封装采用的PublishSubject的实现方式去分发RxBus的事件,若是换成任何其余的分发机制都会致使sticky事件和正常事件数据须要独立来作,成本过高

public RxBus() {
        bus = new SerializedSubject<>(PublishSubject.create());
    }复制代码

解决办法:经过Map<事件类型,事件>手动记录消息事件,和PublishSubject数据统一块儿来处理,避免速度的独立,这里选择线程安全的ConcurrentHashMap

4.ConcurrentHashMap记录事件

初始化

/*stick数据*/
    private final Map<Class<?>, Object> stickyEvent =new ConcurrentHashMap<>();复制代码

post方法中添加事件

/** * 提供了一个新的事件,单一类型 * * @param o 事件数据 */
    public void post(Object o) {
        synchronized (stickyEvent) {
            stickyEvent.put(o.getClass(), o);
        }
        bus.onNext(o);
    }复制代码

5.经过sticky注解获得Observable对象

register(Object subscriber)方法中经过反射获得自定义注解的数据,而后放入到自定义的数据类型SubscriberMethod

/** * 注册 * * @param subscriber 订阅者 */
    public void register(Object subscriber) {
          /*避免重复建立*/
        if(eventTypesBySubscriber.containsKey(subscriber)){
            return;
        }
        Class<?> subClass = subscriber.getClass();
        Method[] methods = subClass.getDeclaredMethods();
        for (Method method : methods) {
            if (method.isAnnotationPresent(Subscribe.class)) {
                //得到参数类型
                Class[] parameterType = method.getParameterTypes();
                //参数不为空 且参数个数为1
                if (parameterType != null && parameterType.length == 1) {

                    Class eventType = parameterType[0];

                    addEventTypeToMap(subscriber, eventType);
                    Subscribe sub = method.getAnnotation(Subscribe.class);
                    int code = sub.code();
                    ThreadMode threadMode = sub.threadMode();
                    boolean sticky = sub.sticky();

                    SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber, method, eventType, code, threadMode,
                            sticky);
                    addSubscriberToMap(eventType, subscriberMethod);

                    addSubscriber(subscriberMethod);
                }
            }
        }
    }复制代码

当事件触发之后,经过SubscriberMethod记录的数据生成不一样的Observable对象,如今对sticky消息增长了响应的对象处理

/** * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者 */
    public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
        synchronized (stickyEvent) {
            Observable<T> observable = bus.ofType(eventType);
            final Object event = stickyEvent.get(eventType);

            if (event != null) {
                return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() {
                    @Override
                    public void call(Subscriber<? super T> subscriber) {
                        subscriber.onNext(eventType.cast(event));
                    }
                }));
            } else {
                return observable;
            }
        }
    }复制代码

这里使用merge操做符:能够将多个Observables合并生成一个Observable。

6.Observable对象分发事件

获得sticky的压缩Observable对象后,还须要按照注解中被指定的线程去触发事件任务

/** * 用于处理订阅事件在那个线程中执行 * * @param observable * @param subscriberMethod * @return */
    private Observable postToObservable(Observable observable, SubscriberMethod subscriberMethod) {

        switch (subscriberMethod.threadMode) {
            case MAIN:
                observable.observeOn(AndroidSchedulers.mainThread());
                break;
            case NEW_THREAD:
                observable.observeOn(Schedulers.newThread());
                break;
            case CURRENT_THREAD:
                observable.observeOn(Schedulers.immediate());
                break;
            case IO:
                observable.observeOn(Schedulers.io());
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
        }
        return observable;
    }复制代码

这里很简单,直接运用RxJava和RxAndroid的线程去管理便可

7.sticky消息的销毁

由于这里的sticky消息采用的是map队列记录的方式去实现,因此当sticky消息不在被需求记录的时候,或者程序退出,须要手动清空map队列的数据,避免内存溢出和浪费

/** * 移除指定eventType的Sticky事件 */
    public <T> T removeStickyEvent(Class<T> eventType) {
        synchronized (stickyEvent) {
            return eventType.cast(stickyEvent.remove(eventType));
        }
    }

    /** * 移除全部的Sticky事件 */
    public void removeAllStickyEvents() {
        synchronized (stickyEvent) {
            stickyEvent.clear();
        }
    }复制代码

结果

经过sticky消息的完善,RxBus已经彻底实现了EventBus3.0的所有功能,而且所有安装EventBus3.0的使用方法来封装,方便项目的迁移和使用。

  • 注解方式定义

  • post方式分发事件

  • sticky消息功能

  • 注册销毁简单化


源码

传送门-GitHub项目地址-戳我


建议

若是你对这套封装有任何的问题和建议欢迎加入QQ群告诉我!

相关文章
相关标签/搜索