RxJava 1.x使用与理解——2018.5.22html
前一段时间,项目引入RxJava,用起来很简单,可是对原理不甚理解,因而参考各类资料,对照源码,进行了深刻学习,写在这里,但愿对看到的小伙伴有所帮助react
RxJava源码理解并不简单,感谢各位前辈们的无私分析分享,才让我能更高效地学习进步,再次感谢!android
学习过程记录,加深理解,提高文字组合表达能力。也但愿能给学习RxJava的同窗一些灵感
大部份内容整理于[此文]数据库
tips:无相关基础的话,看了下面的相关定义,是不太可能直接理解的,因此,不建议一开始纠结于此段内容,能够先看后面内容,回头就会豁然开朗了。编程
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM. ————官网
翻译:RxJava – Reactive Extensions for the JVM – 一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库网络
这是须要强调的是**Reactive Extensions**,响应式编程扩展,RxJava实际是实现了Java中的响应式编程风格,RxJava是ReactiveX((有多语言版本)的Java实现数据结构
ReactiveX:An API for asynchronous programming with observable streams异步
在Java平台实现了响应式编程风格async
分步操做,每一步阅读起来清晰:
复杂操做分步进行,使代码在复杂逻辑下看下来依然逻辑清晰。经过链式调用,将每一步功能组合起来实现复杂功能。ide
异步操做很关键的一点是程序的简洁性,由于在调度过程比较复杂的状况下,异步代码常常会既难写也难被读懂。 Android 创造的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava 的优点也是简洁,但它的简洁的不同凡响之处在于,随着程序逻辑变得愈来愈复杂,它依然可以保持简洁。
数据流:
Event buses 或者 Click events 本质上就是异步事件流,你能够监听并处理这些事件。响应式编程的思路大概以下:你能够用包括 Click 和 Hover 事件在内的任何东西建立 Data stream。Stream 廉价且常见,任何东西均可以是一个 Stream:变量、用户输入、属性、Cache、数据结构等等。举个例子,想像一下你的 Twitter feed 就像是 Click events 那样的 Data stream,你能够监听它并相应的做出响应。——jikexueyuan
数据流操做:
在这个基础上,你还有使人惊艳的函数去组合、建立、过滤这些 Streams。这就是函数式魔法的用武之地。Stream 能接受一个,甚至多个 Stream 为输入。你能够融合两个 Stream,也能够从一个 Stream 中过滤出你感兴趣的 Events 以生成一个新的 Stream,还能够把一个 Stream 中的数据值 映射到一个新的 Stream 中。
观察者模式面向的需求是:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,须要在 B 变化的一瞬间作出反应。
Android 开发中一个比较典型的例子是点击监听器 OnClickListener 。对设置 OnClickListener 来讲, View 是被观察者, OnClickListener 是观察者,两者经过 setOnClickListener() 方法达成订阅关系。订阅以后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener 。
采起这样被动的观察方式,既省去了反复检索状态的资源消耗,也可以获得最高的反馈速度。
RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 经过 subscribe() 方法实现订阅关系,从而 Observable 能够在须要的时候发出事件来通知 Observer。
与传统观察者模式不一样, RxJava 的事件回调方法除了普通事件 onNext() (至关于 onClick() / onEvent())以外,还定义了两个特殊的事件:onCompleted() 和 onError()。
所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不一样的事件或事件序列。
RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的最大缘由。
RxJava的API,主要都是基于变换实现的。
这些变换虽然功能各有不一样,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):
// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。 // 若是须要看源码,能够去 RxJava 的 GitHub 仓库下载。 public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) { return Observable.create(new OnSubscribe<R>() { @Override public void call(Subscriber subscriber) { Subscriber newSubscriber = operator.call(subscriber); newSubscriber.onStart(); onSubscribe.call(newSubscriber); } }); }
这段代码颇有意思:它生成了一个新的 Observable 并返回,并且建立新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现居然看起来和前面讲过的 Observable.subscribe() 同样!然而它们并不同哟~不同的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不一样
subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题,可是 lift() 以后的状况就复杂了点。
当含有 lift() 时:
这样就实现了 lift() 过程,有点像一种代理机制,经过事件拦截和处理实现事件序列的变换。
精简掉细节的话,也能够这么说:在 Observable 执行了 lift(Operator) 方法以后,会返回一个新的 Observable,这个新的 Observable 会像一个代理同样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。
两次和屡次的 lift() 图示:
在不指定线程的状况下, RxJava 遵循的是线程不变的原则,即:在哪一个线程调用 subscribe(),就在哪一个线程生产事件;在哪一个线程生产事件,就在哪一个线程消费事件。若是须要切换线程,就须要用到 Scheduler (调度器)。
在RxJava 中,Scheduler ——调度器,至关于线程控制器,RxJava 经过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:
有了这几个 Scheduler ,就可使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 * subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫作事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫作事件消费的线程。
Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { Log.d(tag, "number:" + number); } });
前面讲到了,能够利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不一样的线程。但是在了解了 map() flatMap() 等变换方法后,有些好事的(其实就是当初刚接触 RxJava 时的我)就问了:能不能多切换几回线程?
答案是:能。由于 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并非(严格说应该为『不必定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它以后的操做所在的线程。所以若是有屡次切换线程的需求,只要在每一个想要切换线程的位置调用一次 observeOn() 便可。
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定 .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .map(mapOperator) // 新线程,由 observeOn() 指定 .observeOn(Schedulers.io()) .map(mapOperator2) // IO 线程,由 observeOn() 指定 .observeOn(AndroidSchedulers.mainThread) .subscribe(subscriber); // Android 主线程,由 observeOn() 指定
不一样于 observeOn() , subscribeOn() 的位置放在哪里均可以,但它是只能调用一次的。
subscribeOn() 和 observeOn() 的内部实现,也是用的 lift()
subscribeOn()在新返回的Observable中,线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件尚未开始发送,所以 subscribeOn() 的线程控制能够从事件发出的开端就形成影响
而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,所以 observeOn() 控制的是它后面的线程
多个 subscribeOn() 和 observeOn() 混合使用时,线程调度图解
图中共有 5 处含有对事件的操做。由图中能够看出,①和②两处受第一个 subscribeOn() 影响,运行在红色线程;③和④处受第一个 observeOn() 的影响,运行在绿色线程;⑤处受第二个 onserveOn() 影响,运行在紫色线程;而第二个 subscribeOn() ,因为在通知过程当中线程就被第一个 subscribeOn() 截断,所以对整个流程并无任何影响。这里也就回答了前面的问题:当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起做用。
综合来说:对于能够分步进行的复杂的数据变换很是友好
举例: