Rxjs 响应式编程-第一章:响应式

Rxjs 响应式编程-第一章:响应式
Rxjs 响应式编程-第二章:序列的深刻研究
Rxjs 响应式编程-第三章: 构建并发程序
Rxjs 响应式编程-第四章 构建完整的Web应用程序
Rxjs 响应式编程-第五章 使用Schedulers管理时间
Rxjs 响应式编程-第六章 使用Cycle.js的响应式Web应用程序javascript

响应式

现实世界至关混乱:事件不按照顺序发生,应用崩溃,网络不通。几乎没有应用是彻底同步的,因此咱们不得不写一些异步代码保持应用的可响应性。大多数的时候是很痛苦的,但也并非不可避免。java

现代应用须要超级快速的响应速度,而且但愿可以不漏掉一个字节的处理来自不一样数据源的数据。然而并无现成的解决方案,由于它们不会随着咱们添加并发和应用程序状态而扩展代码变得愈来愈复杂。程序员

本章向您介绍反应式编程,这是一种天然,简单的方法处理异步代码的方式。我会告诉你事件的流程 - 咱们称之为Observables - 是处理异步代码的一种很好的方式。
而后咱们将建立一个Observable,看看响应式思惟和RxJS是怎么样改善现有技术,让你成为更快乐,更多高效的程序员。数据库

什么是响应式?

让咱们从一个小的响应性RxJS程序开始。 这个程序须要经过单击按钮检索来自不一样来源的数据,它具备如下要求:编程

  • 它必须统一来自使用不一样源的JSON结构
  • 最终结果不该包含任何副本
  • 为了不屡次请求数据,用户不能重复点击按钮

使用RxJS,咱们的代码相似这样:json

var button = document.getElementById('retrieveDataBtn');
var source1 = Rx.DOM.getJSON('/resource1').pluck('name');
var source2 = Rx.DOM.getJSON('/resource2').pluck('props', 'name');
function getResults(amount) {
    return source1.merge(source2)
        .pluck('names')
        .flatMap(function(array) { return Rx.Observable.from(array); })
        .distinct()
        .take(amount);
}
var clicks = Rx.Observable.fromEvent(button, 'click');
clicks.debounce(1000)
    .flatMap(getResults(5))
    .subscribe(
        function(value) { console.log('Received value', value); },
        function(err) { console.error(err); },
        function() { console.log('All values retrieved!'); }
    );

不要担忧不理解这里的代码。只要关注于成果便可。你看到的第一件事是咱们使用更少的代码实现更多的功能。咱们经过使用Observable来实现这一目标。segmentfault

Observable表示数据流。程序也能够能够主要表示为数据流。在前面的示例中,两个远程源是Observables,用户点击鼠标也是如此。实际上,咱们的程序本质上是一个由按钮的单击事件构成的Observable,咱们把它转变成得到咱们想要的结果。api

响应式编程具备很强的表现力,举个例子来讲,限制鼠标重复点击的例子。想象一下咱们使用咱们使用promise和callback实现这个功能是有多复杂:咱们须要每秒重置一下点击次数,而且在用户点击以后每秒都要保存点击状态。可是这样子,对于这个小功能来讲就显得过于复杂了,而且所写代码与业务功能并无直观的联系。为了弥补基础代码库的功能不足,在一个大型应用中,这些很小的复杂功能会增长的很是快。数组

经过响应式编,咱们使用debounce方法来限制点击流次数。这样就保证每次点击的间隔时间至少1s,忽略1s之间的点击次数。咱们不关心内部如何实现,咱们只是表达咱们但愿代码执行的操做,而不是如何操做。promise

这就变得更有趣了。接下来,您将看到反应式编程如何帮助咱们提升课程效率和表现力。

电子表格是可响应的

让咱们从这样一个响应性系统的典型例子开始考虑:点子表格。咱们都是使用过吧,但咱们不多停下来思考它们是多么使人震惊的直观。假设咱们在电子表格的单元格A1中有一个值,而后咱们能够在电子表格中的其余单元格中引用它,而且每当咱们更改A1时,每一个依赖于A1的单元格都会自动更新与A1同步。

image

这些操做对咱们感受很天然,咱们并不会去告诉计算机去根据A1更新单元格或者如何更新;这些单元格就自动这样子作了。在点子表格中,咱们只须要简单的声明咱们须要处理的问题,不用操心计算机如何处理。

鼠标输入做为streams

理解如何把事件做为流,咱们回想一下本章开头的那个程序。在那里,咱们使用鼠标点击做为用户点击时实时生成的无限事件流。这个想法起源于Erik Meijer,也就是Rxjs的做者。他认为:你的鼠标就是一个数据库。

在响应式编程中,我把鼠标点击事件做为一个咱们能够查询和操做的持续的流事件。想象成流而不是一个孤立的事件,这种想法开辟了一种全新的思考方式。咱们能够在其中操纵还没有建立的整个值的流。

好好想一想。这种方式有别于咱们以往的编程方式,以前咱们把数据存储在数据库,或者数组而且等待这些数据可用以后在使用它们。若是它们尚不可用(举个例子:一个网络请求),咱们只能等它们好了才可使用。

image

咱们能够将流视为所在由时间而不是存储位置分开的数组。不管是时间仍是存储位,咱们都有元素序列:

image

将您的程序视为流动的数据序列是理解的RxJS程序的关键。这须要一些练习,但并不难。事实上,大多数咱们在任何应用程序中使用的数据均可以表示为序列。

序列查询

让咱们使用传统javascript传统的事件绑定技术来实现一个鼠标点击流。要记录鼠标点击的x和y坐标,咱们能够这样写:

ch1/thinking_sequences1.js

document.body.addEventListener('mousemove', function(e) {
    console.log(e.clientX, e.clientY);
});

此代码将按顺序打印每次鼠标单击的x坐标和y坐标。

输出以下:

252 183
211 232
153 323
...

看起来像一个序列,不是吗? 固然,问题在于操纵事件并不像操纵数组那么容易。 例如,若是咱们想要更改前面的代码,使其仅记录前10次位于屏幕右侧的单击事件(至关随机的目标),咱们会写像这样的东西:

var clicks = 0;
document.addEventListener('click', function registerClicks(e) {
    if (clicks < 10) {
        if (e.clientX > window.innerWidth / 2) {
            console.log(e.clientX, e.clientY);
            clicks += 1;
        }
    } else {
        document.removeEventListener('click', registerClicks);
    }
});

为了知足咱们的要求,咱们经过引入一个全局变量做为扩展状态来记录当前点击数。 咱们还须要使用嵌套的条件来检查两个不一样的条件。当咱们完成时,咱们必须注销事件,以避免泄漏内存。

反作用和外部状态

若是一个动做在其发生的范围以外产生影响,咱们称之为一方反作用。更改函数外部的变量,打印到控制台或更新数据库中的值,这些都是反作用。例如改变函数内部的变量是安全的,可是若是该变量超出了咱们函数的范围,那么其余函数也能够改变它的值,这就意味着这个功能再也不受控制,由于你没法预测外部会对这个变量做何操做。因此咱们须要跟踪它,添加检查以确保它的变化符合咱们的预期。可是这样子添加的代码其实与咱们程序无关,确增长程序的复杂度也更容易出错。虽然反作用老是会有的,可是咱们应该努力减小。这在响应式编程中尤为重要,由于咱们随着时间变换会产生不少状态片断。因此避免外部状态和反作用是贯穿本书一条宗旨。

咱们设法知足了咱们的简单要求,可是为了实现这样一个简单的目标,最终获得了至关复杂的代码。对于首次查看它的开发人员来讲,不容易懂且维护代码很困难。 更重要的是,由于咱们仍然须要保存外部撞他,因此咱们很容易在将来发展出玄妙的错误。

在这种状况下咱们想要的只是查询点击的“数据库”。若是咱们是使用关系数据库,咱们使用声明性语言SQL:

SELECT x, y FROM clicks LIMIT 10

若是咱们将点击事件流视为能够查询和转变的数据源,该怎么办?毕竟,它与数据库没有什么不一样,都是一个能够处理数据的东西。咱们所须要的只是一个为咱们提供抽象概念的数据类型。

输入RxJS及其Observable数据类型:

Rx.Observable.fromEvent(document, 'click')
    .filter(function(c) { return c.clientX > window.innerWidth / 2; })
    .take(10)
    .subscribe(function(c) { console.log(c.clientX, c.clientY) })

这段代码功能同以前,它能够这样子解读:

建立一个Observable的点击事件,并过滤掉在点击事件上发生屏幕左侧的点击。而后只在控制台打印前10次点击的坐标。

注意即便您不熟悉代码也很容易阅读,也没有必要建立外部变量来保持状态。这样使咱们的代码是自包含的,不容易产生bug。因此也就不必去清除你的状态。咱们能够合并,转换或者单纯的传递Observables。咱们已经将不容易处理的事件转变为有形数据结构,这种数据结构与数组同样易于使用,但更加灵活。

在下一节,咱们将看到使Observables如此强大的原理。

观察者和迭代者

要了解Observable的来源,咱们须要查看他们的基础:Observer和Iterator软件模式。在本节中咱们将快速浏览它们,而后咱们将看到Observables如何结合,简单而有力。

观察者模式

对于软件开发人员来讲,很难不听到Observables就想起观察者模式。在其中咱们有一个名为Producer的对象,内部保留订阅者的列表。当Producer对象发生改变时,订阅者的update方法会被自动调用。(在观察者模式的大部分解释中,这个实体被叫作Subject,为了不你们和RxJs的本身Subject混淆,咱们称它为Producer)。

ch1/observer_pattern.js

function Producer() {
    this.listeners = [];
}

Producer.prototype.add = function(listener) {
    this.listeners.push(listener);
};

Producer.prototype.remove = function(listener) {
    var index = this.listeners.indexOf(listener);
    this.listeners.splice(index, 1);
};

Producer.prototype.notify = function(message) {
    this.listeners.forEach(function(listener) {
        listener.update(message);
    });
};

Producer对象在实例的侦听器中保留一个动态的Listener列表,每当Producer更新的时候都会调用其notify方法。在下面的代码中,咱们建立了两个对象来监听
notifie,一个Producer的实例。

ch1/observer_pattern.js

// Any object with an 'update' method would work.
var listener1 = {
    update: function(message) {
    console.log('Listener 1 received:', message);
}
};
var listener2 = {
    update: function(message) {
    console.log('Listener 2 received:', message);
}
};
var notifier = new Producer();
notifier.add(listener1);
notifier.add(listener2);
notifier.notify('Hello there!');

当咱们运行这个程序的时候:

Listener 1 received: Hello there!
Listener 2 received: Hello there!

notifier更新内部状态的时候,listener1listener2都会被更新。这些都不须要咱们去操心。

咱们的实现很简单,但它说明了观察者模式容许观察者和监听器解耦。

迭代器模式

Observable的另外一主要部分来自Iterator模式。一个Iterator是一个为消费者提供简单的遍象它内容的方式,隐藏了消费者内部的实现。

Iterator接口很简单。它只须要两个方法:next()来获取序列中的下一个项目,以及hasNext()来检查是否还有项目序列。

下面是咱们如何编写一个对数字数组进行操做的迭代器,而且只返回divisor参数的倍数的元素:

ch1/iterator.js

function iterateOnMultiples(arr, divisor) {
    this.cursor = 0;
    this.array = arr;
    this.divisor = divisor || 1;
}
iterateOnMultiples.prototype.next = function() {
    while (this.cursor < this.array.length) {
    var value = this.array[this.cursor++];
    if (value % this.divisor === 0) {
        return value;
    }
}
};
iterateOnMultiples.prototype.hasNext = function() {
    var cur = this.cursor;
    while (cur < this.array.length) {
        if (this.array[cur++] % this.divisor === 0) {
            return true;
        }
    }
    return false;
};

咱们能够这样子使用咱们的迭代器:

ch1/iterator.js

var consumer = new iterateOnMultiples([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 3);

console.log(consumer.next(), consumer.hasNext()); // 3 true
console.log(consumer.next(), consumer.hasNext()); // 6 true
console.log(consumer.next(), consumer.hasNext()); // 9 false

迭代器很是适合封装任何类型数据结构的遍历逻辑。 正如咱们在前面的例子中看到的那样,迭代器在处理不一样类型的数据的时候就会变得颇有趣,或者在运行的时候作配置,就像咱们在带有divisor参数的示例中所作的那样。

Rx模式和Observable

虽然Observer和Iterator模式自己就很强大,可是二者的结合甚至更好。 咱们称之为Rx模式,命名为
在Reactive Extensions库以后。咱们将在本书的其他部分使用这种模式。

Observable序列或简单的Observable是Rx模式的核心。Observable按顺序传递出来它的值 - 就像迭代器同样 - 而不是消费者要求它传出来的值。这个和观察者模式有相同之处:获得数据并将它们推送到监听器。

pull和push

在编程中,基于推送的行为意味着应用程序的服务器组件向其客户端发送更新,而不是客户端必须轮询服务器以获取这些更新。这就像是说“不要打电话给咱们; 咱们会打电话给你。“
RxJS是基于推送的,所以事件源(Observable)将推进新值给消费者(观察者),消费者却不能去主动请求新值。

更简单地说,Observable是一个随着时间的推移可使用其数据的序列。Observables,也就是Observers的消费者至关于观察者模式中的监听器。当Observe订阅一个Observable时,它将在序列中接收到它们可用的值,而没必要主动请求它们。

到目前为止,彷佛与传统观察者没有太大区别。 但实际上有两个本质区别:

  • Observable在至少有一个Observer订阅它以前不会启动。
  • 与迭代器同样,Observable能够在序列完成时发出信号。

使用Observables,咱们能够声明如何对它们发出的元素序列作出反应,而不是对单个项目作出反应。咱们能够有效地复制,转换和查询序列,这些操做将应用于序列的全部元素。

建立Observables

有几种方法能够建立Observable,建立函数是最明显的一种。 Rx.Observable对象中的create方法接受一个Observer参数的回调。 该函数定义了Observable将如何传出值。这是咱们如何建立一个简单的Observable:

var observable = Rx.Observable.create(function(observer) {
    observer.onNext('Simon');
    observer.onNext('Jen');
    observer.onNext('Sergi');
    observer.onCompleted(); // We are done
});

当咱们订阅此Observable时,它经过在其侦听器上调用onNext方法来发出三个字符串。 而后它调用onCompleted来表示序列已完成。 可是咱们究竟如何订阅Observable呢?咱们使用Observers来作这件事情。

第一次接触Observers

Observers监听Observables。每当Observable中触发一个事件,它都会在全部Observers中调用相关的方法。

Observers有三种方法:onNextonCompletedonError

onNext    至关于观察者模式中的update。 当Observable发出新值时调用它。请注意该名称如何反映咱们订阅序列的事实,而不只仅是离散值。

onCompleted    表示没有更多可用数据。 调用onCompleted后,对onNext的进一步调用将不起做用。

onError    在Observable中发生错误时调用。 在调用以后,对onNext的进一步调用将不起做用

如下是咱们建立基本观察者的方法:

var observer = Rx.Observer.create(
    function onNext(x) { console.log('Next: ' + x); },
    function onError(err) { console.log('Error: ' + err); },
    function onCompleted() { console.log('Completed'); }
);

Rx.Observer对象中的create方法接受onNext,onCompleted和onError状况的函数,并返回一个Observer实例。这三个函数是可选的,您能够决定要包含哪些函数。例如,若是咱们订阅无限序列(例如点击按钮(用户能够永久点击)),则永远不会调用onCompleted处理程序。 若是咱们确信序列不能出错(例如,经过从数组中生成一个Observable),咱们就不须要onError方法了。

使用Observable进行Ajax调用

咱们尚未对Observables作过任何实用的事情。如何建立一个检索远程内容的Observable?为此,咱们将使用Rx.Observable.create包装XMLHttpRequest对象。

function get(url) {
    return rxjs.Observable.create(function(observer) {
        // Make a traditional Ajax request
        var req = new XMLHttpRequest(); req.open('GET', url);
        req.onload = function() { 
            if (req.status == 200) {
                // If the status is 200, meaning there have been no problems,     
                // Yield the result to listeners and complete the sequence     
                observer.next(req.response);
                observer.completed();
            }
            else {
                // Otherwise, signal to listeners that there has been an error
                observer.error(new Error(req.statusText)); }
            };
            req.onerror = function() {
            observer.error(new Error("Unknown Error"));
        };
        req.send();
    });
}
// Create an Ajax Observable
var test = get('/api/contents.json');

在前面的代码中,get函数使用create来包装XMLHttpRequest。若是HTTP GET请求成功,咱们emit它的内容并结束序列(咱们的Observable只会发出一个结果)。 不然,咱们会emit一个错误。在最后一行,咱们传入一个url进行调用。 这将建立Observable,但它不会发出任何请求。这很重要:Observable在至少有一个观察者描述它们以前不会作任何事情。 因此让咱们要这样子作:

// Subscribe an Observer to it
test.subscribe(
    function onNext(x) { console.log('Result: ' + x); }, 
    function onError(err) { console.log('Error: ' + err); }, 
    function onCompleted() { console.log('Completed'); }
);

首先要注意的是,咱们没有像以前的代码那样显式建立Observer。大多数时候咱们都会使用这个更短的版本,咱们在Observable中使用这三个订阅Observer案例的函数:next,completed和error。

subscribe而后一切就绪。在subscribe以前,咱们只是声明了Observable和Observer将如何交互。只有当咱们调用subscribe方法时,一切才开始运行。

始终会有一个Operator

在RxJS中,转换或查询序列的方法称为Operator。Operator位于静态Rx.Observable对象和Observable实例中。在咱们的示例中,create就是一个这样的Operator。

当咱们必须建立一个很是具体的Observable时,create是一个很好的选择,可是RxJS提供了许多其余Operator,能够很容易地为经常使用源建立Observable。

让咱们再看看前面的例子。对于像Ajax请求这样的常见操做,一般有一个Operator可供咱们使用。 在这种状况下,RxJS DOM库提供了几种从DOM相关源建立Observable的方法。因为咱们正在执行GET请求,咱们可使用Rx.DOM.Request.get,而后咱们的代码就变成了这个:

Rx.DOM.get('/api/contents.json').subscribe(
    function onNext(data) { console.log(data.response); }, 
    function onError(err) { console.error(err); }
);

rxjs-dom自己支持的rxjs版本比较旧,例子只能作为示意

这段代码与咱们以前的代码彻底相同,但咱们没必要建立XMLHttpRequest的包装器: 它已经存在了。另请注意,此次咱们省略了onCompleted回调,由于咱们不打算在Observable complete时作出反应。咱们知道它只会产生一个结果,咱们已经在onNext回调中使用它了。

在本书中咱们将使用这样的大量便利操做符。这都是基于rxjs自己的能量,这也正式rxjs强大的地方之一。

一种能够约束所有的数据类型

在RxJS程序中,咱们应该努力将全部数据都放在Observables中,而不只仅是来自异步源的数据。 这样作能够很容易地组合来自不一样来源的数据,例如现有数组与回调结果,或者XMLHttpRequest的结果与用户触发的某些事件。

例如,若是咱们有一个数组,其项目须要与来自其余地方的数据结合使用,最好将此数组转换为Observable。(显然,若是数组只是一个不须要组合的中间变量,则没有必要这样作。)在本书中,您将了解在哪些状况下值得将数据类型转换为Observables。

RxJS为operators提供了从大多数JavaScript数据类型建立Observable的功能。 让咱们回顾一下你将一直使用的最多见的:数组,事件和回调。

从数组建立Observable

咱们可使用通用的operators将任何相似数组或可迭代的对象转换为Observable。 from将数组做为参数并返回一个包含他全部元素的Observable。

Rx.Observable
.from(['Adrià', 'Jen', 'Sergi']) 
.subscribe(
    function(x) { console.log('Next: ' + x); }, 
    function(err) { console.log('Error:', err); },
    function() { console.log('Completed'); }
);

from是和fromEvent一块儿,是RxJS代码中最方便和最经常使用的operators之一。

从JavaScript事件建立Observable

当咱们将一个事件转换为一个Observable时,它就变成了一个能够组合和传递的第一类值。 例如,这是一个Observable,只要它移动就会传初鼠标指针的坐标。

var allMoves = Rx.Observable.fromEvent(document, 'mousemove')

allMoves.subscribe(function(e) {
  console.log(e.clientX, e.clientY);
});

将事件转换为Observable会将事件从以前的事件逻辑中释放出来。更重要的是,咱们能够基于原始的Observables建立新的Observable。这些新的是独立的,可用于不一样的任务。

var movesOnTheRight = allMoves.filter(function(e) { 
    return e.clientX > window.innerWidth / 2;
});
var movesOnTheLeft = allMoves.filter(function(e) { 
    return e.clientX < window.innerWidth / 2;
});
movesOnTheRight.subscribe(function(e) { 
    console.log('Mouse is on the right:', e.clientX);
});
movesOnTheLeft.subscribe(function(e) { 
    console.log('Mouse is on the left:', e.clientX);
});

在前面的代码中,咱们从原始的allMoves中建立了两个Observable。 这些专门的Observable只包含原始的过滤项:movesOnTheRight包含发生在屏幕右侧的鼠标事件,movesOnTheLeft包含发生在左侧的鼠标事件。 它们都没有修改原始的Observable:allMoves将继续发出全部鼠标移动。 Observable是不可变的,每一个应用于它们的operator都会建立一个新的Observable。

从回调函数建立Observable

若是您使用第三方JavaScript库,则可能须要与基于回调的代码进行交互。 咱们可使用fromCallbackfromNodeCallback两个函数将回调转换为Observable。Node.js遵循的是在回调函数的第一个参数传入错误对象,代表存在问题。而后咱们使用fromNodeCallback专门从Node.js样式的回调中建立Observable:

var Rx = require('rx'); // Load RxJS
var fs = require('fs'); // Load Node.js Filesystem module
// Create an Observable from the readdir method
var readdir = Rx.Observable.fromNodeCallback(fs.readdir); // Send a delayed message
var source = readdir('/Users/sergi');
var subscription = source.subscribe(
    function(res) { console.log('List of directories: ' + res);},
    function(err) { console.log('Error: ' + err); },
    function() { console.log('Done!'); 
});

前面的代码中,咱们使用Node.js的fs.readdir方法建立一个Observable readdir。 fs.readdir接受目录路径和回调函数delayedMsg,该函数在检索目录内容后调用。

咱们使用readdir和咱们传递给原始fs.readdir的相同参数,省掉了回调函数。 这将返回一个Observable,当咱们订阅一个Observer时,它将正确使用onNext,onError和onCompleted。

总结

在本章中,咱们探讨了响应式编程,并了解了RxJS如何经过Observable解决其余问题的方法,例如callback或promise。如今您了解为何Observables功能强大,而且您知道如何建立它们。有了这个基础,咱们如今能够继续建立更有趣的响应式程序。下一章将向您展现如何建立和组合基于序列的程序,这些程序为Web开发中的一些常见场景提供了更“可观察”的方法。

关注个人微信公众号,更多优质文章定时推送
clipboard.png

建立了一个程序员交流微信群,你们进群交流IT技术

图片描述

若是已过时,能够添加博主微信号15706211347,拉你进群

相关文章
相关标签/搜索