nodejs中使用worker_threads来建立新的线程

[toc]javascript

nodejs中使用worker_threads来建立新的线程java

简介

以前的文章中提到了,nodejs中有两种线程,一种是event loop用来相应用户的请求和处理各类callback。另外一种就是worker pool用来处理各类耗时操做。node

nodejs的官网提到了一个可以使用nodejs本地woker pool的lib叫作webworker-threads。web

惋惜的是webworker-threads的最后一次更新仍是在2年前,而在最新的nodejs 12中,根本没法使用。安全

而webworker-threads的做者则推荐了一个新的lib叫作web-worker。异步

web-worker是构建于nodejs的worker_threads之上的,本文将会详细讲解worker_threads和web-worker的使用。async

worker_threads

worker_threads模块的源代码源自lib/worker_threads.js,它指的是工做线程,能够开启一个新的线程来并行执行javascript程序。函数

worker_threads主要用来处理CPU密集型操做,而不是IO操做,由于nodejs自己的异步IO已经很是强大了。oop

worker_threads中主要有5个属性,3个class和3个主要的方法。接下来咱们将会一一讲解。post

isMainThread

isMainThread用来判断代码是否在主线程中运行,咱们看一个使用的例子:

const { Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
    console.log('在主线程中');
  new Worker(__filename);
} else {
  console.log('在工做线程中');
  console.log(isMainThread);  // 打印 'false'。
}

上面的例子中,咱们从worker_threads模块中引入了Worker和isMainThread,Worker就是工做线程的主类,咱们将会在后面详细讲解,这里咱们使用Worker建立了一个工做线程。

MessageChannel

MessageChannel表明的是一个异步双向通讯channel。MessageChannel中没有方法,主要经过MessageChannel来链接两端的MessagePort。

class MessageChannel {
        readonly port1: MessagePort;
        readonly port2: MessagePort;
    }

当咱们使用new MessageChannel()的时候,会自动建立两个MessagePort。

const { MessageChannel } = require('worker_threads');

const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// Prints: received { foo: 'bar' } from the `port1.on('message')` listener

经过MessageChannel,咱们能够进行MessagePort间的通讯。

parentPort和MessagePort

parentPort是一个MessagePort类型,parentPort主要用于worker线程和主线程进行消息交互。

经过parentPort.postMessage()发送的消息在主线程中将能够经过worker.on('message')接收。

主线程中经过worker.postMessage()发送的消息将能够在工做线程中经过parentPort.on('message')接收。

咱们看一下MessagePort的定义:

class MessagePort extends EventEmitter {
        close(): void;
        postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;
        ref(): void;
        unref(): void;
        start(): void;

        addListener(event: "close", listener: () => void): this;
        addListener(event: "message", listener: (value: any) => void): this;
        addListener(event: string | symbol, listener: (...args: any[]) => void): this;

        emit(event: "close"): boolean;
        emit(event: "message", value: any): boolean;
        emit(event: string | symbol, ...args: any[]): boolean;

        on(event: "close", listener: () => void): this;
        on(event: "message", listener: (value: any) => void): this;
        on(event: string | symbol, listener: (...args: any[]) => void): this;

        once(event: "close", listener: () => void): this;
        once(event: "message", listener: (value: any) => void): this;
        once(event: string | symbol, listener: (...args: any[]) => void): this;

        prependListener(event: "close", listener: () => void): this;
        prependListener(event: "message", listener: (value: any) => void): this;
        prependListener(event: string | symbol, listener: (...args: any[]) => void): this;

        prependOnceListener(event: "close", listener: () => void): this;
        prependOnceListener(event: "message", listener: (value: any) => void): this;
        prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this;

        removeListener(event: "close", listener: () => void): this;
        removeListener(event: "message", listener: (value: any) => void): this;
        removeListener(event: string | symbol, listener: (...args: any[]) => void): this;

        off(event: "close", listener: () => void): this;
        off(event: "message", listener: (value: any) => void): this;
        off(event: string | symbol, listener: (...args: any[]) => void): this;
    }

MessagePort继承自EventEmitter,它表示的是异步双向通讯channel的一端。这个channel就叫作MessageChannel,MessagePort经过MessageChannel来进行通讯。

咱们能够经过MessagePort来传输结构体数据,内存区域或者其余的MessagePorts。

从源代码中,咱们能够看到MessagePort中有两个事件,close和message。

close事件将会在channel的中任何一端断开链接的时候触发,而message事件将会在port.postMessage时候触发,下面咱们看一个例子:

const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();

// Prints:
//   foobar
//   closed!
port2.on('message', (message) => console.log(message));
port2.on('close', () => console.log('closed!'));

port1.postMessage('foobar');
port1.close();

port.on('message')实际上为message事件添加了一个listener,port还提供了addListener方法来手动添加listener。

port.on('message')会自动触发port.start()方法,表示启动一个port。

当port有listener存在的时候,这表示port存在一个ref,当存在ref的时候,程序是不会结束的。咱们能够经过调用port.unref方法来取消这个ref。

接下来咱们看一下怎么经过port来传输消息:

port.postMessage(value[, transferList])

postMessage能够接受两个参数,第一个参数是value,这是一个JavaScript对象。第二个参数是transferList。

先看一个传递一个参数的状况:

const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const circularData = {};
circularData.foo = circularData;
// Prints: { foo: [Circular] }
port2.postMessage(circularData);

一般来讲postMessage发送的对象都是value的拷贝,可是若是你指定了transferList,那么在transferList中的对象将会被transfer到channel的接受端,而且再也不存在于发送端,就好像把对象传送出去同样。

transferList是一个list,list中的对象能够是ArrayBuffer, MessagePort 和 FileHandle。

若是value中包含SharedArrayBuffer对象,那么该对象不能被包含在transferList中。

看一个包含两个参数的例子:

const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
// post uint8Array的拷贝:
port2.postMessage(uint8Array);

port2.postMessage(uint8Array, [ uint8Array.buffer ]);

//port2.postMessage(uint8Array);

上面的例子将输出:

Uint8Array(4) [ 1, 2, 3, 4 ]
Uint8Array(4) [ 1, 2, 3, 4 ]

第一个postMessage是拷贝,第二个postMessage是transfer Uint8Array底层的buffer。

若是咱们再次调用port2.postMessage(uint8Array),咱们会获得下面的错误:

DOMException [DataCloneError]: An ArrayBuffer is detached and could not be cloned.

buffer是TypedArray的底层存储结构,若是buffer被transfer,那么以前的TypedArray将会变得不可用。

markAsUntransferable

要想避免这个问题,咱们能够调用markAsUntransferable将buffer标记为不可transferable. 咱们看一个markAsUntransferable的例子:

const { MessageChannel, markAsUntransferable } = require('worker_threads');

const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);

markAsUntransferable(pooledBuffer);

const { port1 } = new MessageChannel();
port1.postMessage(typedArray1, [ typedArray1.buffer ]);

console.log(typedArray1);
console.log(typedArray2);

SHARE_ENV

SHARE_ENV是传递给worker构造函数的一个env变量,经过设置这个变量,咱们能够在主线程与工做线程进行共享环境变量的读写。

const { Worker, SHARE_ENV } = require('worker_threads');
new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
  .on('exit', () => {
    console.log(process.env.SET_IN_WORKER);  // Prints 'foo'.
  });

workerData

除了postMessage(),还能够经过在主线程中传递workerData给worker的构造函数,从而将主线程中的数据传递给worker:

const { Worker, isMainThread, workerData } = require('worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename, { workerData: 'Hello, world!' });
} else {
  console.log(workerData);  // Prints 'Hello, world!'.
}

worker类

先看一下worker的定义:

class Worker extends EventEmitter {
        readonly stdin: Writable | null;
        readonly stdout: Readable;
        readonly stderr: Readable;
        readonly threadId: number;
        readonly resourceLimits?: ResourceLimits;

        constructor(filename: string | URL, options?: WorkerOptions);

        postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;
        ref(): void;
        unref(): void;

        terminate(): Promise<number>;

        getHeapSnapshot(): Promise<Readable>;

        addListener(event: "error", listener: (err: Error) => void): this;
        addListener(event: "exit", listener: (exitCode: number) => void): this;
        addListener(event: "message", listener: (value: any) => void): this;
        addListener(event: "online", listener: () => void): this;
        addListener(event: string | symbol, listener: (...args: any[]) => void): this;

       ... 
    }

worker继承自EventEmitter,而且包含了4个重要的事件:error,exit,message和online。

worker表示的是一个独立的 JavaScript 执行线程,咱们能够经过传递filename或者URL来构造worker。

每个worker都有一对内置的MessagePort,在worker建立的时候就会相互关联。worker使用这对内置的MessagePort来和父线程进行通讯。

经过parentPort.postMessage()发送的消息在主线程中将能够经过worker.on('message')接收。

主线程中经过worker.postMessage()发送的消息将能够在工做线程中经过parentPort.on('message')接收。

固然,你也能够显式的建立MessageChannel 对象,而后将MessagePort做为消息传递给其余线程,咱们看一个例子:

const assert = require('assert');
const {
  Worker, MessageChannel, MessagePort, isMainThread, parentPort
} = require('worker_threads');
if (isMainThread) {
  const worker = new Worker(__filename);
  const subChannel = new MessageChannel();
  worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
  subChannel.port2.on('message', (value) => {
    console.log('接收到:', value);
  });
} else {
  parentPort.once('message', (value) => {
    assert(value.hereIsYourPort instanceof MessagePort);
    value.hereIsYourPort.postMessage('工做线程正在发送此消息');
    value.hereIsYourPort.close();
  });
}

上面的例子中,咱们借助了worker和parentPort自己的消息传递功能,传递了一个显式的MessageChannel中的MessagePort。

而后又经过该MessagePort来进行消息的分发。

receiveMessageOnPort

除了port的on('message')方法以外,咱们还可使用receiveMessageOnPort来手动接收消息:

const { MessageChannel, receiveMessageOnPort } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
port1.postMessage({ hello: 'world' });

console.log(receiveMessageOnPort(port2));
// Prints: { message: { hello: 'world' } }
console.log(receiveMessageOnPort(port2));
// Prints: undefined

moveMessagePortToContext

先了解一下nodejs中的Context的概念,咱们能够从vm中建立context,它是一个隔离的上下文环境,从而保证不一样运行环境的安全性,咱们看一个context的例子:

const vm = require('vm');

const x = 1;

const context = { x: 2 };
vm.createContext(context); // 上下文隔离化对象。

const code = 'x += 40; var y = 17;';
// `x` and `y` 是上下文中的全局变量。
// 最初,x 的值为 2,由于这是 context.x 的值。
vm.runInContext(code, context);

console.log(context.x); // 42
console.log(context.y); // 17

console.log(x); // 1; y 没有定义。

在worker中,咱们能够将一个MessagePort move到其余的context中。

worker.moveMessagePortToContext(port, contextifiedSandbox)

这个方法接收两个参数,第一个参数就是要move的MessagePort,第二个参数就是vm.createContext()建立的context对象。

worker_threads的线程池

上面咱们提到了使用单个的worker thread,可是如今程序中一个线程每每是不够的,咱们须要建立一个线程池来维护worker thread对象。

nodejs提供了AsyncResource类,来做为对异步资源的扩展。

AsyncResource类是async_hooks模块中的。

下面咱们看下怎么使用AsyncResource类来建立worker的线程池。

假设咱们有一个task,使用来执行两个数相加,脚本名字叫作task_processor.js:

const { parentPort } = require('worker_threads');
parentPort.on('message', (task) => {
  parentPort.postMessage(task.a + task.b);
});

下面是worker pool的实现:

const { AsyncResource } = require('async_hooks');
const { EventEmitter } = require('events');
const path = require('path');
const { Worker } = require('worker_threads');

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
  constructor(callback) {
    super('WorkerPoolTaskInfo');
    this.callback = callback;
  }

  done(err, result) {
    this.runInAsyncScope(this.callback, null, err, result);
    this.emitDestroy();  // `TaskInfo`s are used only once.
  }
}

class WorkerPool extends EventEmitter {
  constructor(numThreads) {
    super();
    this.numThreads = numThreads;
    this.workers = [];
    this.freeWorkers = [];

    for (let i = 0; i < numThreads; i++)
      this.addNewWorker();
  }

  addNewWorker() {
    const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
    worker.on('message', (result) => {
      // In case of success: Call the callback that was passed to `runTask`,
      // remove the `TaskInfo` associated with the Worker, and mark it as free
      // again.
      worker[kTaskInfo].done(null, result);
      worker[kTaskInfo] = null;
      this.freeWorkers.push(worker);
      this.emit(kWorkerFreedEvent);
    });
    worker.on('error', (err) => {
      // In case of an uncaught exception: Call the callback that was passed to
      // `runTask` with the error.
      if (worker[kTaskInfo])
        worker[kTaskInfo].done(err, null);
      else
        this.emit('error', err);
      // Remove the worker from the list and start a new Worker to replace the
      // current one.
      this.workers.splice(this.workers.indexOf(worker), 1);
      this.addNewWorker();
    });
    this.workers.push(worker);
    this.freeWorkers.push(worker);
    this.emit(kWorkerFreedEvent);
  }

  runTask(task, callback) {
    if (this.freeWorkers.length === 0) {
      // No free threads, wait until a worker thread becomes free.
      this.once(kWorkerFreedEvent, () => this.runTask(task, callback));
      return;
    }

    const worker = this.freeWorkers.pop();
    worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
    worker.postMessage(task);
  }

  close() {
    for (const worker of this.workers) worker.terminate();
  }
}

module.exports = WorkerPool;

咱们给worker建立了一个新的kTaskInfo属性,而且将异步的callback封装到WorkerPoolTaskInfo中,赋值给worker.kTaskInfo.

接下来咱们就可使用workerPool了:

const WorkerPool = require('./worker_pool.js');
const os = require('os');

const pool = new WorkerPool(os.cpus().length);

let finished = 0;
for (let i = 0; i < 10; i++) {
  pool.runTask({ a: 42, b: 100 }, (err, result) => {
    console.log(i, err, result);
    if (++finished === 10)
      pool.close();
  });
}
本文做者:flydean程序那些事

本文连接:http://www.flydean.com/nodejs-worker-thread/

本文来源:flydean的博客

欢迎关注个人公众号:「程序那些事」最通俗的解读,最深入的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

相关文章
相关标签/搜索