monad(https://en.wikipedia.org/wiki/Monad_%28functional_programming%29)是一个从函数式编程中比较抽象的一个概念,可能对不少程序员来讲并不熟悉。它超出了本指南的范围,在www.introtorx.com中,咱们能够找到一个简短的定义:html
Monads are a kind of abstract data type constructor that encapsulate program logic instead of data in the domain model.java
Monads 是咱们感兴趣的,由于 observable 是 Monad。RX代码声明须要作什么,但实际处理不是在执行Rx语句时发生的,而是在发出值时发生的。读者可能会发现阅读更多关于 monads 的内容是颇有趣的。对于本指南,当说起Monad时,读者只需考虑 observable。react
想要离开Monad有两个主要缘由,第一个缘由是,一个新的Rx开发人员仍然会更习惯于更传统的范例。在不一样的范例中执行部分计算可能会使某些部分工做,而您仍在研究如何使用Rx进行操做。第二个缘由是,咱们一般与没有考虑到Rx的组件和库进行交互。当将现有代码重构为Rx时,让Rx以阻塞方式运行多是有用的。ios
以分块方式从可观察到的对象中获取数据,第一步是转换为 BlockingObservable.。任何可观察的对象均可以经过如下两种方式转换为 BlockingObservable:您可使用可观察对象的 toBlocking 方法。git
或者BlockingObservable的静态工厂方法程序员
BlockingObservable 没有继承 Observable,也不能与咱们一般的Rx操做符一块儿使用。它有本身的小型函数方法的实现,容许您以阻塞的方式从可观察到的数据中提取数据。这些方法中的许多都是与咱们已经看到的方法相对应的阻塞方法。github
Observable 有一个 forEach 方法,forEach 被定义为 subscribe 的别名,主要区别是它不返回 Subscription。来看一个例子编程
输出:api
这里的代码的行为与 subscribe 相似。首先注册一个观察者(forEach接受观察者没有重载,但语义是相同的)。而后执行继续打印“Subscribed”并退出咱们的代码段。当发出值(第一个延迟100 ms)时,它们被传递给咱们的观察者进行处理。缓存
BlockingObservable 没有 subscribe 方法,可是有 forEach,让咱们用 BLockingObservable 改造一下:
输出:
咱们在这里看到,对每一个对象的调用被阻塞,直到可观察到的任务完成。另外一个区别是没有onError和onCompleted的处理程序。onCompleted是在执行完成时给定的,而异常将被抛入运行时进行捕获:
输出:
BlockingObservable 拥有 first ,last 和 single 方法,也实现了firstOrDefault,lastOrDefault和singleOrDefault。在 Observable 中阅读了它们的 namesakes 以后,您已经知道返回的值是什么。一样,不一样的是方法的阻塞性质。它们不返回可观察到的值,当值可用时,它就会发出该值。相反,它们会阻塞,直到值可用并返回值自己,而不返回周围可见的值。
输出:
正如咱们所看到的,调用首先被阻塞,直到一个值可用,而后才返回一个值。
与forEach同样,异常也会在运行时中抛出以供捕获。
输出:
您能够经过BlockingViewable上的各类方法将您的可观察性转换为可迭代性。Iterable是基于拉的,与rx不一样的是,rx是基于推的。这意味着,当使用者准备使用一个值时,会在可迭代的Iterator上使用Next()请求一个值。对Next()的调用要么当即返回一个值,要么阻塞,直到一个值准备好为止。
有几种方法能够从BlockingViewable<T>到Iterable<T>,而且每种方法都有不一样的行为。
在此实现中,将收集和缓存全部发出的值。因为缓存,不会漏掉任何项目。迭代器将尽快获取下一个值,若是下一个值已经发生,则当即获取,或者阻塞,直到下一个值可用为止。
输出:
值得注意的是,在下一个通知可用以前,应该使用hasNext()或Next()块。若是完成,hasNext返回false,next抛出java.util.NoSuchElementException
在这个实现中,根本不缓存值。迭代器将始终等待下一个值并返回该值。
输出:
在本例中,使用者比生产者慢,而且老是错过下一个值。迭代器将得到下一个迭代器。
latest 方法相似于Next,不一样之处在于它将缓存一个值。迭代器只会在自上一个值被使用后可观察到的事件没有发出时阻塞。只要有新的事件发生,迭代器就会当即返回一个值,或者随着迭代的结束而返回。
输出:
使用 latest 迭代器时,若是在发出下一个事件以前没有提取值,则会跳过这些值。若是使用者比生产者快,迭代器将阻塞并等待下一个值。
有趣的是,4从未被消耗掉。那是由于一个onCompleted紧随其后,致使下一个拉看到一个终止的可观察到的。隐式iterator.hasNext()方法报了结止的可观察值,而不检查是否使用了最后一个值。
mostRecent 迭代器从不阻塞。它缓存这个值,所以,若是使用者速度慢,则能够跳过值。与最新版本不一样,最后一个缓存的值老是返回,若是使用者的速度快于生产者,就会致使重复。为了使最近的迭代器彻底无阻塞,须要一个初始值.。若是可观察到的对象还没有发出任何值,则返回该值。
输出:
BlockingObservable<T>能够用 toFuture 方法表示为 Future 。此方法仅建立Future的实例,而不阻止。获取值时根据须要执行块。Future 容许使用者决定如何处理异步操做。Future 还可以报告操做中的错误
输出:
以这种方式建立的Future 但愿observable 发出单个值,就像单个方法所作的那样。若是发出多个项,Future将报告java.lang.IllegalArgumentException。
到目前为止,咱们可以忽略潜在的死锁。Rx的非阻塞性使得建立没必要要的死锁变得更加困难。然而,在本章中,咱们又回到了阻塞方法,从而将死锁再次提到了最前沿。
下面的例子能够做为一个非阻塞案例。可是由于咱们使用了阻塞操做,因此它永远不会解除阻塞。
输出:
forEach仅在序列终止后返回。可是,终止事件要求Each在被推送以前返回。所以,每一个人都将永远不会解除封锁。
访问可观测数据的一些阻塞方法,如 last()
,要求被观测到的对象终止并解除阻塞。其余的,好比first(),要求它至少发出一个事件来解除阻塞。将这些方法用于可观察性并非很大的危险,由于它们只返回一个不可终止的可观察性.。若是使用者没有花时间强制执行一些保证,例如超时,BlockingViewable上的这些相同的方法可能会致使一个永久的块(咱们将看到在[Timeshifter sequences](/Part 3 - Taming the sequence/5. Time-shifted sequences.md)。
原文连接:
https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/2.%20Leaving%20the%20monad.md