就在前不久作了一个关于RXJava的相关教学视频,事后整理了关于RxJava的预习资料和相关内容以及图文和相关源码,须要借鉴的能够和我联系~java
(顺手留下GitHub连接,须要获取相关面试等内容的能够本身去找)
https://github.com/xiangjiana/Android-MS
(VX:mm14525201314)react
观察者模式的定义:
在对象之间定义了一对多的依赖,这样一来,当一个对象改变状态,依赖它的对象会收到通知并自动更新。git
大白话:
其实就是发布订阅模式,发布者发布信息,订阅者获取信息,订阅了就能收到信息,没订阅就收不到信息。github
2丶这个模式的结构图:面试
有一个微信公众号服务,不定时发布一些消息,关注公众号就能够收到推送消息,取消关注就收不到推送消息。编程
ReactiveX是Reactive Extensions的缩写,通常简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年愈来愈流行了,如今已经支持几乎所有的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。设计模式
微软给的定义是,Rx是一个函数库,让开发者能够利用可观察序列和LINQ风格查询操做符来编写异步和基于事件的程序,使用Rx,开发者能够用Observables表示异步数据流,用LINQ操做符查询异步数据流, 用Schedulers参数化异步数据流的并发处理安全
Rx能够这样定义: Rx=Observables + LINQ + Schedulers。ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。微信
不少公司都在使用ReactiveX
,例如Microsoft、Netflix、Github、Trello、SoundCloud。网络
ReactiveX
不只仅是一个编程接口,它是一种编程思想的突破,它影响了许多其它的程序库和框架以及编程语言
Observables
和Schedulers
让开发者能够摆脱底层的线程同步和各类并发问题Rx扩展了观察者模式用于支持数据和事件序列,添加了一些操做符,它让你能够声明式的组合这些序列,而无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO。
Observable经过使用最佳的方式访问异步数据序列填补了这个间隙
Rx的Observable模型让你能够像使用集合数据同样操做异步事件流,对异步事件流使用各类简单、可组合的操做。
对于单层的异步操做来讲,Java中Future对象的处理方式是很是简单有效的,可是一旦涉及到嵌套,它们就开始变得异常繁琐和复杂。使用Future很难很好的组合带条件的异步执行流程(考虑到运行时各类潜在的问题,甚至能够说是不可能的),固然,要想实现仍是能够作到的,可是很是困难,或许你能够用 Future.get() ,但这样作,异步执行的优点就彻底没有了。从另外一方面说,Rx的Observable一开始就是为组合异步数据流准备的。
Rx的Observable不只支持处理单独的标量值(就像Future能够作的),也支持数据序列,甚至是无穷的数据流。Observable 是一个抽象概念,适用于任何场景。Observable拥有它的近亲Iterable的所有优雅与灵活。Observable是异步的双向push,Iterable是同步的单向pull,对比:
Rx对于对于并发性或异步性没有任何特殊的偏好,Observable能够用任何方式实现,线程池、事件循环、非阻塞IO、Actor模式,任何知足你的需求的,你擅长或偏好的方式均可以。不管你选择怎样实现它,不管底层实现是阻塞的仍是非阻塞的,客户端代码将全部与Observable的交互都当作是异步的。
public observable<data>getData();
从Observer的视角看,这些都无所谓,重要的是:使用Rx,你能够改变你的观念,你能够在彻底不影响Observable程序库使用者的状况下,完全的改变Observable的底层实现。
回调在不阻塞任何事情的状况下,解决了 Future.get()
过早阻塞的问题。因为响应结果一旦就绪Callback就会被调用,它们天生就是高效率的。不过,就像使用Future同样,对于单层的异步执行来讲,回调很容易使用,对于嵌套的异步组合,它们显得很是笨拙。
Rx在大量的编程语言中都有实现,并尊重实现语言的风格,并且更多的实现正在飞速增长。
Rx提供了一系列的操做符,你可使用它们来过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable,这些操做符让执行和复合变得很是高效。你能够把Observable
当作Iterable
的推送方式的等价物,使用Iterable,消费者从生产者那拉取数据,线程阻塞直至数据准备好。使用Observable,在数据准备好时,生产者将数据推送给消费者。数据能够同步或异步的到达,这种
方式更灵活。
下面的例子展现了类似的高阶函数在Iterable
和Observable
上的应用
//Iterable getDataFromLoca1Memory() .skip(10) .take(5) .map({ s -> return s + " transformed" }) .forEach({ print1n "netx =>" + it }) // observable getDataFromNetwork() .skip(10) .take(5) .map({ s -> return s + " transformed" }) .subscribe({ print1n "onNetx =>" + it })
Observable类型给GOF的观察者模式添加了两种缺乏的语义,这样就和Iterable类型中可用的操做一致了:
- 生产者能够发信号给消费者,通知它没有更多数据可用了(对于Iterable,一个for循环正常完成表示没有数据了;对于Observable,就是调用观察者的 onCompleted 方法)
- 生产者能够发信号给消费者,通知它遇到了一个错误(对于Iterable,迭代过程当中发生错误会抛出异常;对于Observable,就是调用观察者(Observer)的 onError 方法
有了这两种功能,Rx就能使Observable与Iterable保持一致了,惟一的不一样是数据流的方向。任何对Iterable的操做,你均可以对Observable使用。
在ReactiveX中,一个观察者(Observer)订阅一个可观察对象(Observable)。观察者对Observable发射的数据或数据序列做出响应。这种模式能够极大地简化并发操做,由于它建立了一个处于待命状态的观察者哨兵,在将来某个时刻响应Observable的通知,不须要阻塞等待Observable发射数据。
这篇文章会解释什么是响应式编程模式(reactive pattern),以及什么是可观察对象(Observables)和观察者(observers),其它几篇文章会展现如何用操做符组合和改变Observable的行为。
Single - 一个特殊的Observable,只发射单个数据。
在不少软件编程任务中,或多或少你都会指望你写的代码能按照编写的顺序,一次一个的顺序执行和完成。可是在ReactiveX中,不少指令多是并行执行的,以后他们的执行结果才会被观察者捕获,顺序是不肯定的。为达到这个目的,你定义一种获取和变换数据的机制,而不是调用一个方法。在这种机制下,存在一个可观察对象(Observable),观察者(Observer)订阅(Subscribe)它,当数据就绪时,以前定义的机制就会分发数据给一直处于等待状态的观察者哨兵。
这种方法的优势是,若是你有大量的任务要处理,它们互相之间没有依赖关系。你能够同时开始执行它们,不用等待一个完成再开始下一个(用这种方式,你的整个任务队列能耗费的最长时间,不会超过任务里最耗时的那个)。
有不少术语可用于描述这种异步编程和设计模式,在在本文里咱们使用这些术语:一个观察者订阅一个可观察对象(An observer subscribes to an Observable)。经过调用观察者的方法,Observable发射数据或通知给它的观察者。
在其它的文档和场景里,有时咱们也将Observer叫作Subscriber、Watcher、Reactor。这个模型一般被称做Reactor模式。
本文使用相似于Groovy的伪代码举例,可是ReactiveX有多种语言的实现。普通的方法调用(不是某种异步方法,也不是Rx中的并行调用),流程一般是这样的:
- 调用某一个方法
- 用一个变量保存方法返回的结果
- 使用这个变量和它的新值作些有用的事
用代码描述就是:
// make the call, assign its return value to `returnVal` returnVal = someMethod(itsParameters); // do something useful with returnVal
在异步模型中流程更像这样的:
- 定义一个方法,这个方法拿着某个异步调用的返回值作一些有用的事情。这个方法是观察者的一部分。
- 将这个异步调用自己定义为一个Observable
- 观察者经过订阅(Subscribe)操做关联到那个Observable
- 继续你的业务逻辑,等方法返回时,Observable会发射结果,观察者的方法会开始处理结果或结果集
用代码描述就是:
// defines, but does not invoke, the Subscriber's onNext handler // (in this example, the observer is very simple and has only an onNext handler) def myOnNext = { it -> do something useful with it }; // defines, but does not invoke, the Observable def myObservable = someObservable(itsParameters); // subscribes the Subscriber to the Observable, and invokes the Observable myObservable.subscribe(myOnNext); // go on about my business
Subscribe方法用于将观察者链接到Observable,你的观察者须要实现如下方法的一个子集:
onNext(T item)
onError(Exception ex)
onComplete
根据Observable协议的定义,onNext可能会被调用零次或者不少次,最后会有一次onCompleted或onError调用(不会同时),传递数据给onNext一般被称做发射,onCompleted和onError被称做通知。
下面是一个更完整的例子:
def myOnNext = { item -> /* do something useful with item */ }; def myError = { throwable -> /* react sensibly to a failed call */ }; def myComplete = { /* clean up after the final response */ }; def myObservable = someMethod(itsParameters); myObservable.subscribe(myOnNext, myError, myComplete); // go on about my business
在一些ReactiveX实现中,有一个特殊的观察者接口Subscriber,它有一个unsubscribe方法。调用这个方法表示你不关心当前订阅的Observable了,所以Observable能够选择中止发射新的数据项(若是没有其它观察者订阅)。
取消订阅的结果会传递给这个Observable的操做符链,并且会致使这个链条上的每一个环节都中止发射数据项。这些并不保证会当即发生,然而,对一个Observable来讲,即便没有观察者了,它也能够在一个while循环中继续生成并尝试发射数据项。
ReactiveX的每种特定语言的实现都有本身的命名偏好,虽然不一样的实现之间有不少共同点,但并不存在一个统一的命名标准。
并且,在某些场景中,一些名字有不一样的隐含意义,或者在某些语言看来比较怪异。
例如,有一个onEvent
命名模式(onNext, onCompleted, onError
),在一些场景中,这些名字可能意味着事件处理器已经注册。然而在ReactiveX
里,他们是事件处理器的名字。
Observable何时开始发射数据序列?这取决于Observable的实现,一个"热"的Observable可能一建立完就开始发射数据,所以全部后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有一些数据错过了)。一个"冷"的Observable会一直等待,直到有观察者订阅它才开始发射数据,所以这个观察者能够确保会收到整个数据序列。
在一些ReactiveX
实现里,还存在一种被称做Connectable的Observable,无论有没有观察者订阅它,这种Observable都不会开始发射数据,除非Connect方法被调用。
对于ReactiveX来讲,Observable和Observer仅仅是个开始,它们自己不过是标准观察者模式的一些轻量级扩展,目的是为了更好的处理事件序列。
ReactiveX真正强大的地方在于它的操做符,操做符让你能够变换、组合、操纵和处理Observable发射的数据。
Rx的操做符让你能够用声明式的风格组合异步操做序列,它拥有回调的全部效率优点,同时又避免了典型的异步系统中嵌套回调的缺点。
下面是经常使用的操做符列表:
- 建立操做
Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
- 变换操做
Buffer, FlatMap, GroupBy, Map, Scan和Window
- 过滤操做
Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take,TakeLast
- 组合操做
And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
- 错误处理
Catch
和Retry
- 辅助操做
Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn,TimeInterval, Timeout, Timestamp, Using
- 条件和布尔操做
All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil,TakeWhile
- 算术和集合操做
Average, Concat, Count, Max, Min, Reduce, Sum
- 转换操做 To
- 链接操做
Connect, Publish, RefCount, Replay
- 反压操做,用于增长特殊的流程控制策略的操做符
这些操做符并不全都是ReactiveX
的核心组成部分,有一些是语言特定的实现或可选的模块。
(顺手留下GitHub连接,须要获取相关面试等内容的能够本身去找)
https://github.com/xiangjiana/Android-MS
(VX:mm14525201314)