Android 使用RxJava实现一个发布/订阅事件总线

1.简单介绍

1.1.发布/订阅事件主要用于网络请求的回调。html

  事件总线可使Android各组件之间的通讯变得简单,并且能够解耦。react

  其实RxJava实现事件总线和EventBus比较相似,他们都依据与观察者模式。git

  我的比较习惯用RxJava来实现,由于很是简单而清晰。github

  

 

1.2.固然EventBus实现总线的方式也有不少人用。web

  这里给个传送门==>EventBus的github地址:https://github.com/greenrobot/EventBus网络

  而后Otto实现总线也不错==>Otto的github地址:https://github.com/square/ottoapp

 

 

1.3.使用RxJava的好处以及注意点ide

  最明显的好处就是:项目体积缩小了。函数

  注意:使用RxLifecycle来解决RxJava内存泄漏的问题。工具

  ==>参考个人另外一篇博客:RxLifecycle第三方库的使用。

  

 

1.4.理解一下观察者模式。

  这是一种行为模式。

  当你的类或者主对象(称为被观察者)的状态发生改变就会通知全部对此感兴趣的类或对象(称为观察者)。

  详情了解请参考这篇文章:观察者模式--千军万马穿云箭。

 

 

1.5.理解一下发布/订阅

  发布/订阅 模式的功能和观察者模式是同样的,都是完成特定事件发生后的消息通知。

  观察者模式和发布/订阅模式之间仍是存在了一些差异,在发布/订阅模式中重点是发布消息,而后由调度中心

  统一调度,不须要知道具体有哪些订阅者。(这样就能够匿名)

为何要匿名?
在计算机程序设计中有一个很是棒的思想叫“解耦”。你一般但愿在你的设计中保持尽量低的耦合度。
一般状况下,你但愿消息发布商可以直接了解全部须要接收消息的订阅者,
这样,一旦“事件”或消息准备好就能够及时通知每个订阅者。
可是使用事件总线,发布者能够免除这种职责并实现独立性,
由于消息发布者和消息订阅者能够相互不知道对方,只关心对应的消息,从而接触二者之间的依赖关系
怎么实现匿名?
提到匿名,天然而然你就会问:你是如何真正实现发布者和订阅者之间的匿名? 
很简单,只要找到一个中间人,让这个中间人负责两方的消息调度。事件总线就是一个这样的中间人。 综上所述,事件总线就是这么简单。

 

 

1.6.使用RxJava实现事件总线的简单案例

  案例来源:用RxJava实现事件总线-RxBus。

  github参考案例地址:https://github.com/kaushikgopal/RxJava-Android-Samples

  以下面的例子:

  咱们从顶部片断(绿色部分)发布事件,并从底部片断(蓝色部分)监听点击事件(经过事件总线)。

  

  怎么实现这个功能呢?

  第一步自定义一个事件总线 

public class RxBus {
 
  private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());
 
  public void send(Object o) {
    _bus.onNext(o);
  }
 
  public Observable<Object> toObserverable() {
    return _bus;
  }
}

  第二步将事件发布到总线中。

@OnClick(R.id.btn_demo_rxbus_tap)
public void onTapButtonClicked() {
 
    _rxBus.send(new TapEvent());
}

  第三步监听来自其余组件或服务事件

_rxBus.toObserverable()
    .subscribe(new Action1<Object>() {
      @Override
      public void call(Object event) {
 
        if(event instanceof TapEvent) {
          _showTapText();
 
        }else if(event instanceof SomeOtherEvent) {
          _doSomethingElse();
        }
      }
    });

 

 

1.7.本篇文章的参考文献

  Android RxJava实现RxBus。

  Android基于RxJava、RxAndroid的EventBus实现。

  用RxJava实现事件总线-RxBus。


2.封装好的总线类

2.1.RxJava1.x的总线实现方式

/**
 * desc   : 利用 PublishSubject的特性:与普通的Subject不一样,在订阅时并不当即触发订阅事件,
 * 而是容许咱们在任意时刻手动调用onNext(),onError(),onCompleted来触发事件。
 */
public class RxBus {

    private ConcurrentHashMap<Object, List<Subject>> subjectMapper = new ConcurrentHashMap<>();

    private RxBus() {

    }

    private static class Holder {
        private static RxBus instance = new RxBus();
    }

    public static RxBus getInstance() {
        return Holder.instance;
    }

    public <T> Observable<T> register(@NonNull Class<T> clz) {
        return register(clz.getName());
    }

    public <T> Observable<T> register(@NonNull Object tag) {
        List<Subject> subjectList = subjectMapper.get(tag);
        if (null == subjectList) {
            subjectList = new ArrayList<>();
            subjectMapper.put(tag, subjectList);
        }

        Subject<T, T> subject = PublishSubject.create();
        subjectList.add(subject);

        //System.out.println("注册到rxbus");
        return subject;
    }

    public <T> void unregister(@NonNull Class<T> clz, @NonNull Observable observable) {
        unregister(clz.getName(), observable);
    }

    public void unregister(@NonNull Object tag, @NonNull Observable observable) {
        List<Subject> subjects = subjectMapper.get(tag);
        if (null != subjects) {
            subjects.remove(observable);
            if (subjects.isEmpty()) {
                subjectMapper.remove(tag);
                //System.out.println("从rxbus取消注册");
            }
        }
    }

    public void post(@NonNull Object content) {
        post(content.getClass().getName(), content);
    }

    public void post(@NonNull Object tag, @NonNull Object content) {
        List<Subject> subjects = subjectMapper.get(tag);
        if (!subjects.isEmpty()) {
            for (Subject subject: subjects) {
                subject.onNext(content);
            }
        }
    }
}
几个关键方法: 
register —— 由tag,生成一个subject List,同时利用PublishSubject建立一个Subject并返回,
它同时也是Observable的子类。
unregister —— 移除tag对应subject List 中的Observable。若subject List为空,也将被移除。
post —— 遍历tag对应subject List 中的Subject,执行onNext()。
这里实际执行的是观察者Observer的onNext(),
Subject的定义:
public abstract class Subject<T, R> extends Observable<R> implements Observer<T>。

  测试代码:

/*
rxbus
 */
Observable<String> observable = RxBus.getInstance().register(String.class);
observable.map(s -> {
    try {
        int v = Integer.valueOf(s);
        System.out.println("map变换成功, source = " + s);
        return v;
    } catch (Exception e) {
        System.out.println("map变换失败, source = " + s);
        return s;
    }
}).subscribe(value -> {
    System.out.println("订阅 " + value);
});

RxBus.getInstance().post("888");
RxBus.getInstance().post("发发发");
RxBus.getInstance().unregister(String.class, observable);
//这里比较有意思的是,使用了lambda表达式。
//在map变换时,若是将字符串转成Integer,没有问题就返回整型;
//若报异常,就返回String型。
//一样的,在最终订阅时,value参数的类型也是由map变换来决定的。

 

 

2.2.RxJava2.0总线实现类

  由于在RxJava2.0以后,io.reactivex.Observable中没有进行背压处理了。

  若是有大量消息堆积在总线中来不及处理会产生OutOfMemoryError。

  有新类io.reactivex.Flowable专门针对背压问题。

 

  无背压处理的Observable实现,跟RxJava1.0x中同样,使用PublishSubject来实现。

  要实现有背压的2.0x版,使用FlowableProcessor的子类PublishProcessor来产生Flowable。

  

  源代码以下:

public class RxBus {

    private final FlowableProcessor<Object> mBus;

    private RxBus() {
        mBus = PublishProcessor.create().toSerialized();
    }

    private static class Holder {
        private static RxBus instance = new RxBus();
    }

    public static RxBus getInstance() {
        return Holder.instance;
    }

    public void post(@NonNull Object obj) {
        mBus.onNext(obj);
    }

    public <T> Flowable<T> register(Class<T> clz) {
        return mBus.ofType(clz);
    }

    public void unregisterAll() {
        //会将全部由mBus 生成的 Flowable 都置  completed 状态  后续的 全部消息  都收不到了
        mBus.onComplete();
    }

    public boolean hasSubscribers() {
        return mBus.hasSubscribers();
    }

}

  测试代码:

Flowable<Integer> f1 = RxBus.getInstance().register(Integer.class);
f1.subscribe(value -> System.out.println("订阅f1消息 .. " + value));
RxBus.getInstance().post(999);

 


3.实际项目调用方式

3.1.首先自定义一个RxBus。

  这个类感受有点像工具类。和其余函数没有任何耦合关系。

  这个类见在上面2中封装好的RxBus类。

 

 

3.2.在BaseListFragment实现了LazyLoadFragment中的抽象函数。

  这里解释一下:

  BaseListFragment是一个能够刷新能够加载更多的一个碎片。

  LazyLoadFragment是一个懒加载的被BaseListFragmetn继承的一个基类。

  LazyLoadFragment经过判断是否可见的函数setUserVisibleHint执行了一个抽象函数fetchData()。

  adapter是页面内容的一个适配器。

  而后在BaseListFragment中重写这个抽象函数。

 @Override
    public void fetchData() {
        observable = RxBus.getInstance().register(BaseListFragment.TAG);
        observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                adapter.notifyDataSetChanged();
            }
        });
    }

  observable.subscribe(new Consumer<Integer>)返回的是一个Disposable类型。

  以下面Disposable的简单使用方式。

 Disposable disposable = observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                  //这里接收数据项
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
              //这里接收onError
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
              //这里接收onComplete。
            }
        });

 

 

3.3.小贴士

  RxBus的注册与反注册必定要对应出现。

  通常在活动或者Fragment中的onStart中register这个活动或者片断的TAG(也就是一个惟一标识字符串)。

  通常在活动或者Fragment中的onDestroy中ungister这个活动或者片断的TAG。

  post用于传递消息,看状况调用呗。

相关文章
相关标签/搜索