Node.js 多线程彻底指南

翻译:疯狂的技术宅
原文: https://blog.logrocket.com/a-...

本文首发微信公众号:jingchengyideng
欢迎关注,天天都给你推送新鲜的前端技术文章javascript


不少人都想知道单线程的 Node.js 怎么能与多线程后端竞争。考虑到其所谓的单线程特性,许多大公司选择 Node 做为其后端彷佛违反直觉。要想知道缘由,必须理解其单线程的真正含义。前端

JavaScript 的设计很是适合在网上作比较简单的事情,好比验证表单,或者说建立彩虹色的鼠标轨迹。 在2009年,Node.js的创始人 Ryan Dahl使开发人员能够用该语言编写后端代码。java

一般支持多线程的后端语言具备各类机制,用于在线程和其余面向线程的功能之间同步数据。要向 JavaScript 添加对此类功能的支持,须要修改整个语言,这不是 Dahl 的目标。为了让纯 JavaScript 支持多线程,他必须想一个变通方法。接下来让咱们探索一下其中的奥秘……node

Node.js 是如何工做的

Node.js 使用两种线程:event loop 处理的主线程和 worker pool 中的几个辅助线程。git

事件循环是一种机制,它采用回调(函数)并注册它们,准备在未来的某个时刻执行。它与相关的 JavaScript 代码在同一个线程中运行。当 JavaScript 操做阻塞线程时,事件循环也会被阻止。程序员

工做池是一种执行模型,它产生并处理单独的线程,而后同步执行任务,并将结果返回到事件循环。事件循环使用返回的结果执行提供的回调。github

简而言之,它负责异步 I/O操做 —— 主要是与系统磁盘和网络的交互。它主要由诸如 fs(I/O 密集)或 crypto(CPU 密集)等模块使用。工做池用 libuv 实现,当 Node 须要在 JavaScript 和 C++ 之间进行内部通讯时,会致使轻微的延迟,但这几乎不可察觉。面试

基于这两种机制,咱们能够编写以下代码:算法

fs.readFile(path.join(__dirname, './package.json'), (err, content) => {
 if (err) {
   return null;
 }

 console.log(content.toString());
});

前面提到的 fs 模块告诉工做池使用其中一个线程来读取文件的内容,并在完成后通知事件循环。而后事件循环获取提供的回调函数,并用文件的内容执行它。typescript

以上是非阻塞代码的示例,咱们没必要同步等待某事的发生。只需告诉工做池去读取文件,并用结果去调用提供的函数便可。因为工做池有本身的线程,所以事件循环能够在读取文件时继续正常执行。

在不须要同步执行某些复杂操做时,这一切都相安无事:任何运行时间太长的函数都会阻塞线程。若是应用程序中有大量这类功能,就可能会明显下降服务器的吞吐量,甚至彻底冻结它。在这种状况下,没法继续将工做委派给工做池。

在须要对数据进行复杂的计算时(如AI、机器学习或大数据)没法真正有效地使用 Node.js,由于操做阻塞了主(且惟一)线程,使服务器无响应。在 Node.js v10.5.0 发布以前就是这种状况,在这一版本增长了对多线程的支持。

简介:worker_threads

worker_threads 模块容许咱们建立功能齐全的多线程 Node.js 程序。

thread worker 是在单独的线程中生成的一段代码(一般从文件中取出)。

注意,术语 thread workerworkerthread 常常互换使用,他们都指的是同一件事。

要想使用 thread worker,必须导入 worker_threads 模块。让咱们先写一个函数来帮助咱们生成这些thread worker,而后再讨论它们的属性。

type WorkerCallback = (err: any, result?: any) => any;

export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) {
 const worker = new Worker(path, { workerData });

 worker.on('message', cb.bind(null, null));
 worker.on('error', cb);

 worker.on('exit', (exitCode) => {
   if (exitCode === 0) {
     return null;
   }

   return cb(new Error(`Worker has stopped with code ${exitCode}`));
 });

 return worker;
}

要建立一个 worker,首先必须建立一个 Worker 类的实例。它的第一个参数提供了包含 worker 的代码的文件的路径;第二个参数提供了一个名为 workerData 的包含一个属性的对象。这是咱们但愿线程在开始运行时能够访问的数据。

请注意:无论你是用的是 JavaScript, 仍是最终要转换为 JavaScript 的语言(例如,TypeScript),路径应该始终引用带有 .js.mjs 扩展名的文件。

我还想指出为何使用回调方法,而不是返回在触发 message 事件时将解决的 promise。这是由于 worker 能够发送许多 message 事件,而不是一个。

正如你在上面的例子中所看到的,线程间的通讯是基于事件的,这意味着咱们设置了 worker 在发送给定事件后调用的侦听器。

如下是最多见的事件:

worker.on('error', (error) => {});

只要 worker 中有未捕获的异常,就会发出 error 事件。而后终止 worker,错误能够做为提供的回调中的第一个参数。

worker.on('exit', (exitCode) => {});

在 worker 退出时会发出 exit 事件。若是在worker中调用了 process.exit(),那么 exitCode 将被提供给回调。若是 worker 以 worker.terminate() 终止,则代码为1。

worker.on('online', () => {});

只要 worker 中止解析 JavaScript 代码并开始执行,就会发出 online 事件。它不经常使用,但在特定状况下能够提供信息。

worker.on('message', (data) => {});

只要 worker 将数据发送到父线程,就会发出 message 事件。

如今让咱们来看看如何在线程之间共享数据。

在线程之间交换数据

要将数据发送到另外一个线程,能够用 port.postMessage() 方法。它的原型以下:

port.postMessage(data[, transferList])

port 对象能够是 parentPort,也能够是 MessagePort 的实例 —— 稍后会详细讲解。

数据参数

第一个参数 —— 这里被称为 data —— 是一个被复制到另外一个线程的对象。它能够是复制算法所支持的任何内容。

数据由结构化克隆算法进行复制。引用自 Mozilla:

它经过递归输入对象来进行克隆,同时保持以前访问过的引用的映射,以免无限遍历循环。

该算法不复制函数、错误、属性描述符或原型链。还须要注意的是,以这种方式复制对象与使用 JSON 不一样,由于它能够包含循环引用和类型化数组,而 JSON 不能。

因为可以复制类型化数组,该算法能够在线程之间共享内存。

在线程之间共享内存

人们可能会说像 clusterchild_process 这样的模块在好久之前就开始使用线程了。这话对,也不对。

cluster 模块能够建立多个节点实例,其中一个主进程在它们之间对请求进行路由。集群可以有效地增长服务器的吞吐量;可是咱们不能用 cluster 模块生成一个单独的线程。

人们倾向于用 PM2 这样的工具来集中管理他们的程序,而不是在本身的代码中手动执行,若是你有兴趣,能够研究一下如何使用 cluster 模块。

child_process 模块能够生成任何可执行文件,不管它是不是用 JavaScript 写的。它和 worker_threads 很是类似,但缺乏后者的几个重要功能。

具体来讲 thread workers 更轻量,而且与其父线程共享相同的进程 ID。它们还能够与父线程共享内存,这样能够避免对大的数据负载进行序列化,从而更有效地来回传递数据。

如今让咱们看一下如何在线程之间共享内存。为了共享内存,必须将 ArrayBufferSharedArrayBuffer 的实例做为数据参数发送到另外一个线程。

这是一个与其父线程共享内存的 worker:

import { parentPort } from 'worker_threads';

parentPort.on('message', () => {
 const numberOfElements = 100;
 const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * numberOfElements);
 const arr = new Int32Array(sharedBuffer);

 for (let i = 0; i < numberOfElements; i += 1) {
   arr[i] = Math.round(Math.random() * 30);
 }

 parentPort.postMessage({ arr });
});

首先,咱们建立一个 SharedArrayBuffer,其内存须要包含100个32位整数。接下来建立一个 Int32Array 实例,它将用缓冲区来保存其结构,而后用一些随机数填充数组并将其发送到父线程。

在父线程中:

import path from 'path';

import { runWorker } from '../run-worker';

const worker = runWorker(path.join(__dirname, 'worker.js'), (err, { arr }) => {
 if (err) {
   return null;
 }

 arr[0] = 5;
});

worker.postMessage({});

arr [0] 的值改成5,实际上会在两个线程中修改它。

固然,经过共享内存,咱们冒险在一个线程中修改一个值,同时也在另外一个线程中进行了修改。可是咱们在这个过程当中也获得了一个好处:该值不须要进行序列化就能够另外一个线程中使用,这极大地提升了效率。只需记住管理数据正确的引用,以便在完成数据处理后对其进行垃圾回收。

共享一个整数数组当然很好,但咱们真正感兴趣的是共享对象 —— 这是存储信息的默认方式。不幸的是,没有 SharedObjectBuffer 或相似的东西,但咱们能够本身建立一个相似的结构

transferList参数

transferList 中只能包含 ArrayBufferMessagePort。一旦它们被传送到另外一个线程,就不能再次被传送了;由于内存里的内容已经被移动到了另外一个线程。

目前,还不能经过 transferList(可使用 child_process 模块)来传输网络套接字。

建立通讯渠道

线程之间的通讯是经过 port 进行的,port 是 MessagePort 类的实例,并启用基于事件的通讯。

使用 port 在线程之间进行通讯的方法有两种。第一个是默认值,这个方法比较容易。在 worker 的代码中,咱们从worker_threads 模块导入一个名为 parentPort 的对象,并使用对象的 .postMessage() 方法将消息发送到父线程。

这是一个例子:

import { parentPort } from 'worker_threads';
const data = {
 // ...
};

parentPort.postMessage(data);

parentPort 是 Node.js 在幕后建立的 MessagePort 实例,用于与父线程进行通讯。这样就能够用 parentPortworker 对象在线程之间进行通讯。

线程间的第二种通讯方式是建立一个 MessageChannel 并将其发送给 worker。如下代码是如何建立一个新的 MessagePort 并与咱们的 worker 共享它:

import path from 'path';
import { Worker, MessageChannel } from 'worker_threads';

const worker = new Worker(path.join(__dirname, 'worker.js'));

const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => {
 console.log('message from worker:', message);
});

worker.postMessage({ port: port2 }, [port2]);

在建立 port1port2 以后,咱们在 port1 上设置事件监听器并将 port2 发送给 worker。咱们必须将它包含在 transferList 中,以便将其传输给 worker 。

在 worker 内部:

import { parentPort, MessagePort } from 'worker_threads';

parentPort.on('message', (data) => {
 const { port }: { port: MessagePort } = data;

 port.postMessage('heres your message!');
});

这样,咱们就能使用父线程发送的 port 了。

使用 parentPort 不必定是错误的方法,但最好用 MessageChannel 的实例建立一个新的 MessagePort,而后与生成的 worker 共享它。

请注意,在后面的例子中,为了简便起见,我用了 parentPort

使用 worker 的两种方式

能够经过两种方式使用 worker。第一种是生成一个 worker,而后执行它的代码,并将结果发送到父线程。经过这种方法,每当出现新任务时,都必须从新建立一个工做者。

第二种方法是生成一个 worker 并为 message 事件设置监听器。每次触发 message 时,它都会完成工做并将结果发送回父线程,这会使 worker 保持活动状态以供之后使用。

Node.js 文档推荐第二种方法,由于在建立 thread worker 时须要建立虚拟机并解析和执行代码,这会产生比较大的开销。因此这种方法比不断产生新 worker 的效率更高。

这种方法被称为工做池,由于咱们建立了一个工做池并让它们等待,在须要时调度 message 事件来完成工做。

如下是一个产生、执行而后关闭 worker 例子:

import { parentPort } from 'worker_threads';

const collection = [];

for (let i = 0; i < 10; i += 1) {
 collection[i] = i;
}

parentPort.postMessage(collection);

collection 发送到父线程后,它就会退出。

下面是一个 worker 的例子,它能够在给定任务以前等待很长一段时间:

import { parentPort } from 'worker_threads';

parentPort.on('message', (data: any) => {
 const result = doSomething(data);

 parentPort.postMessage(result);
});

worker_threads 模块中可用的重要属性

worker_threads 模块中有一些可用的属性:

isMainThread

当不在工做线程内操做时,该属性为 true 。若是你以为有必要,能够在 worker 文件的开头包含一个简单的 if 语句,以确保它只做为 worker 运行。

import { isMainThread } from 'worker_threads';

if (isMainThread) {
 throw new Error('Its not a worker');
}

workerData

产生线程时包含在 worker 的构造函数中的数据。

const worker = new Worker(path, { workerData });

在工做线程中:

import { workerData } from 'worker_threads';

console.log(workerData.property);

parentPort

前面提到的 MessagePort 实例,用于与父线程通讯。

threadId

分配给 worker 的惟一标识符。


如今咱们知道了技术细节,接下来实现一些东西并在实践中检验学到的知识。

实现 setTimeout

setTimeout 是一个无限循环,顾名思义,用来检测程序运行时间是否超时。它在循环中检查起始时间与给定毫秒数之和是否小于实际日期。

import { parentPort, workerData } from 'worker_threads';

const time = Date.now();

while (true) {
    if (time + workerData.time <= Date.now()) {
        parentPort.postMessage({});
        break;
    }
}

这个特定的实现产生一个线程,而后执行它的代码,最后在完成后退出。

接下来实现使用这个 worker 的代码。首先建立一个状态,用它来跟踪生成的 worker:

const timeoutState: { [key: string]: Worker } = {};

而后时负责建立 worker 并将其保存到状态的函数:

export function setTimeout(callback: (err: any) => any, time: number) {
 const id = uuidv4();

 const worker = runWorker(
   path.join(__dirname, './timeout-worker.js'),
   (err) => {
     if (!timeoutState[id]) {
       return null;
     }

     timeoutState[id] = null;

     if (err) {
       return callback(err);
     }

     callback(null);
   },
   {
     time,
   },
 );

 timeoutState[id] = worker;

 return id;
}

首先,咱们使用 UUID 包为 worker 建立一个惟一的标识符,而后用先前定义的函数 runWorker 来获取 worker。咱们还向 worker 传入一个回调函数,一旦 worker 发送了数据就会被触发。最后,把 worker 保存在状态中并返回 id

在回调函数中,咱们必须检查该 worker 是否仍然存在于该状态中,由于有可能会 cancelTimeout(),这将会把它删除。若是确实存在,就把它从状态中删除,并调用传给 setTimeout 函数的 callback

cancelTimeout 函数使用 .terminate() 方法强制 worker 退出,并从该状态中删除该这个worker:

export function cancelTimeout(id: string) {
 if (timeoutState[id]) {
   timeoutState[id].terminate();

   timeoutState[id] = undefined;

   return true;
 }

 return false;
}

若是你有兴趣,我也实现了 setInterval,代码在这里,但由于它对线程什么都没作(咱们重用setTimeout的代码),因此我决定不在这里进行解释。

我已经建立了一个短小的测试代码,目的是检查这种方法与原生方法的不一样之处。你能够在这里找到代码。这些是结果:

native setTimeout { ms: 7004, averageCPUCost: 0.1416 }
worker setTimeout { ms: 7046, averageCPUCost: 0.308 }

咱们能够看到 setTimeout 有一点延迟 - 大约40ms - 这时 worker 被建立时的消耗。平均 CPU 成本也略高,但没什么难以忍受的(CPU 成本是整个过程持续时间内 CPU 使用率的平均值)。

若是咱们能够重用 worker,就可以下降延迟和 CPU 使用率,这就是要实现工做池的缘由。

实现工做池

如上所述,工做池是给定数量的被事先建立的 worker,他们保持空闲并监听 message 事件。一旦 message 事件被触发,他们就会开始工做并发回结果。

为了更好地描述咱们将要作的事情,下面咱们来建立一个由八个 thread worker 组成的工做池:

const pool = new WorkerPool(path.join(__dirname, './test-worker.js'), 8);

若是你熟悉限制并发操做,那么你在这里看到的逻辑几乎相同,只是一个不一样的用例。

如上面的代码片断所示,咱们把指向 worker 的路径和要生成的 worker 数量传给了 WorkerPool 的构造函数。

export class WorkerPool<T, N> {
 private queue: QueueItem<T, N>[] = [];
 private workersById: { [key: number]: Worker } = {};
 private activeWorkersById: { [key: number]: boolean } = {};

 public constructor(public workerPath: string, public numberOfThreads: number) {
   this.init();
 }
}

这里还有其余一些属性,如 workersByIdactiveWorkersById,咱们能够分别保存现有的 worker 和当前正在运行的 worker 的 ID。还有 queue,咱们可使用如下结构来保存对象:

type QueueCallback<N> = (err: any, result?: N) => void;

interface QueueItem<T, N> {
 callback: QueueCallback<N>;
 getData: () => T;
}

callback 只是默认的节点回调,第一个参数是错误,第二个参数是可能的结果。 getData 是传递给工做池 .run() 方法的函数(以下所述),一旦项目开始处理就会被调用。 getData 函数返回的数据将传给工做线程。

.init() 方法中,咱们建立了 worker 并将它们保存在如下状态中:

private init() {
  if (this.numberOfThreads < 1) {
    return null;
  }

  for (let i = 0; i < this.numberOfThreads; i += 1) {
    const worker = new Worker(this.workerPath);

    this.workersById[i] = worker;
    this.activeWorkersById[i] = false;
  }
}

为避免无限循环,咱们首先要确保线程数 > 1。而后建立有效的 worker 数,并将它们的索引保存在 workersById 状态。咱们在 activeWorkersById 状态中保存了它们当前是否正在运行的信息,默认状况下该状态始终为false。

如今咱们必须实现前面提到的 .run() 方法来设置一个 worker 可用的任务。

public run(getData: () => T) {
  return new Promise<N>((resolve, reject) => {
    const availableWorkerId = this.getInactiveWorkerId();

    const queueItem: QueueItem<T, N> = {
      getData,
      callback: (error, result) => {
        if (error) {
          return reject(error);
        }
return resolve(result);
      },
    };

    if (availableWorkerId === -1) {
      this.queue.push(queueItem);

      return null;
    }

    this.runWorker(availableWorkerId, queueItem);
  });
}

在 promise 函数里,咱们首先经过调用 .getInactiveWorkerId() 来检查是否存在空闲的 worker 能够来处理数据:

private getInactiveWorkerId(): number {
  for (let i = 0; i < this.numberOfThreads; i += 1) {
    if (!this.activeWorkersById[i]) {
      return i;
    }
  }

  return -1;
}

接下来,咱们建立一个 queueItem,在其中保存传递给 .run() 方法的 getData 函数以及回调。在回调中,咱们要么 resolve 或者 reject promise,这取决于 worker 是否将错误传递给回调。

若是 availableWorkerId 的值是 -1,意味着当前没有可用的 worker,咱们将 queueItem 添加到 queue。若是有可用的 worker,则调用 .runWorker() 方法来执行 worker。

.runWorker() 方法中,咱们必须把当前 worker 的 activeWorkersById 设置为使用状态;为 messageerror 事件设置事件监听器(并在以后清理它们);最后将数据发送给 worker。

private async runWorker(workerId: number, queueItem: QueueItem<T, N>) {
 const worker = this.workersById[workerId];

 this.activeWorkersById[workerId] = true;

 const messageCallback = (result: N) => {
   queueItem.callback(null, result);

   cleanUp();
 };

 const errorCallback = (error: any) => {
   queueItem.callback(error);

   cleanUp();
 };

 const cleanUp = () => {
   worker.removeAllListeners('message');
   worker.removeAllListeners('error');

   this.activeWorkersById[workerId] = false;

   if (!this.queue.length) {
     return null;
   }

   this.runWorker(workerId, this.queue.shift());
 };

 worker.once('message', messageCallback);
 worker.once('error', errorCallback);

 worker.postMessage(await queueItem.getData());
}

首先,经过使用传递的 workerId,咱们从 workersById 中得到 worker 引用。而后,在 activeWorkersById 中,将 [workerId] 属性设置为true,这样咱们就能知道在 worker 在忙,不要运行其余任务。

接下来,分别建立 messageCallbackerrorCallback 用来在消息和错误事件上调用,而后注册所述函数来监听事件并将数据发送给 worker。

在回调中,咱们调用 queueItem 的回调,而后调用 cleanUp 函数。在 cleanUp 函数中,要删除事件侦听器,由于咱们会屡次重用同一个 worker。若是没有删除监听器的话就会发生内存泄漏,内存会被慢慢耗尽。

activeWorkersById 状态中,咱们将 [workerId] 属性设置为 false,并检查队列是否为空。若是不是,就从 queue 中删除第一个项目,并用另外一个 queueItem 再次调用 worker。

接着建立一个在收到 message 事件中的数据后进行一些计算的 worker:

import { isMainThread, parentPort } from 'worker_threads';

if (isMainThread) {
 throw new Error('Its not a worker');
}

const doCalcs = (data: any) => {
 const collection = [];

 for (let i = 0; i < 1000000; i += 1) {
   collection[i] = Math.round(Math.random() * 100000);
 }

 return collection.sort((a, b) => {
   if (a > b) {
     return 1;
   }

   return -1;
 });
};

parentPort.on('message', (data: any) => {
 const result = doCalcs(data);

 parentPort.postMessage(result);
});

worker 建立了一个包含 100 万个随机数的数组,而后对它们进行排序。只要可以多花费一些时间才能完成,作些什么事情并不重要。

如下是工做池简单用法的示例:

const pool = new WorkerPool<{ i: number }, number>(path.join(__dirname, './test-worker.js'), 8);

const items = [...new Array(100)].fill(null);

Promise.all(
 items.map(async (_, i) => {
   await pool.run(() => ({ i }));

   console.log('finished', i);
 }),
).then(() => {
 console.log('finished all');
});

首先建立一个由八个 worker 组成的工做池。而后建立一个包含 100 个元素的数组,对于每一个元素,咱们在工做池中运行一个任务。开始运行后将当即执行八个任务,其他任务被放入队列并逐个执行。经过使用工做池,咱们没必要每次都建立一个 worker,从而大大提升了效率。

结论

worker_threads 提供了一种为程序添加多线程支持的简单的方法。经过将繁重的 CPU 计算委托给其余线程,能够显着提升服务器的吞吐量。经过官方线程支持,咱们能够期待更多来自AI、机器学习和大数据等领域的开发人员和工程师使用 Node.js.

本文首发微信公众号:jingchengyideng

欢迎扫描二维码关注公众号,天天都给你推送新鲜的前端技术文章

欢迎扫描二维码关注公众号,天天都给你推送新鲜的前端技术文章



欢迎继续阅读本专栏其它高赞文章:

相关文章
相关标签/搜索