响应式编程(Reactive Programming)(Rx)介绍

很明显你是有兴趣学习这种被称做响应式编程的新技术才来看这篇文章的。html

学习响应式编程是很困难的一个过程,特别是在缺少优秀资料的前提下。刚开始学习时,我试过去找一些教程,并找到了为数很少的实用教程,可是它们都流于表面,从没有围绕响应式编程构建起一个完整的知识体系。库的文档每每也没法帮助你去了解它的函数。不信的话能够看一下这个:前端

经过合并元素的指针,将每个可观察的元素序列放射到一个新的可观察的序列中,而后将多个可观察的序列中的一个转换成一个只从最近的可观察序列中产生值得可观察的序列。java

天啊。react

我看过两本书,一本只是讲述了一些概念,而另外一本则纠结于如何使用响应式编程库。我最终放弃了这种痛苦的学习方式,决定在开发中一边使用响应式编程,一边理解它。在 Futurice 工做期间,我尝试在真实项目中使用响应式编程,而且当我遇到困难时,获得了同事们的帮助。jquery

在学习过程当中最困难的一部分是 以响应式编程的方式思考 。这意味着要放弃命令式且带状态的编程习惯,而且要强迫你的大脑以一种不一样的方式去工做。在互联网上我找不到任何关于这方面的教程,而我以为这世界须要一份关于怎么以响应式编程的方式思考的实用教程,这样你就有足够的资料去起步。库的文档没法为你的学习提供指引,而我但愿这篇文章能够。git

“什么是响应式编程?”

在互联网上有着一大堆糟糕的解释与定义。Wikipedia 一如既往的空泛与理论化。Stackoverflow 的权威答案明显不适合初学者。Reactive Manifesto 看起来是你展现给你公司的项目经理或者老板们看的东西。微软的 Rx terminology "Rx = Observables + LINQ + Schedulers" 过于重量级且微软味十足,只会让大部分人困惑。相对于你所使用的 MV* 框架以及钟爱的编程语言,"Reactive" 和 "Propagation of change" 这些术语并无传达任何有意义的概念。框架的 Views 层固然要对 Models 层做出反应,改变固然会传播。若是没有这些,就没有东西会被渲染了。github

因此不要再扯这些废话了。ajax

响应式编程是使用异步数据流进行编程

一方面,这并非什么新东西。Event buses 或者 Click events 本质上就是异步事件流,你能够监听并处理这些事件。响应式编程的思路大概以下:你能够用包括 Click 和 Hover 事件在内的任何东西建立 Data stream。Stream 廉价且常见,任何东西均可以是一个 Stream:变量、用户输入、属性、Cache、数据结构等等。举个例子,想像一下你的 Twitter feed 就像是 Click events 那样的 Data stream,你能够监听它并相应的做出响应。数据库

在这个基础上,你还有使人惊艳的函数去组合、建立、过滤这些 Streams。这就是函数式魔法的用武之地。Stream 能接受一个,甚至多个 Stream 为输入。你能够融合两个 Stream,也能够从一个 Stream 中过滤出你感兴趣的 Events 以生成一个新的 Stream,还能够把一个 Stream 中的数据值 映射到一个新的 Stream 中。编程

既然 Stream 在响应式编程中如此重要,那么咱们就应该好好的了解它们,就从咱们熟悉的"Clicks on a button" Event stream 开始。

响应式编程

Stream 就是一个按时间排序的 Events 序列,它能够放射三种不一样的 Events:(某种类型的)Value、Error 或者一个" Completed" Signal。考虑一下"Completed"发生的时机,例如,当包含这个按钮的窗口或者视图被关闭时。

经过分别为 Value、Error、"Completed"定义事件处理函数,咱们将会异步地捕获这些 Events。有时能够忽略 Error 与"Completed",你只须要定义 Value 的事件处理函数就行。监听一个 Stream 也被称做是订阅 ,而咱们所定义的函数就是观察者,Stream则是被观察者,其实就是 Observer Design Pattern

上面的示意图也可使用ASCII重画为下图,在下面的部分教程中咱们会使用这幅图:

--a---b-c---d---X---|->

    a, b, c, d are emitted values
    X is an error
    | is the 'completed' signal
    ---> is the timeline

既然已经开始对响应式编程感到熟悉,为了避免让你以为无聊,咱们能够尝试作一些新东西:咱们将会把一个 Click event stream 转为新的 Click event stream。

首先,让咱们作一个能记录一个按钮点击了多少次的计数器 Stream。在常见的响应式编程库中,每一个Stream都会有多个方法,如 mapfilterscan, 等等。当你调用其中一个方法时,例如 clickStream.map(f),它就会基于原来的 Click stream 返回一个新的 Stream 。它不会对原来的 Click steam 做任何修改。这个特性称为不可变性,它对于响应式编程 Stream,就若是汁对于薄煎饼。咱们也能够对方法进行链式调用,如 clickStream.map(f).scan(g)

clickStream: ---c----c--c----c------c-->
               vvvvv map(c becomes 1) vvvv
               ---1----1--1----1------1-->
               vvvvvvvvv scan(+) vvvvvvvvv
    counterStream: ---1----2--3----4------5-->

map(f) 会根据你提供的 f 函数把原 Stream 中的 Value 分别映射到新的 Stream 中。在咱们的例子中,咱们把每一次 Click 都映射为数字 1。scan(g) 会根据你提供的 g 函数把 Stream 中的全部 Value 聚合成一个 Value x = g(accumulated, current) ,这个示例中 g 只是一个简单的添加函数。而后,每 Click 一次, counterStream 就会把点击的总次数发给它的观察者。

为了展现响应式编程真正的实力,让咱们假设你想获得一个包含“双击”事件的 Stream。为了让它更加有趣,假设咱们想要的这个 Stream 要同时考虑三击(Triple clicks),或者更加宽泛,连击(两次或更多)。深呼吸一下,而后想像一下在传统的命令式且带状态的方式中你会怎么实现。我敢打赌代码会像一堆乱麻,而且会使用一些变量保存状态,同时也有一些计算时间间隔的代码。

而在响应式编程中,这个功能的实现就很是简单。事实上,这逻辑只有 4 行代码。但如今咱们先无论那些代码。用图表的方式思考是理解怎样构建Stream的最好方法,不管你是初学者仍是专家。

响应式编程

灰色的方框是用来转换 Stream 函数的。首先,简而言之,咱们把连续 250 ms 内的 Click 都积累到一个列表中(就是buffer(stream.throttle(250ms) 作的事。不要在乎这些细节,咱们只是展现一下响应式编程而已)。结果是一个列表的 Stream ,而后咱们使用 map() 把每一个列表映射为一个整数,即它的长度。最终,咱们使用 filter(x >= 2) 把整数 1 给过滤掉。就这样,3 个操做就生成了咱们想要的 Stream。而后咱们就能够订阅(“监听”)这个 Stream,并以咱们所但愿的方式做出反应。

我但愿你能感觉到这个示例的优美之处。这个示例只是冰山一角:你能够把一样的操做应用到不一样种类的 Stream 上,例如,一个 API 响应的 Stream;另外一方面,还有不少其它可用的函数。

“为何我要使用响应式编程(RP)?”

响应式编程提升了代码的抽象层级,因此你能够只关注定义了业务逻辑的那些相互依赖的事件,而非纠缠于大量的实现细节。RP 的代码每每会更加简明。

特别是在开发如今这些有着大量与数据事件相关的 UI events 的高互动性 Webapps、手机 apps 的时候,RP 的优点就更加明显。10年前,网页的交互就只是提交一个很长的表单到后端,而在前端只产生简单的渲染。Apps 就表现得更加的实时了:修改一个表单域就能自动地把修改后的值保存到后端,为一些内容"点赞"时,会实时的反应到其它在线用户那里等等。

如今的 Apps 有着大量各类各样的实时 Events,以给用户提供一个交互性较高的体验。咱们须要工具去应对这个变化,而响应式编程就是一个答案。

以 RP 方式思考的例子

让咱们作一些实践。一个真实的例子一步一步的指导咱们以 RP 的方式思考。不是虚构的例子,也没有只解释了一半的概念。学完教程以后,咱们将写出真实可用的代码,并作到知其然,知其因此然。

在这个教程中,我将会使用 JavaScript 和 RxJS 做为工具 ,由于JavaScript是如今最多人会的语言,而 Rx* library family 有多种语言版本,并支持多种平台(.NETJava, Scala, Clojure, JavaScriptRubyPythonC++Objective-C/Cocoa, Groovy等等)。因此,不管你用的是什么工具,你都能从下面这个教程中受益。

实现"Who to follow"推荐界面

在 Twitter 上,这个代表其余帐户的 UI 元素看起来是这样的:

响应式编程

咱们将会重点模拟它的核心功能,以下:

  • 启动时从 API 那里加载账户数据,并显示 3 个推荐
  • 点击"Refresh"时,加载另外 3 个推荐用户到这三行中
  • 点击账户所在行的'x'按钮时,只清除那一个推荐而后显示一个新的推荐
  • 每行都会显示账户的头像,以及他们主页的连接

咱们能够忽略其它的特性和按钮,由于它们是次要的。同时,由于 Twitter 最近关闭了对非受权用户的 API,咱们将会为 Github 实现这个推荐界面,而非 Twitter。这是Github获取用户的API

若是你想先看一下最终效果,这里有完成后的代码 http://jsfiddle.net/staltz/8jFJH/48/

请求和响应

在 Rx 中你该怎么处理这个问题呢? 好吧,首先,(几乎) 全部的东西均可以转为一个Stream 。这就是Rx的咒语。让咱们先从最简单的特性开始:"在启动时,从API加载3个账户的数据"。这并无什么特别,就只是简单的(1)发出一个请求,(2)收到一个响应,(3)渲染这个响应。因此,让咱们继续,并用Stream表明咱们的请求。一开始可能会以为杀鸡用牛刀,但咱们应当从最基本的开始,对吧?

在启动的时候,咱们只须要发出一个请求,因此若是咱们把它转为一个Data stream的话,那就是一个只有一个Value的Stream。稍后,咱们知道将会有多个请求发生,但如今,就只有一个请求。

--a------|->

    Where a is the string 'https://api.github.com/users'

这是一个咱们想向其发出请求的 URL 的 Stream。每当一个请求事件发生时,它会告诉咱们两件事:"何时"与"什么东西"。"何时"这个请求会被执行,就是何时这个 Event 会被映射。"什么东西"会被请求,就是这个映射出来的值:一个包含 URL 的 String。

在 RX 中,建立只有一个值的 Stream 是很是简单的。官方把一个 Stream 称做“Observable”,由于它能够被观察,可是我发现那是个很愚蠢的名子,因此我把它叫作 Stream*。

var requestStream = Rx.Observable.just('https://api.github.com/users');

可是如今,那只是一个包含了String的Stream,并无其余操做,因此咱们须要以某种方式使那个值被映射。就是经过subscribing 这个 Stream。

requestStream.subscribe(function(requestUrl) {
    // execute the request
    jQuery.getJSON(requestUrl, function(responseData) {
        // ...
    });
    }

留意一下咱们使用了 jQuery 的 Ajax 函数(咱们假设你已经知道 should know already)去处理异步请求操做。但先等等,Rx 能够用来处理异步 Data stream。那这个请求的响应就不能看成一个包含了将会到达的数据的 Stream 吗?固然,从理论上来说,应该是能够的,因此咱们尝试一下。

requestStream.subscribe(function(requestUrl) {
    // execute the request
    var responseStream = Rx.Observable.create(function (observer) {
        jQuery.getJSON(requestUrl)
        .done(function(response) { observer.onNext(response); })
        .fail(function(jqXHR, status, error) { observer.onError(error); })
        .always(function() { observer.onCompleted(); });
    });

    responseStream.subscribe(function(response) {
        // do something with the response
    });
    }

Rx.Observable.create()所作的事就是经过显式的通知每个 Observer (或者说是“Subscriber”) Data events(onNext() )或者 Errors ( onError() )来建立你本身的 Stream。而咱们所作的就只是把 jQuery Ajax Promise 包装起来而已。打扰一下,这意味者Promise本质上就是一个Observable?

响应式编程

是的。

Observable 就是 Promise++。在 Rx 中,你能够用 var stream = Rx.Observable.fromPromise(promise) 轻易的把一个 Promise 转为 Observable,因此咱们就这样子作吧。惟一的不一样就是 Observable 并不遵循 Promises/A+,但概念上没有冲突。Promise 就是只有一个映射值的 Observable。Rx Stream 比 Promise 更进一步的是容许返回多个值。

这样很是不错,并展示了 Observables 至少有 Promise 那么强大。因此若是你相信 Promise 宣传的那些东西,那么也请留意一下 Rx Observables 能胜任些什么。

如今回到咱们的例子,若是你已经注意到了咱们在 subscribe() 内又调用了另一个 subscribe() ,这相似于 Callback hell。一样,你应该也注意到 responseStream 是创建在 requestStream 之上的。就像你以前了解到的那样,在 Rx 内有简单的机制能够从其它 Stream 中转换并建立出新的 Stream,因此咱们也应该这样子作。

你如今须要知道的一个基本的函数是 [map(f)](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observable.md#rxobservableprototypemapselector-thisarg) ,它分别把 f() 应用到 Stream A 中的每个值中,并把返回的值放进 Stream B 里。若是咱们也对请求 Stream 与响应 Stream 进行一样的处理,咱们能够把 Request URL 映射为响应 Promise(而 Promise 能够转为 Streams)。

var responseMetastream = requestStream
    .map(function(requestUrl) {
     return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
      });

而后,咱们将会创造一个叫作" Metastream "的怪物:包含 Stream 的 Stream。暂时不须要惧怕。Metastream 就是一个 Stream,其中映射的值仍是另一个 Stream。你能够把它想像为 pointers:每一个映射的值都是一个指向其它 Stream 的指针。在咱们的例子里,每一个请求 URL 都会被映射一个指向包含响应 Promise stream 的指针。

响应式编程

Response 的 Metastream 看起来会让人困惑,而且看起来也没有帮到咱们什么。咱们只想要一个简单的响应 stream,其中每一个映射的值应该是 JSON 对象,而不是一个 JSON 对象的'Promise'。是时候介绍 (Mr. Flatmap)(https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observable.md#rxobservableprototypeflatmapselector-resultselector) 了:它是 map() 的一个版本,经过把应用到"trunk" Stream 上的全部操做都应用到"branch" Stream 上,能够"flatten" Metastream。Flatmap 并非用来"修复" Metastream 的,由于 Metastream 也不是一个漏洞,这只是一些用来处理 Rx 中的异步响应的工具。

var responseStream = requestStream
    .flatMap(function(requestUrl) {
        return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
      });

响应式编程

很好。由于响应stream是根据请求 stream定义的,因此 若是 咱们后面在请求 stream上发起更多的请求的话,在响应 stream上咱们将会获得相应的响应事件,就像预期的那样:

requestStream:  --a-----b--c------------|->
    responseStream: -----A--------B-----C---|->

    (lowercase is a request, uppercase is its response)

如今,咱们终于有了一个响应 stream,因此能够把收到的数据渲染出来了:

responseStream.subscribe(function(response) {
    // render `response` to the DOM however you wish
    });

把目前为止全部的代码放到一块儿就是这样:

var requestStream = Rx.Observable.just('https://api.github.com/users');

    var responseStream = requestStream
    .flatMap(function(requestUrl) {
        return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
    });

    responseStream.subscribe(function(response) {
    // render `response` to the DOM however you wish
    });

刷新按钮

我以前并无提到返回的 JSON 是一个有着 100 个用户数据的列表。由于这个 API 只容许咱们设置偏移量,而没法设置返回的用户数,因此咱们如今是只用了 3 个用户的数据而浪费了另外 97 个的数据。这个问题暂时能够忽略,稍后咱们会学习怎么缓存这些数据。

每点击一次刷新按钮,请求 stream 就会映射一个新的 URL,同时咱们也能获得一个新的响应。咱们须要两样东西:一个是刷新按钮上 Click events 组成的 Stream(咒语:一切都能是 Stream),同时咱们须要根据刷新 click stream 而改变请求 stream。幸运的是,RxJS 提供了从 Event listener 生成 Observable 的函数。

var refreshButton = document.querySelector('.refresh');
    var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

既然刷新 click event 自己并无提供任何要请求的 API URL,咱们须要把每一次的 Click 都映射为一个实际的 URL。如今,咱们把刷新 click stream 改成新的请求 stream,其中每个 Click 都分别映射为带有随机偏移量的 API 端点。

var requestStream = refreshClickStream
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
      });

由于我比较笨而且也没有使用自动化测试,因此我刚把以前作好的一个特性毁掉了。如今在启动时不会再发出任何的请求,而只有在点击刷新按钮时才会。额...这两个行为我都须要:不管是点击刷新按钮时仍是刚打开页面时都该发出一个请求。

咱们知道怎么分别为这两种状况生成 Stream:

var requestOnRefreshStream = refreshClickStream
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
    });

    var startupRequestStream = Rx.Observable.just('https://api.github.com/users');

但咱们怎样才能把这两个"融合"为一个呢?好吧,有 merge() 函数。这就是它作的事的图解:

stream A: ---a--------e-----o----->
    stream B: -----B---C-----D-------->
          vvvvvvvvv merge vvvvvvvvv
          ---a-B---C--e--D--o----->

这样就简单了:

var requestOnRefreshStream = refreshClickStream
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
    });

    var startupRequestStream = Rx.Observable.just('https://api.github.com/users');

    var requestStream = Rx.Observable.merge(
    requestOnRefreshStream, startupRequestStream
    );

还有一个更加简洁的可选方案,不须要使用中间变量。

var requestStream = refreshClickStream
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
    })
      .merge(Rx.Observable.just('https://api.github.com/users'));

甚至能够更简短,更具备可读性:

var requestStream = refreshClickStream
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
    })
      .startWith('https://api.github.com/users');

startWith() 函数作的事和你预期的彻底同样。不管你输入的 Stream 是怎样,startWith(x) 输出的 Stream 一开始都是 x 。可是还不够 DRY,我重复了 API 终端 string。一种修复的方法是去掉 refreshClickStream 最后的startWith() ,并在一开始的时候"模拟"一次刷新 Click。

var requestStream = refreshClickStream.startWith('startup click')
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

很好。若是你把以前我"毁掉了的版本"的代码和如今的相比,就会发现惟一的不一样是加了 startWith() 函数。

用 Stream 构建三个推荐

到如今为止,咱们只是谈及了这个推荐 UI 元素在 responeStream 的 subscribe() 内执行的渲染步骤。对于刷新按钮,咱们还有一个问题:当你点击‘刷新’ 时,当前存在的三个推荐并不会被清除。新的推荐会在响应到达后出现,为了让 UI 看起来舒服一些,当点击刷新时,咱们须要清理掉当前的推荐。

refreshClickStream.subscribe(function() {
    // clear the 3 suggestion DOM elements 
    });

不,别那么快,朋友。这样很差,咱们如今有两个订阅者会影响到推荐的 DOM 元素(另一个是responseStream.subscribe() ),并且这样彻底不符合 Separation of concerns。还记得响应式编程的咒语么?

响应式编程

因此让咱们把显示的推荐设计成一个 stream,其中每个映射的值都是包含了推荐内容的 JSON 对象。咱们以此把三个推荐内容分开来。如今第一个推荐看起来是这样子的:

var suggestion1Stream = responseStream
    .map(function(listUsers) {
        // get one random user from the list
        return listUsers[Math.floor(Math.random()*listUsers.length)];
      });

其余的, suggestion2Stream 和 suggestion3Stream 能够简单的拷贝 suggestion1Stream 的代码来使用。这不是 DRY,它会让咱们的例子变得更加简单一些,加之我以为这是一个能够帮助考虑如何减小重复的良好实践。

咱们不在 responseStream 的 subscribe() 中处理渲染了,咱们这么处理:

suggestion1Stream.subscribe(function(suggestion) {
    // render the 1st suggestion to the DOM
    });

回到"当刷新时,清理掉当前的推荐",咱们能够很简单的把刷新点击映射为 null,而且在 suggestion1Stream 中包含进来,以下:

var suggestion1Stream = responseStream
    .map(function(listUsers) {
        // get one random user from the list
        return listUsers[Math.floor(Math.random()*listUsers.length)];
    })
    .merge(
        refreshClickStream.map(function(){ return null; })
      );

当渲染时,null 解释为"没有数据",因此把 UI 元素隐藏起来。

suggestion1Stream.subscribe(function(suggestion) {
    if (suggestion === null) {
        // hide the first suggestion DOM element
    }
    else {
        // show the first suggestion DOM element
        // and render the data
    }
    });

如今的示意图:

refreshClickStream: ----------o--------o---->
    requestStream: -r--------r--------r---->
    responseStream: ----R---------R------R-->   
    suggestion1Stream: ----s-----N---s----N-s-->
    suggestion2Stream: ----q-----N---q----N-q-->
    suggestion3Stream: ----t-----N---t----N-t-->

其中,N 表明了 null

做为一种补充,咱们也能够在一开始的时候就渲染“空的”推荐内容。这经过把 startWith(null) 添加到 Suggestion stream 就完成了:

var suggestion1Stream = responseStream
    .map(function(listUsers) {
        // get one random user from the list
        return listUsers[Math.floor(Math.random()*listUsers.length)];
    })
    .merge(
        refreshClickStream.map(function(){ return null; })
    )
      .startWith(null);

如今结果是:

refreshClickStream: ----------o---------o---->
     requestStream: -r--------r---------r---->
    responseStream: ----R----------R------R-->   
    suggestion1Stream: -N--s-----N----s----N-s-->
    suggestion2Stream: -N--q-----N----q----N-q-->
    suggestion3Stream: -N--t-----N----t----N-t-->

关闭推荐并使用缓存的响应

还有一个功能须要实现。每个推荐,都该有本身的"X"按钮以关闭它,而后在该位置加载另外一个推荐。最初的想法,点击任何关闭按钮时都须要发起一个新的请求:

var close1Button = document.querySelector('.close1');
    var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click');
    // and the same for close2Button and close3Button

    var requestStream = refreshClickStream.startWith('startup click')
    .merge(close1ClickStream) // we added this
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
      });

这个没有效果。这将会关闭而且从新加载 全部 的推荐,而不是仅仅处理咱们点击的那一个。有一些不同的方法能够解决,而且让它变得更加有趣,咱们能够经过复用以前的请求来解决它。API 的响应页面有 100 个用户,而咱们仅仅使用其中的三个,因此还有不少的新数据可使用,无须从新发起请求。

一样的,咱们用Stream的方式来思考。当点击'close1'时,咱们想要用 responseStream 最近的映射从响应列表中获取一个随机的用户,如:

requestStream: --r--------------->
    responseStream: ------R----------->
    close1ClickStream: ------------c----->
    suggestion1Stream: ------s-----s----->

在 Rx* 中, 叫作链接符函数的 combineLatest 彷佛实现了咱们想要的功能。它接受两个 Stream,A 和 B 做为输入,当其中一个 Stream 发射一个值时, combineLatest 把最近两个发射的值 a 和 b 从各自的 Stream 中取出而且返回一个 c = f(x,y) ,其中 f 为你定义的函数。用图来表示更好:

stream A: --a-----------e--------i-------->
    stream B: -----b----c--------d-------q---->
          vvvvvvvv combineLatest(f) vvvvvvv
          ----AB---AC--EC---ED--ID--IQ---->

    where f is the uppercase function

咱们能够在 close1ClickStream 和 responseStream 上使用 combineLatest(),因此不管何时当一个按钮被点击时,咱们能够得到最新的响应发射值,而且在 suggestion1Stream 上产生一个新的值。另外一方面,combineLatest() 是对称的,当一个新的响应在 responseStream 发射时,它将会把最后的'关闭 1'的点击事件一块儿合并来产生一个新的推荐。这是有趣的,由于它容许咱们把以前的 suggestion1Stream 代码简化成下边这个样子:

var suggestion1Stream = close1ClickStream
    .combineLatest(responseStream,             
        function(click, listUsers) {
        return listUsers[Math.floor(Math.random()*listUsers.length)];
        }
    )
    .merge(
        refreshClickStream.map(function(){ return null; })
    )
      .startWith(null);

还有一个问题须要解决。combineLatest() 使用最近的两个数据源,可是当其中一个来源没发起任何事件时,combineLatest() 没法在 Output stream 中产生一个 Data event。从上边的 ASCII 图中,你能够看到,当第一个 Stream 发射值 a 时,这个值时并无任何输出产生,只有当第二个 Stream 发射值 b 时才有值输出。

有多种方法能够解决这个问题,咱们选择最简单的一种,一开始在'close 1'按钮上模拟一个点击事件:

var suggestion1Stream = close1ClickStream.startWith('startup click') // we added this
    .combineLatest(responseStream,             
        function(click, listUsers) {l
         return listUsers[Math.floor(Math.random()*listUsers.length)];
        }
    )
    .merge(
        refreshClickStream.map(function(){ return null; })
    )
      .startWith(null);

结束

终于完成了,全部的代码合在一块儿是这样子:

var refreshButton = document.querySelector('.refresh');
    var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

    var closeButton1 = document.querySelector('.close1');
    var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click');
    // and the same logic for close2 and close3

    var requestStream = refreshClickStream.startWith('startup click')
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
    });

    var responseStream = requestStream
    .flatMap(function (requestUrl) {
        return Rx.Observable.fromPromise($.ajax({url: requestUrl}));
    });

    var suggestion1Stream = close1ClickStream.startWith('startup click')
    .combineLatest(responseStream,             
        function(click, listUsers) {
        return listUsers[Math.floor(Math.random()*listUsers.length)];
        }
    )
    .merge(
        refreshClickStream.map(function(){ return null; })
    )
    .startWith(null);
    // and the same logic for suggestion2Stream and suggestion3Stream

    suggestion1Stream.subscribe(function(suggestion) {
    if (suggestion === null) {
        // hide the first suggestion DOM element
    }
    else {
        // show the first suggestion DOM element
        // and render the data
    }
     });

你能够查看这个最终效果 http://jsfiddle.net/staltz/8jFJH/48/

这段代码虽然短小,但实现了很多功能:它适当的使用 Separation of concerns 实现了对 Multiple events 的管理,甚至缓存了响应。函数式的风格让代码看起来更加 Declarative 而非 Imperative:咱们并不是给出一组指令去执行,而是经过定义 Stream 之间的关系 定义这是什么 。举个例子,咱们使用 Rx 告诉计算机 *suggestion1Stream* 是 由 'close 1' Stream 与最新响应中的一个用户合并而来,在程序刚运行或者刷新时则是 null 。

留意一下代码中并无出现如 if 、 for 、 while 这样的控制语句,或者通常 JavaScript 应用中典型的基于回调的控制流。若是你想使用 filter() ,上面的 subscribe() 中甚至能够不用 if 、 else (实现细节留给读者做为练习)。在 Rx 中,咱们有着像 map 、 filter 、 scan 、 merge 、 combineLatest 、 startWith 这样的 Stream 函数,甚至更多相似的函数去控制一个事件驱动(Event-driven)的程序。这个工具集让你能够用更少的代码实现更多的功能。

接下来会发生什么

若是你以为 Rx* 会成为你首选的响应式编程库,花点时间去熟悉这个big list of functions,它包括了如何转换、合并、以及建立 Observable。若是你想经过图表去理解这些函数,请看 RxJava's very useful documentation with marble diagrams。不管何时你遇到问题,画一下这些图,思考一下,看一下这一大串函数,而后继续思考。以我我的经验,这样效果很明显。

一旦你开始使用 Rx 去编程,颇有必要去理解 Cold vs Hot Observables 中的概念。若是忽略了这些,你一不当心就会被它坑了。我提醒过你了。经过学习真正的函数式编程去提高本身的技能,并熟悉那些会影响到 Rx 的问题,好比反作用。

可是响应式编程不只仅是 Rx。还有相对容易理解的 Bacon.js,它没有 Rx 那些怪癖。Elm Language 则以它本身的方式支持 RP:它是一门会编译成 Javascript + HTML + CSS 的响应式编程语言 ,并有一个 time travelling debugger。很是厉害。

Rx 在须要处理大量事件的 Frontend 和 Apps 中很是有用。但它不只仅能用在客户端,在后端或者与数据库交互时也很是有用。事实上,RxJava 是实现Netflix's API服务器端并发的一个重要组件 。Rx 并非一个只能在某种应用或者语言中使用的 Framework。它本质上是一个在开发任何 Event-driven 软件中都能使用的编程范式。

若是教程帮到你了,请支持

相关文章
相关标签/搜索