[书籍翻译] 《JavaScript并发编程》第七章 抽取并发逻辑

本文是我翻译《JavaScript Concurrency》书籍的第七章 抽取并发逻辑,该书主要以Promises、Generator、Web workers等技术来说解JavaScript并发编程方面的实践。
完整书籍翻译地址: https://github.com/yzsunlei/javascript_concurrency_translation 。因为能力有限,确定存在翻译不清楚甚至翻译错误的地方,欢迎朋友们提issue指出,感谢。

到本书这里,咱们已经在代码中明确地模拟了并发问题。使用promises,咱们同步化了两个或更多异步操做。使用生成器,咱们按需建立数据,避免没必要要的内存分配。最后,咱们了解到Web worker是利用多个CPU内核的主要工具。javascript

在本章中,咱们将采用全部这些方法并将它们放入应用程序代码的上下文中。也就是说,若是并发是默认的,那么咱们须要使并发尽量不那么明显。咱们将首先探索各类技术,这些技术将帮助咱们在使用的组件中封装并发机制。而后,咱们将经过使用promises来帮助worker通讯,直接改进前两章的代码。前端

一旦咱们可以使用promises抽象worker通讯,咱们将尝试在生成器的帮助下实现惰性的worker。咱们还将使用Parallel.js库来介绍worker抽象,而后是worker线程池的概念。java

编写并发代码

并发编程很难作到。即便是人为的示例应用程序,大部分复杂性来自并发代码。咱们显然但愿咱们的代码可读性好,同时保持并发的好处。咱们但愿充分利用系统上的每一个CPU。咱们只想在须要时计算咱们须要的东西。咱们不但愿意大利面条式的代码将多个异步操做混在一块儿。在开发应用程序的同时关注并发编程的全部这些方面会削弱咱们应该关注的内容 - 提供应用程序有价值的功能。node

在本节中,咱们将介绍可能用于将咱们的应用程序的其他部分与棘手的并发隔离的方法。这一般意味着将并发做为默认模式 - 即便在引擎下没有发生真正的并发时也是如此。最后,咱们不但愿咱们的代码包含90%的并发处理技巧,而只有10%的功能。git

隐藏并发机制

在咱们全部的代码中暴露并发机制的难度是,他们每个都稍微不一样于另外一个。这扩大了咱们可能已经发现所在的回调地狱的状况。例如,不是全部的并发操做都是从一些远程资源获取数据的网络请求。异步数据可能来自一个worker或一些自己就是异步的回调。想象一下场景咱们使用了三个不一样的数据源来计算一个咱们须要的值,全部的这些都是异步的。这里是这个问题的示图:github

image143.gif

此图中的数据是咱们在应用程序代码中关注的内容。从咱们正在构建的功能的角度来看,咱们并不关心上述任何事情。所以,咱们的前端架构须要封装与并发相关的复杂性。这意味着咱们的每一个组件都应该可以以相同的方式访问数据。除了咱们全部的异步数据源以外,还有另外一个要考虑的复杂因素 - 当数据不是异步的而且来自本地数据源呢?那么同步本地数据源和HTTP请求呢?咱们将在下一节中介绍这些。web

没有并发性

仅仅由于咱们正在编写并发JavaScript应用程序,并不是每一个操做自己都是并发的。例如,若是一个组件向另外一个组件询问它已经在内存中的数据,则它不是异步操做并会当即返回。咱们的应用程序可能处处都是这些操做,其中并发性根本就没有意义。其中存在的挑战 - 咱们如何将异步操做与同步操做无缝混合?编程

简单的答案是咱们在每处作出并发的默认假设。promise使这个问题易于处理。如下是使用promise来封装异步和同步操做的示图说明:segmentfault

image144.gif

这看起来很像前面的那个图,但有两个重要区别。咱们添加了一个synchronous()操做; 这没有回调函数,由于它不须要回调函数。它不是在等待其余任何东西,因此它会直接地返回。其余两个函数就像在上图中同样;二者都依赖回调函数将数据提供给咱们的应用程序。第二个重要区别是有一个promise对象。这取代了sync()操做和数据概念。或者更确切地说,它将它们融合到同一个概念中。后端

这是promise的关键做用 - 它们为咱们抽象同步问题提供能力。这不只适用于网络请求,还适用于Web worker消息或依赖于回调的任何其余异步操做。它须要一些调整来考虑下咱们的数据,由于咱们得保证它最终会到达这里。可是,一旦咱们消除了这种心理差距,默认状况下就会启用并发。就咱们的功能而言,并发是默认的,而咱们在操做背后所作的事情并非最具破坏性的。

如今让咱们看一些代码。咱们将建立两个函数:一个是异步的,另外一个是简单返回值的普通函数。这里的目标是使运行这些函数的代码相同,尽管生成值的方式有很大不一样:

//一个异步“fetch”函数。咱们使用“setTimeout()”
//在1秒后经过“callback()”返回一些数据。
function fetchAsync(callback) {
    setTimeout(() => {
        callback({hello: 'world'});
    }, 1000);
}

//同步操做只简单地返回数据。
function fetchSync() {
    return {hello: 'world'};
}

//对“fetchAsync()”调用的promise。
//咱们经过了“resolve”函数做为回调。
var asyncPromise = new Promise((resolve, reject) => {
    fetchAsync(resolve);
});

//对“fetchSync()”调用的promise。
//这个promise当即完成使用返回值。
var syncPromise = new Promise((resolve, reject) => {
    resolve(fetchSync());
});

//建立一个等待两个promise完成的promise。
//这让咱们无缝混合同步值和异步值。
Promise.all([
    asyncPromise,
    syncPromise
    ]).then((results) => {
        var [asyncResult, syncResult] = results;
        console.log('async', asyncResult);
        //→async {hello: 'world'}
    });

console.log('sync', syncResult);
//→sync {hello:'world'}

在这里权衡的是增长promise的复杂性,包裹它们而不是让简单的返回值函数立刻返回。但在现实中,封装promise的复杂性中,若是咱们不是写一个并发应用,咱们显然须要关心这类问题自己。这些的好处是巨大的。当一切都是promise的值时,咱们能够安全地排除使人讨厌的致使不一致的并发错误。

worker与promise通讯

咱们如今已经知道了为何将原始值视为promise有益于咱们的代码。是时候将这个概念应用于web workers了。在前两章中,咱们的代码同步来自Web worker的响应看起来有点棘手。这是由于咱们基本上试图模仿许多promise善于处理的样板工做。咱们首先尝试经过建立辅助函数来解决这些问题,这些函数为咱们包装worker通讯,返回promise。而后咱们将尝试另外一种涉及在较低级别扩展Web worker的方法。最后,咱们将介绍一些涉及多个worker的更复杂的同步方案,例如上一章中的那些worker方案。

辅助函数

若是咱们可以以promise解决的形式得到Web worker响应,那将是理想的。可是,咱们须要首先创造promise - 咱们该怎么作这个?好吧,咱们能够手动建立promise,其中发送给worker的消息是从promise executor函数中发送的。可是,若是咱们采用这种方法,咱们就不会比引入promise以前好多少了。

技巧是在单个辅助函数中封装发布到worker的消息和从worker接收的任何消息,以下所示:

image145.gif

咱们来看一个实现这种模式的辅助函数示例。首先,咱们须要一个执行某项任务的worker - 咱们将从这开始:

//吃掉一些CPU循环...
//源自http://adambom.github.io/parallel.js/
function work(n) {
    var i = 0;
    while (++i < n * n) {}
    return i;
}

//当咱们收到消息时,咱们会发布一条消息id,
//以及在“number”上执行“work()”的结果。
addEventListener('message', (e) => {
    postMessage({
        id: e.data.id,
        result: work(e.data.number)
    });
});

在这里,咱们有一个worker,它会对咱们传递的任何数字进行平方。这个work()函数特地很慢,以便咱们能够看到咱们的应用程序做为一个总体在Web worker花费比平时更长的时间来完成任务时的表现。它还使用咱们在以前的Web worker示例中看到的id,所以它能够与发送消息的代码协调。让咱们如今实现使用此worker的辅助函数:

//这将生成惟一ID。
//咱们须要它们将Web worker执行的任务
//映射到更大的建立它们的操做。
function* genID() {
    var id = 0;
    while (true) {
        yield id++;
    }
}

//建立全局“id”生成器。
var id = genID();

//这个对象包含promises的解析器函数,
//当结果从worker那里返回时,咱们经过id在这里查看。
var resolvers = {};

//开始咱们的worker...
var worker = new Worker('worker.js');
worker.addEventListener('message', (e) => {

    //找到合适的解析器函数。
    var resolver = resolvers[e.data.id];

    //从“resolvers”对象中删除它。
    delete resolvers[e.data.id];
    
    //经过调用解析器函数将worker数据传递给promise。
    resolver(e.data.result);
});

//这是咱们的辅助函数。
//它处理向worker发送消息,
//并将promise绑定到worker的响应。
function square(number) {
    return new Promise((resolve, reject) => {
        //用于将Web worker响应和解析器函数绑定在一块儿的id。
        var msgId = id.next().value;

        //存储解析器以便之后在Web worker消息回调中可使用。
        resolvers[msgId] = resolve;

        //发布消息 - id和number参数
        worker.postMessage({
            id: msgId,
            number: number
        });
    });
}

square(10).then((result) => {
    console.log('square(10)', result);
    //→square(10) 100
});

square(100).then((result) => {
    console.log('square(100)', result);
    //→square(100) 10000
});

square(1000).then((result) => {
    console.log('square(1000)', result);
    //→square(1000) 1000000
});

若是咱们关注square()函数的使用方式,传递一个数字参数并将一个promise做为返回值,咱们能够看到这符合咱们以前关于默认状况下使代码并发的讨论。例如,咱们能够从这个场景中彻底删除worker,只需更改辅助函数解析它返回的promise的方式,咱们的其他代码将继续保持不变。

辅助函数策略只是一种使用promises简化worker通讯的方法。也许咱们能够决定咱们不必定要维护一堆辅助函数。接下来,咱们将看一个比辅助函数更细粒度的方法。

扩展postMessage()

咱们能够采用更通用的方法,而不是积聚大量辅助功能。辅助函数自己没有什么问题;他们是直接并且重要的。若是咱们达到了数百个这样的函数,它们的做用就会开始大打折扣了。更通用的方法是继续使用worker.postMessage()。

因此让咱们看看是否可使这个方法返回一个promise,就像咱们上一节中的helper函数同样。这样,咱们继续使用细粒度postMessage()方法,但改进了咱们的同步语义。首先,看看这里的worker代码:

addEventListener('message', (e) => {

    //咱们将发回主线程的结果,
    //它应该始终包含消息id。
    var result = {id: e.data.id};

    //基于“action”,计算响应值“value”。
    //选项是单独保留文本,
    //将其转换为大写,或将其转换为小写。
    if (e.data.action === 'echo') {
        result.value = e.data.value;
    } else if (e.data.action === 'upper') {
        result.value = e.data.value.toUpperCase();
    } else if (e.data.action === 'lower') {
        result.value = e.data.value.toLowerCase();
    }
});

//经过等待延时模拟一个运行时间很长的worker,
//它在1秒后返回结果。
setTimeout(() => {
    postMessage(result);
}, 1000);

这与咱们迄今为止在Web worker代码中看到的彻底不一样。如今,在主线程中,咱们必须弄清楚如何改变Worker的接口。咱们如今就这样作。而后,咱们将尝试向此worker发布一些消息并将处理promises做为响应:

//这个对象包含promises的解析器函数,
//当结果从worker那里返回时,咱们经过id在这里查看。
var resolvers = {};

//保持“postMessage()”的原始实现,
//因此咱们能够稍后在咱们的自定义“postMessage()”中调用它。
var postMessage = Worker.prototype.postMessage;

//用咱们的自定义实现替换“postMessage()”。
Worker.prototype.postMessage = function(data) {
    return new Promise((resolve, reject) => {

    //用于将Web worker响应和解析器函数绑定在一块儿的id。
    var msgId = id.next().value;

    //存储解析器以便之后能够在Web worker消息回调使用。
    resolvers[msgId] = resolve;

    //运行原始的“Worker.postMessage()”实现,
    //实际上负责将消息发布到worker线程。
    postMessage.call(this, Object.assign({
            id: msgId
        }, data));
    });
};

//开始咱们的worker...
var worker = new Worker('worker.js');

worker.addEventListener('message', (e) => {

    //找到合适的解析器函数。
    var resolver = resolvers[e.data.id];

    //从“resolvers”对象中删除它。
    delete resolvers[e.data.id];
    
    //经过调用解析器函数将worker数据传递给promise。
    resolver(e.data.value);
});

worker.postMessage({
    action: 'echo',
    value: 'Hello World'
}).then((value) => {
    console.log('echo', `"${value}"`);
    //→echo “Hello World”
});

worker.postMessage({
    action: 'upper',
    value: 'Hello World'
}).then((value) => {
    console.log('upper', `"${value}"`);
    //→upper “HELLO WORLD”
});

worker.postMessage({
    action: 'lower',
    value: 'Hello World'
}).then((value) => {
    console.log('lower',`"${value}"`);
    //→lower “hello world”
});

嗯,这正是咱们须要的,对吧?咱们能够直接将消息数据发布给worker,并经过promise解析将响应数据发送给咱们。做为一个额外的好处,若是咱们如此倾向,咱们实际上能够围绕这个新的postMessage()函数实现包装辅助函数。主要参与完成这项工做的技巧是存储对原始postMessage()的引用。而后,咱们覆盖web worker属性postMessage,而不是函数自己。最后,咱们能够复用它并添加必要的协调来保证好用。

同步worker结果

该代码在最后2段已经充分下降了web workers回调地狱到可接受的水平。在事实上,如今咱们已经有了一个方法处理如何封装web workers通讯由具备的postMessage()返回一个promise,咱们准备要开始简化一些未使用这种方法的混乱的worker代码。咱们已经了解了这些例子的,因此到目前为止,已经从promise中获益良多,他们是简单的; 没有这些抽象不会是世界末日。

那么咱们映射数据集合而后映射和迭代集合的场景呢?咱们能够回顾map/reduce代码在“第6章,实用的并行”。这主要是因为全部worker通讯模板代码与尝试执行map/reduce操做的代码混合在一块儿。让咱们看看使用promise技术是否更好。首先,咱们将建立一个很是基本的worker:

//返回一个输入数组的映射,
//它经过平方数组中的每一个数字。
addEventListener('message', (e) => {
    postMessage({
        id: e.data.id,
        value: e.data.value.map(v => v * v)
    });
});

咱们可使用此worker传递数组进行映射。所以,咱们将建立其中两个并在两个worker之间拆分工做负载,以下所示:

function onMessage(e) {

    //找到合适的解析器函数。
    var resolver = resolvers[e.data.id];

    //从“resolvers”对象中删除它。
    delete resolvers[e.data.id];

    //经过调用解析器函数将worker数据传递给promise
    resolver(e.data.value);
}

//开始咱们的worker...
var worker1 = new Worker('worker.js'),
    worker2 = new Worker('worker.js');

//建立一些要处理的数据。
var array = new Array(50000).fill(null).map((v, i) => i);

//当worker返回数据时,找到适当的解析器函数来调用。
worker1.addEventListener('message', onMessage);
worker2.addEventListener('message', onMessage);

//将输入数据拆分为2,给出前半部分到第一个worker,
//给出后一部分到第二个worker。在这一点上,咱们有两个promises。
var promise1 = worker1.postMessage({
    value: array.slice(0, Math.floor(array.length / 2))
});

var promise2 = worker2.postMessage({
    value: array.slice(Math.floor(array.length / 2))
});

//使用“Promise.all()”来同步workers
//比手动尝试协调整个worker回调函数要容易得多。
Promise.all([promise1, promise2]).then((values) => {
    console.log('reduced', [].concat(...values).reduce((r, v) => r + v));
    //→reduced 41665416675000
});

这就是咱们须要向worker发布数据以及同步来自两个或更多worker的数据时,咱们实际上就有动力编写并发代码 - 它看起来与如今的其余应用程序代码相同。

惰性workers

如今是咱们从不一样角度看待web workers的时候了。咱们使用worker的根本原​​因是咱们想要在相同的时间内计算比过去更多的数据。正如咱们如今所知,这样作涉及消息传递错综复杂,能够说是分而治之的策略。咱们必须经过将数据输入和输出worker,一般使用数组。

生成器帮助咱们实现惰性地计算。也就是说,咱们不想在内存中计算内容或分配数据,直到咱们确实须要它。web workers难以实现这一目标吗?或者咱们能够利用生成器来惰性地并行计算吗?

在本节中,咱们将探讨有关在Web worker中使用生成器的方法。首先,咱们将研究与Web worker相关的开销问题。而后,咱们将编写一些代码经过使用生成器来将数据输入或者输出worker。最后,咱们将看看咱们是否能够惰性地经过一个生成器链在web worker上传递全部数据。

减小开销

主线程能够拆分开销大的Web workers操做,在另外一个线程中运行它们。这意味着DOM可以渲染挂起的更新并处理挂起的用户事件。可是,咱们仍然面临分配大型数组的开销和更新UI所需的时间。尽管与Web worker并行处理,但咱们的用户仍然可能面临运行缓慢,由于在处理完整个数据集以前,UI没有更新。这是常见的模式的示图:

image146.gif

这是具备单个worker的数据所采用的通用路径; 当有多个worker时,一样的方法也适用。使用这种方法,咱们没法避免须要将数据序列化两次这一事实,咱们必须分配两次。这些开销仅仅是为了促进worker的通讯,而与咱们试图实现的应用程序功能几乎没有关系。

worker通讯所需的数组和序列化开销一般不是什么大问题。可是,对于更大的集合,咱们可能会面临真正的性能问题,这源于咱们用于提升性能的机制。所以,从另外一个角度看worker通讯不会受到损失,即便最初没有必要。

这是大多数worker采用的通用路径的变体。不是预先分配和序列化大量数据,而是将单个项传入和传出worker。这使得UI有机会在全部处理的数据到达以前使用已处理的数据进行更新。

image147.gif

在workers中生成值

若是咱们想要在workers生成结果时更新UI,那么他们没法将结果集打包为数组,以便在完成全部计算后发送回主线程。当发生这种状况时,UI就停在那里而不响应用户。咱们但愿一个惰性的方法,其中值是在一段时间产生一个,这样UI就能够越快被更新。让咱们创建一个例子,将输入发送到该web workers,而后将结果以一个比咱们以前在这本书已经看到的更细微的水平发送回来:

首先,咱们将创造一个worker; 它的代码以下:

//消耗一些CPU循环...
//源自http://adambom.github.io/parallel.js/
function work(n) {
    var i = 0;
    while(++i < n * n) {}
    return i;
}

//将调用“work()”的结果发回给主线程
addEventListener('message', (e) => {
    postMessage(work(e.data));
});

这里没有什么可大不了的。它与咱们已经习惯的经过低效率地对数字进行减慢运行的代码的work()函数相同。worker内部没有使用实际的生成器。这是由于咱们真的不须要,咱们立刻就会明白为何:

//建立一个“update()”协程,
//在生成结果时持续更新UI。
var update = coroutine(function* () {
    var input;
    
    while (true) {
        input = yield;
        console.log('result', input.data);
    }
});

//建立worker,并指定“update()”协程
//做为“message”回调处理程序。
var worker = new Worker('worker.js');
worker.addEventListener('message', update);

//一个数字逐渐变大的数组
var array = new Array(10).fill(null).map((v, i) => i * 10000);

//迭代数组,将每一个数字做为私有消息传递给worker。
for(let item of array) {
    worker.postMessage(item);
}
//→
//result 1
//result 100000000
//result 400000000
//result 900000000
//result 1600000000
//result 2500000000
//result 3600000000
//result 4900000000
//result 6400000000
//result 8100000000

传递给咱们worker的每一个数字的处理成本都比前一个数字要高。总的来讲,在向用户显示任何内容以前处理整个输入数组会以为应用程序挂起或出错了。可是,这不是这种状况,由于虽然每一个数字的处理开销很高,但咱们会在结果可用时将结果发布回来。

咱们经过传入一个数组来执行和将数组做为输出返回来执行有着相同的工做量。然而,这种方法只是改变了事情发生的顺序。咱们在演示中引入了协做式多任务 - 在一个任务中计算一些数据并在另外一个任务中更新UI。完成工做所花费的总时间是相同的,但对于用户来讲,感受要快得多。总得说来,用户可感知的应用程序性能是惟一重要的性能指标。

咱们将输入做为单独的消息传递。咱们能够将输入做为数组传递,单独发布结果,并得到相同的效果。可是,这可能
仅仅是没必要要的复杂性。对于模式有天然的对应关系,由于它是 - 项目输入,项目输出。若是你不须要就不要改变它。

惰性worker链

正如咱们在“第4章,使用Generator实现惰性计算”看到,咱们能够组装生成器链。这就是咱们惰性地实现复杂函数的方式;一个项流经一系列生成器函数,这些函数在生成以前将项转换为下一个生成器,直到它到达调用者。没有生成器,咱们必须分配大量的中间数据结构,只是为了将数据从一个函数传递到下一个函数。

在本文以前的部分中,咱们看到Web worker可使用相似于生成器的模式。因为咱们在这里面临相似的问题,咱们不但愿分配大型数据结构。咱们能够经过在更细粒度级别传递项目来避免这样作。这具备保持UI响应的额外好处,由于咱们可以在最后一个项目从worker到达以前更新它。鉴于咱们能够与worker作不少事情,咱们难道不能基于在这个想法并组装更复杂的worker处理节点链吗?

例如,假设咱们有一组数字和几个转换。咱们在UI中显示这些转换以前,咱们须要按特定顺序进行这些转换。理想状况下,咱们会设置一个worker链,每一个worker负责执行其指定的转换,而后将输出传递给下一个worker。最终,主线程得到一个能够在DOM中显示的值。

这个目标的问题在于它所涉及的很棘手的通讯。因为专用worker只与建立它们的主线程进行通讯,所以将结果发送回主线程,而后发送到链中的下一个worker线程,这几乎没有什么益处。好吧,事实证实,专用worker能够直接通讯而不涉及主线程。咱们能够在这里使用称为频道消息的东西。这个想法很简单; 它涉及建立一个频道,它有两个端口 - 消息在一个端口上发布并在另外一个端口上接收。

咱们一直在使用消息传递频道和端口。他们被卷入web workers。这是消息事件和postMessage()方法模式的来源。如下是咱们如何使用频道和端口链接咱们的Web worker的示图:

image151.gif

咱们能够看到,每一个频道使用两个消息传递端口。第一端口是用于发布消息,而所述第二端口被使用来接收消息事件。主线程惟一一次使用是当所述处理链首先被用于发布一个消息给第一信道和当该消息从第三信道被接收到的消息。不要让worker通讯所需的六个端口吓倒咱们,让咱们写一些代码; 也许,那里看起来会更易于理解。首先,咱们将建立链中使用的worker。实际上,他们是同一个worker的两个实例。下面是代码:

addEventListener('message', (e) => {

    //获取用于发送和接收消息的端口。
    var [port1, port2] = e.ports;

    //侦听第一个端口的传入消息。
    port1.addEventListener('message', (e) => {
        
        //在第二个端口上响应,结果为调用“work()”。
        port2.postMessage(work(e.data));
    });

    //启动两个端口。
    port1.start();
    port2.start();
});

这是颇有趣的。在这个worker中,咱们有消息端口可使用。第一个端口用于接收输入,第二个端口用于发送输出。该work()函数简单地使用咱们如今熟悉的平方数消耗CPU周期来看workers如何表现。咱们在主线程中想要作的是建立这个worker的两个实例,这样咱们就能够传递第一个平方数的实例。而后,在不将结果传递回主线程的状况下,它将结果传递给下一个worker,并再次对数字进行平方。通讯路线应该与前面的图表很是类似。让咱们看一下使用消息传递通道链接worker的一些代码:

//开始咱们的worker...
var worker1 = new Worker('worker.js');
var worker2 = new Worker('worker.js');

//建立通讯所需的在两个worker之间的消息通道。
var channel1 = new MessageChannel();
var channel2 = new MessageChannel();
var channel3 = new MessageChannel();

//咱们的“update()”协程会记录worker的响应
var update = coroutine(function* () {
    var input;
    
    while (true) {
        input = yield;
        console.log('result', input.data);
    }
});

//使用“worker1”链接“channel1”和“channel2”。
worker1.postMessage(null, [
    channel1.port2,
    channel2.port1
]);

//使用“worker2”链接“channel2”和“channel3”。
worker2.postMessage(null, [
    channel2.port2,
    channel3.port1
]);

//将咱们的协程“update()”链接到收到“channel3”任何消息。
channel3.port2.addEventListener('message', update);
channel3.port2.start();

//咱们的输入数据 - 一组数字。
var array = new array(25)
    .fill(null)
    .map((v, i) => i*10);

//将每一个数组项发布到“channel1”。
for (let item of array) {
    channel1.port1.postMessage(item);
}

除了咱们要发送给worker的数据以外,咱们还能够发送一个消息端口列表,咱们但愿将这些消息端口传输到worker上下文。这就是咱们对发送给worker的前两条消息的处理方式。消息数据为空,由于咱们没有对它作任何事情。实际上,这些是咱们发送的惟一消息直接给worker。通讯的其他部分经过咱们建立的消息通道进行。开销大的计算发生在worker上,由于那是消息处理程序所在的位置。

使用Parallel.js

使用Parallel.js库的目的是为了使与Web worker交互尽量的无缝。在事实上,它完成了这本书的一个关键目标,它隐藏并发机制,并让咱们可以专一于咱们正在构建的应用程序。

在本节中,咱们将介绍Parallel.js对worker通讯采起的方法以及将代码传递给worker的通用方法。而后,咱们将介绍一些使用Parallel.js生成新worker线程的代码。最后,咱们将探索这个库已经提供的内置map/reduce功能。

它怎么工做的

在本书中到目前为止咱们使用的全部worker都是咱们本身创造的。咱们在worker中实现了消息事件处理,计算某些值,而后发布响应。使用Parallel.js,咱们不实现worker。相反,咱们实现函数,而后将函数传递给由库管理的workers。

这给咱们带来了一些麻烦。咱们全部的代码都在主线程中实现,这意味着更容易使用在主线程中实现的函数,由于咱们不须要使用importScripts()将它们导入到Web worker中。咱们也不须要经过脚本目录建立Web worker并手动启动它们。相反,咱们让Parallel.js为咱们生成新的worker,而后咱们能够经过将函数和数据传递给他们来告诉worker该作什么。那么,这到底是如何工做的呢?

workers须要一个脚本参数。没有有效的脚本,worker根本没法工做。Parallel.js有一个简单的eval脚本。这是传递给库建立的worker的内容。而后,主线程中的API将在worker中进行评估代码,并在须要与workers通讯时将其发送。

这是可行的,由于Parallel.js的目的不是暴露worker支持的大量功能。相反,目标是使worker通讯机制尽量无缝,同时提供最小的功能。这样能够轻松构建与咱们的应用程序相关的并发功能,而不是咱们永远不会使用的许多其余功能。

如下是咱们如何使用Parallel.js和它的eval脚本将数据和代码传递给worker的说明:

image152.gif

生成workers

Parallel.js库有一个做业的概念。做业的主要输入是做业要处理的数据。做业的建立并不直接与后台worker的建立联系在一块儿。workers与Parallel.js中的做业不一样;使用库时,咱们不直接与worker交互。一旦咱们有了做业实例,而且它提供了咱们的数据,咱们就会使用一个做业方法来调用workers。

最基本的方法是spawn(),它将一个函数做为参数并在Web worker中运行它。咱们传递给它一个函数做为参数而且在web worker中运行。咱们传递给它的函数能够返回结果,而后将它们解析为一个thenable对象被spawn()函数返回。让咱们看一下使用Parallel.js生成由一个web worker返回的新做业的代码:

//一个数字输入数组。
var array = new Array(2500)
    .fill(null)
    .map((v, i) => i);

//建立一个新的并行做业。
//在这里没有worker的建立 - 
//咱们只传递咱们正在使用的构造数据。
var job = new Parallel(array);

//为咱们的“spawn()”做业启动一个定时器。
console.time(`${array.length} items`);

//建立一个新的Web worker,并将咱们的数据和这个函数传递给它。
//咱们正在慢慢映射数组的每一个数字到它的平方。
job.spawn((coll) => {
    return coll.map((n) => {
        var i = 0;
        while(++i < n*n) {}
        return i;
    });

    //“spawn()”的返回值是thenable。含义
    //咱们能够分配一个“then()”回调函数,
    //就像返回的promise那样。
}).then((result) => {
    console.timeEnd(`${array.length} items`);
    //→2500 items:3408.078ms
});

那么如今,这很不错; 咱们没必要担忧任何单调的Web worker生命周期任务。咱们有一些数据和一些咱们想要应用于数据的函数,咱们但愿与页面上发生的其余做业并行运行。最吸引人的是熟悉的thenable,从那里返回的spawn()方法。它适用于咱们的并发应用程序,其中全部其余应用程序都被视为promise。

咱们记录处理咱们提供的输入数据的函数所需的时间。咱们只为这个任务生成一个Web worker,所以在主线程中计算获得的结果与原来的时间相同。除了释放主线程来处理DOM事件和重绘以外,没有实际的性能提高。咱们将看看是否可使用一个不一样的方法来提高并发级别。

当咱们完成后,spawn()建立的worker当即终止。这为咱们释放了内存。可是,没有并发级别来管理
spawn()的使用,若是咱们愿意,咱们能够连续调用它100次。

Mapping and reducing

在上一节中,咱们使用spawn()方法生成了一个worker线程。Parallel.js还有一个map()方法和一个reduce()方法。这个方法是让事情变得更轻松。经过传递map()函数,库将自动将其应用于做业数据中的每一个项。相似的语义适用于reduce()方法。让咱们经过编写一些代码来看看它是如何工做的:

//一个数字输入数组。
var array = new Array(2500)
    .fill(null)
    .map((v, i) => i);

//建立一个新的并行做业。
//这里不会建立workers - 咱们只传递咱们正在使用的构造数据。
var job1 = new Parallel(array);

//为咱们的“spawn()”做业启动一个计时器。
console.time('JOB1');

//这里的问题是Parallel.js会为每一个数组元素建立一个新的worker,
//致使并行减速。
job1.map((n) => {
    var i = 0;
    while (++i < n*n) {}
    return i;
}).reduce((pair) => {
 
    //将数组项reduce为一个总和。
    return pair[0] + pair[1];
}).then((data) => {
    console.log('job1 reduced', data);
    //→job1 reduced 5205208751
    
    console.timeEnd('job1');
    //→job1:59443.863ms
});

哎哟! 这是一个很是重要的性能 - 这里发生了什么?咱们在这里看到的是一种称为并行减速的现象。当并行通讯开销过多时,会发生这种减速。在这个特定示例中发生这种状况的缘由是因为Parallel.js在map()中处理数组的方式。每一个数组项都经过一个worker。这并不意味着建立了2500个worker - 一个worker用于数组中的每一个元素。建立的worker数量最多只能达到4或者咱们在本书前面看到的navigator.hardwareConcurrency值。

在真正的开销来自于发送的消息并收到了worker-5000个消息!这显然不是最优的,由于由代码中的定时器给证实。让咱们看看是否可以作出一个对这些数字的大幅改善,同时保持大体相同的代码结构:

//更快的执行。
var job2 = new Parallel(array);

console.time('job2');

//在映射数组以前,将数组拆分为较小的数组块。
//这样,每一个Parallel.js worker都是处理数组而不是数组项。
//这避免了发送数千个Web worker消息。
job2.spawn((data) => {
    var index = 0,
        size = 1000,
        results = [];

    while (true) {
        let chunk = data.slice(index, index + size);

        if (chunk.length) {
            results.push(chunk);
            index += size;
        } else {
            return result;
        }
    }
}).map((array) => {

    //返回数组块的映射。
    return array.map((n) => {
        var i = 0;
        while(++i < n * n) {}
        return i;
    });
}).reduce((pair) => {

    //将数组块或数字reduce为一个总和。
    return(Array.isArray(pair[0]) ? 
        pair[0].reduce((r, v) => r + v) :
        pair[0]) + (Array.isArray(pair[1]) ? 
        pair[1].reduce((r, v) => r + v) : 
        pair[1]);
}).then((data) => {

    console.log('job2 reduced', data);
    //→job2 resuced 5205208751
});

console.timeEnd('job2');
//→job2:2723.661ms

在这里,咱们能够看到的是在一样的结果被产生,而且快得多。不一样之处在于咱们开始工做以前将数组切片成的阵列较小的数组块。这些数组就是传递给workers的项,而不是单个的数。因此映射做业略微有好的改变,而平方一个数字,它映射一个较小的数组到平方的数组。该reduce的逻辑是稍微复杂一些,但整体来讲,咱们的作法是仍然是相同的。最重要的是,咱们已经删除了大量的消息传递瓶颈,他们在第一次执行形成不可接受的性能缺陷。

就像spawn()方法在返回时清理worker同样,Parallel.js中的map()和reduce()方法也是如此。
释放worker的缺点是,不管什么时候调用这些方法,都须要从新建立它们。咱们将在下一节讨论这个挑战。

worker线程池

本章的最后一节介绍了worker线程池的概念。在上一节关于Parallel.js的介绍中,咱们遇到了常常建立和终止worker的问题。这须要不少开销。若是知道咱们可以运行的并发级别,那么为何不分配一个能够承担工做的静态大小的worker线程池?

建立worker线程池的第一个设计任务是分配worker。下一步是经过将做业分发给池中的可用worker来计划做业。最后,当全部worker都在运行时,咱们须要考虑忙碌状态。让咱们开始吧。

分配池

在考虑分配worker线程池以前,咱们须要查看整体worker抽象池。咱们如何但愿它的外观和行为是怎样的?理想状况下,咱们但愿抽象池的外观和行为相似于普通的专用worker。咱们能够向线程池发布消息并得到promise做为响应。所以,虽然咱们没法直接扩展Worker原型,但咱们能够建立一个与Worker API很是类似的新的抽象。

咱们如今来看一些代码吧。这是咱们将使用的初始抽象:

//表示Web worker线程的“池”,
//隐藏在后面单个Web worker接口的接口。
function WorkerPool(script) {
    //并发级别,或者Web worker要创造的数量。
    //这使用了“hardwareConcurrency”属性(若是存在)。
    //不然,默认为4,
    //由于这是对最多见的CPU结构进行的合理猜想。
    var concurrency = navigator.hardwareConcurrency || 4;

    //worker实例自己存储在Map中,做为键。
    //咱们立刻就会明白为何。
    var workers = this.workers = new Map();

    //对于发布的消息存在队列,全部worker都很忙。
    //因此这可能永远不会被用到的。
    var queue = this.queue = [];

    //用于下面建立worker程序实例,
    //以及添加事件监听器。
    var worker;
    for (var i = 0; i < concurrency; i++) {
        worker = new Worker(script);
        worker.addEventListener('message', function(e) {

            //咱们使用“get()”方法来查找promise的“resolve()”函数。
            //该worker是关键。咱们调用的从worker返回的数据的解析器
            //而且能够将其重置为null。
            //这个很重要,由于它表示worker是空闲的,
            //能够承担更多工做。
            workers.get(this)(e.data);
            workers.set(this, null);

            //若是有排队的数据,咱们获得第一个
            //队列中的“data”和“resolver”。
            //咱们用数据调用“postMessage()”以前,
            //咱们使用新的“resolve()”函数更新“workers”映射。
            if (queue.length) {
                var [data, resolver] = queue.shift();
                workers.set(this, resolver);
                this.postMessage(data);
            }
            
            //这是worker的初始设置,做为在“worker”映射中的键。
            //它的值为null,意味着没有解析函数,它能够承担工做。
            this.workers.set(worker, null);
        }.bind(worker));
    }
}

建立新的WorkerPool时,给定的脚本用于生成线程池中的全部worker。该worker属性是一个Map实例,worker实例自己是做为键。咱们将worker存储为映射键的缘由是咱们能够轻松地查找适当的解析器函数来调用。

当给定的worker程序响应时,调用咱们添加到每一个worker的消息事件处理程序,这就是咱们找的等待调用的解析器函数的地方。咱们不可能调用错误的解析器,由于给定的worker在完成当前任务以前不会接受新的任务。

调度任务

如今咱们将实现postMessage()方法。这是调用者用于将消息发布到池中的一个worker。调用者不知道哪一个worker知足了他们的要求,他们也不关心。他们将promise做为返回值,并以worker响应做为解析值:

WorkerPool.prototype.postMessage = function(data) {

    //“workers”Map映射实例,其中包含全部存储的Web worker。
    var workers = this.workers;

    //当全部worker都很忙时消息被放在“queue”队列中
    var queue = this.queue;

    //尝试找一个可用的worker。
    var worker = this.getWorker();

    //promise会当即返回给调用者,
    //即便没有worker可用。
    return new Promise(function(resolve) {

        //若是找到了worker,咱们能够更新Map映射,
        //使用worker做为键,并使用“resolve()”函数做为值。
        //若是没有worker,那么消息数据以及“resolve()”函数被推送到“queue”队列。
        if (worker) {
            workers.set(worker, resolve);
            worker.postMessage(data);
        } else {
            queue.push([data, resolve]);
        }
    });
};

它是promise执行器函数,实际负责查找第一个可用的worker并在那里发布咱们的消息。当找到可用的worker时,咱们还在咱们的worker映射中设置了worker的解析器函数。若是池中没有可用的worker程序,已发布的消息则将进入队列。此队列在消息事件处理程序中清空。这是由于当worker返回消息时,这意味着worker是空闲的能够承担更多工做,而且在返回空闲状态以前检查是否有任何worker排队。

该getWorker()方法是一个简单的辅助函数为咱们查找下一个可用的worker。咱们知道若是一个worker在workers映射中将其解析器函数设置为null,则能够执行该任务。最后,让咱们看看这个worker线程池的应用:

//建立一个新的线程池和一个负载计数器。
var pool = new WorkerPool('worker.js');
var workload = 0;

document.getElementById('work').addEventListener('click', function(e) {

    //获取咱们要传递给worker的数据,
    //并为此负载建立计数器。
    var amount = +document.getElementById('amount').value,
        timer = 'Workload' + (++workload);

    console.time(timer);

    //将消息传递给线程池,并在promise完成时,中止计时器。
    pool.postMessage(amount).then(function(result) {
        console.timeEnd(timer);
    });

    //若是消息开始排队,
    //咱们的线程池就是过载并显示警告。
    if (pool.queue.length) {
        console.warn('worker pool is getting busy...');
    }
});

在这种使用场景中,咱们有几个表单控件将参数化工做发送给worker。数字越大,工做时间越长; 它使用标准的work()函数来缓慢地对数字做平方。若是咱们使用大量数字并频繁单击按钮将消息发布到线程池中,那么最终咱们将耗尽线程池中可用的资源。若是是这种状况,咱们将显示警告。可是,这仅用于故障排除,当线程池繁忙时,发布的消息不会丢失,它们只是排队等候。

小结

本章的重点是从代码中删除突兀的并发语法。它只是提升了咱们应用程序成功运行的可能性,由于咱们将拥有易于维护和构建的代码。咱们解决的第一个问题是经过使全部内容都是并发的方式来编写并发代码。当没有所涉及的猜想成分时,咱们的代码就是一致的,不易受并发错误的影响。

而后,咱们研究了抽象Web worker通讯能够采起的各类方法。辅助函数是一个选项,所以扩展了postMessage()方法。而后,当咱们须要UI响应时,咱们解决了Web workers的一些限制。即便咱们的大型数据集处理速度更快,咱们仍然存在更新UI的问题。这是经过将Web worker做为生成器处理来完成的。

咱们没必要本身编写全部这些JavaScript并发工具方法。咱们花了一些时间来研究Parallel.js库的各类功能和限制。咱们以介绍Web worker线程池结束了本章。这些消除了与worker建立和终止相关的大量开销,而且它们极大地简化了任务的分配和结果的协调。

这些都是适用于前端的并发话题。如今是时候切换一下,使用NodeJS查看后端的JavaScript并发性。

最后补充下书籍章节目录

另外还有讲解两章nodeJs后端并发方面的,和一章项目实战方面的,这里就再也不贴了,有兴趣可转向https://github.com/yzsunlei/javascript_concurrency_translation查看。

相关文章
相关标签/搜索