Rx.Observalbe.create()
或者建立操做符,能够 建立(created) Observable流。
Observer则能够 订阅(subscribed) 这个流。
经过 执行(execute) next()
、error()
和complete()
能够向订阅者推送不一样的通知。
以后,执行过程可能被 处理掉(disposed) 。
这四个方面都被集成在Observable
实例当中,可是也有一些方面与其余类型有关,好比Observer
和Subscription
。react
Observable的核心关注点是:编程
建立Observable流安全
订阅Observable流异步
执行Observable流ide
终止Observable流函数
Rx.Observable.create
能够说是Observable构造函数的别名,他能够接受一个参数:subscribe
函数。性能
如下的例子建立了一个Observable流,每秒钟向Observer
发出一个字符串类性值hi
。atom
var observable = Rx.Observable.create(function subscribe(observer) { var id = setInterval(() => { observer.next('hi') }, 1000); });
Observables流可使用
create()
建立,可是一般咱们会使用所谓的建立操做符,像of()
,from()
,interval()
等等。code
在上面的例子中,订阅函数(subscribe function)是描述Observalbe最重要的部分。那么,让我来看看何谓订阅。
在例子中,Observalbe的实例observable
能够被订阅,像这样:
observable.subscribe(x => console.log(x));
也许你会注意到,observable.subscribe()
和subscribe
函数在Rx.Observable.create(function subscribe(observer){...})
中使用了相同的名字,这并非巧合。
在库中,他们是不一样的,但在实际使用中,你能够认为他们在概念上是相等的。
Observable不在多个Observer之间共享subscribe
。当调用observable.subscribe()
并获得观察者时,在Rx.Observable.create(function subscribe(observer){...})
中传入的函数将会被执行。每次执行observable.subscribe()
都会触发一个单独针对当前Observer的运行逻辑。
订阅一个Observable流就像调用一个函数,流中的数据将会被传递给回调函数中。
一个subscribe
函数被调用将会开启一个Observable执行流(Observable execution),向观察者们输出流中的值或者事件。
代码Rx.Observable.create(function subscribe(observer){...})
表明了一个“Observable流”,因为惰性计算,只用当有Observer订阅流时,函数才会被执行。
执行过程当中随着时间线产生多个数据,方式是同步或异步二选一。
有三个类型的值会在执行流中发出:
"Next"
通知:发出一个值,好比数字,字符串,对象等等。
"Error"
通知:发出一个js错误或者异常。
Complete
通知:不发出任何值,表示流的结束。
Next
通知是最重要也是最经常使用的类型:他表明了实际推送给Observer的值。Error
和Complete
通知只会在执行流中发出一次,要么是Error
,要么是Complete
。
用正表达式的规则能够很好的表达这种所谓的Observable语法和约定:
next*(error|complete)?
在一个Observable执行流中,会发出0到无限个
Next
通知。而一旦Error
或者Complete
通知被发出,执行流将不会再推送任何消息。
下面的例子展现了一个推送了3个Next
并Complete
的流:
var observable = Rx.Observable.create(function subscribe(observer) { observer.next(1); observer.next(2); observer.next(3); observer.complete(); });
Observables会严格遵照Observable约定,因此下面的代码将不会推送值4
:
var observable = Rx.Observable.create(function subscribe(observer) { observer.next(1); observer.next(2); observer.next(3); observer.complete(); observer.next(4); // Is not delivered because it would violate the contract });
在订阅函数中使用try/catch
捕获可能抛出的异常,也是一个很不错的作法:
var observable = Rx.Observable.create(function subscribe(observer) { try { observer.next(1); observer.next(2); observer.next(3); observer.complete(); } catch (err) { observer.error(err); // delivers an error if it caught one } });
Observable流的执行时间线多是无限长的,但一般咱们只用到有限的时间段和观察者处理业务,所以,咱们须要一种中断流执行的API。
因为一个执行过程对于每一个Observer是独有的,一旦Observer接收到值,那么也必然须要一种中断执行的方式,从而能够节省计算性能和内存空间。
当observable.subscribe()
被调用,Observer将被附加到新建立的Observable执行过程当中,同时返回了一个对象,Subscription
:
var subscription = observable.subscribe(x => console.log(x));
Subscription
表明了一个持续执行的过程,而且有一套最小化的API容许你中断流的执行过程。能够从这里进一步了解Subscription类型。如下例子展现了使用subscription.unsubscribe()
中断持续执行的过程:
var observable = Rx.Observable.from([10, 20, 30]); var subscription = observable.subscribe(x => console.log(x)); // Later: subscription.unsubscribe();
当你订阅流就能够获取一个Subscription,表明了持续执行的过程。调用
unsubscribe()
就能够中断执行过程。
当咱们使用create()
建立一个Observable流时,每个Observable都必须定义它如何处理获取到的资源的处理方式。你能够经过在subscribe()
函数中返回一个自定义的unsubscribe
函数,达到这个目的。
举个例子,如下展现了如何中断一个使用setInterval()
执行interval
的过程:
var observable = Rx.Observable.create(function subscribe(observer) { // Keep track of the interval resource var intervalID = setInterval(() => { observer.next('hi'); }, 1000); // Provide a way of canceling and disposing the interval resource return function unsubscribe() { clearInterval(intervalID); }; });
就像observable.subscribe()
相似Observable.create(function subscribe(){..})
同样,咱们从subscribe()
返回的unsubscribe()
也概念性的等同于subscription.unsubscribe()
。
事实上,若是咱们移除与响应式编程相关的概念,剩下的就是直白的js代码了:
function subscribe(observer) { var intervalID = setInterval(() => { observer.next('hi'); }, 1000); return function unsubscribe() { clearInterval(intervalID); }; } var unsubscribe = subscribe({next: (x) => console.log(x)}); // Later: unsubscribe(); // dispose the resources
咱们使用Rx,包括Observable
、Observer
和Subscription
,其缘由就是为了使用这些安全(就如Observable约定的)和可组合的操做符。