Rxjs 响应式编程-第二章:序列的深刻研究

Rxjs 响应式编程-第一章:响应式
Rxjs 响应式编程-第二章:序列的深刻研究
Rxjs 响应式编程-第三章: 构建并发程序
Rxjs 响应式编程-第四章 构建完整的Web应用程序
Rxjs 响应式编程-第五章 使用Schedulers管理时间
Rxjs 响应式编程-第六章 使用Cycle.js的响应式Web应用程序css

序列的深刻研究

童年的回忆中的益智视频游戏,你必须使用各类技巧在屏幕上引导降低的水流。您能够拆分流,稍后将它们合并,或者使用倾斜的木板来改变它们的方向。你必需要有创造力才能使水达到最终目标。html

我发现该游戏与使用Observable序列有不少类似之处。 Observable只是咱们能够转换,组合和查询的事件流。 不管咱们是在处理简单的Ajax回调仍是在Node.js中处理字节数据都不要紧。 咱们发现流的方式是同样的。 一旦咱们在流中思考,咱们程序的复杂性就会下降。数据库

在本章中,咱们将重点介绍如何在程序中有效地使用序列。 到目前为止,咱们已经介绍了如何建立Observable并使用它们进行简单的操做。为了释放它们的力量,咱们必须知道将咱们的程序输入和输出转换为带有咱们程序流程的序列。编程

在咱们弄清楚以前,咱们将会遇到一些能够帮助咱们开始操做序列的基本operator。接下来,咱们将实现一个真实的应用程序,显示(几乎)实时发生的地震。 开始吧!json

可视化的Observables

您将要学习咱们在RxJS程序中最常使用的一些运算符。 谈论对序列的操做可能感受很抽象。 为了帮助开发人员以简单的方式理解Operator,咱们将使用标准的可视化表示序列,称为大理石图。 它们直观地表示异步数据流,您能够在RxJS的每一个资源中找到它们。segmentfault

让咱们使用范围运算符,它返回一个Observable,它获得指定范围内的整数:Rx.Observable.range(1,3);api

它的大理石图看起来像这样:数组

image

长箭头表示Observable,x轴表示时间。每一个圆圈表示Observable经过内部调用onNext()传出的值。生成第三个值后,range调用了onCompleted,在图中用垂直线表示。promise

让咱们看一个涉及几个Observable的例子。合并运算符采用两个不一样的Observable并返回一个具备合并值的新Observable。 interval运算符返回一个Observable,它在给定的时间间隔内产生增量数,以毫秒为单位。服务器

在下面的代码中,咱们将合并两个不一样的Observable,它们使用interval来以不一样的间隔生成值:

var a = Rx.Observable.interval(200).map(function(i) { 
    return 'A' + i;
});
var b = Rx.Observable.interval(100).map(function(i) {
    return 'B' + i; 
    
});
Rx.Observable.merge(a, b).subscribe(function(x) {
    console.log(x);
});
B0, A0, B1, B2, A1, B3, B4...

合并运算符的大理石图以下所示:

image

这里,沿y轴的虚线箭头指向应用于序列A和B中每一个元素的变换的最终结果。获得的Observable由C表示,其中包含A和B的合并元素。若是不一样Observables同时传出元素,合并序列中这些元素的顺序是随机的。

基本序列运算符

在RxJS中转换Observables的数十个运算符中,最经常使用的是具备良好收集处理能力的其余语言也具备:map,filter和reduce。在JavaScript中,您能够在Array中找到这些operator。

RxJS遵循JavaScript约定,所以您会发现如下运算符的语法与数组运算符的语法几乎相同。实际上,咱们将使用数组和Observables同时实现,以显示两个API的类似程度。

Map

map是最经常使用的序列转换运算符。它接受一个Observable和一个函数,并将该函数应用于源Observable中的每一个值。 它返回一个带有转换值的新Observable。

image

JS Arrays

var src = [1, 2, 3, 4, 5];
var upper = src.map(function(name) {
    return name * 2; 
});
upper.forEach(logValue);

Observables

var src = Rx.Observable.range(1, 5); 
var upper = src.map(function(name) {
    return name * 2; 
});
upper.subscribe(logValue);

在这两种状况下,src都不会发生改变。

这段代码和后面的代码使用的logValue函数:

var logValue = function(val) { 
    console.log(val) 
};

有些状况下,咱们传递给map的函数会进行一些异步计算来转换值。在这种状况下,map将没法按预期工做。 对于这些状况,最好使用flatMap,后续会介绍到。

Filter

filter接受一个Observable和一个函数,并使用该函数检测Observable中的每一个元素。它返回一个Observable序列,其中包含函数返回true的全部元素。

image

JS Arrays

var isEven = (function(val) { return val % 2 !== 0; });
var src = [1, 2, 3, 4, 5];
var even = src.filter(isEven);
even.forEach(logValue);

Observables

var isEven = (function(val) { return val % 2 !== 0; });
var src = Rx.Observable.range(1, 5); 
var even = src.filter(isEven);
even.subscribe(logValue);

Reduce

reduce(也称为fold)接受一个Observable并返回一个始终包含单个项的新项,这是在每一个元素上应用函数的结果。 该函数接收当前元素和函数先前调用的结果。

image

JS Arrays

var src = [1, 2, 3, 4, 5];
var sum = src.reduce(function(a, b) {
    return a + b;
});
console.log(sum);

Observables

var src = Rx.Observable.range(1, 5);
var sum = src.reduce(function(acc, x) {
    return acc + x;
});
sum.subscribe(logValue);

reduce是操做序列的强大操做符。事实上,它是称为聚合运算符的基本实现。

聚合运算符

聚合运算符处理序列并返回单个值。例如, Rx.Observable.first接受一个Observable和一个可选函数,并返回知足函数条件布尔值的第一个元素。

计算序列的平均值也是一个聚合操做.RxJS提供了实例运算符的平均值,可是为了本节的目的,咱们想看看如何使用reduce实现它。每一个聚合运算符均可以经过仅使用reduce来实现:

sequences/marble.js

var avg = Rx.Observable.range(0, 5)
    .reduce(function(prev, cur) {
        return {
            sum: prev.sum + cur,
            count: prev.count + 1
        };
    }, { sum: 0, count: 0 })
    .map(function(o) {
        return o.sum / o.count;
    });
    
var subscription = avg.subscribe(function(x) {
    console.log('Average is: ', x);
});
Average is: 2

在此代码中,咱们使用reduce将每一个新值添加到前一个值。由于reduce不能为咱们提供序列中元素的总数,因此咱们须要对它们进行计数。咱们使用包含两个字段sum和count的对象组成的初始值调用reduce,其中咱们将存储到目前为止的元素总数和总数。每一个新元素都将返回具备更新值的同一对象。

当序列结束时,reduce能够经过调用onNex返回t包含最终总和和最终计数的对象。但在这里咱们使用map来返回将总和除以计数的结果。

咱们能够聚合无限Observables吗?

想象一下,咱们正在编写一个程序,让用户在行走时得到平均速度。即便用户还没有完成行走,咱们也须要可以使用咱们目前所知的速度值进行计算。咱们想要实时记录无限序列的平均值。 问题是若是序列永远不会结束,像reduce这样的聚合运算符将永远不会调用其Observers的onNext运算符。

对咱们来讲幸运的是,RxJS团队已经考虑过这种状况,并为咱们提供了scan操做符,其做用相似于reduce可是会发出每一个中间结果:

var avg = Rx.Observable.interval(1000)
    .scan(function (prev, cur) {
        return {
            sum: prev.sum + cur,
            count: prev.count + 1
        };
    }, { sum: 0, count: 0 })
    .map(function(o) {
        return o.sum / o.count;
    });
    
var subscription = avg.subscribe( function (x) {
    console.log(x);
});

这样,咱们能够聚合须要很长时间才能完成或无限的序列。在前面的示例中,咱们每秒生成一个增量整数,并调用scan替换先前的reduce。咱们如今每秒获得生成值的平均值。

flatMap

若是你的Observable的结果是仍是Observables,你要怎么处理?大多数状况下,您但愿在单个序列中统一这些嵌套Observable中的项目。 这正是flatMap的做用。

flatMap运算符接收参数Observable A,其元素也是Observables,并返回一个子元素也是Observable的Observable。让咱们用图表可视化它:

image

咱们能够看到A(A1,A2,A3)中的每一个元素也是可观察序列。 一旦咱们使用变换函数将flatMap应用于A,咱们获得一个Observable,其中包含A的不一样子元素中的全部元素。

flatMap是一个功能强大的运算符,但它比咱们迄今为止看到的运算符更难理解。能够把它想象成Observables的concatAll()

concatAll是一个函数,它接受一个数组数组并返回一个“flattened”单个数组,其中包含全部子数组的值,而不是子数组自己。 咱们可使用reduce来实现这样的功能:

function concatAll(source) {
    return source.reduce(function(a, b) {
        return a.concat(b); 
    });
}

咱们会像这样使用它:

concatAll([[0, 1, 2], [3, 4, 5], [6, 7, 8]]);
// [0, 1, 2, 3, 4, 5, 6, 7, 8]

flatMap作一样的事情,但它使Observables而不是数组变扁平。它须要一个源Observable和一个返回一个新的Observable的函数,并将该函数应用于源Observable中的每一个元素,就像map同样。若是程序在这里中止,咱们最终会获得一个会发出Observables的Observable。 可是flatMap向主序列发出每一个新Observable发出的值,将全部Observable“扁平化”为一个主序列。 最后,咱们得到了一个Observable。

取消序列

在RxJS中,咱们能够取消正在运行的Observable。 这是一种优于其余异步通讯形式的优点,例如回调和Promise,一旦被调用就没法直接取消(尽管某些Promise实现支持取消)。

咱们能够经过两种主要方式取消Observable:隐式和显式。

显式取消:Disposable

Observables自己没有取消的方法。相反,当咱们订阅Observable时,咱们会获得一个表明该特定订阅的Disposable对象。而后咱们能够在该对象中调用方法dispose,而且该订阅将中止从Observable接收通知。

在下面的示例中,咱们将两个Observers订阅到计数器Observable,它每秒发出一个递增的整数。 两秒后,咱们取消第二个订阅,咱们能够看到它的输出中止但第一个订阅者的输出继续:

sequences/disposable.js

var counter = Rx.Observable.interval(1000);

var subscription1 = counter.subscribe(function(i) {
    console.log('Subscription 1:', i);
});

var subscription2 = counter.subscribe(function(i) {
    console.log('Subscription 2:', i);
});

setTimeout(function() { 
    console.log('Canceling subscription2!');
    subscription2.dispose();
}, 2000);
Subscription 1: 0 
Subscription 2: 0 
Subscription 1: 1 
Subscription 2: 1 
Canceling subscription2! 
Subscription 1: 2 
Subscription 1: 3 
Subscription 1: 4
...

隐式取消:经过Operater

大多数时候,Operater会自动取消订阅。当序列结束或知足操做条件时,rangetake等操做符将取消订阅。更高级的操做符,如withLatestFromflatMapLatest,将根据须要在内部建立和销毁订阅,由于它们处理的是运行中的几个可观察的内容。简而言之,大部分订阅的取消都不该该是你该担忧的。

被封装以后的Observables

当您使用包含不提供取消的外部API的Observable时,Observable仍会在取消时中止发出通知,但基础API不必定会被取消。例如,若是您正在使用封装Promise的Observable,则Observable将在取消时中止发出,但不会取消基础Promise。

在下面的代码中,咱们尝试取消对包含promise p的Observable的订阅,同时咱们以传统的方式设置一个动做来解决promise。 promise应在五秒内resolve,但咱们在建立后当即取消订阅:

var p = new Promise(function(resolve, reject) {
    window.setTimeout(resolve, 5000);
});

p.then(function() {
    console.log('Potential side effect!');
});

var subscription = Rx.Observable.fromPromise(p).subscribe(function(msg) {
    console.log('Observable resolved!');
});

subscription.dispose();

5秒后,咱们看到:

Potential side effect!

若是咱们取消对Observable的订阅,它会有效地阻止它接收通知。 可是promise的then方法仍在运行,代表取消Observable并不会取消关联的Promsie。

了解咱们在Observable中使用的外部API的详细信息很是重要。您可能认为已取消序列,但底层API会继续运行并在程序中引发一些反作用。 这些错误真的很难捕捉到。

错误处理

咱们不能在回调中使用传统的try / catch机制,由于它是同步的。 它将在任何异步代码以前运行,而且没法捕获任何错误。

在回调函数中,能够经过将错误(若是有)做为参数传递到回调函数。这是有用的,但它使代码很是脆弱。

让咱们看看如何捕获Observables中的错误。

onError处理程序

还记得咱们在上面上讨论了第一次与观察者联系的观察者能够调用的三种方法吗? 咱们熟悉onNextonCompleted,可是咱们尚未使用onError; 它是有效处理Observable序列中错误的关键。

为了了解它是如何工做的,咱们将编写一个简单的函数来获取JSON字符串数组,并使用JSON.parse返回一个Observable,它发出从这些字符串解析的对象:

为了了解它是如何工做的,咱们将编写一个简单的函数来获取JSON字符串组成的数组,并使用JSON.parse返回一个Observable,它发出从这些字符串解析的对象:

function getJSON(arr) {
    return Rx.Observable.from(arr).map(function(str) {
        var parsedJSON = JSON.parse(str);
        return parsedJSON;
    });
}

咱们将带有三个JSON字符串的数组传递给getJSON,其中数组中的第二个字符串包含语法错误,所以JSON.parse将没法解析它。 而后咱们将订阅结果,为onNext和onError提供处理程序:

getJSON([
    '{"1": 1, "2": 2}',
    '{"success: true}', // Invalid JSON string
    '{"enabled": true}'
]).subscribe(
    function(json) {
        console.log('Parsed JSON: ', json);
    },
    function(err) {
        console.log(err.message);
    }
)
Parsed JSON: { 1: 1, 2: 2 }
JSON.parse: unterminated string at line 1 column 8 of the JSON data

Observable为第一个结果发出解析的JSON,但在尝试解析第二个结果时抛出异常。 onError处理程序捕获并打印出来。默认行为是,每当发生错误时,Observable都会中止发出项目,而且不会调用onCompleted。

错误捕获

到目前为止,咱们已经看到如何检测错误已经发生并对该信息作了些什么,可是咱们没法对它作出响应并继续咱们正在作的事情。Observable察实例具备catch运算符,它容许咱们对Observable中的错误作出反应并继续使用另外一个Observable。

catch接受一个Observable或一个接收错误的函数做为参数并返回另外一个Observable。 在咱们的场景中,若是原始Observable中存在错误,咱们但愿Observable发出包含error属性的JSON对象:

function getJSON(arr) {
    return Rx.Observable.from(arr).map(function(str) {
        var parsedJSON = JSON.parse(str);
        return parsedJSON;
    });
}

var caught = getJSON(['{"1": 1, "2": 2}', '{"1: 1}']).catch(
    Rx.Observable.return({
        error: 'There was an error parsing JSON'
    })
);

caught.subscribe(
    function(json) {
        console.log('Parsed JSON: ', json);
    },
    // Because we catch errors now, `onError` will not be executed
    function(e) {
        console.log('ERROR', e.message);
    }
);

在前面的代码中,咱们建立了一个新的Observable,它使用catch运算符来捕获原始Observable中的错误。 若是出现错误,它将使用仅发出一个项目的Observable继续序列,并使用描述错误的error属性。 这是输出:

Parsed JSON: Object { 1: 1, 2: 2 }
Parsed JSON: Object { error: "There was an error parsing JSON" }

这是catch操做符的大理石图:

image

注意X表示序列出错。 在这种状况下,Observable值 - 三角形的不一样形状意味着它们是来自另外一个Observable的值。在这里,这是咱们在发生错误时返回的Observable。

catch对于对序列中的错误做出反应很是有用,它的行为与传统的try / catch块很是类似。 可是,在某些状况下,忽略Observable中的项目发生的错误并让序列继续,这将是很是方便的。 在这些状况下,咱们可使用重试运算符。

序列重试

有时错误就会发生,咱们无能为力。例如,可能存在请求远程数据的超时,由于用户具备不稳定的Internet链接,或者咱们查询的远程服务器可能崩溃。在这些状况下,若是咱们可以继续请求咱们须要的数据直到成功,那将是很好的。 重试操做符的确如此:

sequences/error_handling.js

// This will try to retrieve the remote URL up to 5 times.
Rx.DOM.get('/products').retry(5)
.subscribe(
    function(xhr) { console.log(xhr); },
    function(err) { console.error('ERROR: ', err); }
);

在前面的代码中,咱们建立了一个函数,该函数返回一个Observable,它使用XMLHttpRequest从URL检索内容。 由于咱们的链接可能有点不稳定,因此咱们在订阅它以前添加retry(5),确保在出现错误的状况下,它会在放弃并显示错误以前尝试最多五次。

使用重试时须要了解两件重要事项。首先,若是咱们不传递任何参数,它将无限期地重试,直到序列完成没有错误。 若是Observable产生错误,这对性能是危险的。 若是咱们使用同步Observable,它将具备与无限循环相同的效果。

其次,重试将始终从新尝试整个Observable序列,即便某些项目没有错误。若是您在处理项目时形成任何反作用,这一点很重要,由于每次重试都会从新应用它们。

制做实时地震可视化器

使用咱们在本章中到目前为止所涵盖的概念,咱们将构建一个使用RxJS的Web应用程序,以向咱们展现实时发生地震的位置。咱们首先要创建一个功能性的反应性实施方案,咱们将随着时间的推移对其进行改进。 最终结果以下:

image

准备环境

咱们将使用USGS(美国地质调查局)地震数据库,该数据库提供多种格式的实时地震数据集。 咱们将以JSONP格式从每周数据集中获取数据。

咱们还将使用Leaflet(一个JavaScript库)来渲染交互式地。让咱们看看咱们的index.html看起来如何,并重点介绍:

examples_earthquake/index.html

<!DOCTYPE html>
<html lang="en-us">
<head>
    <meta charset="utf-8">
    <link rel="stylesheet"
    href="http://cdn.leafletjs.com/leaflet-0.7.3/leaflet.css" />
    <script src="http://cdn.leafletjs.com/leaflet-0.7.3/leaflet.js"></script>
    <script src="../rx.all-4.0.0.js"></script>
    <title>Earthquake map</title>
    <style type="text/css">
        html, body {
        margin: 0;
        padding: 0;
        height: 100%;
        }
        #map { height: 100%; }
    </style>
</head>
<body>
<div id="map"></div>
    <script>
        var QUAKE_URL = 'http://earthquake.usgs.gov/earthquakes/feed/v1.0/' +
        'summary/all_day.geojsonp';
        function loadJSONP(url) {
            var script = document.createElement('script');
            script.src = url;
            var head = document.getElementsByTagName('head')[0];
            head.appendChild(script);
        }
        var map = L.map('map').setView([33.858631, -118.279602], 7);
        L.tileLayer('http://{s}.tile.osm.org/{z}/{x}/{y}.png').addTo(map);
    </script>
    <script src="code.js"></script>
</body>
</html>

检索地震位置

如今咱们的HTML已准备就绪,咱们能够为咱们的应用程序编写逻辑。首先,咱们须要知道咱们得到了什么样的数据以及在地图上表明地震所需什么样的数据。

USGS网站给咱们的JSONP数据看起来像这样:

examples_earthquake/jsonp_example.txt

eqfeed_callback({
    "type": "FeatureCollection",
    "metadata": {
        "generated": 1408030886000,
        "url": "http://earthquake.usgs.gov/earthquakes/...",
        "title": "USGS All Earthquakes, Past Day",
        "status": 200, "api": "1.0.13", "count": 134
    },
    "features": [
        {
            "type": "Feature",
            "properties": {
                "mag": 0.82,
                "title": "M 0.8 - 3km WSW of Idyllwild-Pine Cove, California",
                "place": "3km WSW of Idyllwild-Pine Cove, California",
                "time": 1408030368460,
                ...
            },
            "geometry": {
                "type": "Point",
                "coordinates": [ -116.7636667, 33.7303333, 17.33 ]
            },
            "id": "ci15538377"
        },
        ...
    ]
})

features数组包含一个对象,其中包含今天发生的每次地震的数据。 那是一大堆数据! 一天以内发生了多少次地震是使人惊讶的(而且可怕)。对于咱们的程序,咱们只须要每次地震的坐标,标题和大小。

咱们首先要建立一个Observable来检索数据集并发出单个地震。 这是第一个版本:

examples_earthquake/code.js

var quakes = Rx.Observable.create(function(observer) {
    window.eqfeed_callback = function(response) {
        var quakes = response.features;
        quakes.forEach(function(quake) {
            observer.onNext(quake);
        });
    };
    loadJSONP(QUAKE_URL);
});

quakes.subscribe(function(quake) {
    var coords = quake.geometry.coordinates;
    var size = quake.properties.mag * 10000;
    L.circle([coords[1], coords[0]], size).addTo(map);
});

等等,那个明显的全局函数window.eqfeed_callback在咱们的代码中作了什么? 好吧,事实证实,JSONP URL一般在URL中添加查询字符串,以指定处理响应的函数名称,但USGS站点不容许这样作,所以咱们须要建立一个全局函数 他们决定咱们必须使用的名称,即eqfeed_callback

咱们的Observable按顺序发出全部地震。咱们如今有地震数据生成器!咱们没必要关心异步流程或者必须将全部逻辑放在同一个函数中。只要咱们订阅Observable,就会获得地震数据。

经过在地震观测中将地震检索“黑箱”,咱们如今能够订阅并处理每次地震。 而后咱们将为每一个地震绘制一个圆,其大小与其大小成比例。

深刻一些

咱们能够作得更好吗?你打赌!在前面的代码中,咱们仍然经过遍历数组并调用onNext来管理每一个地震,即便咱们在Observable中将其隔离。

这是可使用flatMap的完美状况。咱们将使用Rx.Observable.from检索数据并从features数组中生成一个Observable。 而后咱们将Observable合并回主Observable中:

var quakes = Rx.Observable.create(function(observer) {
    window.eqfeed_callback = function(response) {
        observer.onNext(response);
        observer.onCompleted();
    };
    loadJSONP(QUAKE_URL);
}).flatMap(function transform(dataset) {
    return Rx.Observable.from(dataset.response.features);
});

quakes.subscribe(function(quake) {
    var coords = quake.geometry.coordinates;
    var size = quake.properties.mag * 10000;
    L.circle([coords[1], coords[0]], size).addTo(map);
});

咱们再也不手动管理流程了。 没有循环或条件来提取单个地震对象并将其传递出去。 这是就是发生了什么:

  1. onNext只发生一次,它产生整个JSON字符串。
  2. 因为咱们只会产生一次,所以咱们在onNext以后发出完成信号。
  3. 咱们将flatMap调用连接到create的结果,所以flatMap将从Observable中获取每一个结果(在这种状况下只有一个),将它用做transform函数的参数,并将该函数产生的Observable合并到源Observable。
  4. 这里咱们采用包含全部地震的features数组,并从中建立一个Observable。因为flatMap,这将成为quakes变量将包含的实际Observable。

5.订阅不会改变; 它像之前同样继续处理地震的数据流。

始终有一种方法

到目前为止,咱们已经使用了rx.all.js中包含的RxJS运算符,但一般仍是须要借鉴其余基于RxJS的库附带的运算符。在咱们的例子中,咱们将看看RxJS-DOM。RxJS-DOM是一个外部库,其中包含一个处理JSONP请求的运算符:jsonpRequest。这为咱们节省了一些代码,由于咱们不须要使用讨厌的全局函数:

examples_earthquake/code1_2.js

var quakes = Rx.DOM.jsonpRequest({
    url: QUAKE_URL,
    jsonpCallback: 'eqfeed_callback'
})
.flatMap(function(result) {
    return Rx.Observable.from(result.response.features);
})
.map(function(quake) {
    return {
        lat: quake.geometry.coordinates[1],
        lng: quake.geometry.coordinates[0],
        size: quake.properties.mag * 10000
    };
});

quakes.subscribe(function(quake) {
    L.circle([quake.lat, quake.lng], quake.size).addTo(map);
});

请记住,要运行此代码,您须要在HTML中包含RxJS-DOM中的文件rx.dom.js。请注意咱们如何添加一个map运算符,将地震对象转换为仅包含咱们可视化所需信息的简单对象:纬度,经度和地震震级。 咱们在subscribeoperator中写的功能越少越好。

实时标记

咱们地震应用的版本不会实时更新地震图。为了实现这一点,咱们将使用咱们在本章前面看到的interval运算符 - 以及有用的distinct运算符。下面的代码,而后咱们将完成更改:

examples_earthquake/code1_3.js

var quakes = Rx.Observable
.interval(5000)
.flatMap(function() {
    return Rx.DOM.jsonpRequest({
        url: QUAKE_URL,
        jsonpCallback: 'eqfeed_callback'
    }).retry(3);
})
.flatMap(function(result) {
    return Rx.Observable.from(result.response.features);
})
.distinct(function(quake) { return quake.properties.code; });

quakes.subscribe(function(quake) {
    var coords = quake.geometry.coordinates;
    var size = quake.properties.mag * 10000;
    L.circle([coords[1], coords[0]], size).addTo(map);
});

在前面的代码中,咱们使用interval来发出新请求并以5秒的固定间隔处理它们。 interval建立一个Observable,每隔五秒发出一个递增的数字。咱们对这些数字没有作任何事情; 相反,咱们使用flatMap来检索jsonpRequest的数据。另请注意咱们如何在首先检索列表时出现问题时再次尝试重试。

咱们应用的最后一个运算符是distinct,它只发出以前未发出的元素。 它须要一个函数来返回属性以检查是否相等。 这样咱们就不会重绘已经绘制过的地震。

在不到20行中,咱们编写了一个应用程序,按期轮询外部JSONP URL,从其内容中提取具体数据,而后过滤掉已导入的地震。在那以后,咱们在地图上表示地震,其大小与其大小成比例-全部这些都以独立,清晰和简洁的方式编写,而不依赖于外部状态。这代表了Observables的表现力。

改进的想法

这里有一些想法可使用你新得到的RxJS技能,并使这个小应用程序更有趣:

  • 当用户将鼠标悬停在地震上时,提供一个弹出窗口,显示有关该特定地震的更多信息。 一种方法是从只有你想要显示的属性的地震中建立一个新的Observable,并在悬停时动态过滤它。
  • 在页面顶部放置一个计数器,显示当前到目前为止的地震次数,并天天重置

Operator详解

本章向您介绍了一些新的运算符,因此这里是对它们的回顾,以及咱们在应用程序中使用它们的方法。 请记住,您始终能够在RxJS GitHub站点上找到Operator的完整API文档。

  • Rx.Observable.from

默认行为:同步

因为您在应用程序中使用的许多数据源都来自数组或迭代器,所以有一个运算符能够从中建立Observable。 from是您最常使用的Operator之一。

使用from,咱们能够从数组,相似数组的对象(例如,arguments对象或DOM NodeLists)建立Observable,甚至能够实现可迭代协议的类型,例如StringMapSet

  • Rx.Observable.range

默认行为:同步

range运算符生成有限的Observable,它发出特定范围内的整数。它功能多样,可用于许多场景。 例如,您可使用范围在像扫雷同样的游戏板上生成初始方块。

  • Rx.Observable.interval

默认行为:异步

每次须要生成时间间隔的值时,您可能会以interval运算符做为生成器开始。因为interval每x毫秒发出一次顺序整数(其中x是咱们传递的参数),咱们只须要将值转换为咱们想要的任何值。 咱们在第3章“构建并发程序”中的游戏很大程度上基于该技术。

  • Rx.Observable.distinct

默认行为:与filter的Observable相同

distinct是这些很是简单的Operator之一,能够节省大量的开发工做。它会过滤掉已经发出的任何值。 这使咱们避免编写容易出错的样板代码,咱们将对比传入的结果决定返回值。就是返回不一样值。

image

distinct容许咱们使用指定比较方法的函数。另外,咱们能够不传递任何参数,它将使用严格的比较来比较数字或字符串等基本类型,并在更复杂的对象的状况下运行深度比较。

总结

在本章中,咱们介绍了如何使用大理石图表直观地表示和理解Observable流程。咱们已经介绍了最多见的运算符来转换Observables,更重要的是,咱们只使用Observable序列构建了一个真实的世界应用程序,避免设置任何外部状态,循环或条件分支。咱们以声明的方式表达了咱们的整个程序,而没必要编码完成手头任务的每一步。

在下一章中,咱们将继续探索Observable序列,此次咱们将介绍更高级的运算符,它们容许您控制程序中的流和数据,用以前没法想象的代码!

关注个人微信公众号,更多优质文章定时推送
clipboard.png

相关文章
相关标签/搜索