你没有必要本身手动实现Iobservable<T>接口来建立观察队列,一样的,你也没有必要实现Iobserver<T>接口来订阅这个队列。经过安装RX库,RX提供请多静态的方法来建立带有一个参数或多个或没有参数的简单队列。你能够很方便的使用这些静态方法。另外,RX还提供了订阅扩展方法来实现多种多样的OnNext,OnError,OnCompleted句柄委托。
建立和订阅简单的观察队列
下面的例子使用观察者类型的范围操做来建立简单的观察集合,经过Observable类订阅方法来订阅这个观察队列集合,而且提供要处理OnNext,OnError,OnCompleted事件的委托Action.
这个范围操做有不少重载的版本,在咱们的例子中,建立一个整数队列从x开始并并生y个数。
一旦开始订阅观察者,值就会被发送给订阅者,那么OnNext的委拖就会被执行。异步
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Reactive; using System.Reactive.Linq; using System.Reactive.Subjects; namespace SimpleSequence { class Program { static void Main(string[] args) { IObservable<int> source = Observable.Range(1, 10); IDisposable subscription = source.Subscribe( x => Console.WriteLine("OnNext: {0}", x), ex => Console.WriteLine("OnError: {0}", ex.Message), () => Console.WriteLine("OnCompleted")); Console.WriteLine("Press ENTER to unsubscribe..."); Console.ReadLine(); subscription.Dispose(); } } }
当一个观察者订阅另外一个观察队列时,线程调用订阅的方法可能来源与不一样的线程,直到这些线程队列执行完成。所以,在观察队列执行完成前订阅调用是异步的不会阻塞其它线程。咱们会在后面讲计划任务里详细讲到。ide
注意订阅的方法返回的是Idisposable,因此你能够解除订阅关系而且释放资源很是容易。当你调用观察队列的Dispose的方法时,这时观察队列就会中止监听数据。通常状况下你除非要提早结束订阅不然你没有必要手动调用Dispose这个方法,当观察源的寿命比观察队列更长时,RX的设计能够去失去相关的关系情节并不用使用终结器。当IDispose被GC回收时,RX不会自动的释放订阅关系,然而,咱们应该注意到当观察者操做释放订阅关系时,订阅关系会被当即释放。如当OnCompeleted或者OnError消息发生时还有多是var x=Observable.Zip(a,b).Subscribe(),x订阅了a,b若是a异常了那么a,b与x的订阅关系都 会当即被释放。this
你能够调整上面的代码经过使用Observable类型的Create方法来返回一个observe,p 在这个方法中定义好OnNext,OnError,OnCompleted的委托,以后你能够经过observer来订阅到观察者类型上代码以下:spa
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Reactive; using System.Reactive.Linq; using System.Reactive.Subjects; namespace SimpleSequence { class Program { static void Main(string[] args) { IObservable<int> source = Observable.Range(1, 10); IObserver<int> obsvr = Observer.Create<int>( x => Console.WriteLine("OnNext: {0}", x), ex => Console.WriteLine("OnError: {0}", ex.Message), () => Console.WriteLine("OnCompleted")); IDisposable subscription = source.Subscribe(obsvr); Console.WriteLine("Press ENTER to unsubscribe..."); Console.ReadLine(); subscription.Dispose(); } } }
除了本身建立observable队列你能够转换现存的.net事件和异步到观察队列中。其它的主题中会详细说到。
使用一个计时器
接下来咱们使用一个计时器来建立一个队列,这个队列将在5秒后开推送数据,以后每1秒推送一次,为了说明问题,咱们将在每一个操做推出值 的加上时间戳,经过这样,当咱们订阅这个数据源队列时咱们能够收到它的值 和时间戳。.net
Console.WriteLine(“Current Time: “ + DateTime.Now); var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1)) .Timestamp(); using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp))) { Console.WriteLine("Press any key to unsubscribe"); Console.ReadKey(); } Console.WriteLine("Press any key to exit"); Console.ReadKey();
把普通的队列转换为观察者队列线程
IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 }; IObservable<int> source = e.ToObservable(); IDisposable subscription = source.Subscribe( x => Console.WriteLine("OnNext: {0}", x), ex => Console.WriteLine("OnError: {0}", ex.Message), () => Console.WriteLine("OnCompleted")); Console.ReadKey();
冷与热观察者
冷观察者在开始订阅后,观察者只会像订阅的对象发送流,订阅之间是不能共享值 的。
热观察者是能够共享订阅的值 的。每个订阅者都 会获得推送的值设计
IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1)); IDisposable subscription1 = source.Subscribe( x => Console.WriteLine("Observer 1: OnNext: {0}", x), ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message), () => Console.WriteLine("Observer 1: OnCompleted")); IDisposable subscription2 = source.Subscribe( x => Console.WriteLine("Observer 2: OnNext: {0}", x), ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message), () => Console.WriteLine("Observer 2: OnCompleted")); Console.WriteLine("Press any key to unsubscribe"); Console.ReadLine(); subscription1.Dispose(); subscription2.Dispose();
热的代码以下:code
Console.WriteLine("Current Time: " + DateTime.Now); var source = Observable.Interval(TimeSpan.FromSeconds(1)); //creates a sequence IConnectableObservable<long> hot = Observable.Publish<long>(source); // convert the sequence into a hot sequence IDisposable subscription1 = hot.Subscribe( // no value is pushed to 1st subscription at this point x => Console.WriteLine("Observer 1: OnNext: {0}", x), ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message), () => Console.WriteLine("Observer 1: OnCompleted")); Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now); Thread.Sleep(3000); //idle for 3 seconds hot.Connect(); // hot is connected to source and starts pushing value to subscribers Console.WriteLine("Current Time after Connect: " + DateTime.Now); Thread.Sleep(3000); //idle for 3 seconds Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now); IDisposable subscription2 = hot.Subscribe( // value will immediately be pushed to 2nd subscription x => Console.WriteLine("Observer 2: OnNext: {0}", x), ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message), () => Console.WriteLine("Observer 2: OnCompleted")); Console.ReadKey();
源文server