The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators.app
Reactive Extensions represents all these data sequences as observable sequences. An application can subscribe to these observable sequences to receive asynchronous notifications as new data arrive.async
IObservable<T>
/IObserver<T>
IObservable<T>
就是observable sequences 的抽象,就像Pull-based中的IEnumerable<T>
相似,用来承载数据,表明了一个能够被观察的数据源,他能够在将sequences中的数据,推送给任何一个有兴趣的listener,咱们用IObserver<T>
来表明这种对IObservable<T>
感兴趣的listener。spa
一个IObservable<T>
的实现能够被视为一个T类型数据的集合, 例如IObservable<int>
能够被视为一个int类型数据的集合,他们会被推送给IObserver<T>
这样的订阅者。
咱们看一下接口的定义:code
public interface IObservable<out T>{ IDisposable Subscribe(IObserver<T> observer);}public interface IObserver<in T>{ void OnCompleted(); // Notifies the observer that the source has finished sending messages. void OnError(Exception error); // Notifies the observer about any exception or error. void OnNext(T value); // Pushes the next data value from the source to the observer.}
Rx还经过扩展的方式提供了一些订阅更方便的方式,不须要实现本身的IObserver<T>
,只须要实现对应的订阅事件(OnNext, OnError, OnComplete)对应的delegate方法便可,以下面的例子:server
IObservable<int> source = Observable.Range(1, 5); //creates an observable sequence of 5 integers, starting from 1IDisposable subscription = source.Subscribe( x => Console.WriteLine("OnNext: {0}", x), //prints out the value being pushed ex => Console.WriteLine("OnError: {0}", ex.Message), () => Console.WriteLine("OnCompleted"));
上面这只是两个例子,在实际的开发应用中,我没几乎没有必要本身去实现这两个接口,Rx经过Observable
和Observer
为咱们提供了丰富的足够的实现,后面将继续说明。接口