由于Rx针对异步系统设计,而且Rx也天然支持多线程,因此新的Rx开发人员有时会假设Rx默认是多线程的。在其余任何事情以前,重要的是澄清Rx默认是单线程的。ios
除非另有说明,不然每次调用onNext / onError / onCompleted都会同步执行整个运算符链,包括最终subscriber的操做。咱们能够看到比如下示例:git
输出:github
咱们在这里看到,咱们从3个不一样的线程中调用了onNext。每次订阅者的操做都在第一个onNext调用来自的同一个线程上执行。不管咱们将多少个连接在一块儿,状况也是如此。除非咱们另有请求,不然该值将同步。多线程
subscribeOn和observeOn容许您控制订阅的调用和通知的接收(什么线程将在您的观察者上调用onNext / onError / onCompleted)。异步
在Rx中,您不直接处理线程。而是将它们包装在名为Scheduler的策略中。咱们稍后会看到更多。函数
使用subscribeOn,您能够决定执行Observable.create的Scheduler。即便你不是本身调用create,也有内在的等价。请考虑如下示例spa
输出:线程
咱们在这里看到,不只一切都在同一个线程上执行,它其实是顺序的:在完成订阅)observable(包括执行create的lambda参数的主体)以前,subscribe不会解除阻塞。在lambda中对onNext的调用会执行整个运算符链,一直到println。实际上,订阅已建立的observable是阻塞的。设计
不管您要求什么,一些可观察者都会建立本身的线程。例如,Observable.interval是异步的。在这种状况下,subscribeOn将指定运行建立资源的函数的线程,这一般是没有用的。它使您没法控制将租用的资源。对象
输出:
observeOn控制管道的另外一侧。值的建立和发送将正常工做,可是您的观察器的操做将在调度程序策略指定的不一样线程上调用。
输出:
与subscribeOn不一样,observeOn的效果不会跳转到管道的开头。它只是改变了它以后的运算符的线程。您能够将其视为拦截事件并更改链的其他部分的线程。这是一个例子:
输出;
咱们能够在这里看到事件从调用onNext的线程开始并保持在该线程上,直到它们遇到observeOn运算符。以后,他们继续新线程。这样,您能够将不一样的线程策略分配给Rx管道的不一样部分。
正如咱们所看到的,一些可观察对象依赖于订阅租用的资源,并在订阅结束时释放。一般,释放资源很便宜。在特殊状况下,您须要取消订阅操做以不阻止或特定地发生在特殊线程上,您能够指定将使用unsubscribeOn执行这些操做的调度程序。
输出:
using方法执行3个函数,一个租用资源,一个使用它,另外一个释放它。使用unsubscribeOn,咱们只影响了释放资源的功能。
下节再续!
原文:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%204%20-%20Concurrency/1.%20Scheduling%20and%20threading.md