目录javascript
本文是Rxjs 响应式编程-第二章:序列的深刻研究这篇文章的学习笔记。html
示例代码托管在:http://www.github.com/dashnowords/blogs前端
更多博文:《大史住在大前端》目录java
文中使用到的一些基本运算符:git
map
-映射filter
-过滤reduce
-有限列聚合scan
-无限列聚合flatMap
-拉平操做(重点)catch
-捕获错误retry
-序列重试from
-生成可观测序列range
-生成有限的可观测序列interval
-每隔指定时间发出一次顺序整数distinct
-去除出现过的重复值建议本身动手尝试一下,记住就能够了,有过lodash
使用经验的开发者来讲并不难。github
原文中在http
请求拿到获取到数据后,最初使用了forEach
实现了手动流程管理,因而原文提出了优化设想,试图探究如何依赖响应式编程的特性将手动的数据加工转换改造为对流的转换,好让最终的消费者可以拿到直接可用的数据,而不是获得一个响应后手动进行不少后处理。在代码层面须要解决的问题就是,如何在不使用手动遍历的前提下将一个有限序列中的数据逐个发给订阅者,而不是一次性将整个数据集发过去。编程
假设咱们如今并不知道有flatMap
这样一个可使用的方法,那么先来作一些尝试:segmentfault
var quakes = Rx.Observable.create(function(observer) { //模拟获得的响应流 var response = { features:[{ earth:1 },{ earth:2 }], test:1 } /* 最初的手动遍历代码 var quakes = response.features; quakes.forEach(function(quake) { observer.onNext(quake); });*/ observer.onNext(response); }) //为了能将features数组中的元素逐个发送给订阅者,须要构建新的流 .map(dataset){ return Rx.Observable.from(dataset.features) }
当咱们订阅quakes
这个事件流的时候,每次都会获得另外一个Observable
,它是由于数据源通过了映射变换,从数据变成了可观测对象。那么为了获得最终的序列值,就须要再次订阅这个Observable
,这里须要注意的是可观测对象被订阅前是不启动的,因此不用担忧它的时序问题。数组
quakes.subscribe(function(data){ data.subscribe(function(quake){ console.log(quake); }) });
若是将Observable
当作一个盒子,那么每一层盒子只是实现了流程控制功能性的封装,为了取得真正须要使用的数据,最终的订阅者不得不像剥洋葱似的经过subscribe
一层层打开盒子拿到最里面的数据,这样的封装性对于数据在流中的传递具备很好的隔离性,可是对最终的数据消费者而言,倒是一件很麻烦的事情。缓存
这时flatMap
运算符就派上用场了,它能够将冗余的包裹除掉,从而在主流被订阅时直接拿到要使用的数据,从大理石图来直观感觉一下flatMap
:
乍看之下会以为它和merge
好像是同样的,其实仍是有一些区别的。merge
的做用是将多个不一样的流合并成为一个流,而上图中A1,A2,A3这三个流都是当主流A返回数据时新生成的,能够将他们想象为A的支流,若是你想在支流里捞鱼,就须要在每一个支流里布网,而flatMap
至关于提供了一张大网,将全部A的支流里的鱼都给捞上来。
因此在使用了flatMap
后,就能够直接在一级订阅中拿到须要的数据了:
var quakes = Rx.Observable.create(function(observer) { var response = { features:[{ earth:1 },{ earth:2 }], test:1 } observer.onNext(response); }).flatMap((data)=>{ return Rx.Observable.from(data.features); }); quakes.subscribe(function(quake) { console.log(quake) });
若是本节的基本知识你尚不熟悉,能够经过javascript基础修炼(8)——指向FP世界的箭头函数这篇文章来简单回顾一下函数式编程的基本知识,而后再继续后续的部分。
/*map运算符的做用 *对全部容器类而言,它至关于打开容器,进行操做,而后把容器再盖上。 *Container在这里只是一个抽象定义,为了看清楚它对于容器中包含的值意味着什么。 *你会发现它其实就是Observable的抽象原型。 */ Container.prototype.map = function(f){ return Container.of(f(this.__value)) } //基本的科里化函数 var curry = function(fn){ args = [].slice.call(arguments, 1); return function(){ [].push.apply(args, arguments); return fn.apply(this, args); } } //map pointfree风格的map运算符 var map = curry(function(f, any_functor_at_all) { return any_functor_at_all.map(f); }); /*compose函数组合方法 *运行后返回一个新函数,这个函数接受一个参数。 *函数科里化的基本应用,也是函数式编程中运算管道构建的基本方法。 */ var compose = function (f, g) { return function (x) { return f(g(x)); } }; /*IO容器 *一个简单的Container实现,用来作流程管理 *这里须要注意,IO实现的做用是函数的缓存,且老是返回新的IO实例 *能够看作一个简化的Promise,重点是直观感觉一下它做为函数的 *容器是如何被使用的,对于理解Observable有很大帮助 */ var IO = function(f) { this.__value = f; } IO.of = function(x) { return new IO(function() { return x; }); } IO.prototype.map = function(f) { return new IO(compose(f, this.__value)); }
若是上面的基本知识没有问题,那么就继续。
如今来实现这样一个功能,读入一个文件的内容,将其中的a
字符所有换成b
字符,接着存入另外一个文件,完成后在控制台输出一个消息,为了更明显地看到数据容器的做用,咱们使用同步方法并将其包裹在IO
容器中,而后利用函数式编程:
var fs = require('fs'); //读取文件 var readFile = (filename)=>IO.of(fs.readFileSync(filename,'utf-8')); //转换字符 var transContent = (content)=>IO.of((content)=>content.replace('a','b')); //写入字符串 var writeFile = (content)=>IO.of(fs.writeFileSync('dest.txt',content));
当具体的函数被IO
容器包裹起来而实现延迟执行的效果时,就没法按原来的方式使用compose( )
运算符直接对功能进行组合,由于readFile
函数运行时的输出结果(一个io
容器实例)和transContent
函数须要的参数类型(字符串)再也不匹配,在不修改原有函数定义的前提下,函数式编程中采用的作法是使用map
操做符来预置一个参数:
/* *map(transContent)是一个高阶函数,它的返回函数就能够接收一个容器实例, *并对容器中的内容执行map操做。 */ var taskStep12 = compose(map(transContent), readFile);
这里比较晦涩,涉及到不少功能性函数的嵌套,建议手动推导一下taskStep12
这个变量的值,它的结构是这样一种形式:
io{ __value:io{ __value:someComposedFnExpression } }
若是试图一次性将全部的步骤组合在一块儿,就须要采用下面的形式:
var task = compose(map(map(writeFile)),map(transContent),readFile); //组合后的task形式就是 //io{io{io{__value:someComposedFnExpression}}}
问题已经浮出水面了,每多加一个针对容器操做的步骤,书写时就须要多包裹一层map
,而运行时就须要多进入一层才能触及组合好的能够实现真正功能的函数表达式,真的是很麻烦。
👉 提示一:
如今来回想一下原示例中的Observable对象,将其看作是一个容器(含有map类方法),那么若是map方法调用时传入的参数是一个运行时会生成新的Observable对象的方法时,就会产生Observable嵌套,获得observable{observable{.....}}这样的结构,那么在最终的数据消费者经过
subscribe
方法订阅数据时,就不得不用不少个subscribe
才能拿到实际须要的数据。👉提示二:
没有相关经验的读者在使用pointfree风格的
map
操做符时可能会感到很是不适应,若是你以为它很难理解,也能够尝试直接使用IO.prototype.map
这种链式调用风格的写法将上例中的三个步骤组合在一块儿来查看最后的结果,毕竟在Rxjs
中常使用的也就是Observable
这一个容器类。
当咱们看到问题所在后就不难发现,其实这个问题的解决方法并不复杂,咱们要作的不过就是在必要的时候合并内容的容器,为此来定义两个合并运算的方法:
//链式调用风格 IO.prototype.join = function(){ return this.isNothing() ? IO.of(null):this.__value; } //pointfree风格运算符 var join = (m)=>m.join();
这里引入一个新的概念Monad
,它的定义是能够被展平的容器,也就是说拥有join
和of
方法并遵循必定规则的容器,都是Monad
,在这种设定下,3.1中的示例就能够被改写为下面的形式:
var task = compose(join,map(writeFile),join,map(transContent),readFile);
不难发现map
和join
老是须要成对出现的,那么再利用函数科里化的技巧将map
和join
连起来:
var chain = curry(function(f,m){ return m.map(f).join(); })
那么组合后的函数就变成了下面的形式:
var task = compose(chain(writeFile),chain(transContent),readFile);
这里的chain
,就是FlatMap
。
最后将上面几种形式放在一块儿再来回顾一下:
//原有形式 var task = compose(map(map(writeFile)),map(transContent),readFile); //map-join形式 var task = compose(join,map(writeFile),join,map(transContent),readFile); //chain形式(flatMap) var task = compose(chain(writeFile),chain(transContent),readFile);
若是理解了这几种形式,就不难理解flatMap
的拉平效应了,所谓flatMap
,说白了其实就是将容器展开的一种操做。
flatMap
所解决问题,是在函数式编程引入了Functor
的概念将逻辑函数包裹在容器中后才产生的,那么这种容器概念的引入对函数式编程到底有什么意义,笔者还没有搞清楚,相关内容留做之后补充。
《javascript函数式编程指南》https://llh911001.gitbooks.io/mostly-adequate-guide-chinese/content/