Node.js多线程彻底指南

许多人想知道单线程的Node.js如何与多线程后端竞争。所以,考虑到Node既有的单线程特性,那么多的大公司选择Node做为它们的后端彷佛是违反直觉的。要知道为何,咱们必须理解Node单线程的真正含义。javascript

当初建立JavaScript的目的只是为了在web上作一些简单的事情,好比验证表单,或者建立一个彩虹色的鼠标轨迹。直到2009年,Ryan Dahl才建立了Node,使开发人员可以使用该语言编写后端代码。前端

后端语言一般支持多线程,有各类机制能够在线程之间同步数据,以及支持线程相关的其余特性。要在JavaScript中支持这些东西,就须要改变整个语言,而这并非Dahl真正的目标。为了让普通JavaScript支持多线程,他必须建立一个变通方案。让咱们了解一下……java

Node.js究竟是如何工做的

Node.js 使用两种线程:一个主线程由event loop处理,其余辅助线程由worker pool处理。node

事件循环是一种获取回调(函数)并将其注册以备未来执行的机制。它与日常的JavaScript代码在相同的线程中运行。当JavaScript操做阻塞线程时,事件循环也被阻塞。git

Worker pool 是一种执行模型,它生成并处理单独的线程,而后这些线程同步执行任务并将结果返回给事件循环。而后,事件循环使用上述结果执行提供的回调。github

简而言之,它负责异步I/O操做——主要是与系统的磁盘和网络的交互。它主要用于 fs(I/O密集型)或crypto (CPU密集型)等模块。Worker pool 是在libuv中实现的,每当Node须要在JavaScript和C++之间进行内部通讯时,它都会致使轻微的延迟,但这并不明显。web

有了这两种机制,咱们能够这样写代码:算法

fs.readFile(path.join(__dirname, './package.json'), (err, content) => {
 if (err) {
   return null;
 }

 console.log(content.toString());
});

复制代码

前面提到的fs模块告诉worker pool使用它的一个线程来读取文件的内容,并在完成时通知事件循环。而后,事件循环接受提供的回调函数并使用文件的内容执行它。json

以上是一个非阻塞代码的例子。所以,咱们没必要同步等待某件事发生。咱们告诉worker pool读取文件并使用执行结果调用提供的函数。因为worker pool有本身的线程,因此在读取文件时,事件循环能够继续正常执行。后端

在须要同步执行某些复杂的操做以前,一切都是正常的:任何花费太长时间运行的函数都会阻塞线程。若是一个应用程序有不少这样的功能,它可能会显著下降服务器的吞吐量,或者彻底卡死。在这种状况下,没法将工做分配给worker pool。

须要复杂计算的领域——如AI、机器学习或大数据——实际上不能有效地使用Node.js,由于这些操做阻塞了仅有的一个线程(主线程),使服务器无响应。这种状况一直持续到Node.js v10.5.0出现,它增长了对多线程的支持。

worker_threads

worker_threads模块是一个包,它容许咱们建立全功能的多线程Node.js应用程序。

线程worker是在单独的线程中生成的一段代码(一般从文件中获取)。

注意,术语thread workerworkerthread常常交替使用;它们都指的是同一件事。

要开始使用线程worker,咱们必须导入 worker_threads模块。咱们先建立一个函数来帮助生成这些线程worker,而后再讨论它们的属性。

type WorkerCallback = (err: any, result?: any) => any;

export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) {
 const worker = new Worker(path, { workerData });

 worker.on('message', cb.bind(null, null));
 worker.on('error', cb);

 worker.on('exit', (exitCode) => {
   if (exitCode === 0) {
     return null;
   }

   return cb(new Error(`Worker has stopped with code ${exitCode}`));
 });

 return worker;
}

复制代码

要建立worker,咱们必须建立Worker类的一个实例。在第一个参数中,咱们提供了包含worker代码的文件的路径;第二个参数,咱们提供了一个对象,其中包含一个名为workerData的属性。这是咱们但愿线程在开始运行时可以访问的数据。

注意,不管你使用的是JavaScript自己,仍是可转换为JavaScript的语言(好比TypeScript),路径都应该是指向带有 .js.mjs扩展名的文件。

我还想指出为何咱们使用回调方法,而不是返回一个在message 事件触发时被解决的promise。这是由于worker能够发送多个 message 事件,而不是一个。

正如你在上面的示例中所看到的,线程之间的通讯是基于事件的,这就是说咱们能够设置事件监听器,以便在worker触发指定事件时调用它。

如下是最多见的事件:

worker.on('error', (error) => {});
复制代码

当worker中出现未捕获的异常时,就会发出error事件。而后终止worker,错误做为回调函数的第一个参数传递。

worker.on('exit', (exitCode) => {});
复制代码

当worker 退出时,会发送exit事件。若是在worker内部调用了process.exit() ,则会向回调函数提供状态码exitCode 。若是使用worker.terminate()终止worker ,状态码为1。

worker.on('online', () => {});

复制代码

在worker 中止解析JavaScript代码并开始执行时发送online事件。它不经常使用,但能够在特定的状况下提供有效信息。

worker.on('message', (data) => {});

复制代码

worker 向父线程发送数据时会发送message事件。

如今,咱们来看看如何在线程之间共享数据。

线程之间交换数据

要将数据发送到另外一个线程,咱们使用port.postMessage() 方法。函数签名以下:

port.postMessage(data[, transferList])

复制代码

端口对象能够是parentPort,也能够是 MessagePort的一个实例。稍后再详细介绍。

data 参数

第一个参数 data  是一个复制到另外一个线程的对象。它能够包含复制算法支持的任何内容。

数据由结构化克隆算法复制。据Mozilla:

它经过递归遍历输入对象来构建克隆,同时维护之前访问过的引用的映射,以免无限遍历循环。

该算法不复制函数、错误对象、属性描述符或原型链。还应该注意,以这种方式复制对象与JSON不一样,由于它能够包含循环引用和类型化数组,而JSON不能。

经过支持类型化数组的复制,该算法使得在线程之间共享内存成为可能。

线程间共享内存

人们可能会说,cluster 或child_process之类的模块在好久之前就启用了线程。对,也不对。

cluster模块能够建立多个node实例,由一个主进程在它们之间路由分发收到的请求。集群应用程序有效地成倍增长服务器吞吐量;可是,咱们不能使用 cluster 模块派生一个单独的线程。

人们倾向于使用PM2这样的工具管理集群应用程序,而不是在代码中手动处理。可是若是你有兴趣,你能够看下个人这篇关于如何使用cluster模块的帖子

child_process模块能够生成任何可执行文件,无论它是否是JavaScript。它很是相似,可是它缺乏worker_threads 所具备的几个重要特性。

具体来讲,线程worker更轻量级,而且与父线程共享相同的进程ID。它们还能够与父线程共享内存,这使它们能够避免序列化大的数据负载,从而更有效地来回发送数据。

如今让咱们看一个如何在线程之间共享内存的示例。为了共享内存,必须将ArrayBufferSharedArrayBuffer 的实例做为数据参数或置于数据参数内部发送给另外一个线程。

这是一个与父线程共享内存的worker :

import { parentPort } from 'worker_threads';

parentPort.on('message', () => {
 const numberOfElements = 100;
 const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * numberOfElements);
 const arr = new Int32Array(sharedBuffer);

 for (let i = 0; i < numberOfElements; i += 1) {
   arr[i] = Math.round(Math.random() * 30);
 }

 parentPort.postMessage({ arr });
});

复制代码

首先,咱们建立一个SharedArrayBuffer,其中的内存须要包含100个32位整数。接下来,咱们建立一个Int32Array的实例,它将使用缓冲区来保存它的结构,而后咱们用一些随机数填充数组并将其发送给父线程。

父线程:

import path from 'path';

import { runWorker } from '../run-worker';

const worker = runWorker(path.join(__dirname, 'worker.js'), (err, { arr }) => {
 if (err) {
   return null;
 }

 arr[0] = 5;
});

worker.postMessage({});

复制代码

经过将'arr[0]更改成5,咱们实际上在两个线程中都修改了它。

固然,经过共享内存,咱们可能面临一个风险:在一个线程中更改一个值,另外一个线程中也随之改变了。可是咱们同时也得到了一个很是好的特性:不须要序列化值就能够在另外一个线程中使用,这极大地提升了效率。只需记住正确地管理对数据的引用,以便在完成数据处理后对其进行垃圾收集。

共享一个整数数组是能够的,但咱们真正感兴趣的是共享对象——存储信息的默认方式。不幸的是,没有SharedObjectBuffer或相似的东西,但咱们能够本身建立一个相似的结构

transferList 参数

transferList 只能包含ArrayBufferMessagePort。一旦它们被转移到另外一个线程,就不能再在发送线程中使用:内存被移动到另外一个线程,所以在发送线程中不可用。

目前,咱们还不能经过将它们包含在transferList中来传输网络套接字(这个能够经过child_process模块来实现)。

为通讯建立通道

线程之间的通讯经过端口进行,端口是MessagePort类的实例,支持基于事件的通讯。

使用端口在线程之间进行通讯有两种方法。第一个是默认的,也是两个中比较简单的一个。在worker的代码中,咱们从worker_threads模块导入一个名为parentPort的对象,并使用该对象的.postMessage() 方法向父线程发送消息。

这里有一个例子:

import { parentPort } from 'worker_threads';

const data = { 
    // ...
};

parentPort.postMessage(data);

复制代码

parentPort 是Node.js在后台为咱们建立的MessagePort的一个实例,它支持与父线程的通讯。这样,咱们能够经过使用parentPort 和 worker对象在线程之间进行通讯。

线程之间通讯的第二种方式是实际建立一个本身的MessageChannel并将它发送给worker。下面演示了如何建立一个新的MessagePort,并与worker共享:

import path from 'path';
import { Worker, MessageChannel } from 'worker_threads';

const worker = new Worker(path.join(__dirname, 'worker.js'));

const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => {
 console.log('message from worker:', message);
});

worker.postMessage({ port: port2 }, [port2]);

复制代码

建立port1port2以后,咱们在port1上设置事件监听器,并将port2发送给wroker。咱们必须把它包括在transferList中,以便转移到worker一方。

如今,在worker内部:

import { parentPort, MessagePort } from 'worker_threads';

parentPort.on('message', (data) => {
 const { port }: { port: MessagePort } = data;

 port.postMessage('heres your message!');
});

复制代码

经过这种方式,咱们使用父线程发送的端口。

使用parentPort不必定是一个错误的方法,但更好的方法是建立一个新的MessagePort ,其中包含一个MessageChannel 实例,而后与派生的worker共享它(即:关注点分离)。

请注意,在下面的示例中,我使用了 parentPort来简化。

使用worker的两种方法

咱们有两种使用worker的方法。第一种方法是生成一个worker,执行它的代码,而后将结果发送给父线程。使用这种方法,每次出现新任务时,咱们都必须从新建立一个worker。

第二种方法是派生一个worker并为message事件设置侦听器。每次触发message时,它都会执行工做并将结果发送回父线程,父线程将保持worker的活动状态,以供之后使用。

Node.js 文档推荐使用第二种方法,由于实际建立一个线程worker须要花费不少精力,这须要建立一个虚拟机并解析和执行代码。这种方法也比不断建立worker更有效。

这种方法称为线程池,由于咱们建立一个线程池并让他们等待,在须要时发送message事件来完成工做。

下面是一个文件的例子,其中包含了一个worker的建立、执行和关闭的过程:

import { parentPort } from 'worker_threads';

const collection = [];

for (let i = 0; i < 10; i += 1) {
 collection[i] = i;
}

parentPort.postMessage(collection);

复制代码

collection发送到父线程后,它就退出了。

这里有一个worker的例子,它能够等待很长一段时间才获得任务:

import { parentPort } from 'worker_threads';

parentPort.on('message', (data: any) => {
 const result = doSomething(data);

 parentPort.postMessage(result);
});

复制代码

worker_threads 模块提供的有用属性

worker_threads模块中有一些可用的属性:

isMainThread

当不在worker线程内操做时,该属性为true 。若是须要,能够在worker文件的开头包含一个简单的if语句,以确保它只是做为worker运行。

import { isMainThread } from 'worker_threads';

if (isMainThread) {
 throw new Error('Its not a worker');
}

复制代码

workerData

包含在生成的线程worker构造函数中的数据。

const worker = new Worker(path, { workerData });

复制代码

在worker线程里:

import { workerData } from 'worker_threads';

console.log(workerData.property);

复制代码

parentPort

前面提到的用于与父线程通讯的MessagePort实例。

threadId

分配给worker的惟一标识符。


如今咱们知道了技术细节,让咱们实现一些东西并在实践中测试咱们的知识。

实现 setTimeout

setTimeout 是一个无限循环,顾名思义,它会让应用程序超时。实际上,它会在每次迭代中检查起始时间和给定毫秒数的总和是否小于当前时间。

import { parentPort, workerData } from 'worker_threads';

const time = Date.now();

while (true) {
 if (time + workerData.time <= Date.now()) {
   parentPort.postMessage({});
   break;
 }
}

复制代码

这个特定的实现生成一个线程,执行它的代码,而后在完成以后退出。

让咱们尝试实现使用这个worker的代码。首先,让咱们建立一个状态,在这个状态中,咱们将跟踪生成的worker:

const timeoutState: { [key: string]: Worker } = { };

复制代码

接下来是负责建立worker并将其保存到状态中的函数:

export function setTimeout(callback: (err: any) => any, time: number) {
 const id = uuidv4();

 const worker = runWorker(
   path.join(__dirname, './timeout-worker.js'),
   (err) => {
     if (!timeoutState[id]) {
       return null;
     }

     timeoutState[id] = null;

     if (err) {
       return callback(err);
     }

     callback(null);
   },
   {
     time,
   },
 );

 timeoutState[id] = worker;

 return id;
}

复制代码

首先,咱们使用UUID包为worker建立一个唯一的标识符,而后使用前面定义的帮助函数runWorker来获取worker。咱们还向worker传递一个回调函数,该函数在worker发送一些数据时触发。最后,咱们将worker保存到状态并返回id

在回调函数中,咱们必须检查worker是否仍然存在于状态中,由于可能存在cancelTimeout(),会把它删除。若是它确实存在,咱们将它从状态中删除,并调用传递给setTimeout 函数的callback

cancelTimeout 函数使用.terminate() 方法强迫worker退出,并将worker从状态中移除:

export function cancelTimeout(id: string) {
 if (timeoutState[id]) {
   timeoutState[id].terminate();

   timeoutState[id] = undefined;

   return true;
 }

 return false;
}

复制代码

我建立了一小段测试代码,用于检查这种方法与原生方法有多大差异。你能够查看这里的代码。结果以下:

native setTimeout { ms: 7004, averageCPUCost: 0.1416 }

复制代码
worker setTimeout { ms: 7046, averageCPUCost: 0.308 }

复制代码

咱们能够看到在setTimeout 中有一个轻微的延迟(大约40ms),这是由正在建立的worker形成的。平均CPU成本也稍微高一些,可是还能够接受(CPU成本是整个进程期间CPU使用量的平均值)。

若是咱们能够重用worker,咱们将下降延迟和CPU使用率,这就是为何咱们如今要研究如何实现咱们本身的线程池。

实现一个线程池

如前所述,一个线程池是一个给定数量的先前建立的worker线程,他们等待并监听message 事件。一旦触发了message 事件,它们就执行工做并发回结果。

为了更好地说明咱们将要作什么,下面是如何建立一个8个worker线程的线程池:

const pool = new WorkerPool(path.join(__dirname, './test-worker.js'), 8);

复制代码

若是你熟悉限制并发操做,那么你将看到这里的逻辑几乎是相同的,只是不一样的应用场景。

如上面的代码片断所示,咱们向WorkerPool的构造函数传递worker的路径和要建立的worker的数量。

export class WorkerPool<T, N> {
 private queue: QueueItem<T, N>[] = [];
 private workersById: { [key: number]: Worker } = {};
 private activeWorkersById: { [key: number]: boolean } = {};

 public constructor(public workerPath: string, public numberOfThreads: number) {
   this.init();
 }
}

复制代码

在这里,咱们有额外的属性,好比workersById 和activeWorkersById,在这些属性中,咱们能够分别保存现有worker和当前运行worker的id。还有queue,咱们能够用下面的结构保存对象:

type QueueCallback<N> = (err: any, result?: N) => void;

interface QueueItem<T, N> {
 callback: QueueCallback<N>;
 getData: () => T;
}

复制代码

callback 只是默认的node回调,它的第一个参数是error,第二个参数是可能的结果。getData是传递给线程池的.run()方法的函数(下面将对此进行解释),该方法在项目开始处理时调用。getData函数返回的数据将被传递给worker线程。

.init()方法中,咱们建立worker并将他们保存在状态中:

private init() {
  if (this.numberOfThreads < 1) {
    return null;
  }

  for (let i = 0; i < this.numberOfThreads; i += 1) {
    const worker = new Worker(this.workerPath);

    this.workersById[i] = worker;
    this.activeWorkersById[i] = false;
  }
}

复制代码

为了不无限循环,咱们首先确保线程的数量是>1。而后,咱们建立有效数量的worker,并经过它们在workersById 状态下的索引保存它们。咱们保存它们当前是否运行在 activeWorkersById 状态的信息,这个状态在默认状况下老是为false。

如今,咱们必须实现前面提到的.run() 方法来设置一个任务,以便在worker可用时运行。

public run(getData: () => T) {
  return new Promise<N>((resolve, reject) => {
    const availableWorkerId = this.getInactiveWorkerId();

    const queueItem: QueueItem<T, N> = {
      getData,
      callback: (error, result) => {
        if (error) {
          return reject(error);
        }
return resolve(result);
      },
    };

    if (availableWorkerId === -1) {
      this.queue.push(queueItem);

      return null;
    }

    this.runWorker(availableWorkerId, queueItem);
  });
}

复制代码

在传递给promise的函数内部,咱们首先检查是否有一个worker能够经过调用.getInactiveWorkerId()来处理数据:

private getInactiveWorkerId(): number {
  for (let i = 0; i < this.numberOfThreads; i += 1) {
    if (!this.activeWorkersById[i]) {
      return i;
    }
  }

  return -1;
}

复制代码

接下来,咱们建立了一个queueItem,其中保存传递给 .run() 方法的getData 函数和回调函数。在回调中,咱们要么resolve要么reject 该promise,这取决于worker是否将错误传递给回调。

若是 availableWorkerId是-1,那么没有可用的worker,咱们将queueItem添加到queue。若是有一个可用的worker,咱们调用 . runworker() 方法来执行这个worker。

.runWorker()方法中,咱们必须在activeWorkersById状态中设置当前正在使用的worker;为 message 和error事件设置事件监听器(并在随后清除);最后,将数据发送给worker。

private async runWorker(workerId: number, queueItem: QueueItem<T, N>) {
 const worker = this.workersById[workerId];

 this.activeWorkersById[workerId] = true;

 const messageCallback = (result: N) => {
   queueItem.callback(null, result);

   cleanUp();
 };

 const errorCallback = (error: any) => {
   queueItem.callback(error);

   cleanUp();
 };

 const cleanUp = () => {
   worker.removeAllListeners('message');
   worker.removeAllListeners('error');

   this.activeWorkersById[workerId] = false;

   if (!this.queue.length) {
     return null;
   }

   this.runWorker(workerId, this.queue.shift());
 };

 worker.once('message', messageCallback);
 worker.once('error', errorCallback);

 worker.postMessage(await queueItem.getData());
}

复制代码

首先,经过使用传递的workerId,咱们从workersById状态得到worker引用。而后,在activeWorkersById内部,咱们将[workerId]属性设置为true,这样咱们就知道在worker忙碌时不运行任何其余东西。

接下来,咱们分别在messageerror事件上建立messageCallbackerrorCallback,而后注册这些函数来监听事件并将数据发送给工做者。

在回调函数中,咱们调用queueItem的回调,而后调用cleanUp函数。在cleanUp函数中,咱们确保删除事件监听器,由于咱们屡次重用同一个worker。若是咱们不删除监听器,就会出现内存泄漏;实际上,咱们会慢慢地耗尽内存。

activeWorkersById状态中,咱们将[workerId]属性设置为false,并检查队列是否为空。若是不是,则从队列中删除第一个项目,并使用另外一个queueItem再次调用worker。

让咱们建立一个worker,在接收到message事件的数据后执行一些计算:

import { isMainThread, parentPort } from 'worker_threads';

if (isMainThread) {
 throw new Error('Its not a worker');
}

const doCalcs = (data: any) => {
 const collection = [];

 for (let i = 0; i < 1000000; i += 1) {
   collection[i] = Math.round(Math.random() * 100000);
 }

 return collection.sort((a, b) => {
   if (a > b) {
     return 1;
   }

   return -1;
 });
};

parentPort.on('message', (data: any) => {
 const result = doCalcs(data);

 parentPort.postMessage(result);
});

复制代码

worker建立了一个由100万个随机数组成的数组,而后对它们进行排序。只要花点时间完成,会发生什么事可有可无。

下面是一个简单的使用线程池的例子:

const pool = new WorkerPool<{ i: number }, number>(path.join(__dirname, './test-worker.js'), 8);

const items = [...new Array(100)].fill(null);

Promise.all(
 items.map(async (_, i) => {
   await pool.run(() => ({ i }));

   console.log('finished', i);
 }),
).then(() => {
 console.log('finished all');
});

复制代码

咱们首先建立一个包含8个worker的线程池。而后咱们建立一个包含100个元素的数组,对于每一个元素,咱们在工做池中运行一个任务。首先,8个任务将被当即执行,其他的将被放入队列并逐步执行。经过使用一个线程池,咱们没必要每次都建立一个worker,这极大地提升了效率。

总结

worker_threads为应用程序提供了一种很是简单的方法支持多线程。经过将繁重的CPU计算任务委托给其余线程,咱们能够显著提升服务器的吞吐量。有了官方的线程支持,咱们能够期待更多来自AI、机器学习和大数据等领域的开发人员和工程师开始使用Node.js。

交流

欢迎关注微信公众号“1204译站”,获取更多前端技术干货文章。

公众号:1024译站
相关文章
相关标签/搜索