上一篇 提到的官方源码也能够在个人共享(http://pan.baidu.com/s/1qWqKe5Y)下载。不了解观察者设计模式的读者阅读源码会有意义点困难(好比我),能够参考(.NET设计模式(19):观察者模式(Observer Pattern))入门。html
HelloInsightObservable官方源码利用toToPointStream方法将观察者的实例转化为点事件流,接着在点事件流中使用linq查询e>50的输入,并将其输出c#
运行结果以下:设计模式
其不足之处在于代码有点混乱,并且只有一个观察者。网络
接下来本文就逐步修改,而且实现多个观察者的状况。app
program.cs中定义观察者,将观察者“订阅”到目标对象的语句以下:dom
var outputObserver = new OutputObserver();ide
var outputObservable = query.ToObservable();//将事件流转化为可观察的输出 函数
outputObservable.Subscribe(outputObserver);//提供通知信息到outputObserverthis
那容易想到的思路是直接在program.cs中添加多个观察者,再使用Subscribe方法订阅多个观察者。可是输出每次都有变更,因为不一样观察者输出同样也看不出明显规律,偶尔还会因为枚举观察者的过程当中观察者集合变更而产生异常spa
这是由于InputObservable.cs中模拟输入流的GenerateInput是Timer的回调函数。每个观察者在运行以后都会将Timer设为中止状态,别的观察者在Timer已经启动的状况下加入不是很恰当。使人奇怪的是官方源码在InputObservable. cs的构造函数中启动了Timer,既然没打算添加多个观察者,那在GenerateInput中遍历观察者集合Observers的语句有什么意义?
添加以下引用:
Microsoft.ComplexEventProcessing;
Microsoft.ComplexEventProcessing.Observable;
System.Reactive;
System.Reactive.Providers;
namespace HelloInsight_edit { public class OutputObserver:IObserver<int>//实现IObserver接口 { private string name; public OutputObserver(string name){ this.name = name; } public virtual void OnCompleted() { Console.WriteLine("Stopping query..."); } public virtual void OnError(Exception e) { Console.WriteLine("Unexpected error occured"); } public virtual void OnNext(int value) { Console.WriteLine("{0}观察到的value: {1}", this.name,value); } } }
为简单起见,IObserver的抽象类型都使用int型,之后Main方法建立事件流的时候也会相应修改。
咱们要删掉构造方法中的timer.change(timeSpan,timeSpan),新建了update方法,用来调用这句话。这样可使得多个observer都添加到observers中以后再启动Timer。
public class EventSource:IObservable<int> { private List<IObserver<int>> observers = new List<IObserver<int>>(); private readonly int dataNumber; private int generatedNumber; private Random random; private readonly Timer timer; private readonly int timeSpan; //add private int _randomNumber; public EventSource(int dataNumber) { Console.WriteLine("我是构造方法"); this.random = new Random(); this.dataNumber = dataNumber; this.generatedNumber = 0; this.timer = new Timer(GenerateInput);//callback是一个委托,表示要执行的方法 this.timeSpan = 100;//每一个随机数字产生的时间间隔 1000ms //timer.Change(timeSpan, timeSpan);//此语句控制数据 this._randomNumber = -1;//初始化随机数字 } public int RandomNumber { get { return _randomNumber; } set { this._randomNumber = value; } } public void Update() { timer.Change(timeSpan, timeSpan); } private void GenerateInput(object _) { foreach (var observer in observers) { _randomNumber= random.Next(100); Console.WriteLine("Random generated data {0} : {1}", generatedNumber, _randomNumber); observer.OnNext(_randomNumber); generatedNumber++; if (generatedNumber >= dataNumber) { observer.OnCompleted(); timer.Change(Timeout.Infinite, timeSpan); return; } } timer.Change(timeSpan, timeSpan); } public void AddObserver(IObserver<int> observer) { observers.Add(observer); } public void RemoveObserver(IObserver<int> observer) { observers.Remove(observer); } //必须实现的方法 public IDisposable Subscribe(IObserver<int> observer) { if (observer != null && !observers.Contains(observer)) { observers.Add(observer); } Console.WriteLine("我是subscriber"); return observer as IDisposable; } }
将输入源的实例es转化为点事件流stream,query过滤获得stream中大于50的事件流,query2过滤获得stream大于70的事件流。创建了3个观察者roger、luffy和nami,咱们用luffy观察query,用nami观察query2。
修好program.cs以后就能够调试了噢耶……
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; //add using Microsoft.ComplexEventProcessing; using Microsoft.ComplexEventProcessing.Linq; namespace HelloInsight_edit { class Program { static void Main(string[] args) { //将EventSource类做为CEP引擎的输入。 EventSource es = new EventSource(10); var server = Server.Create("Default"); var application = server.CreateApplication("Observable Application"); //注意如下4行,这里与适配器方式的程序不一样的是,没有插入CTI事件。 var stream = es.ToPointStream(application, e => PointEvent.CreateInsert(DateTime.Now, e), AdvanceTimeSettings.StrictlyIncreasingStartTime, "Observable Stream"); var query = from e in stream where e > 50 select e; OutputObserver roger = new OutputObserver("roger"); OutputObserver luffy = new OutputObserver("luffy"); OutputObserver nami = new OutputObserver("nami"); Console.WriteLine("Starting query..."); //直接对原始流添加观察者 //es.AddObserver(roger); es.AddObserver(luffy); es.AddObserver(nami); //对newStream添加观察者 var newStream = query.ToObservable(); newStream.Subscribe(luffy); //newStream.Subscribe(nami);//添加多个订阅者可能会有异常 //对newStream2添加观察者 var query2 = from e in stream where e > 70 select e; var newStream2 = query2.ToObservable(); newStream2.Subscribe(nami); //调用timer.change(定义callback的等待时间和时间间隔) es.Update(); Console.ReadLine(); } } }
运行结果:
能够看出,Subscribe两个观察者的操做先执行。
在遍历观察者集合observers的过程当中,每组显示2个随机数。luffy和nami依次观察第一个和第二个。
{?我的理解为newStream.Subscribe(luffy);的功能相似于一个绑定了luffy的线程,遍历结束以后所有用户开始依次输出。全局变量generatedNumber负责总体次数}
这不是咱们要的功能。
对于流中每一个事件,不一样观察者都观察到才行。
将生成随机数的语句放到遍历操做foreach以前
private void GenerateInput(object _) { _randomNumber = random.Next(100); if (generatedNumber <= dataNumber) { Console.WriteLine("Random generated data {0} : {1}", generatedNumber, _randomNumber); foreach (var observer in observers) observer.OnNext(_randomNumber);//使用最大程度实现的OnNext } else { observers.ElementAt(0).OnCompleted(); timer.Change(Timeout.Infinite, timeSpan); } generatedNumber++; timer.Change(timeSpan, timeSpan); }
运行结果:
能够看出,对于流中每一个事件,luffy检测到了大于50的事件,nami检测到了大于70的事件,实现了预约的目标。
{!接下来咱们要将观察者模式、点事件流检测和WCF(Windows Communication Foundation)相结合,实现事件源和观察者WCF通讯,便于接下来部署到网络中}
[1]IObserver<T>接口
(http://msdn.microsoft.com/zh-cn/library/dd783449(v=vs.110).aspx)
[2]IDisposable接口
(http://msdn.microsoft.com/zh-cn/library/system.idisposable(v=vs.110).aspx)
[3]virtual方法
(http://www.cnblogs.com/hacker/archive/2004/08/10/31774.html)