一块儿来看 rxjs

更新日志

  • 2018-05-26 校订
  • 2016-12-03 初版翻译

过去你错过的 Reactive Programming 的简介

你好奇于这名为Reactive Programming(反应式编程)的新事物, 更确切地说,你想了解它各类不一样的实现(好比 [Rx*], [Bacon.js], RAC 以及其它各类各样的框架或库)javascript

学习它比较困难, 由于比较缺好的学习材料(译者注: 原文写就时, RxJs 还在 v4 版本, 彼时社区对 RxJs 的探索还不够完善). 我在开始学习的时候, 试图找过教程, 不过能找到的实践指南屈指可数, 并且这些教程只不过隔靴搔痒, 并不能帮助你作真正了解 RxJs 的基本概念. 若是你想要理解其中一些函数, 每每代码库自带的文档帮不到你. 说白了, 你能一下看懂下面这种文档么:前端

Rx.Observable.prototype.flatMapLatest(selector, [thisArg])java

按照将元素的索引合并的方法, 把一个 "observable 队列 " 中的做为一个新的队列加入到 "observable 队列的队列" 中, 而后把 "observable 队列的队列" 中的一个 "observable 队列" 转换成一个 "仅从最近的 'observable 队列' 产生的值构成的一个新队列."react

这是都是什么鬼?git

我读了两本书, 一本只是画了个大体的蓝图, 另外一本则是一节一节教你 "如何使用 Reactive Libarary" . 最后我以一种艰难的方式来学习 Reactive Programming: 一遍写, 一遍理解. 在我就任于 Futurice 的时候, 我第一次在一个真实的项目中使用它, 我在遇到问题时, 获得了来自同事的支持.github

学习中最困难的地方是 以 Reactive(反应式) 的方式思考. 这意思就是, 放下你以往熟悉的编程中的命令式和状态化思惟习惯, 鼓励本身以一种不一样的范式去思考. 至今我还没在网上找到任何这方面的指南, 而我认为世界上应该有一个说明如何以 Reactive(反应式) 的方式思考的教程, 这样你才知道要如何开始使用它. 在阅读完本文后以后. 请继续阅读代码库自带的文档来指引你以后的学习. 我但愿, 这篇文档对你有所帮助.web

"什么是 Reactive Programming(反应式编程)?"

在网上能够找到大量对此糟糕的解释和定义. Wikipedia 的 意料之中地泛泛而谈和过于理论化. Stackoverflow 的 圣经般的答案也绝对不适合初学者. Reactive Manifesto 听起来就像是要给你公司的项目经理或者是老板看的东西. 微软的 Rx 术语 "Rx = Observables + LINQ + Schedulers" 也读起来太繁重, 太微软了, 以致于你看完后仍然一脸懵逼. 相似于 "reactive" 和 "propagation" 的术语传达出的含义给人感受无异于你之前用过的 MV* 框架和趁手的语言已经作到的事情. 咱们现有的框架视图固然是会对数据模型作出反应, 任何的变化固然也是要冒泡的. 要否则, 什么东西都不会被渲染出来嘛.ajax

因此, 让咱们撇开那些无用的说辞, 尝试去了解本质.sql

Reactive programming(反应式编程) 是在以异步数据流来编程

固然, 这也不是什么新东西. 事件总线或者是典型的点击事件确实就是异步事件流, 你能够对其进行 observe(观察) 或者作些别的事情. 不过, Reactive 是比之更优秀的思惟模型. 你可以建立任何事物的数据流, 而不仅是从点击和悬浮事件中. "流" 是广泛存在的, 一切均可能是流: 变量, 用户输入, 属性, 缓存, 数据结构等等. 好比, 想象你的 Twitter 时间线会成为点击事件一样形式的数据流.数据库

熟练掌握该思惟模型以后, 你还会接触到一个使人惊喜的函数集, 其中包含对任何的数据流进行合并、建立或者从中筛选数据的工具. 它充分展示了 "函数式" 的魅力所在. 一个流能够做为另外一个流的输入. 甚至多个流能够做为另外一个流的输入. 你能够合并两个流. 你能够筛选出一个仅包含你须要的数据的另外一个流. 你能够从一个流映射数据值到另外一个流.

让咱们基于 "流是 Reactive 的中心" 这个设想, 来细致地作看一下整个思惟模型, 就从咱们熟知的 "点击一个按钮" 事件流开始.

Click event stream

每一个流是一个按时序不间断的事件序列. 它可能派发出三个东西: (某种类型的)一个数值, 一个错误, 或者一个 "完成" 信号. 说到 "完成" , 举个例子, 当包含了这个按钮的当前窗口/视图关闭时, 也就是 "完成" 信号发生时.

咱们仅能异步地捕捉到这些事件: 经过定义三种函数, 分别用来捕捉派发出的数值、错误以及 "完成" 信号. 有时候后二者能够被忽略, 你只需定义用来捕捉数值的函数. 咱们把对流的 "侦听" 称为订阅(subscribing), 咱们定义的这三种函数合起来就是观察者, 流则是被观察的主体(或者叫"被观察者"). 这正是设计模式中的观察者模式.

描述这种方式的另外一种方式用 ASCII 字符来画个导图, 在本教程的后续的部分也能看到这种图形.

--a---b-c---d---X---|-> a, b, c, d 表明被派发出的值 X 表明错误 | 表明"完成"信号 ---> 则是时间线

这些都是是老生常谈了, 为了避免让你感到无聊, 如今来点新鲜的东西: 咱们将原生的点击事件流进行变换, 来建立新的点击事件流.

首先, 咱们作一个计数流, 来指明一个按钮被点击了多少次. 在通常的 Reactive 库中, 每一个流都附带了许多诸如mapfilterscan 等的方法. 当你调用这些方法之一(好比好比clickStream.map(f))时, 它返回一个基于 clickStream 的新的流. 它没有对原生的点击事件作任何修改. 这种(不对原有流做任何修改的)特性叫作immutability(不可变性), 而它和 Reactive(反应式) 这个概念的契合度之高比如班戟和糖浆(译者注: 班戟就是薄煎饼, 该称呼多见于中国广东地区. 此句意为 immutability 与 Reactive 两个概念高度契合). 这样的流容许咱们进行链式调用, 好比clickStream.map(f).scan(g):

clickStream: ---c----c--c----c------c--> vvvvv map(c becomes 1) vvvv ---1----1--1----1------1--> vvvvvvvvv scan(+) vvvvvvvvv counterStream: ---1----2--3----4------5-->

map(f) 方法根据你提供的函数f替换每一个被派发的元素造成一个新的流. 在上例中, 咱们将每次点击都映射为计数 1. scan(g) 方法则在内部运行x = g(accumulated, current), 以某种方式连续聚合处理该流以前全部的值, 在该例子中, g 就是简单的加法. 而后, 一次点击发生时, counterStream 就派发一个点击数的总值.

为了展现 Reactive 真正的能力, 咱们假设你想要作一个 "双击事件" 的流. 或者更厉害的, 咱们假设咱们想要获得一个 "三击事件流" , 甚至推广到更广泛的状况, "多击流". 如今, 深呼吸, 想象一下按照传统的命令式和状态化思惟习惯要如何完成这项工做? 我敢说那会烦死你了, 它必须包含各类各样用来保持状态的变量, 以及一些对周期性工做的处理.

然而, 以 Reactive 的方式, 它会很是简单. 事实上, 这个逻辑只不过是四行代码. 不过让咱们如今忘掉代码.不管你是个初学者仍是专家, 借助导图来思考, 才是理解和构建流最好的方法.

Multiple clicks stream

图中的灰色方框是将一个流转换成另外一个流的方法. 首先, 每通过 "250毫秒" 的 "事件静默" (简单地说, 这是在 buffer(stream.throttle(250ms)) 完成的. (如今先)没必要担忧对这点的细节的理解, 咱们主要是演示下 Reactive 的能力.), 咱们就获得了一个 "点击动做" 的列表, 即, 转换的结果是一个列表的流, 而从这个流中咱们应用 map() 将每一个列表映射成对应该队列的长度的整数值. 最后, 咱们使用 filter(x >= 2) 方法忽略掉全部的 1. 如上: 这 3 步操做将产生咱们指望的流. 咱们以后能够订阅("侦听")它, 并按咱们但愿的处理方式处理流中的数据.

我但愿你感觉到了这种方式的美妙. 这个例子只是一次不过揭示了冰山一角: 你能够将相同的操做应用到不一样种类的流上, 好比 API 返回的流中. 除此之外, 还有许多有效的函数.

"为何我应该采用反应式编程?"

Reactive Programming (反应式编程) 提高了你代码的抽象层次, 你能够更多地关注用于定义业务逻辑的事件之间的互相依赖, 而没必要写大量的细节代码来处理事件. RP(反应式编程)的代码会更简洁明了.

在现代网页应用和移动应用中, 这种好处是显而易见的, 这些场景下, 与数据事件关联的大量 UI 事件须要被高频地交互. 10 年前, 和 web 页面的交互只是很基础地提交一个长长的表单给后端, 而后执行一次简单的从新渲染. 在这 10 年间, App 逐渐变得更有实时性: 修改表单中的单个字段可以自动触发一次到后端的保存动做, 对某个内容的 "点赞" 须要实时反馈到其余相关的用户......

现今的 App 有大量的实时事件, 它们共同做用, 以带给用户良好的体验. 咱们要能简洁处理这些事件的工具, 而 Reactive Programming 方式咱们想要的.

举例说明如何以反应式编程的方式思考

如今咱们进入到实战. 一个真实的手把手教你如何以 RP(反应式编程) 的方式来思考的例子. 注意这里不是随处抄来的例子, 不是半吊子解释的概念. 到这篇教程结束为止, 咱们会在写出真正的功能性代码的同时, 理解咱们作的每一个动做.

我选择了 JavaScript 和 RxJS 做为工具, 缘由是, JavaScript 是当下最为人熟知的语言, 而 [Rx*] 支持多数语言和平台 (.NET, Java, Scala, Clojure, JavaScript, Ruby, Python, C++, Objective-C/Cocoa, Groovy等等). , 不管你的工具是什么, 你能够从这篇教程中收益.

实现一个"建议关注"盒子

在 Twitter 上, 有一个 UI 元素是建议你能够关注的其它帐户.

Twitter Who to follow suggestions box

咱们将着重讲解如何模仿出它的核心特性:

  • 在页面启动时, 从 API 中加载帐户数据, 并展现三个推荐关注者
  • 在点击"刷新"时, 加载另外的三个推荐关注的帐户, 造成新三行
  • 在点击一个帐户的 "x" 按钮时, 清除该帐户并展现一个新的
  • 每一行显示帐户的头像和到他们主页的连接

咱们能够忽略其它的特性和按钮, 它们都是次要的. 另外, Twitter 最近关闭了非认证请求接口, 做为替代, 咱们使用 [Github 的 API] 来构建这个关注别人 UI.(注: 到本稿的最新的校订为止, github 的该接口对非认证用户启用了一段时间内访问频次限制)

若是你想尽早看一下完整的代码, 请点击[样例代码].

请求和回复

你如何用 Rx 处理这个问题?

首先, (几乎) 万物皆可为流 .这是 "Rx 口诀". 让咱们从最容易的特性开始: "在页面启动时, 从 API 中加载帐户数据". 这没什么可贵, 只须要(1) 发一个请求, (2) 读取回复, (3) 渲染回复的中的数据. 因此咱们直接把咱们咱们的请求当作流. 一开始就用流也许很有"杀鸡焉用牛刀"的意味, 但为了理解, 咱们须要从基本的例子开始.

在应用启动的时候, 咱们只须要一个请求, 所以若是咱们将它做为一个数据流, 它将会只有一个派发的值. 咱们知道以后咱们将有更多的请求, 但刚开始时只有一个.

--a------|-> 其中 a 是字符串 'https://api.github.com/users'

这是一个将请求的 URL 的流. 不管请求什么时候发生, 它会告诉咱们两件事: 请求发生的时刻和内容. 请求执行之时就是事件派发之时, 请求的内容就是被派发的值: 一个 URL 字符串.

建立这样一个单值流对 [Rx*] 来讲很是简单, 官方对于流的术语, 是 "Observable"(可被观察者), 顾名思义它是可被观察的, 但我以为这名字有点傻, 因此我称呼它为 _流_.

var requestStream = Rx.Observable.just('https://api.github.com/users');

但如今, 这只是一个字符串流, 不包含其余操做, 因此咱们须要要在值被派发的时候作一些事情. 这依靠对流的订阅.

requestStream.subscribe(function(requestUrl) { // 执行该请求 jQuery.getJSON(requestUrl, function(responseData) { // ... }); }

注意咱们使用了 jQuery Ajax 回调(咱们假定你应已对此有了解)来处理请求操做的异步性. 但稍等, Rx 就是处理 异步 数据流的. 难道这个请求的回复不就是一个在将来某一刻会带回返回数据的流么? 从概念上讲, 它看起来就是的, 咱们来尝试写一下.

requestStream.subscribe(function(requestUrl) { // 执行该请求 var responseStream = Rx.Observable.create(function (observer) { jQuery.getJSON(requestUrl) .done(function(response) { observer.onNext(response); }) .fail(function(jqXHR, status, error) { observer.onError(error); }) .always(function() { observer.onCompleted(); }); }); responseStream.subscribe(function(response) { // 对回复作一些处理 }); }

Rx.Observable.create() 所作的是自定义一个流, 这个流会通知其每一个观察者(或者说其"订阅者" )有数据产生 (onNext()) 或发生了错误 (onError()). 咱们须要作的仅仅是包装 jQuery Ajax Promise. 稍等, 这难道是说 Promise 也是一个 Observable?

 

是的. Observable 就是一个 Promise++ 对象. 在 Rx 中, 经过运行 var stream = Rx.Observable.fromPromise(promise) 你就能够把一个 Promise 转换成一个 Observable. 仅有的区别在于 Observables 不符合 Promises/A+ 标准, 但他们在概念上是不冲突的. 一个 Promise 就是一个仅派发一个值的 Observable. Rx 流就是容许屡次返回值的 Promise.

这个例子很能够的, 它展现了 Observable 是如何至少有 Promise 的能力. 所以若是你喜欢 Promise, 请注意 Rx Observable 也能够作到一样的事.

如今回到咱们的例子上, 也许你已经注意到了, 咱们在一个中 subscribe() 调用了另外一个 subscribe(), 这有点像回调地狱. 另外, responseStream 的建立也依赖于 requestStream. 但正如前文所述, 在 Rx 中有简单的机制来最流做变换并支持从其余流建立一个新的流, 接下来咱们来作这件事.

到目前为止, 你应该知道的对流进行变换的一个基础方法是 map(f), 将 "流 A" 中的每个元素做 f() 处理, 而后在 "流 B" 中生成一一对应的值. 若是咱们这样处理咱们的请求和回复流, 咱们能够把请求 URL 映射到回复的 Promise (被当作是流) 中.

var responseMetastream = requestStream .map(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); });

这下咱们建立了一个叫作 元流 (流的流) 的奇怪的东西. 没必要对此感到疑惑, 元流, 就是其中派发值是流的流. 你能够把它想象成 指针): 每一个被派发的值都是对其它另外一个流的 指针 . 在咱们的例子中, 每一个请求的 URL 都被映射为一个指针, 指向一个个包含 URL 对应的返回数据的 promise 流.

Response metastream

这个元流看上去有点让人迷惑, 并且对咱们根本没什么用. 咱们只是想要一个简单的回复流, 其中每一个派发的值都应是一个 JSON 对象, 而不是一个包含 JSON 对象的 Promise. 如今来认识 Flatmap: 它相似于 map(), 但它是把 "分支" 流中派发出的的每一项值在 "主干" 流中派发出来, 如此, 它就能够对元流进行扁平化处理.(译者注: 这里, "分支" 流指的是元流中每一个被派发的值, "主干" 流是指这些值有序构成的流, 因为元流中的每一个值都是流, 做者不得不用 "主干" 和 "分支" 这样的比喻来描述元流与其值的关系). 在此, Flatmap 并非起到了"修正"的做用, 元流也并非一个 bug, 相反, 它们正是 Rx 中处理异步回复流的工具.

var responseStream = requestStream .flatMap(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); });

Response stream

漂亮. 由于回复流是依据请求流定义的, 设想以后
有更多的发生在请求流中的事件, 不难想象, 就会有对应的发生在回复流中的的回复事件:

requestStream:  --a-----b--c------------|-> responseStream: -----A--------B-----C---|-> (小写的是一个请求, 大写的是一个回复)

如今咱们终于获得了回复流, 咱们就能够渲染接收到的数据

responseStream.subscribe(function(response) { // 按你设想的方式渲染 `response` 为 DOM });

整理一下到目前为止的代码, 以下:

var requestStream = Rx.Observable.just('https://api.github.com/users'); var responseStream = requestStream .flatMap(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); }); responseStream.subscribe(function(response) { // 按你设想的方式渲染 `response` 为 DOM });

刷新按钮

如今咱们注意到, 回复中的 JSON 是一个包含 100 个用户的列表. [Github 的 API] 只容许咱们指定一页的偏移量, 而不能指定读取的一页中的项目数量, 因此咱们只用到 3 个数据对象, 剩下的 97 个只能浪费掉. 咱们暂时忽略这个问题, 以后咱们会看到经过缓存回复来处理它.

每次刷新按钮被点击的时候, 请求流应该派发一个新的 URL, 所以咱们会获得一个新的回复. 咱们须要两样东西: 一个刷新按钮的点击事件流(口诀: 万物皆可成流), 而且咱们须要改变请求流以依赖刷新点击流. 好在, RxJs 拥有从事件监听器产生 Observable 的工具.

var refreshButton = document.querySelector('.refresh'); var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

既然刷新点击事件自身不带任何 API URL, 咱们须要映射每次点击为一个实际的 URL. 如今咱们将请求流改为刷新点击流, 这个流被映射为每次带有随机的偏移参数的、到 API 的请求.

var requestStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });

若是我直接这样写, 也不作自动化测试, 那这段代码其实有个特性没实现. 即请求不会在页面加载完时发生, 只有当刷新按钮被点击的时候才会. 但其实, 两种行为咱们都须要: 刷新按钮被点击的时候的请求, 或者是页面刚打开时的请求.

两种场景下须要不一样的流:

var requestOnRefreshStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var startupRequestStream = Rx.Observable.just('https://api.github.com/users');

但咱们如何才能"合并"这二者为同一个呢? 有一个 merge() 方法. 用导图来解释的话, 它看起来像是这样的.

stream A: ---a--------e-----o-----> stream B: -----B---C-----D--------> vvvvvvvvv merge vvvvvvvvv ---a-B---C--e--D--o----->

那咱们要作的事就变得很容易了:

var requestOnRefreshStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var startupRequestStream = Rx.Observable.just('https://api.github.com/users'); var requestStream = Rx.Observable.merge( requestOnRefreshStream, startupRequestStream );

也有另一种更干净的、不须要中间流的写法:

var requestStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }) .merge(Rx.Observable.just('https://api.github.com/users'));

甚至再短、再有可读性一点:

var requestStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }) .startWith('https://api.github.com/users');

startWith() 会照你猜的那样去工做: 给流一个起点. 不管你的输入流是怎样的, 带 startWith(x) 的输出流总会以 x 做为起点. 但我这样作还不够 [DRY], 我把 API 字符串写了两次. 一种修正的作法是把 startWith() 用在 refreshClickStream 上, 这样能够从"模拟"在页面加载时一次刷新点击事件.

var requestStream = refreshClickStream.startWith('startup click') .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });

漂亮. 若是你如今回头去看我说 "有个特性没实现" 的那一段, 你应该能看出那里的代码和这里的代码的区别仅仅是多了一个 startWith().

使用流来创建"3个推荐关注者"的模型

到如今为止, 咱们只是写完了一个发生在回复流的 subscribe() 中的 推荐关注者 的 UI. 对于刷新按钮, 咱们要解决一个问题: 一旦你点击了"刷新", 如今的三个推荐关注者仍然没有被清理. 新的推荐关注者只在请求内回复后才能拿到, 不过为了让 UI 看上去使人温馨, 咱们须要在刷新按钮被点击的时候就清理当前的推荐关注者.

refreshClickStream.subscribe(function() { // 清理 3 个推荐关注者的 DOM 元素 });

稍等一下. 这样作不太好, 由于这样咱们就有两个会影响到推荐关注者的 DOM 元素的 subscriber (另外一个是 responseStream.subscribe()), 这听起来不符合 Separation of concerns. 还记得 Reactive 口诀吗?

Mantra

在 "万物皆可为流" 的指导下, 咱们把推荐关注者构建为一个流, 其中每一个派发出来的值都是一个包含了推荐关注人数据的 JSON 对象. 咱们会对三个推荐关注者的数据分别作这件事. 像这样来写:

var suggestion1Stream = responseStream .map(function(listUsers) { // 从列表中随机获取一个用户 return listUsers[Math.floor(Math.random()*listUsers.length)]; });

至于获取另外两个用户的流, 即 suggestion2Stream 和 suggestion3Stream, 只须要把 suggestion1Stream 复制一遍就好了. 这不够 [DRY], 不过对咱们的教程而言, 这样能让咱们的示例简单些, 同时我认为, 思考如何在这个场景下避免重复编写 suggestion[N]Stream 也是个好的思惟练习, 就留给读者去考虑吧.

咱们让渲染的过程发生在回复流的 subscribe() 中, 而是这样作:

suggestion1Stream.subscribe(function(suggestion) { // 渲染第 1 个推荐关注者 });

回想以前咱们说的 "刷新的时候, 清理推荐关注者", 咱们能够简单地将刷新单击事件映射为 "null" 数据(它表明当前的推荐关注者为空), 而且在 suggestion1Stream 作这项工做, 以下:

var suggestion1Stream = responseStream .map(function(listUsers) { // 从列表中随机获取一个用户 return listUsers[Math.floor(Math.random()*listUsers.length)]; }) .merge( refreshClickStream.map(function(){ return null; }) );

在渲染的时候, 咱们把 null 解释为 "没有数据", 隐藏它的 UI 元素.

suggestion1Stream.subscribe(function(suggestion) { if (suggestion === null) { // 隐藏第 1 个推荐关注者元素 } else { // 显示第 1 个推荐关注者元素并渲染数据 } });

整个情景是这样的:

refreshClickStream: ----------o--------o----> requestStream: -r--------r--------r----> responseStream: ----R---------R------R--> suggestion1Stream: ----s-----N---s----N-s--> suggestion2Stream: ----q-----N---q----N-q--> suggestion3Stream: ----t-----N---t----N-t-->

其中 N 表示 null

(译者注: 注意, 当 refreshClickStream 产生新值, 即用户进行点击时, null 的产生老是马上发生在 refreshClickStream 以后; 而 refreshClickStream => requestStream => responseStream, responseStream 中的值, 是发给 API 接口的异步请求的结果, 这个结果的产生每每会须要花一点时间, 必然在 null 以后, 所以能够达到 "为了让 UI 看上去使人温馨, 咱们须要在刷新按钮被点击的时候就清理当前的推荐关注者" 的效果).

稍微完善一下, 咱们会在页面启动的时候也会渲染 "空" 推荐关注人. 为此能够 startWith(null) 放在推荐关注人的流里:

var suggestion1Stream = responseStream .map(function(listUsers) { // 从列表中随机获取一个用户 return listUsers[Math.floor(Math.random()*listUsers.length)]; }) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);

最后咱们获得的流:

refreshClickStream: ----------o---------o----> requestStream: -r--------r---------r----> responseStream: ----R----------R------R--> suggestion1Stream: -N--s-----N----s----N-s--> suggestion2Stream: -N--q-----N----q----N-q--> suggestion3Stream: -N--t-----N----t----N-t-->

关闭推荐关注人, 并利用已缓存的回复数据

目前还有一个特性没有实现. 每一个推荐关注人格子应该有它本身的 'x' 按钮来关闭它, 而后加载另外一个数据来代替. 也许你的第一反应是, 用一种简单方法: 在点击关闭按钮的时候, 发起一个请求, 而后更新这个推荐人:

var close1Button = document.querySelector('.close1'); var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click'); // close2Button 和 close3Button 重复此过程 var requestStream = refreshClickStream.startWith('startup click') .merge(close1ClickStream) // 把关闭按钮加在这里 .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });

然而这没不对. (因为 refreshClickStream 影响了全部的推荐人流, 因此)该过程会关闭而且从新加载_全部的_推荐关注人, 而不是仅更新咱们想关掉的那一个. 这里有不少方式来解决这个问题, 为了玩点炫酷的, 咱们会重用以前的回复数据中别的推荐人. API 返回的数据每页包含 100 个用户, 但咱们每次只用到其中的 3 个, 因此咱们有不少有效的刷新数据能够用, 不必再请求新的.

再一次的, 让咱们用流的思惟来思考. 当一个 'close1'点击事件发生的时候, 咱们使用 responseStream中 最近被派发的 回复来从回复的用户列表中随机获取一个用户. 以下:

requestStream: --r---------------> responseStream: ------R-----------> close1ClickStream: ------------c-----> suggestion1Stream: ------s-----s----->

在 [Rx*] 中, 有一个合成器方法叫作 combineLatest, 彷佛能够完成咱们想作的事情. 它把两个流 A 和 B 做为其输入, 而当其中任何一个派发值的时候, combineLatest 会把二者最近派发的值 a 和 b 按照 c = f(x,y) 的方法合并处理再输出, 其中 f 是你能够定义的方法. 用图来解释也许更清楚:

stream A: --a-----------e--------i--------> stream B: -----b----c--------d-------q----> vvvvvvvv combineLatest(f) vvvvvvv ----AB---AC--EC---ED--ID--IQ----> 在该例中, f 是一个转换为全大写的函数

咱们能够把 combineLatest() 用在 close1ClickStream 和 responseStream 上, 所以一旦 "关闭按钮1" 被点击(致使 close1ClickStream 产生新值), 咱们都能获得最新的返回数据, 并在 suggestion1Stream中产生一个新的值. 因为 combineLatest() 的对称性的, 任什么时候候, 只要 responseStream 派发了一个新的回复, 它也将合并最新的一次 '关闭按钮1被点击' 事件来产生一个新的推荐关注人. 这个特性很是有趣, 由于它容许咱们简化咱们以前的 suggestion1Stream , 以下:

var suggestion1Stream = close1ClickStream .combineLatest(responseStream, function(click, listUsers) { return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);

在上述思考中, 还有一点东西被遗漏. combineLatest() 使用了两个数据源中最近的数据, 可是若是这些源中的某个从未派发过任何东西, combineLatest() 就不能产生一个数据事件到输出流. 若是你再细看上面的 ASCII 图, 你会发现当第一个流派发 a 的时候, 不会有任何输出. 只有当第二个流派发 b 的时候才能产生一个输出值.

有几种方式来解决该问题, 咱们仍然采起最简单的一种, 就是在页面启动的时候模拟一次对 '关闭按钮1' 按钮的点击:

var suggestion1Stream = close1ClickStream.startWith('startup click') // 把对"关闭按钮1"的点击的模拟加在这里 .combineLatest(responseStream, function(click, listUsers) {l return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);

总结整理

如今咱们的工做完成了. 完整的代码以下所示:

var refreshButton = document.querySelector('.refresh'); var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click'); var closeButton1 = document.querySelector('.close1'); var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click'); // close2 和 close3 是一样的逻辑 var requestStream = refreshClickStream.startWith('startup click') .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var responseStream = requestStream .flatMap(function (requestUrl) { return Rx.Observable.fromPromise($.ajax({url: requestUrl})); }); var suggestion1Stream = close1ClickStream.startWith('startup click') .combineLatest(responseStream, function(click, listUsers) { return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null); // suggestion2Stream 和 suggestion3Stream 是一样的逻辑 suggestion1Stream.subscribe(function(suggestion) { if (suggestion === null) { // 隐藏第 1 个推荐关注者元素 } else { // 显示第 1 个推荐关注者元素并渲染数据 } });

你能够在这里查看完整的[样例代码]

很惭愧, 这只是一个微小的代码示例, 但它的信息量很大: 它着重表现了, 如何对关注点进行适当的隔离, 从而对不一样流进行管理, 甚至充分利用了返回数据的流. 这样的函数式风格使得代码像声明式多于像命令式: 咱们并不用给出一个要执行的的结构化序列, 咱们只是经过定义流之间的关系来表达系统中每件事物是什么. 举例来讲, 经过 Rx, 咱们告诉计算机 _suggestion1Stream 就是 点击关闭按钮1 的流, 与最近一个API返回的(用户中随机选择的一个)的用户的流, 刷新时产生 null 的流, 和应用启动时产生 null 的流的合并流_.

回想一下那些你熟稔的流程控制的语句(好比 ifforwhile), 以及 Javascript 应用中随处可见的基于回调的控制流. (只要你愿意, )你甚至能够在上文的 subscribe() 中不写 if 和 else, 而是(在 observable 上)使用 filter()(这一块我就不写实现细节了, 留给你做为练习). 在 Rx 中, 有不少流处理方法, 好比 mapfilterscanmergecombineLateststartWith, 以及很是多用于控制一个事件驱动的程序的流的方法. 这个工具集让你用更少的代码而写出更强大的效果.

接下来还有什么?

若是你愿意用 [Rx*] 来作反应式编程, 请花一些时间来熟悉这个 函数列表, 其中涉及如何变换, 合并和建立 Observables (被观察者). 若是你想以图形的方式理解这些方法, 能够看一下 弹珠图解 RxJava. 一旦你对理解某物有困难的时候, 试着画一画图, 基于图来思考, 看一下函数列表, 再继续思考. 以个人经验, 这样的学习流程很是有用.

一旦你熟悉了如何使用 [Rx] 进行变成, 理解冷热酸甜, 想吃就吃...哦不, 冷热 Observables 就颇有必要了. 反正就算你跳过了这一节, 你也会回来从新看的, 勿谓言之不预也. 建议经过学习真正的函数式编程来磨练你的技巧, 而且熟悉影响各类议题, 好比"影响 [Rx] 的反作用"什么的.

不过, 实现了反应式编程的库并不是并不是只有 [Rx]. [Bacon.js] 的运行机制就很直观, 理解它不像理解 [Rx] 那么难; [Elm Language] 在特定的应用场景有很强的生命里: 它是一种会编译到 Javascript + HTML + CSS 的反应式编程语言, 它的特点在于 [time travelling debugger]. 这些都很不错.

Rx 在严重依赖事件的前端应用中表现优秀. 但它不仅是只为客户端应用服务的, 在接近数据库的后端场景中也大有可为. 实际上, [RxJava 正是激活 Netflex 服务端并发能力的关键]. Rx 不是一个严格限于某种特定类型应用的框架或者是语言. 它实际上是一种范式, 你能够在任何事件驱动的软件中实践它.

 


原文连接本文为云栖社区原创内容,未经容许不得转载。

相关文章
相关标签/搜索