1、反应式编程(Reactive Programming)html
一、什么是反应式编程:反应式编程(Reactive programming)简称Rx,他是一个使用LINQ风格编写基于观察者模式的异步编程模型。简单点说Rx = Observables + LINQ + Schedulers。python
二、为何会产生这种风格的编程模型?我在本系列文章开始的时候说过一个使用事件的例子:react
var watch = new FileSystemWatcher(); watch.Created += (s, e) => { var fileType = Path.GetExtension(e.FullPath); if (fileType.ToLower() == "jpg") { //do some thing } };
这个代码定义了一个FileSystemWatcher,而后在Watcher事件上注册了一个匿名函数。事件的使用是一种命令式代码风格,有没有办法写出声明性更强的代码风格?咱们知道使用高阶函数可让代码更具声明性,整个LINQ扩展就是一个高阶函数库,常见的LINQ风格代码以下:git
var list = Enumerable.Range(1, 10) .Where(x => x > 8) .Select(x => x.ToString()) .First();
可否使用这样的风格来编写事件呢?github
三、事件流
LINQ是对IEnumerable<T>的一系列扩展方法,咱们能够简单的将IEnumerable<T>认为是一个集合。当咱们将事件放在一个时间范围内,事件也变成了集合。咱们能够将这个事件集合理解为事件流。编程
事件流的出现给了咱们一个可以对事件进行LINQ操做的灵感。bash
2、反应式编程中的两个重要类型多线程
事件模型从本质上来讲是观察者模式,因此IObservable<T>和IObserver<T>也是该模型的重头戏。让咱们来看看这两个接口的定义:架构
public interface IObservable<out T> { //Notifies the provider that an observer is to receive notifications. IDisposable Subscribe(IObserver<T> observer); }
public interface IObserver<in T> { //Notifies the observer that the provider has finished sending push-based notifications. void OnCompleted(); //Notifies the observer that the provider has experienced an error condition. void OnError(Exception error); //Provides the observer with new data. void OnNext(T value); }
这两个名称准确的反应出了它两的职责:IObservable<T>-可观察的事物,IObserver<T>-观察者。框架
IObservable<T>只有一个方法Subscribe(IObserver<T> observer),此方法用来对事件流注册一个观察者。
IObserver<T>有三个回调方法。当事件流中有新的事件产生的时候会回调OnNext(T value),观察者会获得事件中的数据。OnCompleted()和OnError(Exception error)则分别用来通知观察者事件流已结束,事件流发生错误。
显然事件流是可观察的事物,咱们用Rx改写上面的例子:
Observable.FromEventPattern<FileSystemEventArgs>(watch, "Created") .Where(e => Path.GetExtension(e.EventArgs.FullPath).ToLower() == "jpg") .Subscribe(e => { //do some thing });
注:在.net下使用Rx编程须要安装如下Nuget组件:
Install-Package Rx-main
3、UI编程中使用Rx
Rx模型不但使得代码更加具备声明性,Rx还能够用在UI编程中。
一、UI编程中的第一段Rx代码
为了简单的展现如何在UI编程中使用Rx,咱们以Winform中的Button为例,看看事件模型和Rx有何不一样。
private void BindFirstGroupButtons() { btnFirstEventMode.Click += btnFirstEventMode_Click; } void btnFirstEventMode_Click(object sender, EventArgs e) { MessageBox.Show("hello world"); }
添加了一个Button,点击Button的时候弹出一个对话框。使用Rx作一样的实现:
//获得了Button的Click事件流。 var clickedStream = Observable.FromEventPattern<EventArgs>(btnFirstReactiveMode, "Click"); //在事件流上注册了一个观察者。 clickedStream.Subscribe(e => MessageBox.Show("Hello world"));
有朋友指出字符串“Click”很是让人不爽,这确实是个问题。因为Click是一个event类型,没法用表达式树获取其名称,最终我想到使用扩展方法来实现:
public static IObservable<EventPattern<EventArgs>> FromClickEventPattern(this Button button) { return Observable.FromEventPattern<EventArgs>(button, "Click"); } public static IObservable<EventPattern<EventArgs>> FromDoubleClickEventPattern(this Button button) { return Observable.FromEventPattern<EventArgs>(button, "DoubleClick"); }
咱们平时经常使用的事件类型也就那么几个,能够暂时经过这种方案来实现,该方案算不上完美,可是比起直接使用字符串又能优雅很多。
btnFirstReactiveMode.FromClickEventPattern() .Subscribe(e => MessageBox.Show("hello world"));
二、UI编程中存在一个很常见的场景:当一个事件的注册者阻塞了线程时,整个界面都处于假死状态。.net中的异步模型也从APM,EAP,TPL不断演化直至async/await模型的出现才使得异步编程更加简单易用。咱们来看看界面假死的代码:
void btnSecondEventMode_Click(object sender, EventArgs e) { btnSecondEventMode.BackColor = Color.Coral; Thread.Sleep(2000); lblMessage.Text = "event mode"; }
Thread.Sleep(2000);模拟了一个长时间的操做,当你点下Button时整个界面处于假死状态而且此时的程序没法响应其余的界面事件。传统的解决方案是使用多线程来解决假死:
BtnSecondEventAsyncModel.BackColor = Color.Coral; Task.Run(() => { Thread.Sleep(2000); Action showMessage = () => lblMessage.Text = "async event mode"; lblMessage.Invoke(showMessage); });
这个代码的复杂点在于:普通的多线程没法对UI进行操做,在Winform中须要用Control.BeginInvoke(Action action)通过包装后,多线程中的UI操做才能正确执行,WPF则要使用Dispatcher.BeginInvoke(Action action)包装。
Rx方案:
btnSecondReactiveMode.FromClickEventPattern() .Subscribe(e => { Observable.Start(() => { btnSecondReactiveMode.BackColor = Color.Coral; Thread.Sleep(2000); return "reactive mode"; }) .SubscribeOn(ThreadPoolScheduler.Instance) .ObserveOn(this) .Subscribe(x => { lblMessage.Text = x; }); });
一句SubscribeOn(ThreadPoolScheduler.Instance)将费时的操做跑在了新线程中,ObserveOn(this)让后面的观察者跑在了UI线程中。
注:使用ObserveOn(this)须要使用Rx-WinForms
Install-Package Rx-WinForms
这个例子虽然成功了,可是并无比BeginInvoke(Action action)的方案有明显的进步之处。在一个事件流中再次使用Ovservable.Start()开启新的观察者让人更加摸不着头脑。这并非Rx的问题,而是事件模型在UI编程中存在局限性:不方便使用异步,不具有可测试性等。以XMAL和MVVM为核心的UI编程模型将在将来处于主导地位,因为在MVVM中能够将UI绑定到一个Command,从而解耦了事件模型。
开源项目ReactiveUI提供了一个以Rx基础的UI编程方案,可使用在XMAL和MVVM为核心的UI编程中,例如:Xamarin,WFP,Windows Phone8等开发中。
注:在WPF中使用ObserveOn()须要安装Rx-WPF
Install-Package Rx-WPF
三、再来一个例子,让咱们感觉一下Rx的魅力
界面上有两个Button分别为+和-操做,点击+按钮则+1,点击-按钮则-1,最终的结果显示在一个Label中。
这样的一个需求使用经典事件模型只须要维护一个内部变量,两个按钮的Click事件分别对变量作加1或减1的操做便可。
Rx做为一种函数式编程模型讲求immutable-不可变性,即不使用变量来维护内部状态。
var increasedEventStream = btnIncreasement.FromClickEventPattern() .Select(_ => 1); var decreasedEventStream = btnDecrement.FromClickEventPattern() .Select(_ => -1); increasedEventStream.Merge(decreasedEventStream) .Scan(0, (result, s) => result + s) .Subscribe(x => lblResult.Text = x.ToString());
这个例子使用了IObservable<T>的”谓词”来对事件流作了一些操做。
下面就让咱们来看看IObservable<T>中经常使用的“谓词”
4、IObservable<T>中的谓词
IObservable<T>的灵感来源于LINQ,因此不少操做也跟LINQ中的操做差很少,例如Where、First、Last、Single、Max、Any。
还有一些“谓词”则是新出现的,例如上面提到的”Merge”、“Scan”等,为了理解这些“谓词”的含义,咱们请出一个神器RxSandbox。
一、Merge操做,从下面的图中咱们能够清晰的看出Merge操做将三个事件流中的事件合并在了同一个时间轴上。
二、Where操做则是根据指定的条件筛选出事件。
有了这个工具咱们能够更加方便的了解这些“谓词”的用途。
5、IObservable<T>的建立
Observable类提供了不少静态方法用来建立IObservable<T>,以前的例子咱们都使用FromEventPattern方法来将事件转化为IObservable<T>,接下来再看看别的方法。
Return能够建立一个具体的IObservable<T>:
public static void UsingReturn() { var greeting = Observable.Return("Hello world"); greeting.Subscribe(Console.WriteLine); }
Create也能够建立一个IObservable<T>,而且拥有更加丰富的重载:
public static void UsingCreate() { var greeting = Observable.Create<string>(observer => { observer.OnNext("Hello world"); return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed")); }); greeting.Subscribe(Console.WriteLine); }
Range方法能够产生一个指定范围内的IObservable<T>
Observable.Range(1, 10) .Subscribe(x => Console.WriteLine(x.ToString()));
Generate方法是一个折叠操做的逆向操做,又称Unfold方法:
public static void UsingGenerate() { var range = Observable.Generate(0, x => x < 10, x => x + 1, x => x); range.Subscribe(Console.WriteLine); }
Interval方法能够每隔必定时间产生一个IObservable<T>:
Observable.Interval(TimeSpan.FromSeconds(1)) .Subscribe(x => Console.WriteLine(x.ToString()));
Subscribe方法有一个重载,能够分别对Observable发生异常和Observable完成定义一个回调函数。
Observable.Range(1, 10) .Subscribe(x => Console.WriteLine(x.ToString()), e => Console.WriteLine("Error" + e.Message), () => Console.WriteLine("Completed"));
还能够将IEnumerable<T>转化为IObservable<T>类型:
Enumerable.Range(1, 10).ToObservable() .Subscribe(x => Console.WriteLine(x.ToString()));
也能够将IObservable<T>转化为IEnumerable<T>
var list= Observable.Range(1, 10).ToEnumerable();
6、Scheduler
Rx的核心是观察者模式和异步,Scheduler正是为异步而生。咱们在以前的例子中已经接触过一些具体的Scheduler了,那么他们都具体是作什么的呢?
一、先看下面的代码:
public static void UsingScheduler() { Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId); var source = Observable.Create<int>( o => { Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId); o.OnNext(1); o.OnNext(2); o.OnNext(3); o.OnCompleted(); Console.WriteLine("Finished on threadId:{0}",Thread.CurrentThread.ManagedThreadId); return Disposable.Empty; }); source //.SubscribeOn(NewThreadScheduler.Default) //.SubscribeOn(ThreadPoolScheduler.Instance) .Subscribe( o => Console.WriteLine("Received {1} on threadId:{0}",Thread.CurrentThread.ManagedThreadId,o), () => Console.WriteLine("OnCompleted on threadId:{0}",Thread.CurrentThread.ManagedThreadId)); Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId); }
当咱们不使用任何Scheduler的时候,整个Rx的观察者和主题都跑在主线程中,也就是说并无异步执行。正以下面的截图,全部的操做都跑在threadId=1的线程中。
当咱们使用SubscribeOn(NewThreadScheduler.Default)或者SubscribeOn(ThreadPoolScheduler.Instance)的时候,观察者和主题都跑在了theadId=3的线程中。
这两个Scheduler的区别在于:NewThreadScheduler用于执行一个长时间的操做,ThreadPoolScheduler用来执行短期的操做。
二、SubscribeOn和ObserveOn的区别
上面的例子仅仅展现了SubscribeOn()方法,Rx中还有一个ObserveOn()方法。stackoverflow上有一个这样的问题:What's the difference between SubscribeOn and ObserveOn,其中一个简单的例子很好的诠释了这个区别。
public static void DifferenceBetweenSubscribeOnAndObserveOn() { Thread.CurrentThread.Name = "Main"; IScheduler thread1 = new NewThreadScheduler(x => new Thread(x) { Name = "Thread1" }); IScheduler thread2 = new NewThreadScheduler(x => new Thread(x) { Name = "Thread2" }); Observable.Create<int>(o => { Console.WriteLine("Subscribing on " + Thread.CurrentThread.Name); o.OnNext(1); return Disposable.Create(() => { }); }) //.SubscribeOn(thread1) //.ObserveOn(thread2) .Subscribe(x => Console.WriteLine("Observing '" + x + "' on " + Thread.CurrentThread.Name)); }
当咱们注释掉:SubscribeOn(thread1)和ObserveOn(thread2)时的结果以下:
观察者和主题都跑在name为Main的thread中。
当咱们放开SubscribeOn(thread1):
主题和观察者都跑在了name为Thread1的线程中
当咱们注释掉:SubscribeOn(thread1),放开ObserveOn(thread2)时的结果以下:
主题跑在name为Main的主线程中,观察者跑在了name=Thread2的线程中。
当咱们同时放开SubscribeOn(thread1)和ObserveOn(thread2)时的结果以下:
主题跑在name为Thread1的线程中,观察者跑在了name为Thread2的线程中。
至此结论应该很是清晰了:SubscribeOn()和ObserveOn()分别控制着主题和观察者的异步。
7、其余Rx资源
除了.net中的Rx.net,其余语言也纷纷推出了本身的Rx框架。
参考资源:
http://rxwiki.wikidot.com/101samples
http://introtorx.com/Content/v1.0.10621.0/01_WhyRx.html#WhyRx
http://www.codeproject.com/Articles/646361/Reactive-Programming-For-NET-And-Csharp-Developers