既然你已经知道了什么是 Rx,如今是开始建立和操做 sequences 的时候了。操做 sequences 的最初实现是基于c#linq,然后者则是从函数式编程中获得启发的。对于这些知识并非必要的,可是它会使读者更容易学习。根据原始的www.introtorx.com,咱们也将操做划分为一般从简单到更高级的主题。大多数 Rx 运算符操做现有序列。可是首先,咱们将看到如何建立一个 Observable 开始。java
在前面的示例中,咱们使用了 Subject 并手动将值推入其中以建立序列。咱们使用该序列演示了一些关键概念和第一个也是最重要的 Rx 方法--Subject。在大多数状况下,Subject 并非创造一个新的 Observable 最佳方式。咱们如今将看到更整洁的方法来创造可观察的序列。ios
Just方法建立一个 Observable,它将发出预先定义的值序列,在建立时提供,而后终止。git
输出:程序员
observable 会调用单一的 onCompleted 方法,没有其余的。github
输出:编程
这个 observable 不会输出任何值c#
输出:缓存
该 observable 会发出 error 事件和终止服务器
输出:并发
defer 并不定义一种新的 observable ,而是容许您在每次 subscriber 到达时都声明如何建立 observable。考虑如何建立一个 observable ,返回当前时间并终止。您正在发出一个值,所以它听起来就像一个 just。
输出:
注意 subscribers 分开1秒执行,可是看看同一时间。这是由于时间的值只请求了一次:当执行到达 just时。您想要的是当订阅者经过 subscribing 请求时要从新定义的时间。延迟将执行一个函数来建立和返回 Observable 。函数返回 Observable 延迟的,这里最重要的一点是,这个函数将再次执行每个新 subscription 。
creat 是一个很是强大的功能来建立 observables,让咱们看看签名。
static <T> Observable<T> create(Observable.OnSubscribe<T> f)
O使用nSubscribe<T>比它看起来简单。它基本上是一个函数,对于T类型,它接接受 Subscriber<T> ,咱们能够手动肯定推送事件到 subscriber 。
输出:
当有人订阅 observable (value)时,相应的 Subscriber 实例传递给您的函数。当代码执行时,值被推送到订 Subscriber。注意,若是您想要 sequence 来指示其完成,您要本身调用 onCompleted。
当现有的快速方法都不符合您的目的时,这个方法应该是您首选的建立自定义的 observable 。代码相似于咱们如何建立一个 subject 并将 value 推送给它,可是有几个重要的区别。首先,事件的源被整齐地封装,并与无关的代码分离。第二,subject 所包含的危险并不明显:对于管理状态的 subject ,任何访问实例的人均可以将值推入其中并更改顺序,稍后咱们会看到更多关于这个问题的内容。
使用 subject 的另外一个关键区别在于代码是惰性地执行的,当 observer 订阅时,代码执行。在上面的示例中,当建立 observable 时,代码并没运行(由于还没有订阅服务器),可是每次调用订阅都是这样。这意味着每一个订阅服务器都会再次生成每一个值,相似于ReplaySubject。最终结果相似于一个ReplaySubject,但没有发生缓存。可是,若是咱们习惯使用使用 ReplaySubject,建立方法比较耗时,那么就会阻塞执行建立的线程。您必须手动建立一个新线程来将值推送到 subject中。咱们尚未提出并发性的方法,可是也有一些方便的方法来并行执行中的函数。
您可能已经注意到,您可使用 Observable.create 来实现之前的任何一个 observables。实际上,咱们的建立示例至关于 Observable.just("hello")。
在函数式编程中,一般会建立长度不受限制或无限长的序列。RxJava有工厂方法来建立这样的序列。
对程序员来讲都是直接的和熟悉的方法,它发出指定的整数范围。
Observable<Integer> values = Observable.range(10, 15);
这个例子的发出值的整数范围 10 到24 之间。
此函数将建立一个无限的序列,以指定的持续时间分隔。
输出:
直到咱们再也不 unsubscribe 啦,这个 sequence 才会终止。
咱们应该注意为何在结尾的 read()方法是必要的。没有它,程序就会终止,而不会打印内容。这是由于咱们的操做是非阻塞的:咱们建立了一个 observable ,随着时间的推移,咱们会发出值,而后,若是值到达的话,咱们会注册执行这些操做。全部这些都不是阻塞的,主线程终止,计时器在本身的线程上运行,这并不阻止JVM终止。
对于Observable.timer有两个重载。第一个示例建立一个observable 等待给定时间,而后发出 0L 并终止。
输出:
第二个会等待必定的时间,而后以给定的频率开始像间隔同样发出。
输出:
上面的例子等待两秒后,每秒开始计数。
与咱们迄今所看到的大部分功能同样,您能够将任何类型的输入使用 create 转换为Rx observable。
Future 是 java 框架的功能,当你使用并发性的框架的时候可能使用他们。它们是一个和 RX 差很少强大的并发概念,由于它们只返回一个值。天然地,您可能但愿将它们变成 observables 。
输出:
observable 在可用时发出futureTask 的结果,而后终止。若是任务被取消,可观察到的将发出一个java.util.concurrent.CancerationException error。
若是您对 Future 的结果返回时间限制感兴趣,您能够提供这样的超时机制。
Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS);
若是 Future 没有在指定的时间内完成,observable 将忽略它,以 TimeoutException 失败。
您还可使用 Observable.from 将任何集合转换成 observable,这样每一个值都会发出,执行最后的 onCompleted 事件。
输出:
Observable 不是能够用Iterable或stream 替换的。Observables 是基于 push-based, 也就是说,调用onNext 会致使处理程序堆栈所有执行到最终订阅服务器方法(除非另有指定)。其余模型是基于 pull-based,这意味着在返回结果以前,尽量快地请求值和执行块。
参考:
https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%202%20-%20Sequence%20Basics/1.%20Creating%20a%20sequence.md