Rxjs 响应式编程-第四章 构建完整的Web应用程序

Rxjs 响应式编程-第一章:响应式
Rxjs 响应式编程-第二章:序列的深刻研究
Rxjs 响应式编程-第三章: 构建并发程序
Rxjs 响应式编程-第四章 构建完整的Web应用程序
Rxjs 响应式编程-第五章 使用Schedulers管理时间
Rxjs 响应式编程-第六章 使用Cycle.js的响应式Web应用程序html

构建完整的Web应用程序

在本章中,咱们将构建一个典型的Web应用程序,在前端和后端使用RxJS。咱们将转换文档对象模型(DOM)并使用Node.js服务器中的WebSockets进行客户端 - 服务器通讯。前端

对于用户界面位,咱们将使用RxJS-DOM库,这是由RxJS制做的同一团队的库,它提供了方便的Operator来处理DOM和浏览器相关的东西,这将使咱们的编程更简洁。对于服务器部分,咱们将使用两个完善的节点库,并将一些API与Observables包装在一块儿,以便在咱们的应用程序中使用它们。node

在本章以后,您将可以使用RxJS以声明方式构建用户界面,使用咱们目前为止看到的技术并将它们应用于DOM。 您还能够在任何Node.js项目中使用RxJS,而且可以在任何项目中使用反应式编程和RxJS。程序员

创建一个实时地震Dashboard

咱们将为地震仪表板应用程序构建服务器和客户端部件,实时记录地震的位置并可视化显示。咱们将在Node.js中构建服务器,而且改进咱们的应用程序,使其更具互动性和更充足的信息量。正则表达式

一开始的代码以下:数据库

examples_earthquake/code1_3.jsnpm

var quakes = Rx.Observable
.interval(5000)
.flatMap(function() {
    return Rx.DOM.jsonpRequest({
        url: QUAKE_URL,
        jsonpCallback: 'eqfeed_callback'
    }).retry(3);
})
.flatMap(function(result) {
    return Rx.Observable.from(result.response.features);
})
.distinct(function(quake) { return quake.properties.code; });

quakes.subscribe(function(quake) {
    var coords = quake.geometry.coordinates;
    var size = quake.properties.mag * 10000;
    L.circle([coords[1], coords[0]], size).addTo(map);
});

image

这段代码已经有一个潜在的错误:它能够在DOM准备好以前执行,每当咱们尝试在代码中使用DOM元素时就会抛出错误。咱们想要的是在触发DOMContentLoaded事件以后加载咱们的代码,这表示浏览器已经准备好dom了。编程

RxJS-DOM提供Rx.DOM.readyObservable,当触发DOMContentLoaded时,它会发出一次。 所以,让咱们将代码包装在initialize函数中,并在订阅Rx.DOM.ready时执行它:json

examples_earthquake_ui/code1.jssegmentfault

function initialize() {
    var quakes = Rx.Observable
    .interval(5000)
    .flatMap(function() {
    return Rx.DOM.jsonpRequest({
        url: QUAKE_URL,
        jsonpCallback: 'eqfeed_callback'
    });
    })
    .flatMap(function(result) {
        return Rx.Observable.from(result.response.features);
    })
    .distinct(function(quake) { return quake.properties.code; });
    
    quakes.subscribe(function(quake) {
        var coords = quake.geometry.coordinates;
        var size = quake.properties.mag * 10000;
        L.circle([coords[1], coords[0]], size).addTo(map);
    });
}
Rx.DOM.ready().subscribe(initialize);

接下来,咱们将在HTML中添加一个空表,咱们将在下一部分填充地震数据:

<table>
    <thead>
        <tr>
            <th>Location</th>
            <th>Magnitude</th>
            <th>Time</th>
        </tr>
    </thead>
    <tbody id="quakes_info">
    </tbody>
</table>

有了这个,咱们准备开始为咱们的仪表板编写新代码。

添加地震列表

新仪表板的第一个功能是显示地震的实时列表,包括有关其位置,大小和日期的信息。此列表的数据与来自USGS网站的地图相同。咱们首先建立一个函数,在给定props对象参数的状况下返回一个row元素:

examples_earthquake_ui/code2.js

function makeRow(props) {
    var row = document.createElement('tr');
    row.id = props.net + props.code;
    var date = new Date(props.time);
    var time = date.toString();
    [props.place, props.mag, time].forEach(function(text) {
        var cell = document.createElement('td');
        cell.textContent = text;
        row.appendChild(cell);
    });
    return row;
}

props参数与咱们从USGS站点检索的JSON中的properties属性相同。

为了生成行,咱们将再次订阅地震Observable。此订阅会在表格中为每次收到的新地震建立一行。 咱们在initialize函数的末尾添加代码:

examples_earthquake_ui/code2.js

var table = document.getElementById('quakes_info');
quakes
.pluck('properties')
.map(makeRow)
.subscribe(function(row) { table.appendChild(row); });

pluck运算符从每一个地震对象中提取属性值,由于它包含makeRow所需的全部信息。 而后咱们将每一个地震对象映射到makeRow,将其转换为填充的HTML tr元素。 最后,在订阅中,咱们将每一个发出的行追加到咱们的table中。

每当咱们收到地震数据时,这应该获得一个数据稠密的表格。

看起来不错,并且很容易!不过,咱们能够作一些改进。首先,咱们须要探索RxJS中的一个重要概念:冷热Observable。

冷热Observable

不管Observers是否订阅它们,“热”Observable都会发出值。另外一方面,“冷”Observables从Observer开始订阅就发出整个值序列。

热Observable

订阅热Observable的Observer将接收从订阅它的确切时刻发出的值。在那一刻订阅的每一个其余Observer将收到彻底相同的值。 这相似于JavaScript事件的工做方式。

鼠标事件和股票交易代码是热的Observables的例子。在这两种状况下,Observable都会发出值,不管它是否有订阅者,而且在任何订阅者收听以前可能已经生成了值。这是一个例子:

hot_cold.js

var onMove = Rx.Observable.fromEvent(document, 'mousemove');
var subscriber1 = onMove.subscribe(function(e) {
    console.log('Subscriber1:', e.clientX, e.clientY);
});
var subscriber2 = onMove.subscribe(function(e) {
    console.log('Subscriber2:', e.clientX, e.clientY);
});
// Result:
// Subscriber1: 23 24
// Subscriber2: 23 24
// Subscriber1: 34 37
// Subscriber2: 34 37
// Subscriber1: 46 49
// Subscriber2: 46 49
// ...

在该示例中,两个订阅者在发出Observable时都会收到相同的值。 对于JavaScript程序员来讲,这种行为感受很天然,由于它相似于JavaScript事件的工做方式。

如今让咱们看看冷Observables是如何工做的。

冷Observable

只有当Observers订阅它时,冷Observable才会发出值。

例如,Rx.Observable.range返回一个冷Observable。订阅它的每一个新观察者都将收到整个范围:

hot_cold.js

function printValue(value) {
    console.log(value);
}
var rangeToFive = Rx.Observable.range(1, 5);
var obs1 = rangeToFive.subscribe(printValue); // 1, 2, 3, 4, 5
var obs2 = Rx.Observable
.delay(2000)
.flatMap(function() {
    return rangeToFive.subscribe(printValue); // 1, 2, 3, 4, 5
});

了解咱们什么时候处理热或冷的Observable对于避免细微和隐藏的错误相当重要。例如,Rx.Observable.interval返回一个Observable,它以固定的时间间隔生成一个递增的整数值。 想象一下,咱们想用它来将相同的值推送给几个观察者。 咱们能够像这样实现它:

hot_cold.js

var source = Rx.Observable.interval(2000);

var observer1 = source.subscribe(function (x) {
    console.log('Observer 1, next value: ' + x);
});

var observer2 = source.subscribe(function (x) {
    console.log('Observer 2: next value: ' + x);
});

输出

Observer 1, next value: 0
Observer 2: next value: 0
Observer 1, next value: 1
Observer 2: next value: 1
...

这彷佛没什么问题。 但如今想象咱们须要第二个用户在第一个用户加入后三秒钟加入:

hot_cold.js

var source = Rx.Observable.interval(1000);
var observer1 = source.subscribe(function (x) {
    console.log('Observer 1: ' + x);
});
setTimeout(function() {
    var observer2 = source.subscribe(function (x) {
        console.log('Observer 2: ' + x);
    });
}, 3000);

输出

Observer 1: 0
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 2: 0
Observer 1: 4
Observer 2: 1
...

如今咱们看到有些东西真的没了。三秒后订阅时,observer2接收源已经推送过的全部值,而不是从当前值开始并从那里继续,由于Rx.Observable.interval是一个冷Observable。 若是热和冷Observables之间的的区别不是很清楚的话,那么这样的场景可能会使人惊讶。

若是咱们有几个Observers订阅冷的Observable,他们将收到相同序列值的副本。严格来讲,尽管观察者共享相同的Observable,但它们并无共享相同的值序列。若是咱们但愿Observers共享相同的序列,咱们须要一个热的Observable。

从冷到热使用publish

咱们可使用publish将冷的Observable变成热的。调用publish会建立一个新的Observable,它充当原始Observable的代理。它经过订阅原始版本并将其收到的值推送给订阅者来实现。

已发布的Observable其实是一个ConnectableObservable,它有一个名为connect的额外方法,咱们调用它来开始接收值。 这容许咱们在开始运行以前订阅它:

hot_cold.js

// Create an Observable that yields a value every second
var source = Rx.Observable.interval(1000);
var publisher = source.publish();
// Even if we are subscribing, no values are pushed yet.
var observer1 = publisher.subscribe(function (x) {
    console.log('Observer 1: ' + x);
});
// publisher connects and starts publishing values
publisher.connect();
setTimeout(function() {
    // 5 seconds later, observer2 subscribes to it and starts receiving
    // current values, not the whole sequence.
    var observer2 = publisher.subscribe(function (x) {
        console.log('Observer 2: ' + x);
    });
}, 5000);

共享冷Observable

让咱们回到咱们的地震示例。到目前为止,咱们的代码看起来很合理;咱们有一个带有两个订阅的Observable地震:一个在地图上绘制地震,另外一个在表格中列出地震。

但咱们可使代码更有效率。 经过让两个地震用户,咱们实际上要求两次数据。 您能够经过在quakesflatMap操做符中放入一个console.log来检查。

发生这种状况是由于quakes是一个冷Observable,而且它会将全部值从新发送给每一个新订阅者,所以新订阅意味着新的JSONP请求。这会经过网络请求两次相同的资源来影响咱们的应用程序性能。

对于下一个示例,咱们将使用`share·运算符,当Observers的数量从0变为1时,它自动建立对Observable的预订。 这使咱们免于从新链接:

examples_earthquake_ui/code2.js

var quakes = Rx.Observable
.interval(5000)
.flatMap(function() {
    return Rx.DOM.jsonpRequest({
        url: QUAKE_URL,
        jsonpCallback: 'eqfeed_callback'
    });
})
.flatMap(function(result) {
    return Rx.Observable.from(result.response.features);
})
.distinct(function(quake) { return quake.properties.code; })
.share()

如今地震的行为就像一个热的Observable,咱们没必要担忧咱们链接多少观察者,由于他们都会收到彻底相同的数据。

缓冲值

咱们以前的代码运行良好,但请注意,每次咱们收到有关地震的信息时都会插入一个tr节点。 这是低效的,由于每次插入咱们都会修改DOM并致使从新绘制页面,使浏览器没必要要地计算新布局。 这可能会致使性能降低。

理想状况下,咱们会批处理几个传入的地震对象,并每隔几秒插入一批地震对象。手动实现会很棘手,由于咱们必须保留计数器和元素缓冲区,咱们必须记住每次批量重置它们。 可是使用RxJS,咱们可使用一个基于缓冲区的RxJS运算符,好比bufferWithTime

使用bufferWithTime,咱们能够缓冲传入的值,并在每x个时间段将它们做为数组释放:

examples_earthquake_ui/code3.bufferWithTime.js

var table = document.getElementById('quakes_info');
quakes
.pluck('properties')
.map(makeRow)
❶ .bufferWithTime(500)
❷ .filter(function(rows) { return rows.length > 0; }
.map(function(rows) {
var fragment = document.createDocumentFragment();
rows.forEach(function(row) {
❸ fragment.appendChild(row);
});
return fragment;
})
.subscribe(function(fragment) {
❹ table.appendChild(fragment);
});

这是新代码中正在发生的事情:

  1. B缓存每一个传入值并每500毫秒释放一批值。
  2. 不管如何,bufferWithTime每500ms执行一次,若是没有传入值,它将产生一个空数组。 咱们会过滤掉这些空数组。
  3. 咱们将每一行插入一个文档片断,这是一个没有父文档的文档。这意味着它不在DOM中,而且修改其内容很是快速和有效。
  4. 最后,咱们将片断附加到DOM。附加片断的一个优势是它被视为单个操做,只会致使一次重绘。 它还将片断的子元素附加到咱们附加片断自己的同一元素。

使用缓冲区和片断,咱们设法保持行插入性能,同时保持应用程序的实时性(最大延迟为半秒)。 如今咱们已准备好为咱们的仪表板添加下一个功能:交互性!

添加交互

咱们如今在地图上和列表中发生地震,但两个表示之间没有相互做用。例如,每当咱们点击列表上的地图时,就能够在地图上居中地震,并在咱们将鼠标移动到其行上时突出显示地图上带圆圈的地震。 咱们开始吧。

在Leaflet中,您能够在地图上绘制并将绘图放在各自的图层中,以便您能够单独操做它们。 让咱们建立一组名为quakeLayer的图层,咱们将存储全部地震圈。每一个圆圈都是该组内的一个图层。 咱们还将建立一个对象codeLayers,咱们将存储地震代码和内部图层ID之间的相关性,以便咱们能够经过地震ID来查找圆圈:

examples_earthquake_ui/code3.js

var codeLayers = {};
var quakeLayer = L.layerGroup([]).addTo(map);

如今,在初始化内部的地震Observable订阅中,咱们将每一个圆圈添加到图层组并将其ID存储在codeLayers中。 若是这看起来有点错综复杂,那是由于这是Leaflet容许咱们在地图中引用图层的惟一方式。

examples_earthquake_ui/code3.js

quakes.subscribe(function(quake) {
    var coords = quake.geometry.coordinates;
    var size = quake.properties.mag * 10000;
    var circle = L.circle([coords[1], coords[0]], size).addTo(map);
    quakeLayer.addLayer(circle);
    codeLayers[quake.id] = quakeLayer.getLayerId(circle);
});

咱们如今建立悬停效果。咱们将编写一个新函数isHovering,它返回一个Observable,它发出一个布尔值,表示在任何给定时刻鼠标是否在特定地震圈上:

examples_earthquake_ui/code3.js

❶ var identity = Rx.helpers.identity;
function isHovering(element) {
❷ var over = Rx.DOM.mouseover(element).map(identity(true));
❸ var out = Rx.DOM.mouseout(element).map(identity(false));
❹ return over.merge(out);
}
  1. Rx.helpers.identity是定义函数。 给定参数x,它返回x。 这样咱们就没必要编写返回它们收到的值的函数。
  2. over是一个Observable,当用户将鼠标悬停在元素上时会发出true。
  3. out是一个Observable,当用户将鼠标移动到元素以外时,它会发出false。
  4. isHovering将over和out合并,返回一个Observable,当鼠标悬停在元素上时发出true,当它离开时返回false。

使用isHovering,咱们能够修改建立rows的订阅,这样咱们就能够在建立时订阅每行中的事件:

examples_earthquake_ui/code3.js

var table = document.getElementById('quakes_info');
quakes
.pluck('properties')
.map(makeRow)
.bufferWithTime(500)
.filter(function(rows) { return rows.length > 0; })
.map(function(rows) {
    var fragment = document.createDocumentFragment();
    rows.forEach(function(row) {
        fragment.appendChild(row);
    });
    return fragment;
})
.subscribe(function(fragment) {
    var row = fragment.firstChild; // Get row from inside the fragment
    ❶ var circle = quakeLayer.getLayer(codeLayers[row.id]);
    ❷ isHovering(row).subscribe(function(hovering) {
        circle.setStyle({ color: hovering ? '#ff0000' : '#0000ff' });
    });
    ❸ Rx.DOM.click(row).subscribe(function() {
        map.panTo(circle.getLatLng());
    });
    table.appendChild(fragment);
})
  1. 咱们使用从行元素得到的ID在地图上获取地震的圆元素。 有了它,codeLayers为咱们提供了相应的内部ID,它使用quakeLayer.getLayer获取了circle元素。
  2. 咱们用当前行调用isHovering,而后咱们订阅生成的Observable。 若是悬停参数为真,咱们会将圆圈画成红色; 否则,它会是蓝色的。
  3. 咱们订阅了从当前行中的click事件建立的Observable。 单击列表中的行时,地图将以地图中相应圆圈为中心。

使其更高效

经验丰富的前端开发人员知道在页面上建立许多事件是致使性能不佳的一个因素。 在前面的示例中,咱们为每一行建立了三个事件。 若是咱们在列表中得到100次地震,咱们将在页面周围浮动300个事件,只是为了作一些亮点突出工做! 这对于表现来讲太糟糕了,咱们能够作得更好。

由于DOM中的事件老是冒泡(从子元素到父元素),前端开发人员中一个众所周知的技术是避免将鼠标事件单独附加到多个元素,而是将它们附加到父元素。 一旦在父项上触发了事件,咱们就可使用事件的target属性来查找做为事件目标的子元素。

由于咱们须要为事件click和mouseover提供相似的功能,因此咱们将建立一个函数getRowFromEvent:

examples_earthquake_ui/code3.pairwise.js

function getRowFromEvent(event) {
return Rx.Observable
.fromEvent(table, event)
❶ .filter(function(event) {
    var el = event.target;
    return el.tagName === 'TD' && el.parentNode.id.length;
})
❷ .pluck('target', 'parentNode')
❸ .distinctUntilChanged();
}

getRowFromEvent为咱们提供了事件发生的表行。 如下是详细信息:

  1. 咱们确保在表格单元格中发生事件,并检查该单元格的父级是不是具备ID属性的行。 这些行是咱们用地震ID标记的行。
  2. pluck运算符在element的target属性中提取嵌套属性parentNode。
  3. 这能够防止屡次得到相同的元素。 例如,使用mouseover事件会发生不少事情。

examples_earthquake_ui/code3.pairwise.js

在上一节中,咱们在每行上附加事件mouseover和mouseout,以便在每次鼠标输入或退出行时更改地震圈颜色。 如今,咱们将仅使用桌面上的mouseover事件,并结合方便的pairwise运算符:

examples_earthquake_ui/code3.pairwise.js

getRowFromEvent('mouseover')
.pairwise()
.subscribe(function(rows) {
    var prevCircle = quakeLayer.getLayer(codeLayers[rows[0].id]);
    var currCircle = quakeLayer.getLayer(codeLayers[rows[1].id]);
    prevCircle.setStyle({ color: '#0000ff' });
    currCircle.setStyle({ color: '#ff0000' });
});

pairwise将每一个发射值与先前在阵列中发射的值进行分组。 由于咱们老是得到不一样的行,因此成对将始终产生鼠标刚刚离开的行和鼠标如今悬停的行。 有了这些信息,就能够相应地为每一个地震圈着色。

处理click事件更简单:

examples_earthquake_ui/code3.pairwise.js

getRowFromEvent('click')
.subscribe(function(row) {
    var circle = quakeLayer.getLayer(codeLayers[row.id]);
    map.panTo(circle.getLatLng());
});

咱们能够回到订阅quakes来生成行:

examples_earthquake_ui/code3.pairwise.js

quakes
.pluck('properties')
.map(makeRow)
.subscribe(function(row) { table.appendChild(row); });

咱们的代码如今更加干净,而且它不依赖于别处的row。 若是没有row,getRowFromEvent将不会尝试产生任何item。

更重要的是,咱们的代码如今很是高效。 不管咱们检索的地震信息量如何,咱们老是只有一个鼠标悬停事件和单击事件,而不是数百个事件。

从Twitter获取实时更新

咱们为地震制做实时仪表板的计划的第二部分是从Twitter添加与地球上发生的不一样地震有关的报告和信息。 为此,咱们将建立一个小型Node.js程序,该程序将获取与地震相关的文章流。

设置咱们的Node.js环境

让咱们开始配置咱们的Node.js应用程序吧。除了RxJS,咱们将使用两个第三方模块:ws和twit。这种相似的模块都是让咱们保持最少的代码。

首先,让咱们为咱们的应用程序建立一个文件夹,并安装咱们将使用的模块。 (请注意,npm命令的输出可能会因软件包的当前版本而异。)

image

客户端 - 服务器通讯

如今咱们准备开始构建咱们的应用程序了。让咱们在tweet_stream文件夹中建立一个名为index.js的新文件来加载咱们将使用的模块:

examples_earthquake_ui/tweet_stream/index.js

var WebSocketServer = require('ws').Server; var Twit = require('twit');
var Rx = require('rx');

要使用Twitter API,您须要在Twitter网站中请求使用者密钥和访问令牌。 完成后,使用配置对象建立一个新的Twit对象,以下所示:

examples_earthquake_ui/tweet_stream/index.js

var T = new Twit({
    consumer_key: 'rFhfB5hFlth0BHC7iqQkEtTyw',
    consumer_secret: 'zcrXEM1jiOdKyiFFlGYFAOo43Hsz383i0cdHYYWqBXTBoVAr1x', 
    access_token: '14343133-nlxZbtLuTEwgAlaLsmfrr3D4QAoiV2fa6xXUVEwW9', 
    access_token_secret: '57Dr99wECljyyQ9tViJWz0H3obNG3V4cr5Lix9sQBXju1'
});

如今咱们能够建立一个函数onConnect,它将完成搜索推文和未来与客户端通讯的全部工做,而且咱们能够启动一个WebSocket服务器,一旦WebSocket链接并准备好就会调用onConnect:

examples_earthquake_ui/tweet_stream/index.js

function onConnect(ws) {
    console.log('Client connected on localhost:8080');
}
var Server = new WebSocketServer({ port: 8080 });
Rx.Observable.fromEvent(Server, 'connection').subscribe(onConnect);

咱们如今能够启动咱们的应用程序,它应该在端口8080上启动WebSocket链接:

~/tweet_stream$ node index.js

因为咱们还没有将任何浏览器链接到此服务器,所以还没有打印有关客户端链接的消息。如今让咱们切换到dashboard的代码并执行此操做。咱们将在RxJS-DOM中使用fromWebSocket运算符:

examples_earthquake_ui/code4.js

function initialize() {
    var socket = Rx.DOM.fromWebSocket('ws://127.0.0.1:8080'); ...

在前面的代码中,fromWebSocket建立一个Subject,做为WebSocket服务器的消息的发送者和接收者。 经过调用socket.onNext,咱们将可以向服务器发送消息,经过订阅套接字,咱们将收到服务器发送给咱们的任何消息。

咱们如今能够发送包含咱们收到的地震数据的服务器消息:

examples_earthquake_ui/code4.js

quakes.bufferWithCount(100)
.subscribe(function(quakes) {
    console.log(quakes);
    var quakesData = quakes.map(function(quake) {
        return {
            id: quake.properties.net + quake.properties.code,
            lat: quake.geometry.coordinates[1],
            lng: quake.geometry.coordinates[0],
            mag: quake.properties.mag
        };
    });
    socket.onNext(JSON.stringify({quakes: quakesData }));
});

咱们能够为来自服务器的消息设置订阅者:

examples_earthquake_ui/code4.js

socket.subscribe(function(message) {
    console.log(JSON.parse(message.data));
});

如今,当咱们从新加载浏览器时,客户端消息应出如今服务器终端中:

~/tweet_stream$ node index.js
Client connected on localhost:8080

太棒了! 一旦开始从远程JSONP资源接收地震,浏览器就应该向服务器发送命令。 可是如今,服务器彻底忽略了这些消息。 是时候回到咱们的推文流代码并用它们作点什么了。

首先,咱们将链接到从浏览器客户端到达服务器的消息事件。 每当客户端发送消息时,WebSocket服务器都会发出包含消息内容的消息事件。 在咱们的例子中,内容是一个JSON字符串。

咱们能够在onConnect函数中编写如下代码:

examples_earthquake_ui/tweet_stream/index.js

var onMessage = Rx.Observable.fromEvent(ws, 'message')
.subscribe(function(quake) {
    quake = JSON.parse(quake);
    console.log(quake);
});

若是咱们从新启动服务器(终端中的Ctrl-C)并从新加载浏览器,咱们应该会看到终端上的地震细节打印出来。这是完美的。 如今咱们已经准备好开始寻找与咱们的地震有关的推文了。

检索和发送推文

咱们正在使用Node.js twit的流式Twitter客户端链接到Twitter和搜索推文。 从如今开始,服务器中的全部代码都将在onConnect函数内部发生,由于它假定已经创建了与WebSocket的链接。 让咱们初始化推文流:

examples_earthquake_ui/tweet_stream/index.js

var stream = T.stream('statuses/filter', {
    track: 'earthquake',
    locations: []
});

这告诉咱们的Twit实例T开始流式传输Twitter状态,按关键字地震过滤。 固然,这是很是通用的,而不是与如今发生的地震直接相关。 但请注意空位置数组。 这是一个纬度和经度边界的数组,咱们能够用它们按地理位置过滤推文,以及地震一词。 那更加具体! 好的,让咱们订阅这个流并开始向浏览器发送推文:

examples_earthquake_ui/tweet_stream/index.js

Rx.Observable.fromEvent(stream, 'tweet').subscribe(function(tweetObject) {
    ws.send(JSON.stringify(tweetObject), function(err) {
        if (err) {
            console.log('There was an error sending the message');
        }
    });
});

若是咱们从新启动服务器并从新加载浏览器,咱们应该在浏览器中收到推文,开发面板中的控制台应该打印推文。

这些推文还没有按地震位置进行过滤。 为此,咱们须要对收到的每一条地震信息作如下事情:

  • 取每一个地震的经度和纬度对的震中坐标,建立一个边界框,界定咱们认为与地震相关的推文的地理区域。
  • 累积全部边界坐标,以便发送给客户端的推文与地图上的地震保持相关。
  • 每次收到新地震的消息时,都会使用新坐标更新twit流。

这是一种方法:

examples_earthquake_ui/tweet_stream/index.js

Rx.Observable
.fromEvent(ws, 'message')
.flatMap(function(quakesObj){
    quakesObj = JSON.parse(quakesObj);
    return Rx.Observable.from(quakesObj.quakes);
    })
❶ .scan([], function(boundsArray, quake) {
❷ var bounds = [
    quake.lng - 0.3, quake.lat - 0.15,
    quake.lng + 0.3, quake.lat + 0.15
].map(function(coordinate) {
    coordinate = coordinate.toString();
    return coordinate.match(/\-?\d+(\.\-?\d{2})?/)[0];
});
boundsArray.concat(bounds);
❸   return boundsArray.slice(Math.max(boundsArray.length - 50, 0));
})
❹ .subscribe(function(boundsArray) {
    stream.stop();
    stream.params.locations = boundsArray.toString();
    stream.start();
});

如下是前面代码中发生的事情的一步一步:

  1. 咱们再次见到咱们的老朋友scan。 任什么时候候咱们须要累积结果并产生每一个中间结果,scan是咱们的朋友。 在这种状况下,咱们将继续在boundsArray数组中累积地震坐标。
  2. 从地震震中的单纬度/经度坐标对,咱们建立一个阵列,其中包含由西北坐标和东南坐标肯定的区域。 用于近似边界的数字建立了一个大城市大小的矩形。以后,咱们使用正则表达式将每一个坐标的小数精度限制为两位小数,以符合Twitter API要求。
  3. 咱们将生成的边界链接到boundsArray,它包含之前每一个地震的边界。 而后咱们采用最后25对边界(数组中的50个项目),由于这是Twitter API的限制。
  4. 最后,咱们订阅了Observable,在onNext函数中,咱们从新启动当前的twit流来从新加载更新的位置,以便经过咱们新的累积位置数组进行过滤,转换为字符串。

从新启动服务器并从新加载浏览器后,咱们应该在浏览器应用程序中收到相关的推文。 可是如今,咱们只能看到开发人员控制台中显示的原始对象。 在下一节中,咱们将生成HTML以在仪表板中显示推文。

在Dashboard上显示推文

既然咱们正在接收来自服务器的推文,那么剩下要作的就是在屏幕上很好地展现它们。 为此,咱们将建立一个新的HTML元素,咱们附加传入的推文:

examples_earthquake_ui/index_final.html

<div id="tweet_container"></div>

咱们还将更新socket Observable订阅以处理传入的tweet对象并将它们附加到咱们刚刚建立的tweet_container元素:

examples_earthquake_ui/code5.js

socket
.map(function(message) { return JSON.parse(message.data); })
.subscribe(function(data) {
    var container = document.getElementById('tweet_container');
    container.insertBefore(makeTweetElement(data), container.firstChild);
});

任何新的推文都会出如今列表的顶部,它们将由makeTweetElement建立,这是一个建立推文元素的简单函数,并使用咱们做为参数传递的数据填充它:

examples_earthquake_ui/code5.js

function makeTweetElement(tweetObj) {
    var tweetEl = document.createElement('div');
    tweetEl.className = 'tweet';
    var content = '<img src="$tweetImg" class="avatar" />' +
    '<div class="content">$text</div>' +
    '<div class="time">$time</div>';
    
    var time = new Date(tweetObj.created_at);
    var timeText = time.toLocaleDateString() + ' ' + time.toLocaleTimeString();
    content = content.replace('$tweetImg', tweetObj.user.profile_image_url);
    content = content.replace('$text', tweetObj.text);
    content = content.replace('$time', timeText);
    tweetEl.innerHTML = content;
    return tweetEl;
}

有了这个,咱们终于有了一个带有相关的地理定位推文的侧边栏,可让咱们更深刻地了解受地震影响的区域。

改进的想法

此仪表板已经正常运行,但能够进行许多改进。 一些想法,使它更好:

  • 添加更多地震数据库。 USGS是一个很棒的资源,但它主要提供在美国发生的地震。 合并来自世界各地的地震报告,而不只仅是美国,并在地图中将它们所有展现在一块儿将会颇有趣。 为此,您可使用mergemergeAll的帮助,并使用distinct与选择器函数来避免重复。
  • 每当用户点击推文时,将地图置于相关地震中心。 这将涉及经过地震在服务器上对推文进行分组,而且您可能但愿使用groupBy运算符将推文分组到特定地理区域。

总结

在本章中,咱们使用RxJS建立了一个响应式用户界面,使咱们可以实时查看地球上发生的地震的各类数据。咱们在浏览器客户端和Node.js服务器中都使用了RxJS,显示了使用Observable管理应用程序的不一样区域是多么容易。

更重要的是,咱们已经看到咱们能够在客户端和服务器上以相同的方式使用RxJS,在咱们的应用程序中随处可见Observable序列抽象。 不只如此。咱们实际上能够在其余编程语言中使用RxJS概念和运算符,由于许多编程语言都支持RxJS。

接下来咱们将介绍Scheduler,它是RxJS中更高级的对象类型,它容许咱们更精确地控制时间和并发性,并为测试代码提供了很大的帮助。

关注个人微信公众号,更多优质文章定时推送
clipboard.png

相关文章
相关标签/搜索