>>返回《C# 并发编程》html
要在应用中安装一个 NuGet 包 System.Reactive
。编程
LINQ to events
(基于 IObservable<T>
)public static void ProgressRun() { var progress = new Progress<int>(); var progressReports = Observable.FromEventPattern<int>( handler => progress.ProgressChanged += handler, handler => progress.ProgressChanged -= handler) //.Where(u => u.EventArgs % 2 == 0) ; progressReports.Subscribe(data => Console.WriteLine($"OnNext:{data.EventArgs},ThreadId:{Thread.CurrentThread.ManagedThreadId}.")); Reports(progress); } private static void Reports(IProgress<int> progress) { System.Console.WriteLine($"Reporting ThreadId:{Thread.CurrentThread.ManagedThreadId}."); for (int i = 0; i < 10; i++) { progress.Report(i); } System.Console.WriteLine($"Reported ThreadId:{Thread.CurrentThread.ManagedThreadId}."); }
输出:并发
Reporting ThreadId:1. Reported ThreadId:1. OnNext:5,ThreadId:9. OnNext:0,ThreadId:4. OnNext:6,ThreadId:10. OnNext:1,ThreadId:5. OnNext:2,ThreadId:6. OnNext:4,ThreadId:8. OnNext:3,ThreadId:7. OnNext:7,ThreadId:11. OnNext:9,ThreadId:13. OnNext:8,ThreadId:12.
public static void TimerRun() { var timer = new System.Timers.Timer(interval: 300) { Enabled = true }; var ticks = Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>( handler => (s, a) => handler(s, a), handler => timer.Elapsed += handler, handler => timer.Elapsed -= handler); ticks.Subscribe(data => Console.WriteLine($"OnNext:{data.EventArgs.SignalTime.Millisecond}, ThreadId:{Thread.CurrentThread.ManagedThreadId}.")); System.Console.WriteLine($"Timer start ThreadId:{Thread.CurrentThread.ManagedThreadId}."); timer.Start(); Thread.Sleep(2000); timer.Stop(); System.Console.WriteLine($"Timer stop ThreadId:{Thread.CurrentThread.ManagedThreadId}."); }
输出:ui
Timer start ThreadId:1. OnNext:473, ThreadId:4. OnNext:772, ThreadId:5. OnNext:73, ThreadId:5. OnNext:373, ThreadId:5. OnNext:673, ThreadId:5. OnNext:975, ThreadId:5. Timer stop ThreadId:1.
public static void ObErrorRun() { var tcs = new TaskCompletionSource<string>(); var client = new WebClient(); var downloadedStrings = Observable.FromEventPattern(client, "DownloadStringCompleted"); downloadedStrings.Subscribe( data => { var eventArgs = (DownloadStringCompletedEventArgs)data.EventArgs; if (eventArgs.Error != null) { Console.WriteLine("OnNext: (Error) " + eventArgs.Error.GetType()); } else { Console.WriteLine("OnNext: " + eventArgs.Result); } }, ex => Console.WriteLine("OnError: " + ex.GetType()), () => Console.WriteLine("OnCompleted")); client.DownloadStringAsync(new Uri("http://invalid.example.com/")); //client.DownloadStringAsync(new Uri("http://www.baidu.com/")); Thread.Sleep(3000); }
输出:.net
OnNext: (Error) System.Net.WebException
把事件封装进 Observable
对象后,每次引起该事件都会调用 OnNext
。在处理 AsyncCompletedEventArgs
时会发生使人奇怪的现象,全部的异常信息都是经过数据形式传递的(OnNext
),而不是经过错误传递(OnError
)。线程
如 UI 元素只能被它所属的 UI 线程控制,所以,若是要根据 Rx 的通知来修改 UI,就应该把通知“转移”到 UI 线程。code
ObserveOn
把通知移动到一个线程池线程,在那里进行计算,而后再把表示结果的通知返回给 UI 线程public delegate void HelloEventHandler(object sender, HelloEventArgs e); public class HelloEventArgs : EventArgs { public string Name { get; set; } public HelloEventArgs(string name) { Name = name; } public int SayHello() { System.Console.WriteLine(Name + " Hello."); return DateTime.Now.Millisecond; } } public static event HelloEventHandler HelloHandlerEvent; public static void ObservableEventRun() { IDisposable ob = null; var task = Task.Run(() => { Thread.Sleep(500); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("lilei")); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("HanMeimei")); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Tom")); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Jerry")); Thread.Sleep(2000); ob?.Dispose(); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("NoConsole")); // 因为 }); // AsyncContext 好比就是 UI上下文 AsyncContext.Run(() => { var uiContext = SynchronizationContext.Current; Console.WriteLine("UI thread is " + Environment.CurrentManagedThreadId); //Observable.FromEvent() ob = Observable.FromEventPattern<HelloEventHandler, HelloEventArgs>( handler => (s, a) => handler.Invoke(s, a), handler => HelloHandlerEvent += handler, handler => HelloHandlerEvent -= handler) .Select(evt => evt.EventArgs.SayHello()).ObserveOn(Scheduler.Default) .Select(s => { // 复杂的计算过程。 Thread.Sleep(100); var result = s; Console.WriteLine("Now Millisecond result " + result + " on thread " + Environment.CurrentManagedThreadId); return result; }) .ObserveOn(uiContext) .Subscribe(s => Console.WriteLine("Subscribe Result " + s + " on thread " + Environment.CurrentManagedThreadId)); //此处不能 task.Wait(); ,会和 Subscribe 中的委托发生死锁 System.Console.WriteLine("AsyncContext.Run Done on thread " + Environment.CurrentManagedThreadId); }); task.Wait(); }
输出:htm
UI thread is 1 AsyncContext.Run Done on thread 1 lilei Hello. HanMeimei Hello. Tom Hello. Jerry Hello. Now Millisecond result 36 on thread 6 Subscribe Result 36 on thread 1 Now Millisecond result 44 on thread 6 Subscribe Result 44 on thread 1 Now Millisecond result 44 on thread 6 Subscribe Result 44 on thread 1 Now Millisecond result 44 on thread 6 Subscribe Result 44 on thread 1
下面的例子使用 Interval
,每秒建立 1 个 OnNext
通知,而后, 使用 Buffer
, 每 2 个通知作一次缓冲:对象
public static void BufferRun() { System.Console.WriteLine($"Buffer start ThreadId:{Thread.CurrentThread.ManagedThreadId}."); var ob = Observable.Interval(TimeSpan.FromMilliseconds(10)) .Buffer(2) .Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Got {string.Join(",", x)}({Thread.CurrentThread.ManagedThreadId})")); Thread.Sleep(100); ob.Dispose(); System.Console.WriteLine($"Buffer end ThreadId:{Thread.CurrentThread.ManagedThreadId}."); }
输出:事件
Buffer start ThreadId:1. 459: Got 0,1(5) 478: Got 2,3(5) 498: Got 4,5(5) 516: Got 6,7(5) 536: Got 8,9(5) Buffer end ThreadId:1.
下面的例子有些相似,使用 Window
建立一些事件组,每组包含 2 个事件:
public static void WindowRun() { System.Console.WriteLine($"Window start ThreadId:{Thread.CurrentThread.ManagedThreadId}."); var ob = Observable.Interval(TimeSpan.FromMilliseconds(10)) .Window(2) .Subscribe(group => { Console.WriteLine($"{DateTime.Now.Millisecond}: Starting new group({Thread.CurrentThread.ManagedThreadId})"); group.Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x},(TID:{Thread.CurrentThread.ManagedThreadId})"), () => Console.WriteLine($"{DateTime.Now.Millisecond}: Ending group")); }); Thread.Sleep(100); ob.Dispose(); System.Console.WriteLine($"Window end ThreadId:{Thread.CurrentThread.ManagedThreadId}."); }
输出:
Window start ThreadId:1. 959: Starting new group(1) 987: Saw 0,(TID:4) 991: Saw 1,(TID:4) 992: Ending group 994: Starting new group(4) 0: Saw 2,(TID:4) 11: Saw 3,(TID:4) 11: Ending group 11: Starting new group(4) 21: Saw 4,(TID:4) 30: Saw 5,(TID:4) 30: Ending group 30: Starting new group(4) 40: Saw 6,(TID:4) 50: Saw 7,(TID:4) 50: Ending group 51: Starting new group(4) 60: Saw 8,(TID:4) 70: Saw 9,(TID:4) 70: Ending group 70: Starting new group(4) Window end ThreadId:1.
这几个例子说明了 Buffer
和 Window
的区别:
Buffer
等待组内的全部事件,而后把全部事件做为一个集合发布Window
用一样的方法进行分组,但它是在每一个事件到达时就发布下面的例子也是监视鼠标移动, 但使用了 Throttle
,在鼠标保持静止 1 秒后才报告最近一条移动事件。
public delegate void MouseEventHandler(object sender, MouseEventArgs e); public class MouseEventArgs : EventArgs { public (int, int) XY { get; set; } public MouseEventArgs((int, int) xy) { XY = xy; } public (int, int) GetPosition() { return XY; } } public static event MouseEventHandler MouseMove; public static void ThrottleRun() { IDisposable ob = null; var task = Task.Run(() => { Thread.Sleep(200); //不触发 MouseMoveProcess((1, 1)); MouseMoveProcess((1, 11)); MouseMoveProcess((1, 111)); MouseMoveProcess((1, 1111)); //触发 MouseMoveProcess((2, 2), 2000); //超时结束 MouseMoveProcess((2, 22)); ob?.Dispose(); }); ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>( handler => (s, a) => handler(s, a), handler => MouseMove += handler, handler => MouseMove -= handler) .Select(x => x.EventArgs.GetPosition()) .Throttle(TimeSpan.FromMilliseconds(500)) .Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}")); task.Wait(); } private static void MouseMoveProcess((int, int) xy, int sleepMillsecond = 200) { System.Console.WriteLine($"Mouse Move {xy.Item1},{xy.Item2},After sleep {sleepMillsecond}."); MouseMove?.Invoke(new object(), new MouseEventArgs(xy)); Thread.Sleep(sleepMillsecond); }
输出:
Mouse Move 1,1,After sleep 200. Mouse Move 1,11,After sleep 200. Mouse Move 1,111,After sleep 200. Mouse Move 1,1111,After sleep 200. Mouse Move 2,2,After sleep 2000. 251: Saw 2,2 Mouse Move 2,22,After sleep 200.
Throttle
经常使用于相似“文本框自动填充”这样的场合
为抑制快速运动的事件序列, Sample 创建了一个有规律的超时时间段, 每一个时间段结束时,它就发布该时间段内最后的一条数据。若是这个时间段没有数据,就不发布。
每隔一秒采样一次
public static void SampleRun() { IDisposable ob = null; var task = Task.Run(() => { Thread.Sleep(200); //不触发 MouseMoveProcess((1, 1)); MouseMoveProcess((1, 11)); MouseMoveProcess((1, 111)); MouseMoveProcess((1, 1111)); //触发 MouseMoveProcess((2, 2), 2000); //超时结束 MouseMoveProcess((2, 22)); ob?.Dispose(); }); ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>( handler => (s, a) => handler(s, a), handler => MouseMove += handler, handler => MouseMove -= handler) .Select(x => x.EventArgs.GetPosition()) .Sample(TimeSpan.FromMilliseconds(500)) .Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}")); task.Wait(); }
输出:
Mouse Move 1,1,After sleep 200. Mouse Move 1,11,After sleep 200. 498: Saw 1,11 Mouse Move 1,111,After sleep 200. Mouse Move 1,1111,After sleep 200. Mouse Move 2,2,After sleep 2000. 991: Saw 2,2 Mouse Move 2,22,After sleep 200. 992: Saw 2,22
Throttle
和 Sample
操做符与 Where
基本差很少,惟一的区别是:
Throttle
、 Sample
根据时间段过滤Where
根据事件的数据过滤在抑制快速涌来的输入流时,这三种操做符提供了三种不一样的方法。
Timeout 操做符在输入流上创建一个可调节的超时窗口。一旦新的事件到达,就重置超时窗口。若是超过时限后事件仍没到达, Timeout 操做符就结束流,并产生一个包含TimeoutException 的 OnError 通知。
public static void TimeoutRun() { IDisposable ob = null; var task = Task.Run(() => { Thread.Sleep(200); //不触发 MouseMoveProcess((1, 1)); MouseMoveProcess((1, 11)); MouseMoveProcess((1, 111)); MouseMoveProcess((1, 1111)); //触发 MouseMoveProcess((2, 2), 1100); //超时结束 MouseMoveProcess((2, 22)); ob?.Dispose(); }); ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>( handler => (s, a) => handler(s, a), handler => MouseMove += handler, handler => MouseMove -= handler) .Select(x => x.EventArgs.GetPosition()) .Timeout(TimeSpan.FromSeconds(1))//Subscribe后相对一秒超时(连续触发则不会超时) .Subscribe( x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}"), ex => Console.WriteLine($"{DateTime.Now.Millisecond}: {ex.GetType().Name}"), // onCompleted 不会执行 () => System.Console.WriteLine($"{DateTime.Now.Millisecond}: Finished.") ); System.Console.WriteLine($"{DateTime.Now.Millisecond} Subscribe Done"); task.Wait(); }
输出:
138 Subscribe Done Mouse Move 1,1,After sleep 200. 313: Saw 1,1 Mouse Move 1,11,After sleep 200. 517: Saw 1,11 Mouse Move 1,111,After sleep 200. 722: Saw 1,111 Mouse Move 1,1111,After sleep 200. 923: Saw 1,1111 Mouse Move 2,2,After sleep 1100. 124: Saw 2,2 139: TimeoutException Mouse Move 2,22,After sleep 200.
在超时以前观察鼠标移动,超时发生后进行切换
public static event MouseEventHandler OtherMouseMove; public static void TimeoutMoveRun() { IDisposable ob = null; var task = Task.Run(() => { Thread.Sleep(200); //不触发 MouseMoveProcess((1, 1), 400); MouseMoveProcess((1, 11), 0); //为了触发超时 Thread.Sleep(1100); System.Console.WriteLine("sleep: 1100"); //因为超时,时间流被迁移到other,下面不会触发 MouseMoveProcess((2, 2), 400); MouseMoveProcess((2, 22), 400); //other的事件,能够触发 OtherMouseMoveProcess((3, 3), 400); OtherMouseMoveProcess((3, 33), 400); ob?.Dispose(); }); var other = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>( handler => (s, a) => handler(s, a), handler => OtherMouseMove += handler, handler => OtherMouseMove -= handler) .Select(x => x.EventArgs.GetPosition()); ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>( handler => (s, a) => handler(s, a), handler => MouseMove += handler, handler => MouseMove -= handler) .Select(x => x.EventArgs.GetPosition()) .Timeout(TimeSpan.FromSeconds(1), other) .Subscribe( x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}"), ex => Console.WriteLine($"{DateTime.Now.Millisecond}: {ex.GetType().Name}")); System.Console.WriteLine($"{DateTime.Now.Millisecond} Subscribe Done"); task.Wait(); } private static void OtherMouseMoveProcess((int, int) xy, int sleepMillsecond = 200) { System.Console.WriteLine($"Other Mouse Move {xy.Item1},{xy.Item2},After sleep {sleepMillsecond}."); OtherMouseMove?.Invoke(new object(), new MouseEventArgs(xy)); Thread.Sleep(sleepMillsecond); }
输出:
793 Subscribe Done Mouse Move 1,1,After sleep 400. 970: Saw 1,1 Mouse Move 1,11,After sleep 0. 373: Saw 1,11 sleep: 1100 Mouse Move 2,2,After sleep 400. Mouse Move 2,22,After sleep 400. Other Mouse Move 3,3,After sleep 400. 281: Saw 3,3 Other Mouse Move 3,33,After sleep 400. 684: Saw 3,33