这一系列文章原本我发表在简书。最近开始转移到掘金。之后也会在掘金发表(慢慢抛弃简书了应该,掘金的技术环境确实比简书好些)。html
上篇简单讲到了一些关于Event/Rx bus的优缺点。而且提到了如何“正确”使用RxJava,而不是使用RxBus来本身从新发明轮子。java
其中也讲到了一个简单使用 create() 方法来进行封装Observable。但也留下了许多坑,好比内存泄漏,不能Multicast(多个Subscriber订阅同一个Observable) 等问题。因此这篇,咱们接着经过这个例子,来具体了解下,如何封装Observable。编程
首先咱们来简单看一下Observable的静态方法,just/from/create都怎么为你提供Observable。
咱们先看just:安全
public static <T> Observable<T> just(T item) { ObjectHelper.requireNonNull(item, "The item is null"); return RxJavaPlugins.onAssembly(new ObservableJust<T>(item)); }复制代码
咱们暂时不须要纠结 RxJavaPlugins.onAssembly() 这个方法。比较重要的是 just(T item) 方法会为你提供一个 ObservableJust(item) 的实例,而这个 ObservableJust 类,就是一个RxJava内部的实现类。
在 RxJava 2.x 中 Observable 是一个抽象类,只有一个抽象方法,subscribeActual(Observer observer);(可是Observable的源码足足有13518行!!!)markdown
public abstract class Observable<T> implements ObservableSource<T>{ //implemented methods protected abstract void subscribeActual(Observer<? super T> observer); //other implements/operators }复制代码
那么ObservableJust这个类究竟什么样呢?app
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> { private final T value; public ObservableJust(final T value) { this.value = value; } @Override protected void subscribeActual(Observer<? super T> s) { ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value); s.onSubscribe(sd); sd.run(); } @Override public T call() { return value; } }复制代码
咱们首先看到构造方法里,直接把value赋给了ObservableJust的成员。这也就是为何Observable.just()里的代码会直接运行,而不是像create()方法,有Subscriber时候才能运行(Observable.create
的初始化方法在subscribeAcutal
里执行)。
再来看看两个item的just(T item1,T item2):ide
public static <T> Observable<T> just(T item1, T item2) { ObjectHelper.requireNonNull(item1, "The first item is null"); ObjectHelper.requireNonNull(item2, "The second item is null"); return fromArray(item1, item2); }复制代码
诶?怎么画风突变?不是ObservableJust了?其实除了只有一个item的just,其余的just方法也都是调用了这个fromArray。那咱们来看看这个fromArray:oop
public static <T> Observable<T> fromArray(T... items) { //NullCheck return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items)); }复制代码
前面一些check咱们忽略,这里咱们发现一些熟悉的身影了ObservableFromArray(items)。又一个Observable的实现类。源码分析
public final class ObservableFromArray<T> extends Observable<T> { final T[] array; public ObservableFromArray(T[] array) { this.array = array; } @Override public void subscribeActual(Observer<? super T> s) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array); s.onSubscribe(d); d.run(); } static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> { //implements } }复制代码
是否是更熟悉?其实Observable几乎全部的静态方法和操做符都是这样,甚至包括一些著名的RxJava库好比RxBinding,也都是使用这种封装方法。内部实现Observable的subscribeActual()方法。对外只提供静态方法来为你生成Observable。为何这么作,咱们来了解一下subscribeActual()方法。
subscribeActual()其实就是Observable和Observer沟通的桥梁。这个Observer(Subscriber)就是你在Observable.subscribe()方法里写的那个类,或者是Consumer(只处理onNext方法)。
public final void subscribe(Observer<? super T> observer) { //NullCheck&Apply plugin subscribeActual(observer); }复制代码
咱们看到其实这个方法除了Check和Apply就只有这一行subscribeActual(observer),链接了Observable和Observer。因此咱们知道了,subscribeActual()方法里的代码,只有在subscribe()调用后,才回调用。
那么他们是如何连接的呢?其实很简单,根据你的逻辑一句一句的调用observer.onXX()方法就能够了。好比刚才咱们看到的ObservableJust:
@Override public void run() { if (get() == START && compareAndSet(START, ON_NEXT)) { observer.onNext(value); if (get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); } } }复制代码
再好比咱们的ObservableFromArray:
void run() { T[] a = array; int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null) { actual.onError(new NullPointerException("The " + i + "th element is null")); return; } actual.onNext(value); } if (!isDisposed()) { actual.onComplete(); } }复制代码
复杂点的例子,好比如何封装button的OnClick事件:
@Override protected void subscribeActual(Observer<? super Object> observer) { if (!checkMainThread(observer)) { return; } Listener listener = new Listener(view, observer); observer.onSubscribe(listener); view.setOnClickListener(listener); } static final class Listener extends MainThreadDisposable implements OnClickListener { private final View view; private final Observer<? super Object> observer; Listener(View view, Observer<? super Object> observer) { this.view = view; this.observer = observer; } @Override public void onClick(View v) { if (!isDisposed()) { observer.onNext(Notification.INSTANCE); } } @Override protected void onDispose() { view.setOnClickListener(null); } } }复制代码
可是细心的同窗应该看到了,每一个subscribeActual()方法里,都会有 observer.onSubscribe(disposable)
这句。那么这句又是作什么的呢?根据Observable Contract,onSubscribe
是告知已经准备好接收item。并且经过这个方法将Disposable传回给Subscriber。
Disposable其实就是控制你取消订阅的。他只有两个方法 dispose() 取消订阅,和 isDisposed() 来通知是否已经取消了订阅。
取消订阅时,要根据需求释放资源。
在subscribeActual()里逻辑要严谨,好比onComplete()以后不要有onNext()。须要注意的点不少,因此可能这也就是为何RxJava推荐用户使用静态方法生成Observable吧。若是有兴趣,能够直接阅读
create()
方法是一个历史遗留问题了。因为这个命名,不少人都以为Observable.create()
不就应该是生成Obseravble最早想到的方法吗? 在 RxJava 1.x 这是错误的,Observable.create()
在 1.x 版本几乎饱受诟病。不是他很差,而是他太难操控。 RxJava必定要遵循Observable Contract才会按照预期执行,而使用create()
你能够彻底无视这个规则。你能够在onComplete以后继续发送onNext事件,下游仍会收到事件。若是在1.x想正确的使用Observable.create()
你必须首先了解几乎全部的规则。因此一直以来 RxJava 1.x 版本使用Observable.create
是不推荐的。(在新版的RxJava 1.3中,create()方法已经标记@deprecated
)
在经历了1.x的失败后,RxJava 2.x 提供了安全的create()
方法。他经过ObservableEmitter做为中间人,代替处理。使得即使你在Emitter中没有参照ObservableContract,下游仍会按照预期的进行。
咱们上文说到的just
,from
,create
等等是生成Observable的操做符,那么如map
,filter
等等的操做符会有什么区别吗?
咱们来看下源码:
map:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }复制代码
filter:
public final Observable<T> filter(Predicate<? super T> predicate) { ObjectHelper.requireNonNull(predicate, "predicate is null"); return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate)); }复制代码
咱们看到,这个区别就是在生成新Observable的时候,会须要两个参数,一个是这个Observable自己,也就是代码中的this,另外一个就是须要进行操做的接口实现(固然也有更多参数的好比Schduler等等,大同小异,再也不赘述)。而这个Observable自己,也就是咱们口中常说的上游。上游下游是根据操做符的来讲,对于一个操做符,在这个操做符以前的就是上游,而这个操做符以后的就是下游。
好比咱们的map:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } }复制代码
source就是咱们的上游。而这个MapObserver就是咱们的中间人(其实也算是操做符自己),将数据根据需求,处理后发给下游。
操做符原理很是复杂,map能够说是最简单的了。若是有兴趣我推荐能够看一下publish(selector)
等等复杂的操做符。更深刻理解操做符。固然,有毅力的同窗也能够关注RxJava 主要负责人的系列博客(纯英文,并且很难懂,不是英语难懂,是原理很难懂)。
读过扔物线大神文章入门的同窗应该对lift
有一个了解。RxJava 1.x 几乎全部操做符都是基于lift
完成的。可是RxJava 2.x 能够说几乎看不到lift
。 目前lift
仅仅做为提供自定义操做符的一个接口(虽然更推荐使用简单好用的compose
,由于lift
须要复写七个抽象方法。)。
最后再说一下几点:
其实 Reactive Programming在Java上的实现不止 RxJava 一个。比较出名的还有Project Reactor和 google 的 agera 等等。 可是综合考虑,不管是性能,扩展性上RxJava在Android平台上是最优秀的。 因为都在JVM上,你们都决定统一接口因此推出 Reactive Streams定义了这一套的几个基本接口:
包括了 :
//对应RxJava中的Flowable public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } //RxJava并无直接对应,而是各类形式的实现类。 public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } //同上,RxJava在flowable中直接使用Subscription public interface Subscription { public void request(long n); public void cancel(); } //Flowable版本的Subject public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }复制代码
正由于这四个接口的命名关系。本在RxJava 1.x 的Observable更名为Flowable。而RxJava 2.x的 Observable是彻底没有backpressure支持。由于起名冲突的缘由,将原本的Subscription改成Disposable,Subscriber改成Observer。