上次给你们分享了cycle.js的内容,这个框架核心模块的代码其实只有一百多行,要理解这个看似复杂的框架,其实最核心的是理解它依赖的异步数据流处理框架——rx.js。今天,给你们分享一下rx.js的实现原理,你们有兴趣能够继续研究它的源码,会让你对异步和响应式编程有更深的理解,进而将rx.js、cycle.js或者仅仅是函数式、响应式编程的思想融入到本身手里的业务中。
为了更好地理解rx.js,须要先谈谈异步编程的实现方案。web
makeHttpCall('/items', items => { for (itemId of items) { makeHttpCall(`/items/${itemId}/info`, itemInfo => { makeHttpCall(`/items/${itemInfo.pic}`, img => { showImg(img); }); }); } }); beginUiRendering();
一旦你须要多块数据时你就陷入了流行的”末日金字塔“或者回调地狱。这段代码有不少的问题。 其中之一就是风格。当你在这些嵌套的回调函数中添加愈来愈多的逻辑,这段代码就会变得很复杂很难理解。由于循环还产生了一个更加细微的问题。for循环是同步的控制流语句,这并不能很好的配合异步调用,由于会有延迟,这可能会产生很奇怪的bug。express
makeHttpCall('/items') .then(itemId => makeHttpCall(`/items/${itemId}/info`)) .then(itemInfo => makeHttpCall(`/items/${itemInfo}.pic}`)) .then(showImg);
链式调用毫无疑问是一个进步,理解这段代码的难度显著降低。然而,尽管Promises在处理这种单值(或单个错误)时很是高效,它有也一些局限性。Promises在处理用户连续输入的数据流时效率怎么样呢? 这时Promises处理起来也并不高效,由于它没有事件的删除、分配、重试等等的语法定义。编程
使用async/await,配合Promise能够以同步的方式编写异步代码,是我如今最喜欢也最经常使用的异步编程方式。好比:上述实如今请求加载完图片后再显示图片的逻辑,也能够这样实现:promise
const showLoadedImg = async () => { let getImgInfo = makeHttpCall('/items') .then(itemId => makeHttpCall(`/items/${itemId}/info`)) .then(itemInfo => makeHttpCall(`/items/${itemInfo}.pic}`)); let loadedImg = await getImgInfo; showImg(loadedImg); }
generator我用的不多,以前用过一段时间koa(express原班团队搞的基于generator的现代web开发框架),不太喜欢代码在各个模块间跳来跳去的编写思路。整个 Generator 函数就是一个封装的异步任务,或者说是异步任务的容器。异步操做须要暂停的地方,都用 yield 语句注明。调用 Generator 函数,会返回一个内部指针(即遍历器 )g,调用指针 g 的 next 方法,会移动内部指针(即执行异步任务的第一段),指向第一个遇到的 yield 语句。下面贴一段koa中间件级联的代码,你们感觉一下:数据结构
var koa = require('koa'); var app = koa(); // x-response-time app.use(function *(next){ var start = new Date; yield next; var ms = new Date - start; this.set('X-Response-Time', ms + 'ms'); }); // logger app.use(function *(next){ var start = new Date; yield next; var ms = new Date - start; console.log('%s %s - %s', this.method, this.url, ms); }); // response app.use(function *(){ this.body = 'Hello World'; }); app.listen(3000);
上面的例子在页面中返回 "Hello World",然而当请求开始时,请求先通过 x-response-time 和 logging 中间件,并记录中间件执行起始时间。 而后将控制权交给 reponse 中间件。当中间件运行到 yield next 时,函数挂起并将控制前交给下一个中间件。当没有中间件执行 yield next 时,程序栈会逆序唤起被挂起的中间件来执行接下来的代码。app
RxJS是一个解决异步问题的JS开发库.它起源于 Reactive Extensions 项目,它带来了观察者模式和函数式编程的相结合的最佳实践。 观察者模式是一个被实践证实的模式,基于生产者(事件的建立者)和消费者(事件的监听者)的逻辑分离关系。框架
何况函数式编程方式的引入,如说明性编程,不可变数据结构,链式方法调用会使你极大的简化代码量。(和回调代码方式说再见吧)koa
若是你熟悉函数式编程,请把RxJS理解为异步化的Underscore.js。异步
RxJS 引入了一个重要的数据类型——流(stream)。async
观察者模式在 Web 中最多见的应该是 DOM 事件的监听和触发。
订阅:经过 addEventListener 订阅 document.body 的 click 事件。
发布:当 body 节点被点击时,body 节点便会向订阅者发布这个消息。
document.body.addEventListener('click', function listener(e) { console.log(e); },false); document.body.click(); // 模拟用户点击
迭代器模式能够用 JavaScript 提供了 Iterable Protocol 可迭代协议来表示。Iterable Protocol 不是具体的变量类型,而是一种可实现协议。JavaScript 中像 Array、Set 等都属于内置的可迭代类型,能够经过 iterator 方法来获取一个迭代对象,调用迭代对象的 next 方法将获取一个元素对象,以下示例。
var iterable = [1, 2]; var iterator = iterable[Symbol.iterator](); iterator.next(); // => { value: "1", done: false} iterator.next(); // => { value: "2", done: false} iterator.next(); // => { value: undefined, done: true}
元素对象中:value 表示返回值,done 表示是否已经到达最后。
遍历迭代器可使用下面作法。
var iterable = [1, 2]; var iterator = iterable[Symbol.iterator](); while(true) { let result; try { result = iterator.next(); // <= 获取下一个值 } catch (err) { handleError(err); // <= 错误处理 } if (result.done) { handleCompleted(); // <= 无更多值(已完成) break; } doSomething(result.value); }
主要对应三种状况:
获取下一个值
调用 next 能够将元素一个个地返回,这样就支持了返回屡次值。
无更多值(已完成)
当无更多值时,next 返回元素中 done 为 true。
错误处理
当 next 方法执行时报错,则会抛出 error 事件,因此能够用 try catch 包裹 next 方法处理可能出现的错误。
RxJS 中含有两个基本概念:Observables 与 Observer。Observables 做为被观察者,是一个值或事件的流集合;而 Observer 则做为观察者,根据 Observables 进行处理。
Observables 与 Observer 之间的订阅发布关系(观察者模式) 以下:
订阅:Observer 经过 Observable 提供的 subscribe() 方法订阅 Observable。
发布:Observable 经过回调 next 方法向 Observer 发布事件。
下面为 Observable 与 Observer 的伪代码:
// Observer var Observer = { next(value) { alert(`收到${value}`); } }; // Observable function Observable (Observer) { setTimeout(()=>{ Observer.next('A'); },1000) } // subscribe Observable(Observer);
上面实际也是观察者模式的表现,那么迭代器模式在 RxJS 中如何体现呢?
在 RxJS 中,Observer 除了有 next 方法来接收 Observable 的事件外,还能够提供了另外的两个方法:error() 和 complete(),与迭代器模式一一对应。
var Observer = { next(value) { /* 处理值*/ }, error(error) { /* 处理异常 */ }, complete() { /* 处理已完成态 */ } };
结合迭代器 Iterator 进行理解:
next()
Observer 提供一个 next 方法来接收 Observable 流,是一种 push 形式;而 Iterator 是经过调用 iterator.next() 来拿到值,是一种 pull 的形式。
complete()
当再也不有新的值发出时,将触发 Observer 的 complete 方法;而在 Iterator 中,则须要在 next 的返回结果中,当返回元素 done 为 true 时,则表示 complete。
error()
当在处理事件中出现异常报错时,Observer 提供 error 方法来接收错误进行统一处理;Iterator 则须要进行 try catch 包裹来处理可能出现的错误。
下面是 Observable 与 Observer 实现观察者 + 迭代器模式的伪代码,数据的逐渐传递传递与影响其实就是流的表现。
// Observer var Observer = { next(value) { alert(`收到${value}`); }, error(error) { alert(`收到${value}`); }, complete() { alert("complete"); }, }; // Observable function Observable (Observer) { [1,2,3].map(item=>{ Observer.next(item); }); Observer.complete(); // Observer.error("error message"); } // subscribe Observable(Observer);
有了上面的概念及伪代码,那么在 RxJS 中是怎么建立 Observable 与 Observer 的呢?
建立 Observable
RxJS 提供 create 的方法来自定义建立一个 Observable,可使用 next 来发出流。
var Observable = Rx.Observable.create(observer => { observer.next(2); observer.complete(); return () => console.log('disposed'); });
建立 Observer
Observer 能够声明 next、err、complete 方法来处理流的不一样状态。
var Observer = Rx.Observer.create( x => console.log('Next:', x), err => console.log('Error:', err), () => console.log('Completed') );
最后将 Observable 与 Observer 经过 subscribe 订阅结合起来。
var subscription = Observable.subscribe(Observer);
关于rx.js的使用和API就再也不赘述了,理解了其实现原理,使用起来就很简单了!