最近在面试求职的时候有被问到 Node 有没有办法实现多线程,我一拍脑壳,害,这个我会:javascript
"利用 cluster 模式经过 fork() 实例化多个 node 服务实例, 好比一个8核服务器,就能够启动8个独立实例,相互之间并不影响,十分稳定.最典型的工具就是 pm2."html
"恩恩,你说的是 cluster 多进程模式,有没有多线程方式呢?"java
"啊...这个...那就经过 child_process 去 fork/spawn 一个子进程..."node
"这也是调起了一个子进程,并非真正的多线程,有关注过 Node 的一些新特性/api 吗?"git
"饿...这个(此时的我是当真不知道 worker_threads...."github
而后结局很明显,在欢声笑语中打出GG...面试
不甘心的我去Node中文文档查阅了相关资料,果不其然,我仍是太年轻了:)算法
害! 为啥 Node中文文档竟然英文API,因而我学习的同时顺便翻译了一哈.有兴趣学习或者了解 worker_threads 模块的同窗能够点击这里查阅~~(谷歌翻译+做者人肉翻译,若有错谬,还望海涵)~~segmentfault
ps: Node不是单线程的么? 实际上并非, Node 单线程是指 v8 引擎执行 js 时是单线程的,就好像浏览器一个Tab进程中,就有GUI渲染线程, JS引擎线程,事件触发线程,定时器触发线程,异步请求http线程. Node.js 异步原理实际上是经过 libuv 的线程池完成异步调用的;当你启动一个Node服务时,打开任务管理器,你会发现 Node 任务的线程数为7(主线程,编译/优化线程,分析器线程,垃圾回收线程等)api
首先确认本身 Node 版本环境支持 worker_threads 模块.如不支持,能够经过nvm下个最新的 Node.
笔者在 Node.js 开发版本v12.6.0上测试该模块. 首先引入模块:
// main.js
const { Worker, isMainThread, parentPort, workerData, MessageChannel } = require('worker_threads');
if (isMainThread) {
console.log('我是主线程', isMainThread);
const worker = new Worker(__filename);
} else {
console.log('我不是主线程', isMainThread);
}
=> 我是主线程 true
=> 我不是主线程 false
复制代码
例子经过 new Worker 生成子线程从新执行了 main.js ,执行完毕, worker子线程自动销毁.
__filename
你能够写成你所要执行的具体 worker.js
所在的路径.
除了经过 new Worker 去加载执行 js 文件,有没有办法直接执行 js 代码呢, 以下所示:
let code = ` for(let i = 0;i < 5;i++) { console.log('worker线程执行中:', i); } `
let worker = new Worker(code, { eval: true });
console.log('主线程执行完毕');
=> 主线程执行完毕
=> worker线程执行中: 0
=> worker线程执行中: 1
=> worker线程执行中: 2
=> worker线程执行中: 3
=> worker线程执行中: 4
复制代码
若是经过 port 设置了 port.on 监听事件, 除非手动 terminate 终结, 不然线程不会自动中断(或者和我同样使用 port.once 即监听一次)
即经过workerData
完成完成线程数据初始化
const data = {
name: 'ego同窗',
age: 23,
sex: 'male',
addr: '深圳南山',
arr: [{ skill: 'coding'}, { hobby: 'basketball' }]
}
if (isMainThread) {
const worker = new Worker(__filename, { workerData: data });
} else {
workerData.age = 16;
workerData.arr[0].skill = 'sleep';
console.log(data);
console.log(workerData);
}
=>
{
name: 'ego同窗',
age: 23,
sex: 'male',
addr: '深圳南山',
arr: [ { skill: 'coding' }, { hobby: 'basketball' } ]
}
{
name: 'ego同窗',
age: 16,
sex: 'male',
addr: '深圳南山',
arr: [ { skill: 'sleep' }, { hobby: 'basketball' } ]
}
复制代码
if (isMainThread) {
const worker = new Worker(__filename);
worker.postMessage({name: 'ego同窗'});
worker.once('message', (message) => {
console.log('主线程接收信息:', message);
});
} else {
parentPort.once('message', (obj) => {
console.log('子线程接收信息:', obj);
parentPort.postMessage(obj.name);
})
}
=> 子线程接收信息: { name: 'ego同窗' }
=> 主线程接收信息: ego同窗
复制代码
parentPort
是生成 worker 线程时自动建立的MessagePort
实例,用于与父进程进行通讯.
//main.js
const path = require('path');
const { port1, port2 } = new MessageChannel();
if (isMainThread) {
const worker1 = new Worker(__filename);
const worker2 = new Worker(path.join(__dirname, 'worker.js'));
worker1.postMessage({ port1 }, [ port1 ]);
worker2.postMessage({ port2 }, [ port2 ]);
} else {
parentPort.once('message', ({ port1 }) => {
console.log('子线程1收到port1', port1);
port1.once('message', (msg) => {
console.log('子线程1收到', msg);
})
port1.postMessage('port1 向 port2 发消息啦');
})
}
// worker.js
const { parentPort } = require('worker_threads');
parentPort.once('message', ({ port2 }) => {
console.log('子线程2收到port2');
port2.once('message', (msg) => {
console.log('子线程2收到', msg);
})
port2.postMessage('这里是port2, over!');
})
=>
子线程1收到port1
子线程2收到port2
子线程1收到 这里是port2, over!
子线程2收到 port1 向 port2 发消息啦
复制代码
简单来讲就是父线程将MessageChannel
类生成的MessagePort
对象实例分别发送到子线程中,两个子线程便可经过 port1
,port2
进行通讯.
菜徐坤疑问:
worker线程实例可不能够经过workerData
传递到另外一个worker线程里直接使用呢?试一下:
// main.js
const worker1 = new Worker(__filename);
const worker2 = new Worker(path.join(__dirname, 'worker.js'), { workerData: worker1 });
// worker.js
const { workerData } = require('worker_threads');
console.log(workerData);
=>
internal/worker.js:144
this[kPort].postMessage({
^
DOMException [DataCloneError]: (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.ref();
...<omitted>... } could not be cloned.
复制代码
抛出数据克隆错误 !
OK,咱们来看看 Node worker_thread模块源码 以下:
// node/lib/internal/worker.js
...
const url = options.eval ? null : pathToFileURL(filename);
// Set up the C++ handle for the worker, as well as some internal wiring.
// 为工做程序设置C ++句柄以及一些内部连线。
this[kHandle] = new WorkerImpl(url, options.execArgv);
...
this[kPort] = this[kHandle].messagePort;
this[kPort].on('message', (data) => this[kOnMessage](data));
...
const { port1, port2 } = new MessageChannel();
this[kPublicPort] = port1;
this[kPublicPort].on('message', (message) => this.emit('message', message));
setupPortReferencing(this[kPublicPort], this, 'message');
this[kPort].postMessage({
type: messageTypes.LOAD_SCRIPT,
filename,
doEval: !!options.eval,
cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
workerData: options.workerData,
publicPort: port2,
manifestSrc: getOptionValue('--experimental-policy') ?
require('internal/process/policy').src :
null,
hasStdin: !!options.stdin
}, [port2]);
// Actually start the new thread now that everything is in place.
// 如今,一切就绪,实际上开始新线程。
this[kHandle].startThread();
复制代码
workerData
是经过 port.postMessagePort(value[, transferList])
克隆副本传输给目标线程,即workerData
经过结构化克隆算法进行复制:
It builds up a clone by recursing through the input object while maintaining a map of previously visited references in order to avoid infinitely traversing cycles.
经过递归遍历输入对象而创建一个副本,同时保持先前访问的引用的映射,以免无限遍历循环。
该算法不复制函数、错误、属性描述符或原型链,能够包含循环引用和类型化数组.
port.postMessage(value[, transferList])
transferList
能够是ArrayBuffer
和MessagePort
对象的列表, 传递ArrayBuffer
后,访问权限被修改,归属于消息接收方,它们将再也不可用于频道的发送方 !!!
线程间经过 clone 第一个参数来互相传递消息, 那若是我不想处处 clone 处处传递数据呢, 有办法解决吗? 答案是有的.
cluster
和 child_process
时一般使用 SharedArrayBuffer
来实现须要多进程共享的内存, 一样的 value
能够包含SharedArrayBuffer
实例,从而能够在任一线程访问这些实例 !
const { Worker, isMainThread, parentPort, MessageChannel, threadId } = require('worker_threads');
if (isMainThread) {
const worker1 = new Worker(__filename);
const worker2 = new Worker(__filename);
const { port1, port2 } = new MessageChannel();
const sharedUint8Array = new Uint8Array(new SharedArrayBuffer(4));
// 输出一下sharedUint8Array
console.log(sharedUint8Array);
worker1.postMessage({ uPort: port1, data: sharedUint8Array }, [ port1 ]);
worker2.postMessage({ uPort: port2, data: sharedUint8Array }, [ port2 ]);
worker2.once('message', (message) => {
console.log(`${message}, 查看共享内存:${sharedUint8Array}`);
});
} else {
parentPort.once('message', ({ uPort, data }) => {
uPort.postMessage(`我是${threadId}号线程`);
uPort.on('message', (msg) => {
console.log(`${threadId}号收到:${msg}`);
if (threadId === 2) {
data[1] = 2;
parentPort.postMessage('2号线程修改了共享内存!!!');
}
console.log(`${threadId}号查看共享内存:${data}`);
})
})
}
=>
Uint8Array [ 0, 0, 0, 0 ]
2号收到:我是1号线程
2号线程修改了共享内存!!!, 查看共享内存:0,2,0,0
1号收到:我是2号线程
2号查看共享内存:0,2,0,0
1号查看共享内存:0,2,0,0
复制代码
经过共享内存, 咱们在一个线程中修改它, 意味全部线程中中进行了修改, 意味着数据的传递修改无需屡次序列化clone, 是否是方便不少呢.
若是不知足共享一个 Buffer 数组, 一般数据都是以对象的形式来存储传递的, 咱们能够建立相似的结构来达到咱们的目的.
worker 工做线程通常有两种使用方法:
官方推荐线程池的使用方法, 毕竟一次次的建立销毁 worker 须要占用不小的开销, 咱们能够根据实际业务状况来选择本身的使用方式.
下面咱们来实现一个简单的 worker_thread 线程池:
// main.js
const path = require('path');
const { Worker } = require('worker_threads');
class WorkerPool {
_workers = []; // 线程引用数组
_activeWorkers = []; // 激活的线程数组
_queue = []; // 任务队列
constructor(workerPath, numOfThreads) {
this.workerPath = workerPath;
this.numOfThreads = numOfThreads;
this.init();
}
// 初始化多线程
init() {
if (this.numOfThreads < 1) {
throw new Error('线程池最小线程数应为1');
}
for (let i = 0;i < this.numOfThreads; i++) {
const worker = new Worker(this.workerPath);
this._workers[i] = worker;
this._activeWorkers[i] = false;
}
}
// 结束线程池中全部线程
destroy() {
for (let i = 0; i < this.numOfThreads; i++) {
if (this._activeWorkers[i]) {
throw new Error(`${i}号线程仍在工做中...`);
}
this._workers[i].terminate();
}
}
// 检查是否有空闲worker
checkWorkers() {
for (let i = 0; i < this.numOfThreads; i++) {
if (!this._activeWorkers[i]) {
return i;
}
}
return -1;
}
run(getData) {
return new Promise((resolve, reject) => {
const restWorkerId = this.checkWorkers();
const queueItem = {
getData,
callback: (error, result) => {
if (error) {
return reject(error);
}
return resolve(result);
}
}
// 线程池已满,将任务加入任务队列
if (restWorkerId === -1) {
this._queue.push(queueItem);
return null;
}
// 空闲线程执行任务
this.runWorker(restWorkerId, queueItem);
})
}
async runWorker(workerId, queueItem) {
const worker = this._workers[workerId];
this._activeWorkers[workerId] = true;
// 线程结果回调
const messageCallback = (result) => {
queueItem.callback(null, result);
cleanUp();
};
// 线程错误回调
const errorCallback = (error) => {
queueItem.callback(error);
cleanUp();
};
// 任务结束消除旧监听器,若还有待完成任务,继续完成
const cleanUp = () => {
worker.removeAllListeners('message');
worker.removeAllListeners('error');
this._activeWorkers[workerId] = false;
if (!this._queue.length) {
return null;
}
this.runWorker(workerId, this._queue.shift());
}
// 线程建立监听结果/错误回调
worker.once('message', messageCallback);
worker.once('error', errorCallback);
// 向子线程传递初始data
worker.postMessage(queueItem.getData);
}
}
复制代码
建立一个workerPool
类,在构造函数里传入要执行的js文件路径和要启动的线程池数,而后经过init()
方法初始化多线程,并将它们的引用存储在_workers
数组里,初始状态默认都为false
不活跃存储在_activeWorkers
数组中.
run
方法分配执行任务,返回Promise
调用任务的回调去resolve/reject
,使用空闲线程runWorker
执行任务,若是暂时没有空闲线程,就把任务push
进_queue
任务队列等待执行.
runWorker
使用空闲线程指定任务,定义好结果回调和error
回调,经过设置子线程的监听事件传递回调结果,把子线程的计算结果传递出来
而后咱们建立一个 worker.js
在里面写CPU密集耗时操做:
// worker.js
const { isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
throw new Error('Its not a worker');
}
const doCalcs = (data) => {
const collection = [];
for (let i = 0; i < 10; i++) {
collection[i] = Math.round(Math.random() * 1000);
}
return collection.sort((a, b) => { return a - b });
};
parentPort.on('message', (data) => {
const result = doCalcs(data);
parentPort.postMessage(result);
});
复制代码
这里就简单写了点排序,方便输出...
而后经过new WorkerPool
生成实例执行任务:
const pool = new WorkerPool(path.join(__dirname, 'worker.js'), 4);
const items = [...new Array(10)].fill(null);
Promise.all(items.map(async (_, i) => {
const res = await pool.run();
console.log(`任务${i}完成结果:`, res);
})).then(() => {
console.log('全部任务完成 !');
// 销毁线程池
pool.destroy();
});
=>
任务1完成结果: [
45, 96, 197, 314,
606, 631, 648, 648,
658, 874
]
任务4完成结果: [
68, 86, 124, 330,
330, 469, 533, 766,
772, 900
]
任务5完成结果: [
107, 344, 370, 499,
504, 627, 750, 840,
873, 972
]
任务6完成结果: [
218, 257, 282, 284,
500, 607, 699, 723,
739, 826
]
任务7完成结果: [
31, 98, 141, 190,
428, 507, 685, 686,
794, 945
]
任务8完成结果: [
27, 100, 188, 245,
471, 497, 514, 620,
645, 993
]
任务9完成结果: [
193, 336, 407, 455,
478, 534, 564, 651,
755, 963
]
任务2完成结果: [
319, 337, 398, 549,
587, 659, 670, 781,
792, 843
]
任务3完成结果: [
173, 188, 273, 406,
445, 450, 582, 678,
727, 882
]
任务0完成结果: [
38, 76, 134, 239,
439, 468, 568, 696,
910, 923
]
全部任务完成 !
复制代码
worker_threads
模块提供了真正的单进程多线程使用方法,咱们能够将CPU密集的任务交给线程去解决,等有告终果后经过MessageChannel
跨线程通讯/或者使用共享内存.
固然,上面这些例子都是最简单最基本的使用方式,真正运用到生产中根据不一样的业务复杂度,worker_threads
可能会有各类花里胡哨的运用和实现.
害 每日扎心一问:今天你找到新工做了嘛
这些文章在我学习过程给予我很是大的帮助,有些做者已经在实际生产业务中有所实践,他们的学习过程和经验很是值得咱们学习~有兴趣的同窗能够选读一些加以学习,有所收获!
Node.js 真·多线程 Worker Threads 初探