Rxjs 响应式编程-第五章 使用Schedulers管理时间

Rxjs 响应式编程-第一章:响应式
Rxjs 响应式编程-第二章:序列的深刻研究
Rxjs 响应式编程-第三章: 构建并发程序
Rxjs 响应式编程-第四章 构建完整的Web应用程序
Rxjs 响应式编程-第五章 使用Schedulers管理时间
Rxjs 响应式编程-第六章 使用Cycle.js的响应式Web应用程序react

使用Schedulers管理时间

自从接触RxJS,就开始在个人项目中使用它。有一段时间我觉得我知道如何有效地使用它,但有一个使人烦恼的问题:我怎么知道我使用的运算符是同步仍是异步?换句话说,Operators到底何时发出通知?这彷佛是正确使用RxJS的关键部分,但对我来讲感受有点模糊。编程

我认为,间隔运算符显然是异步的,因此它在内部使用相似setTimeout的东西来发出项目。可是,若是我使用范围怎么办?它也是异步发射的吗?它会阻止事件循环吗?来自哪里?我处处都在使用这些运算符,但我对它们的内部并发模型知之甚少。canvas

而后我了解了Schedulers。segmentfault

Schedulers是一种强大的机制,能够精确管理应用程序中的并发性。它们容许您随时更改其并发模型,从而对Observable如何发出通知进行细粒度控制。在本章中,您将学习如何使用调度程序并在常见场景中应用它们。咱们将专一于测试,调度程序特别有用,您将学习如何制做本身的Schedulers。数组

使用Schedulers

Schedulers是一种“安排”未来发生的操做的机制。 RxJS中的每一个运算符在内部使用一个Schedulers,选择该Schedulers以在最可能的状况下提供最佳性能。浏览器

让咱们看看咱们如何改变运算符中的Schedulers以及这样作的后果。 首先让咱们建立一个包含1,000个整数的数组:微信

var arr = [];
for (var i=0; i<1000; i++) {
    arr.push(i);
}

而后,咱们从arr建立一个Observable并强制它经过订阅它来发出全部通知。 在代码中,咱们还保存了发出全部通知所需的时间:并发

var timeStart = Date.now();
Rx.Observable.from(arr).subscribe(
    function onNext() {},
    function onError() {},
    function onCompleted() {
        console.log('Total time: ' + (Date.now() - timeStart) + 'ms');
    }
);
"Total time: 6ms”

六毫秒 - 不坏! from在内部使用Rx.Scheduler.currentThread,它计划在任何当前工做完成后运行。 一旦启动,它将同步处理全部通知。app

在让咱们将Scheduler更改成Rx.Scheduler.default框架

var timeStart = Date.now();
Rx.Observable.from(arr, null, null, Rx.Scheduler.default).subscribe(
    function onNext() {},
    function onError() {},
    function onCompleted() {
        console.log('Total time: ' + (Date.now() - timeStart) + 'ms');
    }
);
"Total time: 5337ms”

哇,咱们的代码运行速度比使用currentThread Scheduler慢几千倍。 那是由于默认的Scheduler异步运行每一个通知。 咱们能够经过在订阅后添加一个简单的日志语句来验证这一点。

使用currentThread Scheduler:

Rx.Observable.from(arr).subscribe( ... );
console.log('Hi there!’);
"Total time: 8ms"
"Hi there!"

使用默认Scheduler:

Rx.Observable.from(arr, null, null, Rx.Scheduler.timeout).subscribe( ... );
console.log('Hi there!’);
"Hi there!"
"Total time: 5423ms"

由于使用默认Schedule的Observer以异步方式发出其项目,因此咱们的console.log语句(它是同步的)在Observable甚至开始发出任何通知以前执行。 使用currentThread Scheduler,全部通知都会同步发生,所以只有在Observable发出全部通知时才会执行console.log语句。

所以,Scheduler确实能够改变咱们的Observable的工做方式。 在咱们的例子中,性能确实受到异步处理一个已经可用的大型阵列的影响。 但咱们实际上可使用Scheduler来提升性能。 例如,咱们能够在对Observable执行昂贵的操做以前动态切换Scheduler:

arr.groupBy(function(value) {
    return value % 2 === 0;
})
.map(function(value) {
    return value.observeOn(Rx.Scheduler.default);
})
.map(function(groupedObservable) {
    return expensiveOperation(groupedObservable);
});

在前面的代码中,咱们将数组中的全部值分为两组:偶数和非偶数。 groupBy返回一个Observable,它为每一个建立的组发出一个Observable。 这里是很酷的部分:在运行以前对每一个分组的Observable中的项目进行昂贵的操做,咱们使用observeOn将Scheduler切换到默认值,这样昂贵的操做将异步执行,而不是阻塞事件循环

observeOn和subscribeOn

在上一节中,咱们使用observeOn运算符来更改某些Observable中的Scheduler。 observeOn和subscribeOn是返回Observable实例副本的运算符,但它使用的Scheduler咱们做为参数传递的。

observeOn接受一个Scheduler并返回一个使用该Scheduler的新Observable。 它将使每一个onNext调用在新的Scheduler中运行。

subscribeOn强制Observable的订阅和取消订阅工做(而不是通知)在特定的Scheduler上运行。 与observeOn同样,它接受Scheduler做为参数。 例如,当咱们在浏览器中运行并在订阅调用中执行重要工做时,却不但愿用它来阻止UI线程,subscribeOn很是有用。

基本的Rx Scheduler

让咱们在咱们刚刚使用的Scheduler中深刻了解一下。 RxJS的运算符最经常使用的是immediate,default和currentThread。

Immediate Scheduler

Immediate Scheduler同步发出来自Observable的通知,所以不管什么时候在Immediate Scheduler上调度操做,它都将当即执行,从而阻塞该线程。 Rx.Observable.range是内部使用Immediate Scheduler序的运算符之一:

console.log('Before subscription');

Rx.Observable.range(1, 5)
.do(function(a) {
    console.log('Processing value', a);
})
.map(function(value) { return value * value; })
.subscribe(function(value) { console.log('Emitted', value); });

console.log('After subscription');
Before subscription
Processing value 1
Emitted 1
Processing value 2
Emitted 4
Processing value 3
Emitted 9
Processing value 4
Emitted 16
Processing value 5
Emitted 25
After subscription

程序输出按咱们指望的顺序发生。 每一个console.log语句在当前项的通知以前运行。

什么时候使用它

Immediate Scheduler很是适合于在每一个通知中执行可预测且很是昂贵的操做的Observable。 此外,Observable最终必须调用onCompleted。

Default Scheduler

Default Scheduler以异步方式运行操做。 您能够将其视为setTimeout的等价物,其延迟为零毫秒,从而保持序列中的顺序。 它使用其运行的平台上可用的最有效的异步实现(例如,Node.js中的process.nextTick或浏览器中的setTimeout)。

让咱们使用前一个使用了range示例,并使其在默认的Scheduler上运行。 为此,咱们将使用observeOn运算符:

console.log('Before subscription');

Rx.Observable.range(1, 5)
.do(function(value) {
    console.log('Processing value', value);
})
.observeOn(Rx.Scheduler.default)
.map(function(value) { return value * value; })
.subscribe(function(value) { console.log('Emitted', value); });

console.log('After subscription');
Before subscription
Processing value 1
Processing value 2
Processing value 3
Processing value 4
Processing value 5
After subscription
Emitted 1
Emitted 4
Emitted 9
Emitted 16
Emitted 25

这个输出有很大的不一样。 咱们的同步console.log语句输出每一个值,但咱们使Observable在默认的Scheduler上运行,它会异步生成每一个值。 这意味着咱们在do运算符中的日志语句在平方值以前处理。

什么时候使用它

Default Scheduler永远不会阻塞事件循环,所以它很是适合涉及时间的操做,如异步请求。 它也能够在从未完成的Observable中使用,由于它不会在等待新通知时阻塞程序(这可能永远不会发生)。

Current Thread Scheduler

currentThread Scheduler与Immediate Scheduler同样是同步的,可是若是咱们使用递归运算符,它会将要执行的操做排入队列,而不是当即执行它们。 递归运算符是一个本身调度另外一个运算符的运算符。 一个很好的例子就是repeatrepeat运算符 - 若是没有给出参数 - 将无限期地重复链中的先前Observable序列。

若是对使用Immediate Scheduler的运算符(例如return)调用repeat,则会遇到麻烦。 让咱们经过重复值10来尝试这个,而后使用take只取重复的第一个值。 理想状况下,代码将打印10次而后退出:

// Be careful: the code below will freeze your environment!
Rx.Observable.return(10).repeat().take(1)
.subscribe(function(value) {
    console.log(value);
});
Error: Too much recursion

此代码致使无限循环。 在订阅时,如return调用onNext(10)而后onCompleted,这使得repeat再次订阅return。 因为返回正在Immediate Scheduler上运行,所以该过程会重复,致使无限循环而且永远不会结束。

可是若是相反咱们经过将它做为第二个参数传递给currentThread Scheduler给return,咱们获得:

var scheduler = Rx.Scheduler.currentThread;
Rx.Observable.return(10, scheduler).repeat().take(1)
.subscribe(function(value) {
    console.log(value);
});
10

如今,当repeat从新订阅返回时,新的onNext调用将排队,由于以前的onCompleted仍在发生。 repeat而后返回一个可使用的一次性对象,它调用onCompleted并经过重复处理取消repeat,最终从subscribe返回调用。

什么时候使用它

currentThread Scheduler对于涉及递归运算符(如repeat)的操做很是有用,而且一般用于包含嵌套运算符的迭代。

动画调度

对于诸如canvas或DOM动画之类的快速视觉更新,咱们可使用具备很是小时间间隔的interval运算符,或者咱们能够在内部使用相似setTimeout的函数来调度通知。

但这两种方法都不理想。 在他们两个中咱们都在浏览器上抛出全部这些更新,这可能没法足够快地处理它们。之因此会发生这种状况,是由于浏览器正在尝试渲染一个帧,而后它会收到渲染下一帧的指令,所以它会丢弃当前帧以保持速度。 结果是致使动画的不流畅,卡顿。

浏览器具备处理动画的原生方式,而且它们提供了一个使用它的API,称为requestAnimationFramerequestAnimationFrame容许浏览器经过在最合适的时间排列动画来优化性能,并帮助咱们实现更流畅的动画。

有专门的Scheduler处理requestAnimationFrame

RxDOM库附带了一些额外的调度程序,其中一个是requestAnimationFrame Scheduler。

是的,你猜对了。 咱们可使用此Scheduler来改进咱们的太空飞船视频游戏。 在其中,咱们创建了40ms的刷新速度 - 大约每秒25帧 - 经过在该速度下建立一个interval Observable,而后使用combineLatest以间隔设置的速度更新整个游戏场景(由于它是最快速更新的Observable) )...但谁知道浏览器使用这种技术丢帧了多少帧! 使用requestAnimationFrame能够得到更好的性能。

让咱们建立一个使用Rx.Scheduler.requestAnimationFrame做为其调度程序的Observable。 请注意,它与interval运算符的工做方式相似:

ch_schedulers/starfield_raf.js

function animationLoop(scheduler) {
    return Rx.Observable.generate(
        0,
        function() { return true; }, // Keep generating forever
        function(x) { return x + 1; }, // Increment internal value
        function(x) { return x; }, // Value to return on each notification
        Rx.Scheduler.requestAnimationFrame
    ); // Schedule to requestAnimationFrame
}

如今,不管什么时候咱们使用了25 FPS动画,咱们均可以使用animationLoop函数。 因此咱们的Observable绘制了星星,以前看起来像这样:

spaceship_reactive/spaceship.js

var StarStream = Rx.Observable.range(1, 250)
.map(function() {
    return {
        x: parseInt(Math.random() * canvas.width),
        y: parseInt(Math.random() * canvas.height),
        size: Math.random() * 3 + 1
    };
})
.toArray()
.flatMap(function(arr) {
    return Rx.Observable.interval(SPEED).map(function() {
        return arr.map(function(star) {
            if (star.y >= canvas.height) {
                star.y = 0;
            }
            star.y += 3;
            return star;
        });
    });
});

变成这样:

ch_schedulers/starfield_raf.js

var StarStream = Rx.Observable.range(1, 250)
.map(function() {
    return {
        x: parseInt(Math.random() * canvas.width),
        y: parseInt(Math.random() * canvas.height),
        size: Math.random() * 3 + 1
    };
})
.toArray()
.flatMap(function(arr) {
    return animationLoop().map(function() {
        return arr.map(function(star) {
            if (star.y >= canvas.height) {
                star.y = 0;
            }
            star.y += 3;
            return star;
        });
    });
});

这给了咱们一个更流畅的动画。 代码也更简洁!

使用Scheduler进行测试

测试多是咱们可使用Scheduler的最引人注目的场景之一。 到目前为止,在本书中,咱们一直在编写咱们的核心代码而不考虑后果。 可是在现实世界的软件项目中,咱们将编写测试以确保咱们的代码按照咱们的意图运行。

测试异步代码很难。 咱们常常遇到如下问题之一:

  • 模拟异步事件很复杂且容易出错。 测试的重点是避免bug和错误,但若是你的测试自己有错误,那这显然是有问题的。
  • 若是咱们想要准确测试基于时间的功能,自动化测试变得很是缓慢。 例如,若是咱们须要准确测试在尝试检索远程文件四秒后调用错误,则每一个测试至少须要花费很长时间才能运行结束。 若是咱们不断运行咱们的测试套件,那将影响咱们的开发时间。

TestScheduler

RxJS为咱们提供了TestScheduler,一个旨在帮助测试的Scheduler。 TestScheduler容许咱们在方便时模拟时间并建立肯定性测试,确保它们100%可重复。 除此以外,它容许咱们执行须要花费大量时间并将其压缩到瞬间的操做,同时保持测试的准确性。

TestScheduler是VirtualTimeScheduler的专业化。 VirtualTimeSchedulers在“虚拟”时间而不是实时执行操做。 计划的操做进入队列并在虚拟时间内分配一个时刻。 而后,Scheduler在其时钟前进时按顺序运行操做。 由于它是虚拟时间,因此一切都当即运行,而没必要等待指定的时间。 咱们来看一个例子:

var onNext = Rx.ReactiveTest.onNext;
QUnit.test("Test value order", function(assert) {
    var scheduler = new Rx.TestScheduler();
    var subject = scheduler.createColdObservable(
        onNext(100, 'first'),
        onNext(200, 'second'),
        onNext(300, 'third')
    );
    var result = '';
    subject.subscribe(function(value) { result = value });
    scheduler.advanceBy(100);
    assert.equal(result, 'first');
    scheduler.advanceBy(100);
    assert.equal(result, 'second');
    scheduler.advanceBy(100);
    assert.equal(result, 'third');
});

在前面的代码中,咱们测试了来自冷Observable的一些值以正确的顺序到达。 为此,咱们在TestScheduler中使用helper方法createColdObservable来建立一个Observable,它回放咱们做为参数传递的onNext通知。 在每一个通知中,咱们指定应该发出通知值的时间。 在此以后,咱们订阅此Observable,手动提早调度程序中的虚拟时间,并检查它是否确实发出了预期值。 若是示例在正常时间运行,则须要300毫秒,但因为咱们使用TestScheduler来运行Observable,它将当即运行,但彻底按照咱们的顺序。

写一个真实的测试案例

没有比在现实世界中为时间敏感的任务编写测试更好的方法来理解如何使用虚拟时间来缩短期。 让咱们从咱们在缓冲值中制做的地震查看器中恢复一个Observable:

quakes
.pluck('properties')
.map(makeRow)
.bufferWithTime(500)
.filter(function(rows) { return rows.length > 0; })
.map(function(rows) {
    var fragment = document.createDocumentFragment();
    rows.forEach(function(row) {
        fragment.appendChild(row);
    });
    return fragment;
})
.subscribe(function(fragment) {
    table.appendChild(fragment);
});

为了使代码更易于测试,让咱们将Observable封装在一个函数中,该函数接受咱们在bufferWithTime运算符中使用的Scheduler。在Obpectables中参数化将要测试的Scheduler老是一个好主意。

ch_schedulers/testscheduler.js

function quakeBatches(scheduler) {
    return quakes.pluck('properties')
    .bufferWithTime(500, null, scheduler || null)
    .filter(function(rows) {
        return rows.length > 0;
    });
}

让咱们经过采起一些步骤来简化代码,但保持本质。 此代码采用包含属性属性的Observable JSON对象,将它们缓冲到每500毫秒释放的批次中,并过滤掉空的批次。

咱们想要验证此代码是否有效,但咱们绝对不但愿每次运行测试时都等待几秒钟,以确保咱们的缓冲按预期工做。 这是虚拟时间和TestScheduler将帮助咱们的地方:

ch_schedulers/testscheduler.js

❶ var onNext = Rx.ReactiveTest.onNext;
var onCompleted = Rx.ReactiveTest.onCompleted;
var subscribe = Rx.ReactiveTest.subscribe;
❷ var scheduler = new Rx.TestScheduler();
❸ var quakes = scheduler.createHotObservable(
    onNext(100, { properties: 1 }),
    onNext(300, { properties: 2 }),
    onNext(550, { properties: 3 }),
    onNext(750, { properties: 4 }),
    onNext(1000, { properties: 5 }),
    onCompleted(1100)
);
❹ QUnit.test("Test quake buffering", function(assert) {
    ❺ var results = scheduler.startScheduler(function() {
        return quakeBatches(scheduler)
    }, {
        created: 0,
        subscribed: 0,
        disposed: 1200
    });
    ❻ var messages = results.messages;
    console.log(results.scheduler === scheduler);
    ❼ assert.equal(
        messages[0].toString(),
        onNext(501, [1, 2]).toString()
    );
    assert.equal(
        messages[1].toString(),
        onNext(1001, [3, 4, 5]).toString()
    );
    assert.equal(
        messages[2].toString(),
        onCompleted(1100).toString()
    );
});

让咱们一步一步地剖析代码:

  1. 咱们首先从ReactiveTest加载一些辅助函数。 这些在虚拟时间内注册onNext,onCompleted和订阅事件。
  2. 咱们建立了一个新的TestScheduler,它将推进整个测试。
  3. 咱们使用TestScheduler中的方法createHotObservable建立一个假的热Observable,它将在虚拟时间内模拟特定点的通知。 特别是,它在第一秒发出五个通知,并在1100毫秒完成。 每次它发出一个具备特定属性的对象。
  4. 咱们可使用任何测试框架来运行测试。 对于咱们的例子,我选择了QUnit。
  5. 咱们使用startScheduler方法建立一个使用测试调度程序的Observable。 第一个参数是一个函数,它建立Observable以使用咱们的Scheduler运行。 在咱们的例子中,咱们只返回咱们传递TestScheduler的quakeBatches函数。 第二个参数是一个对象,它包含咱们想要建立Observable的不一样虚拟时间,订阅它并处理它。 对于咱们的示例,咱们在虚拟时间0开始和订阅,而且咱们在1200(虚拟)毫秒处理Observable。
  6. startScheduler方法返回一个带有scheduler和messages属性的对象。 在消息中,咱们能够在虚拟时间内找到Observable发出的全部通知。
  7. 咱们的第一个断言测试在501毫秒以后(在第一个缓冲时间限制以后),咱们的Observable产生值1和2。
    咱们的第二个断言测试在1001毫秒后,咱们的Observable产生剩余的值3,4和5.最后,咱们的第三个断言检查序列是否彻底在1100毫秒完成,正如咱们在热的Observable地震中所指出的那样。

该代码以很是可靠的方式有效地测试咱们的高度异步的Observable,而且无需跳过箍来模拟异步条件。咱们只是指定咱们但愿代码在虚拟时间内做出反应的时间,咱们使用测试调度程序来运行整个操做。

总结

Scheduler是RxJS的重要组成部分。 即便您能够在没有明确使用它们的状况下走很长的路,它们也是一种先进的概念,它可让您在程序中微调并发性。虚拟时间的概念是RxJS独有的,对于测试异步代码等任务很是有用。

在下一章中,咱们将使用Cycle.js,这是一种基于称为单向数据流的概念来建立使人惊叹的Web应用程序的反应方式。有了它,咱们将使用现代技术建立一个快速的Web应用程序,从而显着改进传统的Web应用程序制做方式。

关注个人微信公众号,更多优质文章定时推送
clipboard.png

相关文章
相关标签/搜索