我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?(一)

就在前不久作了一个关于RXJava的相关教学视频,事后整理了关于RxJava的预习资料和相关内容以及图文和相关源码,须要借鉴的能够和我联系~java

一丶 面试辅助路线(所有内容在完整的PDF里都有讲解)

我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?(一)
顺手留下GitHub连接,须要获取相关面试等内容的能够本身去找
https://github.com/xiangjiana/Android-MS
(VX:mm14525201314)
我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?(一)react

二丶 RXJava预习:

JAVA设计模式之观察者模式
一、初步认识

观察者模式的定义:
在对象之间定义了一对多的依赖,这样一来,当一个对象改变状态,依赖它的对象会收到通知并自动更新。git

大白话:
其实就是发布订阅模式,发布者发布信息,订阅者获取信息,订阅了就能收到信息,没订阅就收不到信息。github

2丶这个模式的结构图
我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?(一)面试

三、能够看到,该模式包含四个角色

  • 抽象被观察者角色: 也就是一个抽象主题,它把全部对观察者对象的引用保存在一个集合中,每一个主题均可以有任意数量的观察者。抽象主题提供一个接口,能够增长和删除观察者角色。通常用一个抽象类和接口来实现。
  • 抽象观察者角色: 为全部的具体观察者定义一个接口,在获得主题通知时更新本身。
  • 具体被观察者角色: 也就是一个具体的主题,在集体主题的内部状态改变时,全部登记过的观察者发出通知。
  • 具体观察者角色: 实现抽象观察者角色所须要的更新接口,一边使自己的状态与制图的状态相协调。

四、使用场景例子

有一个微信公众号服务,不定时发布一些消息,关注公众号就能够收到推送消息,取消关注就收不到推送消息。编程

三丶Rxjava介绍

ReactiveX的历史

ReactiveX是Reactive Extensions的缩写,通常简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年愈来愈流行了,如今已经支持几乎所有的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。设计模式

什么是ReactiveX

微软给的定义是,Rx是一个函数库,让开发者能够利用可观察序列和LINQ风格查询操做符来编写异步和基于事件的程序,使用Rx,开发者能够用Observables表示异步数据流,用LINQ操做符查询异步数据流, 用Schedulers参数化异步数据流的并发处理安全

Rx能够这样定义: Rx=Observables + LINQ + Schedulers。ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。微信

ReactiveX的应用

不少公司都在使用ReactiveX,例如Microsoft、Netflix、Github、Trello、SoundCloud。网络

ReactiveX宣言

ReactiveX不只仅是一个编程接口,它是一种编程思想的突破,它影响了许多其它的程序库和框架以及编程语言

Rx模式

使用观察者模式
  • 建立: Rx能够方便的建立事件流和数据流
  • 组合: Rx使用查询式的操做符组合和变换数据流
  • 监听: Rx能够订阅任何可观察的数据流并执行操做
简化代码
  • 函数式风格: 对可观察数据流使用无反作用的输入输出函数,避免了程序里错综复杂的状态
  • 简化代码: Rx的操做符统统常能够将复杂的难题简化为不多的几行代码
  • 异步错误处理: 传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
  • 轻松使用并发: Rx的ObservablesSchedulers让开发者能够摆脱底层的线程同步和各类并发问题
使用Observable的优点

Rx扩展了观察者模式用于支持数据和事件序列,添加了一些操做符,它让你能够声明式的组合这些序列,而无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO。

Observable经过使用最佳的方式访问异步数据序列填补了这个间隙
我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?(一)
Rx的Observable模型让你能够像使用集合数据同样操做异步事件流,对异步事件流使用各类简单、可组合的操做。

Observable可组合

对于单层的异步操做来讲,Java中Future对象的处理方式是很是简单有效的,可是一旦涉及到嵌套,它们就开始变得异常繁琐和复杂。使用Future很难很好的组合带条件的异步执行流程(考虑到运行时各类潜在的问题,甚至能够说是不可能的),固然,要想实现仍是能够作到的,可是很是困难,或许你能够用 Future.get() ,但这样作,异步执行的优点就彻底没有了。从另外一方面说,Rx的Observable一开始就是为组合异步数据流准备的。

Observable更灵活

Rx的Observable不只支持处理单独的标量值(就像Future能够作的),也支持数据序列,甚至是无穷的数据流。Observable 是一个抽象概念,适用于任何场景。Observable拥有它的近亲Iterable的所有优雅与灵活。Observable是异步的双向push,Iterable是同步的单向pull,对比:
我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?(一)

Observable无偏见

Rx对于对于并发性或异步性没有任何特殊的偏好,Observable能够用任何方式实现,线程池、事件循环、非阻塞IO、Actor模式,任何知足你的需求的,你擅长或偏好的方式均可以。不管你选择怎样实现它,不管底层实现是阻塞的仍是非阻塞的,客户端代码将全部与Observable的交互都当作是异步的。

Observable是如何实现的?
public observable<data>getData();
  • 它能与调用者在同一线程同步执行吗?
  • 它能异步地在单独的线程执行吗?
  • 它会将工做分发到多个线程,返回数据的顺序是任意的吗?
  • 它使用Actor模式而不是线程池吗?
  • 它使用NIO和事件循环执行异步网络访问吗?
  • 它使用事件循环将工做线程从回调线程分离出来吗?

从Observer的视角看,这些都无所谓,重要的是:使用Rx,你能够改变你的观念,你能够在彻底不影响Observable程序库使用者的状况下,完全的改变Observable的底层实现。

使用回调存在不少问题

回调在不阻塞任何事情的状况下,解决了 Future.get() 过早阻塞的问题。因为响应结果一旦就绪Callback就会被调用,它们天生就是高效率的。不过,就像使用Future同样,对于单层的异步执行来讲,回调很容易使用,对于嵌套的异步组合,它们显得很是笨拙。

Rx是一个多语言的实现

Rx在大量的编程语言中都有实现,并尊重实现语言的风格,并且更多的实现正在飞速增长。

响应式编程

Rx提供了一系列的操做符,你可使用它们来过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable,这些操做符让执行和复合变得很是高效。你能够把Observable当作Iterable的推送方式的等价物,使用Iterable,消费者从生产者那拉取数据,线程阻塞直至数据准备好。使用Observable,在数据准备好时,生产者将数据推送给消费者。数据能够同步或异步的到达,这种
方式更灵活。

下面的例子展现了类似的高阶函数在IterableObservable上的应用

//Iterable
  getDataFromLoca1Memory()
     .skip(10)
     .take(5)
     .map({ s -> return s + " transformed" })
     .forEach({ print1n "netx =>" + it })

  // observable
  getDataFromNetwork()
     .skip(10)
     .take(5)
     .map({ s -> return s + " transformed" })
     .subscribe({ print1n "onNetx =>" + it })

Observable类型给GOF的观察者模式添加了两种缺乏的语义,这样就和Iterable类型中可用的操做一致了:

  1. 生产者能够发信号给消费者,通知它没有更多数据可用了(对于Iterable,一个for循环正常完成表示没有数据了;对于Observable,就是调用观察者的 onCompleted 方法)
  2. 生产者能够发信号给消费者,通知它遇到了一个错误(对于Iterable,迭代过程当中发生错误会抛出异常;对于Observable,就是调用观察者(Observer)的 onError 方法

有了这两种功能,Rx就能使Observable与Iterable保持一致了,惟一的不一样是数据流的方向。任何对Iterable的操做,你均可以对Observable使用。

四丶名词定义

这里给出一些名词的翻译
  • Reactive 直译为反应性的,有活性的,根据上下文通常翻译为反应式、响应式
  • Iterable 可迭代对象,支持以迭代器的形式遍历,许多语言中都存在这个概念
  • Observable 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会经过某种方式通知观察者或订阅者
  • Observer 观察者对象,监听Observable发射的数据并作出响应,Subscriber是它的一个特殊实现
  • emit 直译为发射,发布,发出,含义是Observable在数据产生或变化时发送通知给Observer,调用Observer对应的方法,文章里一概译为发射
  • items 直译为项目,条目,在Rx里是指Observable发射的数据项,文章里一概译为数据,数据项
Observable
概述

在ReactiveX中,一个观察者(Observer)订阅一个可观察对象(Observable)。观察者对Observable发射的数据或数据序列做出响应。这种模式能够极大地简化并发操做,由于它建立了一个处于待命状态的观察者哨兵,在将来某个时刻响应Observable的通知,不须要阻塞等待Observable发射数据。

这篇文章会解释什么是响应式编程模式(reactive pattern),以及什么是可观察对象(Observables)和观察者(observers),其它几篇文章会展现如何用操做符组合和改变Observable的行为。

相关参考:

Single - 一个特殊的Observable,只发射单个数据。

背景知识

在不少软件编程任务中,或多或少你都会指望你写的代码能按照编写的顺序,一次一个的顺序执行和完成。可是在ReactiveX中,不少指令多是并行执行的,以后他们的执行结果才会被观察者捕获,顺序是不肯定的。为达到这个目的,你定义一种获取和变换数据的机制,而不是调用一个方法。在这种机制下,存在一个可观察对象(Observable),观察者(Observer)订阅(Subscribe)它,当数据就绪时,以前定义的机制就会分发数据给一直处于等待状态的观察者哨兵。

这种方法的优势是,若是你有大量的任务要处理,它们互相之间没有依赖关系。你能够同时开始执行它们,不用等待一个完成再开始下一个(用这种方式,你的整个任务队列能耗费的最长时间,不会超过任务里最耗时的那个)。

有不少术语可用于描述这种异步编程和设计模式,在在本文里咱们使用这些术语:一个观察者订阅一个可观察对象(An observer subscribes to an Observable)。经过调用观察者的方法,Observable发射数据或通知给它的观察者。

在其它的文档和场景里,有时咱们也将Observer叫作Subscriber、Watcher、Reactor。这个模型一般被称做Reactor模式。

建立观察者

本文使用相似于Groovy的伪代码举例,可是ReactiveX有多种语言的实现。普通的方法调用(不是某种异步方法,也不是Rx中的并行调用),流程一般是这样的:

  1. 调用某一个方法
  2. 用一个变量保存方法返回的结果
  3. 使用这个变量和它的新值作些有用的事

用代码描述就是:

// make the call, assign its return value to `returnVal`
  returnVal = someMethod(itsParameters);
  // do something useful with returnVal

在异步模型中流程更像这样的:

  1. 定义一个方法,这个方法拿着某个异步调用的返回值作一些有用的事情。这个方法是观察者的一部分。
  2. 将这个异步调用自己定义为一个Observable
  3. 观察者经过订阅(Subscribe)操做关联到那个Observable
  4. 继续你的业务逻辑,等方法返回时,Observable会发射结果,观察者的方法会开始处理结果或结果集

用代码描述就是:

// defines, but does not invoke, the Subscriber's onNext handler
  // (in this example, the observer is very simple and has only an onNext handler)
  def myOnNext = { it -> do something useful with it };
  // defines, but does not invoke, the Observable
  def myObservable = someObservable(itsParameters);
  // subscribes the Subscriber to the Observable, and invokes the Observable
  myObservable.subscribe(myOnNext);
  // go on about my business
回调方法 (onNext, onCompleted, onError)

Subscribe方法用于将观察者链接到Observable,你的观察者须要实现如下方法的一个子集:

  • onNext(T item)
    Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用屡次,取决于你的实现。
  • onError(Exception ex)
    当Observable遇到错误或者没法返回指望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。
  • onComplete
    正常终止,若是没有遇到错误,Observable在最后一次调用onNext以后调用此方法。

根据Observable协议的定义,onNext可能会被调用零次或者不少次,最后会有一次onCompleted或onError调用(不会同时),传递数据给onNext一般被称做发射,onCompleted和onError被称做通知。

下面是一个更完整的例子:

def myOnNext   = { item -> /* do something useful with item */ };
  def myError    = { throwable -> /* react sensibly to a failed call */ };
  def myComplete  = { /* clean up after the final response */ };
  def myObservable = someMethod(itsParameters);
  myObservable.subscribe(myOnNext, myError, myComplete);
  // go on about my business
取消订阅 (Unsubscribing)

在一些ReactiveX实现中,有一个特殊的观察者接口Subscriber,它有一个unsubscribe方法。调用这个方法表示你不关心当前订阅的Observable了,所以Observable能够选择中止发射新的数据项(若是没有其它观察者订阅)。

取消订阅的结果会传递给这个Observable的操做符链,并且会致使这个链条上的每一个环节都中止发射数据项。这些并不保证会当即发生,然而,对一个Observable来讲,即便没有观察者了,它也能够在一个while循环中继续生成并尝试发射数据项。

关于命名约定

ReactiveX的每种特定语言的实现都有本身的命名偏好,虽然不一样的实现之间有不少共同点,但并不存在一个统一的命名标准。

并且,在某些场景中,一些名字有不一样的隐含意义,或者在某些语言看来比较怪异。

例如,有一个onEvent命名模式(onNext, onCompleted, onError),在一些场景中,这些名字可能意味着事件处理器已经注册。然而在ReactiveX里,他们是事件处理器的名字。

Observables的"热"和"冷"

Observable何时开始发射数据序列?这取决于Observable的实现,一个"热"的Observable可能一建立完就开始发射数据,所以全部后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有一些数据错过了)。一个"冷"的Observable会一直等待,直到有观察者订阅它才开始发射数据,所以这个观察者能够确保会收到整个数据序列。

在一些ReactiveX实现里,还存在一种被称做Connectable的Observable,无论有没有观察者订阅它,这种Observable都不会开始发射数据,除非Connect方法被调用。

用操做符组合Observable

对于ReactiveX来讲,Observable和Observer仅仅是个开始,它们自己不过是标准观察者模式的一些轻量级扩展,目的是为了更好的处理事件序列。

ReactiveX真正强大的地方在于它的操做符,操做符让你能够变换、组合、操纵和处理Observable发射的数据。

Rx的操做符让你能够用声明式的风格组合异步操做序列,它拥有回调的全部效率优点,同时又避免了典型的异步系统中嵌套回调的缺点。

下面是经常使用的操做符列表:

  1. 建立操做 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
  2. 变换操做 Buffer, FlatMap, GroupBy, Map, Scan和Window
  3. 过滤操做 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take,TakeLast
  4. 组合操做 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
  5. 错误处理 CatchRetry
  6. 辅助操做 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn,TimeInterval, Timeout, Timestamp, Using
  7. 条件和布尔操做All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil,TakeWhile
  8. 算术和集合操做 Average, Concat, Count, Max, Min, Reduce, Sum
  9. 转换操做 To
  10. 链接操做 Connect, Publish, RefCount, Replay
  11. 反压操做,用于增长特殊的流程控制策略的操做符

这些操做符并不全都是ReactiveX的核心组成部分,有一些是语言特定的实现或可选的模块。

顺手留下GitHub连接,须要获取相关面试等内容的能够本身去找
https://github.com/xiangjiana/Android-MS
(VX:mm14525201314)

相关文章
相关标签/搜索