Rxjs 响应式编程-第三章: 构建并发程序

Rxjs 响应式编程-第一章:响应式
Rxjs 响应式编程-第二章:序列的深刻研究
Rxjs 响应式编程-第三章: 构建并发程序
Rxjs 响应式编程-第四章 构建完整的Web应用程序
Rxjs 响应式编程-第五章 使用Schedulers管理时间
Rxjs 响应式编程-第六章 使用Cycle.js的响应式Web应用程序html

构建并发程序

并发是正确有效地同时作几件事的艺术。为了实现这一目标,咱们构建咱们的程序来利用时间,以最有效的方式一块儿运行任务。 应用程序中的平常并发示例包括在其余活动发生时保持用户界面响应,有效地处理数百个客户的订单。react

在本章中,咱们将经过为浏览器制做一个用于射击的太空飞船游戏来探索RxJS中的并发性和纯函数。咱们将首先介绍Observable管道,这是一种连接Observable运算符并在它们之间传递状态的技术。而后,我将向您展现如何使用管道来构建程序,而不依赖于外部状态或反作用,将全部逻辑和状态封装在Observables自己中。编程

视频游戏是须要保持不少状态的计算机程序,可是咱们将使用Observable管道和一些优秀的RxJS运算符的功能编写咱们的游戏,没有任何外部状态。canvas

简洁和可观察的管道

Observable管道是一组连接在一块儿的运算符,其中每一个运算符都将Observable做为输入并返回Observable做为输出。 咱们一直在使用本书中的管道; 在使用RxJS进行编程时,它们无处不在。 下面是一个简单的事例:segmentfault

spaceship_reactive/pipeline.js数组

Rx.Observable
    .from(1, 2, 3, 4, 5, 6, 7, 8)
    .filter(function(val) { return val % 2; })
    .map(function(val) { return val * 10; });

管道是独立的。 全部状态从一个运算符流向下一个运算符,而不须要任何外部变量。可是当咱们构建咱们的响应式程序时,咱们可能会想要将状态存储在Observable管道以外(咱们在Side Effects和External State中讨论了外部状态)。这迫使咱们跟踪咱们在管道外设置的变量,全部这些bean计数都很容易致使错误。为避免这种状况,管道中的运算符应始终使用纯函数。promise

在相同输入的状况下,纯函数始终返回相同的输出。当咱们能够保证程序中的函数不能修改其余函数依赖的状态时,设计具备高并发性的程序更容易。这就是纯粹的功能给咱们带来的东西。浏览器

避免外部状态

在下面的例子中,咱们计算到目前为止每隔一秒产生的偶数。咱们经过从interval建立一个Observable并在咱们收到的值是偶数时增长evenTicks:缓存

spaceship_reactive/state.js服务器

var evenTicks = 0;

function updateDistance(i) {
    if (i % 2 === 0) {
        evenTicks += 1;
    }
    return evenTicks;
}

var ticksObservable = Rx.Observable
    .interval(1000)
    .map(updateDistance)
    
ticksObservable.subscribe(function() {
    console.log('Subscriber 1 - evenTicks: ' + evenTicks + ' so far');
});

这是程序运行四秒后获得的输出:

Subscriber 1 - evenTicks: 1 so far
Subscriber 1 - evenTicks: 1 so far
Subscriber 1 - evenTicks: 2 so far
Subscriber 1 - evenTicks: 2 so far

如今,测试一下,让咱们为ticksObservable添加另外一个订阅者:

spaceship_reactive/state.js

var evenTicks = 0;
function updateDistance(i) {
    if (i % 2 === 0) {
        evenTicks += 1;
    }
    return evenTicks;
}

var ticksObservable = Rx.Observable
    .interval(1000)
    .map(updateDistance)
    
ticksObservable.subscribe(function() {
    console.log('Subscriber 1 - evenTicks: ' + evenTicks + ' so far');
});

ticksObservable.subscribe(function() {
    console.log('Subscriber 2 - evenTicks: ' + evenTicks + ' so far');
});

输出如今以下:

Subscriber 1 - evenTicks: 1 so far
Subscriber 2 - evenTicks: 2 so far
Subscriber 1 - evenTicks: 2 so far
Subscriber 2 - evenTicks: 2 so far
Subscriber 1 - evenTicks: 3 so far
Subscriber 2 - evenTicks: 4 so far
Subscriber 1 - evenTicks: 4 so far
Subscriber 2 - evenTicks: 4 so far

等等--第二个订阅者的偶数计数不该该起做用的!他应该跟第一个订阅者的计数彻底一致。正如您可能已经猜到的那样,Observable管道将为每一个订户运行一次,增evenTicks两次。

共享外部状态引发的问题一般比这个例子更微妙。在复杂的应用程序中,打开通向管道外部状态的大门会致使代码变得复杂,而且很快就会出现错误。解决方案是尽量多地封装管道内的信息。 这是咱们能够重构前面的代码以免外部状态的方法:

spaceship_reactive/state.js

function updateDistance(acc, i) {
    if (i % 2 === 0) {
        acc += 1;
    }
    return acc;
}

var ticksObservable = Rx.Observable
    .interval(1000)
    .scan(updateDistance, 0);
    
ticksObservable.subscribe(function(evenTicks) {
    console.log('Subscriber 1 - evenTicks: ' + evenTicks + ' so far');
});

ticksObservable.subscribe(function(evenTicks) {
    console.log('Subscriber 2 - evenTicks: ' + evenTicks + ' so far');
});

预期输出:

Subscriber 1 - evenTicks: 1 so far
Subscriber 2 - evenTicks: 1 so far
Subscriber 1 - evenTicks: 1 so far
Subscriber 2 - evenTicks: 1 so far
Subscriber 1 - evenTicks: 2 so far
Subscriber 2 - evenTicks: 2 so far
Subscriber 1 - evenTicks: 2 so far
Subscriber 2 - evenTicks: 2 so far

使用scan,咱们彻底避免外部状态。咱们将累计的偶数传递给updateDistance而不是依赖外部变量来保持累积值。 这样咱们就不会增长每一个新订户的计数。

大多数时候咱们能够避免依赖外部状态。使用它的常见方案是缓存值或跟踪程序中更改的值。 可是,正如您将在前面的Spaceship Reactive!中看到的那样,能够经过其余几种方式处理这些场景。例如,当咱们须要缓存值时,RxJS的Subject Class(后面会讲到)能够提供不少帮助,当咱们须要跟踪游戏的先前状态时,咱们可使用像Rx.Observable.scan这样的方法。

管道是高效的

我第一次将一堆操做符连接到管道中来转换序列,个人直觉是它不可能有效。我知道经过连接运算符在JavaScript中转换数组是很昂贵的。然而在本书中,咱们经过将序列转换为新序列来设计程序。 这会不会很低效呢?

连接在Observables和数组中看起来相似; 也都有filtermap等方法。可是有一个相当重要的区别:数组方法因为每一个操做而建立一个新数组,而且彻底由下一个操做符转换。另外一方面,可观察的管道不会建立中间的Observable,而且能够一次性将全部操做应用于每一个元素。所以,Observable仅被遍历一次,这使得Observable连接变得高效。 看看如下示例:

spaceship_reactive/array_chain.js

stringArray // represents an array of 1,000 strings
    .map(function(str) {
        return str.toUpperCase();
    })
    .filter(function(str) {
        return /^[A-Z]+$/.test(str);
    })
    .forEach(function(str) {
        console.log(str);
    });

假设stringArray是一个包含1,000个字符串的数组,咱们要将其转换为大写,而后过滤掉包含字母字符之外的任何字符串(或根本没有字母)。而后咱们要将结果数组的每一个字符串打印到控制台。

这是背后发生的事情:

  1. 遍历数组并建立一个包含全部项大写的新数组。
  2. 遍历大写数组,建立另外一个包含1,000个元素的数组。
  3. 遍历筛选的数组并将每一个结果记录到控制台。

在转换数组的过程当中,咱们迭代了三次数组并建立了两个全新的大数组。 这很是低效! 若是您关注性能或者处理大量项目,则不该该以这种方式编程。

spaceship_reactive/array_chain.js

stringObservable // represents an observable emitting 1,000 strings
    .map(function(str) {
        return str.toUpperCase();
    })
    .filter(function(str) {
        return /^[A-Z]+$/.test(str);
    })
    .subscribe(function(str) {
        console.log(str);
    });

Observable的管道看起来与数组链很是类似,可是又不一样。 在一个Observable中,在咱们订阅它以前,没有任何事情发生过,不管咱们应用了多少查询和转换。 当咱们调用像map这样的变换时,咱们其实只运行了一个函数,它将对数组的每一个项目进行一次操做。 所以,在前面的代码中,这将是会发生的事情:

  1. 建立一个大写函数,该函数将应用于Observable的每一个项目,并在Observer订阅它时返回将发出这些新项目的Observable。
  2. 使用先前的大写函数组合过滤器函数,并返回一个Observable,它将发出新项目,大写和过滤,但仅在Observable订阅时候,才会运行。
  3. 经过订阅Observable来发布,经过咱们全部操做器的数据将会被发布出来。

使用Observables,咱们只会查看一次列表,只有在绝对须要时才会应用转换。 例如,假设咱们在上一个示例中添加了一个take运算符:

spaceship_reactive/array_chain.js

stringObservable
    .map(function(str) {
        return str.toUpperCase();
    })
    .filter(function(str) {
        return /^[A-Z]+$/.test(str);
    })
    .take(5)
    .subscribe(function(str) {
        console.log(str);
    });

take使得Observable只发出咱们指定的前n个项目。在咱们的例子中,n是五,因此在数千个数据中,咱们只会收到前五个。 很酷的部分是咱们的代码永远不会遍历全部项目; 只会遍历前5个。

这使开发人员的生活更加轻松。 你能够放心,在操做序列时,RxJS只会作必要的工做。 这种操做方式称为惰性评估,在Haskell和Miranda等函数式语言中很是常见。

RxJS的主体类

Subject是一种实现Observer和Observable类型的类型。 做为Observer,它能够订阅Observable,而且做为Observable,它能够生成值并让Observers订阅它。

在某些状况下,单个Subject能够执行Observers和Observables组合的工做。例如,为了在数据源和Subject的侦听器之间建立代理对象,咱们可使用:

spaceship_reactive/subjects.js

var subject = new Rx.Subject();
var source = Rx.Observable.interval(300)
    .map(function(v) { return 'Interval message #' + v; })
    .take(5);
    
source.subscribe(subject);

var subscription = subject.subscribe(
    function onNext(x) { console.log('onNext: ' + x); },
    function onError(e) { console.log('onError: ' + e.message); },
    function onCompleted() { console.log('onCompleted'); }
);

subject.onNext('Our message #1');
subject.onNext('Our message #2');

setTimeout(function() {
    subject.onCompleted();
}, 1000);

输出:

onNext: Our message #1
onNext: Our message #2
onNext: Interval message #0
onNext: Interval message #1
onNext: Interval message #2
onCompleted

在前面的示例中,咱们建立了一个新的Subject和一个每秒发送一个整数的源Observable。 而后咱们给Subject订阅Observable。以后,咱们订阅了一个Observer到Subject自己。 Subject自己如今表现为Observable。

接下来,咱们使Subject发出本身的值(message1和message2)。在最终结果中,咱们获取Subject本身的消息,而后从源Observable获取代理值。来自Observable的值后来由于它们是异步的,而咱们当即使Subject的本身的值。请注意,即便咱们告诉源Observable采用前五个值,输出也只显示前三个。那是由于在一秒以后咱们在主题上调用onCompleted。 这将完成对全部订阅的通知,并在这种状况下覆盖take操做符。

Subject类为建立更专业的Subject提供了基础。事实上,RxJS带有一些有趣的:AsyncSubjectReplaySubjectBehaviorSubject

AsyncSubject

仅当序列完成时,AsyncSubject才会仅发出序列的最后一个值。而后永远缓存此值,而且在发出值以后订阅的任何Observer将当即接收它。AsyncSubject便于返回单个值的异步操做,例如Ajax请求。

让咱们看一个订阅range的AsyncSubject的简单示例:

spaceship_reactive/subjects.js

var delayedRange = Rx.Observable.range(0, 5).delay(1000);
var subject = new Rx.AsyncSubject();
delayedRange.subscribe(subject);
subject.subscribe(
    function onNext(item) { console.log('Value:', item); },
    function onError(err) { console.log('Error:', err); },
    function onCompleted() { console.log('Completed.'); }
);

在该示例中,delayedRange在延迟一秒以后发出值0到4。而后咱们建立一个新的AsyncSubject主题并将其订阅到delayedRange。 输出以下:

Value: 4
Completed.

按照预期,咱们只获得Observer发出的最后一个值。如今让咱们使用AsyncSubject来实现更真实的场景。 咱们将获取一些远程内容:

spaceship_reactive/subjects.js

function getProducts(url) {
    var subject;
    return Rx.Observable.create(function(observer) {
        if (!subject) {
            subject = new Rx.AsyncSubject();
            Rx.DOM.get(url).subscribe(subject);
        }
        return subject.subscribe(observer);
    });
}

var products = getProducts('/products');

// Will trigger request and receive the response when read
products.subscribe(
    function onNext(result) { console.log('Result 1:', result.response); },
    function onError(error) { console.log('ERROR', error); }
);

// Will receive the result immediately because it's cached
setTimeout(function() {
    products.subscribe(
        function onNext(result) { console.log('Result 2:', result.response); },
        function onError(error) { console.log('ERROR', error); }
    );
}, 5000)

在此代码中,当使用URL调用getProducts时,它将返回一个Observer,该Observer将发出HTTP GET请求的结果。 如下是它如何分解:

  1. getProducts返回一个Observable序列。 咱们在这里建立它。
  2. 若是咱们尚未建立AsyncSubject,咱们建立它并将订阅Rx.DOM.Request.get(url)返回的Observable。
  3. 咱们将Observer订阅到AsyncSubject。每次Observer订阅Observable时,它实际上都会订阅AsyncSubject,它做为Observable检索URL和Observers之间的代理。
  4. 咱们建立Observable来检索URL“products”并将其存储在products变量中。
  5. 这是第一个订阅,将启动URL检索并在检索URL时记录结果。
  6. 这是第二个订阅,在第一个订阅后运行五秒钟。因为此时已经检索到URL,所以不须要其余网络请求。 它将当即收到请求的结果,由于它已存储在AsyncSubject中了。

有趣的是,咱们正在使用一个订阅Rx.DOM.Request.get这个Observable的AsyncSubject。 因为AsyncSubject缓存最后的结果,所以对产品的任何后续订阅都将当即收到结果,而不会致使其余网络请求。每当咱们指望单个结果并但愿保留它时,咱们就可使用AsyncSubject。

这是否意味着AsyncSubject像Promise同样?

确实。

AsyncSubject表示异步操做的结果,您能够将其用做promise的替代。内部的区别在于promise只会处理单个值,而AsyncSubject处理序列中的全部值,只会发送(和缓存)最后一个值。

可以如此轻松地模拟Promise显示了RxJS模型的灵活性。(即便没有AsyncSubject,使用Observables模拟一个承诺也很容易。)

BehaviorSubject

当Observer订阅BehaviorSubject时,它接收最后发出的值,而后接收全部后续值。BehaviorSubject要求咱们提供一个起始值,以便全部Observers在订阅BehaviorSubject时始终会收到一个值。

想象一下,咱们想要检索一个远程文件并在HTML页面上输出它的内容,但咱们在等待内容时须要占位符文本。 咱们可使用BehaviorSubject

spaceship_reactive/behavior_subject.js

var subject = new Rx.BehaviorSubject('Waiting for content');
subject.subscribe(
    function(result) {
        document.body.textContent = result.response || result;
    },
    function(err) {
        document.body.textContent = 'There was an error retrieving content';
    }
);
Rx.DOM.get('/remote/content').subscribe(subject);

如今,HTML正文包含咱们的占位符文本,它将保持这种状态,直到Subject发出新值。最后,咱们请求咱们想要的资源,并将咱们的Subject订阅到生成的Observer。

BehaviorSubject保证始终至少发出一个值,由于咱们在其构造函数中提供了一个默认值。一旦BehaviorSubject完成,它将再也不发出任何值,释放缓存值使用的内存。

ReplaySubject

ReplaySubject缓存其值并将其从新发送到任何较晚的Observer。 与AsyncSubject不一样,不须要为此完成序列。

Subject

var subject = new Rx.Subject();

subject.onNext(1);
subject.subscribe(function(n) {         
    console.log('Received value:', n);
});
subject.onNext(2);
subject.onNext(3);

输出以下:

Received value: 2 
Received value: 3

ReplaySubject

var subject = new Rx.ReplaySubject();

subject.onNext(1);

subject.subscribe(function(n) {     
    console.log('Received value:', n);
});

subject.onNext(2);
subject.onNext(3);

输出以下:

Received value: 1
Received value: 2
Received value: 3

ReplaySubject有助于确保Observers从一开始就获取Observable发出的全部值。它使咱们免于编写凌乱的代码来缓存之前的值,从而帮助咱们减小了不少错误。

固然,要实现该行为,ReplaySubject会将全部值缓存在内存中。 为了防止它占用太多内存,咱们能够经过缓冲区大小限制它存储的数据量,或者经过将特定参数传递给构造函数来限制它。

ReplaySubject构造函数的第一个参数接受一个数字,表示咱们要缓冲的值的数量:

var subject = new Rx.ReplaySubject(2); // Buffer size of 2

subject.onNext(1);
subject.onNext(2);
subject.onNext(3);

subject.subscribe(function(n) { 
    console.log('Received value:', n);
});

输出以下:

Received value: 2 
Received value: 3

第二个参数采用一个数字来表示咱们想要缓冲值的时间(以毫秒为单位):

var subject = new Rx.ReplaySubject(null, 200); // Buffer size of 200ms

setTimeout(function() { 
    subject.onNext(1); 
}, 100);

setTimeout(function() { 
    subject.onNext(2); 
}, 200); 

setTimeout(function() { 
    subject.onNext(3); 
}, 300); 

setTimeout(function() {
    subject.subscribe(function(n) { 
        console.log('Received value:', n);
    });
     subject.onNext(4);
}, 350);

在这个例子中,咱们根据时间而不是值的数量设置缓冲区。 咱们的ReplaySubject将缓存最多200毫秒前发出的值。 咱们发出三个值,每一个值相隔100毫秒,350毫秒后咱们订阅一个Observer,而后咱们发出另外一个值。 在订阅时,缓存的项目是2和3,由于1发生在好久之前(大约250毫秒前),因此它再也不被缓存。

Subject是一个强大的工具,能够为您节省大量时间。 它们为缓存和重复等常见场景提供了很好的解决方案。 由于他们的核心只是观察者和观察者,因此你不须要学习任何新东西。

响应式的飞船

为了展现咱们如何保持一个应用程序的纯粹,咱们将构建一个视频游戏,其中咱们的英雄将和无尽的敌人宇宙飞船战斗。 咱们将大量使用Observable管道,而且我会指出在可能很容易将状态存储在管道外的状况以及如何避免它。

众所周知,视频游戏会保留不少外部状态分数,字符,定时器等的屏幕坐标。咱们的计划是在不依赖于保持状态的单个外部变量的状况下构建整个游戏。

在咱们的游戏中,玩家将使用鼠标水平移动飞船,并经过单击鼠标或点击空格键进行射击。咱们的游戏将有四个主要角色:背景中的移动星球场,玩家的宇宙飞船,敌人,以及来自玩家和敌人的子弹。

它看起来像这样:

image

在屏幕截图中,红色三角形是咱们的宇宙飞船,绿色三角形是敌人。 较小的三角形是子弹。

让咱们从设置阶段开始; 这将是咱们的HTML文件:

spaceship_reactive/spaceship.html

<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8">
        <title>Spaceship Reactive!</title>
        <script src="../rx.all-4.0.0.js"></script> <style>
            html, body {
                margin: 0;
                padding: 0;
            }
        </style>
    </head>
    <body>
        <script src="spaceship.js"></script>
    </body>
</html>

它只是一个简单的HTML文件,能够加载咱们将在本章其他部分使用的JavaScript文件。在那个JavaScript文件中,咱们首先设置一个canvas元素,咱们将在其中渲染咱们的游戏:

spaceship_reactive/starfield_1.js

var canvas = document.createElement('canvas');
var ctx = canvas.getContext("2d");
document.body.appendChild(canvas); 
canvas.width = window.innerWidth; 
canvas.height = window.innerHeight;

有了这个,咱们就能够开始编写咱们游戏的组件了。 首先让咱们画出咱们的星空背景。

建立星空背景

咱们在太空中设置游戏所需的第一件事就是星空。咱们将建立一个向下滚动的星空,以提供穿越太空的感受。 为此,咱们首先使用range运算符生成星标:

spaceship_reactive/starfield_1.js

var SPEED = 40;
var STAR_NUMBER = 250;
var StarStream = Rx.Observable.range(1, STAR_NUMBER)
.map(function() { 
    return {
          x: parseInt(Math.random() * canvas.width),
          y: parseInt(Math.random() * canvas.height),
          size: Math.random() * 3 + 1
    }; 
})

每一个星形将由一个包含随机坐标和1到4之间大小的对象表示。这段代码将为咱们提供一个生成250个这些“星星”的流。

咱们但愿这些星星保持前进。一种方法是每隔几毫秒增长全部星星的y坐标。咱们将使用toArray将StarStream Observable转换为数组,而后将发出一个包含range生成的全部对象的数组。而后咱们可使用flatMap运算符来获取该数组,该运算符将Observable转换为每隔几毫秒产生一个值的数据。使用map咱们能够增长原始数组的每一个项目中的y坐标:

spaceship_reactive/starfield_1.js

var SPEED = 40;
var STAR_NUMBER = 250;
var StarStream = Rx.Observable.range(1, STAR_NUMBER)
.map(function() { 
    return {
          x: parseInt(Math.random() * canvas.width),
          y: parseInt(Math.random() * canvas.height),
          size: Math.random() * 3 + 1
    }; 
})
.toArray() 
.flatMap(function(starArray) {
    return Rx.Observable.interval(SPEED)
    .map(function() { 
        starArray.forEach(function(star) {
            if (star.y >= canvas.height) {
                star.y = 0; // Reset star to top of the screen
            }
            star.y += 3; // Move star 
        });
        return starArray; 
    });
})

在地图中,咱们检查星醒y坐标是否已经在屏幕以外,若是是的话,咱们将其重置为0.经过改变每一个星星中的坐标,咱们能够始终使用相同的星星阵列。

如今咱们须要一个小的辅助函数,在画布上“绘制”一系列星星:

spaceship_reactive/starfield_1.js

function paintStars(stars) {
ctx.fillStyle = '#000000';
ctx.fillRect(0, 0, canvas.width, canvas.height);
ctx.fillStyle = '#ffffff';
stars.forEach(function(star) {
    ctx.fillRect(star.x, star.y, star.size, star.size);
  });
}

paintStars绘制黑色背景并在画布上绘制星星。 实现移动星星的惟一方法是订阅Observable并使用生成的数组调用paintStars。 这是最终的代码:

spaceship_reactive/starfield_1.js

function paintStars(stars) {
    ctx.fillStyle = '#000000';
    ctx.fillRect(0, 0, canvas.width, canvas.height);
    ctx.fillStyle = '#ffffff';
    stars.forEach(function(star) {
        ctx.fillRect(star.x, star.y, star.size, star.size);
    });
}

var SPEED = 40;
var STAR_NUMBER = 250;
var StarStream = Rx.Observable.range(1, STAR_NUMBER)
.map(function() { 
    return {
          x: parseInt(Math.random() * canvas.width),
          y: parseInt(Math.random() * canvas.height),
          size: Math.random() * 3 + 1
    }; 
})
.toArray() 
.flatMap(function(starArray) {
    return Rx.Observable.interval(SPEED).map(function() { 
        starArray.forEach(function(star) {
            if (star.y >= canvas.height) {
                star.y = 0; // Reset star to top of the screen
            }
            star.y += 3; // Move star 
        });
        return starArray; });
}).subscribe(function(starArray) {
    paintStars(starArray);
});

如今咱们已经绘制好了舞台,如今是咱们的英雄出场的时候了。

添加玩家的太空飞船

如今咱们拥有美丽的星空背景,咱们已准备好对英雄的宇宙飞船编程。虽然咱们的宇宙飞船看似简单,但它是游戏中最重要的对象。它是鼠标移动的观察者,它发出当前的鼠标x坐标和恒定的y坐标(玩家只能水平移动,因此咱们永远不会改变y坐标):

spaceship_reactive/hero_1.js

var HERO_Y = canvas.height - 30;
var mouseMove = Rx.Observable.fromEvent(canvas, 'mousemove'); 
var SpaceShip = mouseMove
.map(function(event) { 
    return {
        x: event.clientX,
        y: HERO_Y
    };
})
.startWith({
    x: canvas.width / 2,
    y: HERO_Y 
});

请注意,我使用了startWith。这将设置Observable中的第一个值,并将其设置为屏幕中间的位置。没有startWith咱们的Observable只有在玩家移动鼠标时才开始发射。

让咱们在屏幕上渲染咱们的英雄。在这个游戏中,全部角色都是三角形(个人图形设计技巧不是很使人印象深入),因此咱们将定义一个辅助函数来在画布上渲染三角形,给定坐标,大小和颜色,以及它们的朝向:

spaceship_reactive/hero_1.js

function drawTriangle(x, y, width, color, direction) { 
    ctx.fillStyle = color;
    ctx.beginPath();
    ctx.moveTo(x - width, y);
    ctx.lineTo(x, direction === 'up' ? y - width : y + width); 
    ctx.lineTo(x + width, y);
    ctx.lineTo(x - width,y);
    ctx.fill();
}

咱们还将定义paintSpaceShip,它使用辅助函数:

spaceship_reactive/hero_1.js

function paintSpaceShip(x, y) { 
    drawTriangle(x, y, 20, '#ff0000', 'up');
}

但咱们如今面临一个问题。 若是咱们订阅了SpaceShip Observable并在订阅中调用了drawTriangle,咱们的太空船只有在咱们移动鼠标时才能看到,并且只是瞬间。 这是由于starStream每秒更新画布不少次,若是咱们不移动鼠标就擦除咱们的太空船。因为starStream没法直接访问太空船,所以咱们没法在starStream订阅中渲染太空船。 咱们能够将最新的太空船坐标保存到starStream能够访问的变量中,可是咱们将修改外部状态的规则。 该怎么办?

一般状况下,RxJS有一个很是方便的operator,咱们能够用它来解决咱们的问题。

Rx.Observable.combineLatest是一个方便的operator。 它须要两个或更多Observable并在每一个Observable发出新值时发出每一个Observable的最后结果。知道starStream如此频繁地发出一个新项目(星星数组),咱们能够删除starStream的订阅并使用combineLatest结合starStream和SpaceShip Observables,并在它们发出任何新项目时当即更新它们:

spaceship_reactive/hero_1.js

function renderScene(actors) {
    paintStars(actors.stars);
    paintSpaceShip(actors.spaceship.x, actors.spaceship.y);
}
var Game = Rx.Observable.combineLatest(StarStream, SpaceShip, function(stars, spaceship) {
    return { stars: stars, spaceship: spaceship }; 
});

Game.subscribe(renderScene);

咱们如今使用renderScene函数绘制屏幕上的全部内容,所以您能够删除StarStream的如下订阅代码:

.subscribe(function(starArray) {
    paintStars(starArray);
});

有了这个,每当Observable发出新项目时,咱们都会画出星空背景和宇宙飞船。咱们如今有一艘宇宙飞船在太空中飞行,咱们可使用咱们的鼠标随意移动它。这么简短的代码还不错吧!可是咱们的英雄宇宙飞船在浩瀚的太空中太孤独了。 给它一些同伴怎么样?

生成敌人

若是咱们没有任何敌人,这将是一个很是无聊的游戏。 因此让咱们创造一个无限的流!咱们想要每两秒半创造一个新的敌人。让咱们看一下Enemies Observable的代码,而后仔细阅读:

spaceship_reactive/enemy_1.js

var ENEMY_FREQ = 1500;
var Enemies = Rx.Observable.interval(ENEMY_FREQ)
.scan(function(enemyArray) { 
    var enemy = {
        x: parseInt(Math.random() * canvas.width),
        y: -30
    };
    enemyArray.push(enemy);
    return enemyArray; 
}, []);

var Game = Rx.Observable.combineLatest(
StarStream, SpaceShip, Enemies, function(stars, spaceship, enemies) {
    return {
        stars: stars, 
        spaceship: spaceship, 
        enemies: enemies
    }; 
});
Game.subscribe(renderScene);

为了创造敌人,咱们使用interval运算符每1,500毫秒运行一次,而后咱们使用scan运算符建立一个敌人阵列。

每次Observable发出一个值时,scan聚合结果,并发出每一个中间结果。 在Enemies Observable中,咱们从一个空数组开始,做为scan的第一个参数,咱们在每次迭代中将一个新对象推送到它。 该对象包含随机x坐标和可见屏幕外的固定y坐标。 有了这个,敌人将每1500毫秒发出一个包含全部当前敌人的阵列。

剩下的惟一的事情事渲染enemies的辅助函数。此函数也将更新enemies数组中每一个项目的坐标:

spaceship_reactive/enemy_1.js

// Helper function to get a random integer
function getRandomInt(min, max) {
    return Math.floor(Math.random() * (max - min + 1)) + min;
}

function paintEnemies(enemies) { 
    enemies.forEach(function(enemy) {
        enemy.y += 5;
        enemy.x += getRandomInt(-15, 15);
        drawTriangle(enemy.x, enemy.y, 20, '#00ff00', 'down'); 
    });
}

你能够在paintEnemies中看到咱们也在随机改变x坐标,这样敌人就会没法预测地移动到两侧。如今咱们须要更新函数renderScene以包含对paintEnemies的调用。

你可能已经注意到了咱们到目前为止玩游戏时的一个奇怪的效果:若是你移动鼠标,敌人会更快地向你走来!这多是游戏中的一个很好的功能,但咱们绝对不打算这样作。你能猜出致使这个bug的缘由吗?

若是你猜到它与paintEnemies功能有关,你就是对的。只要任何Observable产生一个值,combineLatest就会渲染咱们的场景。若是咱们不移动鼠标,最快的发射器将始终是starStream,由于它的间隔为40毫秒(Enemies Observable仅每1,500毫秒发出一次)。可是,当咱们移动鼠标时,SpaceShip将比starStream发射得更快(你的鼠标每秒发射屡次坐标),而后paintEnemies将执行屡次,更快地增长敌人的坐标。

为了不这种状况以及将来的相似问题,咱们须要规范游戏的速度,以便Observable不会比咱们的鼠标速度更快地发出值。

是的,正如您可能已经猜到的那样,RxJS有一个operator。

Avoid Drinking from the Firehose

咱们是否是接收数据的速度太快了。大多数状况下,咱们但愿得到全部速度,可是根据Observable流值的频率,咱们可能但愿删除一些咱们收到的值。咱们如今处于其中一种状况中。咱们在屏幕上渲染事物的速度与咱们拥有的最快Observable的速度成正比。事实证实,咱们最快的Observable对咱们来讲太快了,咱们须要在游戏中创建一个恒定的更新速度。

sample是Observable实例中的一个方法,给定一个以毫秒为单位的时间参数,返回一个Observable,它发出每一个时间间隔内父Observable发出的最后一个值。

image

请注意sample如何在间隔时刻丢弃最后一个值以前的任何值。 认清您是否须要此行为很是重要。在咱们的例子中,咱们不关心删除值,由于咱们只想每40毫秒渲染一个元素的当前状态。若是全部值对您都很重要,您可能须要考虑缓冲区运算符:

spaceship_reactive/enemy_2.js

Rx.Observable.combineLatest( StarStream, SpaceShip, Enemies, function(stars, spaceship, enemies) {
    return {
        stars: stars, 
        spaceship: spaceship, 
        enemies: enemies
    }; 
    
})
.sample(SPEED)
.subscribe(renderScene);

经过在combineLatest以后调用sample,咱们确保combineLatest永远不会在前一个以后的40毫秒内产生任何值(咱们的常量SPEED设置为40)。

射击

看到成群的敌人来到咱们身边有点可怕;咱们能作的就是走开,但愿他们不要看到咱们。若是让让咱们的英雄有能力射击邪恶的外星人宇宙飞船会怎么样?

咱们但愿咱们的太空船在咱们点击鼠标或按空格键时进行射击,所以咱们将为每一个事件建立一个Observable并将它们合并到一个名为playerShots的Observable中。 请注意,咱们经过空格键,空格键的键代码事32:

spaceship_reactive/hero_shots.js

var playerFiring = Rx.Observable .merge(
Rx.Observable.fromEvent(canvas, 'click'),
Rx.Observable.fromEvent(canvas, 'keydown')
.filter(function(evt) { 
    return evt.keycode === 32; 
}) )

如今咱们已经了解了sample,咱们能够用它来增长游戏的趣味并限制咱们太空船的射击频率。不然,玩家能够高速射击并轻易摧毁全部敌人。咱们这样作是为了让玩家最多只能每200毫秒射击一次:

spaceship_reactive/hero_shots.js

var playerFiring = Rx.Observable.merge(
    Rx.Observable.fromEvent(canvas, 'click'),
    Rx.Observable.fromEvent(canvas, 'keydown')
    .filter(function(evt) { 
        return evt.keycode === 32; 
    })
)
.sample(200)
.timestamp();

咱们还添加了一个时间戳操做符,它在咱们的Observable发出的每一个值中设置一个属性时间戳,以及它发出的确切时间。 咱们稍后会用它。

最后,为了从咱们的宇宙飞船发射射击,咱们须要知道射击时刻宇宙飞船的x坐标。这样咱们就能够将设计子弹渲染到正确的x坐标。 从SpaceShip Observable设置一个外部变量看起来比较简单,它会始终包含最后发出的x坐标,但这会破坏咱们不成文的协议,永远不会改变外部状态!

相反,咱们将经过再次使用咱们的好朋友combineLatest来实现这一目标:

spaceship_reactive/hero_shots.js

var HeroShots = Rx.Observable
.combineLatest(
playerFiring,
SpaceShip,
function(shotEvents, spaceShip) {
    return { x: spaceShip.x };
})
.scan(function(shotArray, shot) {
    shotArray.push({x: shot.x, y: HERO_Y});
    return shotArray;
}, []);

咱们如今从SpaceShip和playerFiring获取更新的值,这样咱们就能够获得咱们想要的x坐标。 咱们使用扫描的方式与咱们用于Enemy Observable的方式相同,为每一个子弹建立一个当前坐标数组。有了这个,咱们应该准备好在屏幕上绘制咱们的镜头。 咱们使用辅助函数绘制子弹数组中的每一个子弹:

spaceship_reactive/hero_shots.js

var SHOOTING_SPEED = 15;
function paintHeroShots(heroShots) {
    heroShots.forEach(function(shot) {
        shot.y -= SHOOTING_SPEED;
        drawTriangle(shot.x, shot.y, 5, '#ffff00', 'up');
    });
}

而后咱们从咱们的主要combineLatest操做中调用paintHeroShots

Rx.Observable.combineLatest(
StarStream, SpaceShip, Enemies, HeroShots,
function(stars, spaceship, enemies, heroShots) {
    return {
        stars: stars,
        spaceship: spaceship,
        enemies: enemies,
        eroShots: heroShots
    };
})
.sample(SPEED)
.subscribe(renderScene);

咱们在renderScene中添加对paintHeroShots的调用:

function renderScene(actors) {
    paintStars(actors.stars);
    paintSpaceShip(actors.spaceship.x, actors.spaceship.y);
    paintEnemies(actors.enemies);
    aintHeroShots(actors.heroShots);
}

如今,当你运行游戏时,你会注意到每次移动鼠标时,咱们的宇宙飞船都会疯狂的射击。 效果虽然不错,但这不是咱们想要的! 让咱们再看看HeroShots Observable。 在其中,咱们使用combineLatest,以便咱们拥有来自playerFiring和SpaceShip的值。 这与咱们以前遇到的问题相似。每次鼠标移动时,HeroShots中的combineLatest都会发出值,这就转化为被射击的子弹。在这种状况下,节流无济于事,由于咱们但愿用户随时随地进行射击,而且节流将限制射击次数并使其中的许屡次数降低。

每当Observable发出新值时,combineLatest会发出每一个Observable发出的最后一个值。 咱们能够利用这个优点。每当鼠标移动时,combineLatest会发出新的SpaceShip位置和playerFiring的最后一个发射值,除非咱们发射新子弹,不然它将保持不变。 而后,只有当发射的子弹与前一子弹不一样时,咱们才能发出一个值。 distinctUntilChanged操做符为咱们执行脏工做。

运算符distinctdistinctUntilChanged容许咱们过滤掉Observable已经发出的结果。 distinct过滤掉先前发出的任何结果,而distinctUntilChanged过滤掉相同的结果,除非在它们之间发出不一样的结果。咱们只须要确保新子弹与前一子弹不一样,因此distinctUntilChanged对咱们来讲已经足够了。(它还使咱们免于更高内存使用的不一样;不一样的须要将全部先前的结果保留在内存中。)

咱们修改了heroShots,所以它只根据时间戳发出新子弹:

spaceship_reactive/hero_shots2.js

var HeroShots = Rx.Observable
.combineLatest(
playerFiring,
SpaceShip,
function(shotEvents, spaceShip) {
    return {
        timestamp: shotEvents.timestamp,
        x: spaceShip.x
    };
})
.distinctUntilChanged(function(shot) { return shot.timestamp; })
.scan(function(shotArray, shot) {
    shotArray.push({ x:shot.x, y: HERO_Y });
    return shotArray;
}, []);

若是一切顺利,咱们如今可以从咱们的宇宙飞船射击敌人!

敌人射击

咱们应该容许敌人射击; 不然这是一个很是不公平的无聊游戏。 并且很无聊! 对于敌人射击,咱们将执行如下操做:

  • 每一个敌人都会保留更新的子弹阵列。
  • 每一个敌人都会以给定的频率射击。

为此,咱们将使用区间运算符来存储敌人值的新子弹。咱们还将介绍一个新的辅助函数isVisible,它有助于滤除坐标在可见屏幕以外的元素。 这就是Enemy Observable如今的样子:

spaceship_reactive/enemy_shots.js

function isVisible(obj) {
    return obj.x > -40 && obj.x < canvas.width + 40 &&
obj.y > -40 && obj.y < canvas.height + 40;
}
var ENEMY_FREQ = 1500;
var ENEMY_SHOOTING_FREQ = 750;
var Enemies = Rx.Observable.interval(ENEMY_FREQ)
.scan(function(enemyArray) {
    var enemy = {
        x: parseInt(Math.random() * canvas.width),
        y: -30,
        shots: []
    };
    
    Rx.Observable.interval(ENEMY_SHOOTING_FREQ).subscribe(function() {
        enemy.shots.push({ x: enemy.x, y: enemy.y });
        enemy.shots = enemy.shots.filter(isVisible);
    });
    
    enemyArray.push(enemy);
    return enemyArray.filter(isVisible);
}, []);

在该代码中,咱们每次建立新敌人时都会建立一个区间。此间隔将继续向敌方子弹阵列添加子弹,而后它将过滤掉屏幕外的子弹。咱们也可使用isVisible来过滤屏幕外的敌人,就像咱们在return语句中所作的那样。

咱们须要更新paintEnemies,以便渲染敌人的镜头并更新他们的y坐标。而后咱们使用咱们方便的drawTriangle函数来绘制镜头:

spaceship_reactive/enemy_shots.js

function paintEnemies(enemies) {
    enemies.forEach(function(enemy) {
        enemy.y += 5;
        enemy.x += getRandomInt(-15, 15);
        drawTriangle(enemy.x, enemy.y, 20, '#00ff00', 'down');
        enemy.shots.forEach(function(shot) {
            shot.y += SHOOTING_SPEED;
            drawTriangle(shot.x, shot.y, 5, '#00ffff', 'down');
        });
    });
}

有了这个,如今每一个人都在射击其余人,但没有人被摧毁!他们只是滑过敌人和咱们的宇宙飞船,由于咱们尚未定义当射击与太空飞船碰撞时会发生什么。

碰撞检测

当射击击中敌人时,咱们但愿子弹和敌人都能消失?让咱们定义一个辅助函数来检测两个目标是否发生了碰撞:

spaceship_reactive/enemy_shots2.js

function collision(target1, target2) {
    return (target1.x > target2.x - 20 && target1.x < target2.x + 20) &&
(target1.y > target2.y - 20 && target1.y < target2.y + 20);
}

如今让咱们修改辅助函数paintHeroShots来检查每一个子弹是否击中敌人。对于发生命中的状况,咱们将在已击中的敌人上将属性isDead设置为true,而且咱们将子弹的坐标设置为屏幕外。 子弹最终会被滤除,由于它在屏幕外。

spaceship_reactive/enemy_shots2.js

function paintEnemies(enemies) {
    enemies.forEach(function(enemy) {
        enemy.y += 5;
        enemy.x += getRandomInt(-15, 15);
        if (!enemy.isDead) {
        drawTriangle(enemy.x, enemy.y, 20, '#00ff00', 'down');
        }
        enemy.shots.forEach(function(shot) {
            shot.y += SHOOTING_SPEED;
            drawTriangle(shot.x, shot.y, 5, '#00ffff', 'down');
        });
    });
}

var SHOOTING_SPEED = 15;
function paintHeroShots(heroShots, enemies) {
    heroShots.forEach(function(shot, i) {
        for (var l=0; l<enemies.length; l++) {
            var enemy = enemies[l];
            if (!enemy.isDead && collision(shot, enemy)) {
                enemy.isDead = true;
                shot.x = shot.y = -100;
                break;
            }
        }
        shot.y -= SHOOTING_SPEED;
        drawTriangle(shot.x, shot.y, 5, '#ffff00', 'up');
    });
}

接下来让咱们摆脱任何将属性isDead设置为true的敌人。惟一须要注意的是,咱们须要等待那个特定敌人的全部射击消失;不然,当咱们击中一个敌人时,它的全部射击都会随之消失,这很奇怪。 所以,咱们检查其射击的长度,并仅在没有射击时过滤掉敌人物体:

spaceship_reactive/enemy_shots2.js

var Enemies = Rx.Observable.interval(ENEMY_FREQ)
.scan(function(enemyArray) {
    var enemy = {
        x: parseInt(Math.random() * canvas.width),
        y: -30,
        shots: []
    };
    Rx.Observable.interval(ENEMY_SHOOTING_FREQ).subscribe(function() {
        if (!enemy.isDead) {
            enemy.shots.push({ x: enemy.x, y: enemy.y });
        }
        enemy.shots = enemy.shots.filter(isVisible);
    });
    enemyArray.push(enemy);
    return enemyArray
    .filter(isVisible)
    .filter(function(enemy) {
        return !(enemy.isDead && enemy.shots.length === 0);
    });
}, []);

为了检查玩家的船是否被击中,咱们建立了一个函数gameOver:

spaceship_reactive/enemy_shots2.js

function gameOver(ship, enemies) {
    return enemies.some(function(enemy) {
        if (collision(ship, enemy)) {
            return true;
        }
        return enemy.shots.some(function(shot) {
            return collision(ship, shot);
        });
    });
}

若是敌人或敌人射击击中玩家的宇宙飞船,则此函数返回true。

在继续以前,让咱们了解一个有用的运算符:takeWhile。当咱们在现有的Observable上调用takeWhile时,Observable将继续发出值,直到函数做为参数传递给takeWhile返回false。

咱们可使用takeWhile告诉咱们的主要combineLatest Observable继续获取值,直到gameOver返回true:

spaceship_reactive/enemy_shots2.js

Rx.Observable.combineLatest(
    StarStream, SpaceShip, Enemies, HeroShots,
    function(stars, spaceship, enemies, heroShots) {
        return {
            stars: stars,
            spaceship: spaceship,
            enemies: enemies,
            heroShots: heroShots
        };
    })
.sample(SPEED)
.takeWhile(function(actors) {
    return gameOver(actors.spaceship, actors.enemies) === false;
})
.subscribe(renderScene);

当gameOver返回true时,combineLatest将中止发射值,从而马上中止游戏。

最后一件事:保持分数

若是咱们不能向朋友吹嘘咱们的结果,会是什么样的游戏?咱们显然须要一种方法来跟踪咱们的表现。 咱们须要得分。

让咱们建立一个简单的辅助函数来将分数绘制到屏幕的左上角:

spaceship_reactive/score.js

function paintScore(score) {
    ctx.fillStyle = '#ffffff';
    ctx.font = 'bold 26px sans-serif';
    ctx.fillText('Score: ' + score, 40, 43);
}

为了保持分数,咱们将使用Subject。咱们能够在基于combineLatest的主游戏循环中轻松使用它,就像它只是另外一个Observable同样,咱们能够随时将值推送到它。

spaceship_reactive/score.js

var ScoreSubject = new Rx.Subject();
var score = ScoreSubject.scan(function (prev, cur) {
    return prev + cur;
}, 0).concat(Rx.Observable.return(0));

在该代码中,咱们使用scan运算符将每一个新值与总聚合结果相加。因为咱们在游戏开始时不会有任何分数,咱们会链接一个返回0的Observable,所以咱们有一个起点。

如今,只要咱们击中敌人,咱们就必须将分数推向咱们的Subject;这是在paintHeroShots中发生的事情:

spaceship_reactive/score.js

var SCORE_INCREASE = 10;
function paintHeroShots(heroShots, enemies) {
    heroShots.forEach(function(shot, i) {
        for (var l=0; l<enemies.length; l++) {
            var enemy = enemies[l];
            if (!enemy.isDead && collision(shot, enemy)) {
                ScoreSubject.onNext(SCORE_INCREASE);
                enemy.isDead = true;
                shot.x = shot.y = -100;
                break;
            }
        }
        shot.y -= SHOOTING_SPEED;
        drawTriangle(shot.x, shot.y, 5, '#ffff00', 'up');
    });
}

固然,咱们将paintScore添加到renderScene,以便分数显示在屏幕上:

spaceship_reactive/score.js

function renderScene(actors) {
    paintStars(actors.stars);
    paintSpaceShip(actors.spaceship.x, actors.spaceship.y);
    paintEnemies(actors.enemies);
    paintHeroShots(actors.heroShots, actors.enemies);
    paintScore(actors.score);
}

这完成了咱们的Spaceship Reactive游戏。咱们已经设法在浏览器中对整个游戏进行编码,避免经过Observable管道的功能改变任何外部状态。

改进的想法

我相信你已经有了一些使游戏更使人兴奋的想法,我也有一些改进建议,让游戏更好,同时提升你的RxJS技能:

  • 添加以不一样速度移动的第二个(或第三个!)星形场以建立视差效果。 这能够经过几种不一样的方式完成。 尝试重用现有代码并尽量以声明方式执行。
  • 经过使它们以随机间隔发射而不是ENEMY_SHOOTING_FREQ中指定的固定敌人来制造更多不可预测的敌人。 若是玩家的分数越高,你可让他们更快地开火,这是额外的积分!
  • 容许玩家在短期内击中几个敌人得到更多积分。

总结

咱们只使用Observables为浏览器构建了一个完整的游戏,而且沿途咱们已经看到了几种很是方便的方法来处理并发以及组合和转换Observable。这是RxJS的优点之一:总有一种方法能够帮助解决您正在尝试解决的问题。请随意在RxJS文档中探索它们

反应式编程能够轻松编写并发程序。Observable抽象和强大的RxJS方法使程序的不一样部分可以有效地进行交互。不依赖外部状态进行编程可能须要一些时间来适应,但它有很大的好处。咱们能够将整个行为封装在一个Observable管道中,使咱们的程序更加可靠和可靠。

在下一章中,咱们将选择咱们离开它的地震可视化应用程序并添加一个显示与地震有关的推文的Node.js服务器部分。咱们还将改进其用户界面,使其看起来像一个真正的地震仪表板。

关注个人微信公众号,更多优质文章定时推送
clipboard.png

相关文章
相关标签/搜索