在数据处理过程当中会出现一个叫作背压的常见问题,它描述了数据传输过程当中缓冲区后面数据的累积,当传输的接收端具备复杂的操做时,或者因为某种缘由速度较慢时,来自传入源的数据就有累积的趋势,就像阻塞同样。html
要解决这个问题,必须有一个委托系统来确保数据从一个源到另外一个源的平滑流动,不一样的社区已经针对他们的程序独特意解决了这个问题,Unix管道和TCP套接字就是很好的例子,而且一般被称为流量控制,在Node.js中,流是已采用的解决方案。node
本指南的目的是进一步详细说明背压是什么,以及精确流如何在Node.js的源代码中解决这个问题,本指南的第二部分将介绍建议的最佳实践,以确保在实现流时应用程序的代码是安全的和优化的。linux
咱们假设你对Node.js中背压、Buffer
和EventEmitter
的通常定义以及Stream
的一些经验有所了解。若是你尚未阅读这些文档,那么首先查看API文档并非一个坏主意,由于它有助于在阅读本指南时扩展你的理解。git
在计算机系统中,数据经过管道、sockets和信号从一个进程传输到另外一个进程,在Node.js中,咱们找到了一种名为Stream
的相似机制。流很好!他们为Node.js作了不少事情,几乎内部代码库的每一个部分都使用该模块,做为开发人员,咱们鼓励你使用它们!github
const readline = require('readline'); // process.stdin and process.stdout are both instances of Streams const rl = readline.createInterface({ input: process.stdin, output: process.stdout }); rl.question('Why should you use streams? ', (answer) => { console.log(`Maybe it's ${answer}, maybe it's because they are awesome! :)`); rl.close(); });
经过比较Node.js的Stream
实现的内部系统工具,能够证实为何经过流实现背压机制是一个很好的优化的一个很好的例子。shell
在一种状况下,咱们将使用一个大文件(约〜9gb)并使用熟悉的zip(1)工具对其进行压缩。segmentfault
$ zip The.Matrix.1080p.mkv
虽然这须要几分钟才能完成,但在另外一个shell中咱们能够运行一个脚本,该脚本采用Node.js的模块zlib
,它包含另外一个压缩工具gzip(1)。浏览器
const gzip = require('zlib').createGzip(); const fs = require('fs'); const inp = fs.createReadStream('The.Matrix.1080p.mkv'); const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz'); inp.pipe(gzip).pipe(out);
要测试结果,请尝试打开每一个压缩文件,zip(1)工具压缩的文件将通知你文件已损坏,而Stream
完成的压缩将无错误地解压缩。安全
注意:在此示例中,咱们使用.pipe()
将数据源从一端获取到另外一端,可是,请注意没有附加正确的错误处理程序。若是没法正确接收数据块,Readable
源或gzip
流将不会被销毁,pump是一个实用工具,若是其中一个流失败或关闭,它将正确地销毁管道中的全部流,而且在这种状况下是必须的!app
只有Nodejs 8.x或更早版本才须要pump,对于Node 10.x或更高版本,引入pipeline
来替换pump。这是一个模块方法,用于在流传输之间转发错误和正确清理,并在管道完成时提供回调。
如下是使用管道的示例:
const { pipeline } = require('stream'); const fs = require('fs'); const zlib = require('zlib'); // Use the pipeline API to easily pipe a series of streams // together and get notified when the pipeline is fully done. // A pipeline to gzip a potentially huge video file efficiently: pipeline( fs.createReadStream('The.Matrix.1080p.mkv'), zlib.createGzip(), fs.createWriteStream('The.Matrix.1080p.mkv.gz'), (err) => { if (err) { console.error('Pipeline failed', err); } else { console.log('Pipeline succeeded'); } } );
你还能够在管道上调用promisify
以将其与async
/await
一块儿使用:
const stream = require('stream'); const fs = require('fs'); const zlib = require('zlib'); const pipeline = util.promisify(stream.pipeline); async function run() { try { await pipeline( fs.createReadStream('The.Matrix.1080p.mkv'), zlib.createGzip(), fs.createWriteStream('The.Matrix.1080p.mkv.gz'), ); console.log('Pipeline succeeded'); } catch (err) { console.error('Pipeline failed', err); } }
有些状况下,Readable
流可能会过快地为Writable
提供数据 — 远远超过消费者能够处理的数据!
当发生这种状况时,消费者将开始排队全部数据块以供之后消费,写入队列将变得愈来愈长,所以在整个过程完成以前,必须将更多数据保存在内存中。
写入磁盘比从磁盘读取要慢不少,所以,当咱们尝试压缩文件并将其写入咱们的硬盘时,将发生背压,由于写入磁盘将没法跟上读取的速度。
// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!" // Data will begin to build up on the read-side of the data buffer as // `write` tries to keep up with the incoming data flow. inp.pipe(gzip).pipe(outputFile);
这就是背压机制很重要的缘由,若是没有背压系统,该进程会耗尽系统的内存,有效地减缓了其余进程,并独占你系统的大部分直到完成。
这致使了一些事情:
在下面的示例中,咱们将取出.write()函数的返回值并将其更改成true
,这有效地禁用了Node.js核心中的背压支持,在任何对'modified'二进制文件的引用中,咱们正在谈论在没有return ret;
行的状况下运行node
二进制,而改成return true;
。
咱们来看看快速基准测试,使用上面的相同示例,咱们进行几回试验,以得到两个二进制的中位时间。
trial (#) | `node` binary (ms) | modified `node` binary (ms) ================================================================= 1 | 56924 | 55011 2 | 52686 | 55869 3 | 59479 | 54043 4 | 54473 | 55229 5 | 52933 | 59723 ================================================================= average time: | 55299 | 55975
二者都须要大约一分钟来运行,所以根本没有太大差异,但让咱们仔细看看以确认咱们的怀疑是否正确,咱们使用Linux工具dtrace来评估V8垃圾收集器发生了什么。
GC(垃圾收集器)测量时间表示垃圾收集器完成单次扫描的完整周期的间隔:
approx. time (ms) | GC (ms) | modified GC (ms) ================================================= 0 | 0 | 0 1 | 0 | 0 40 | 0 | 2 170 | 3 | 1 300 | 3 | 1 * * * * * * * * * 39000 | 6 | 26 42000 | 6 | 21 47000 | 5 | 32 50000 | 8 | 28 54000 | 6 | 35
虽然这两个过程开始时相同,但彷佛以相同的速率运行GC,很明显,在适当工做的背压系统几秒钟后,它将GC负载分布在4-8毫秒的一致间隔内,直到数据传输结束。
可是,当背压系统不到位时,V8垃圾收集开始拖延,正常二进制文件在一分钟内调用GC约75次,然而,修改后的二进制文件仅触发36次。
这是因为内存使用量增长而累积的缓慢而渐进的债务,随着数据传输,在没有背压系统的状况下,每一个块传输使用更多内存。
分配的内存越多,GC在一次扫描中须要处理的内存就越多,扫描越大,GC就越须要决定能够释放什么,而且在更大的内存空间中扫描分离的指针将消耗更多的计算能力。
为肯定每一个二进制的内存消耗,咱们使用/usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js
单独为每一个进程计时。
这是正常二进制的输出:
Respecting the return value of .write() ============================================= real 58.88 user 56.79 sys 8.79 87810048 maximum resident set size 0 average shared memory size 0 average unshared data size 0 average unshared stack size 19427 page reclaims 3134 page faults 0 swaps 5 block input operations 194 block output operations 0 messages sent 0 messages received 1 signals received 12 voluntary context switches 666037 involuntary context switches
虚拟内存占用的最大字节大小约为87.81mb。
如今更改.write()
函数的返回值,咱们获得:
Without respecting the return value of .write(): ================================================== real 54.48 user 53.15 sys 7.43 1524965376 maximum resident set size 0 average shared memory size 0 average unshared data size 0 average unshared stack size 373617 page reclaims 3139 page faults 0 swaps 18 block input operations 199 block output operations 0 messages sent 0 messages received 1 signals received 25 voluntary context switches 629566 involuntary context switches
虚拟内存占用的最大字节大小约为1.52gb。
若是没有流来委托背压,则分配的内存空间要大一个数量级 — 同一进程之间的巨大差别!
这个实验展现了Node.js的反压机制是如何优化和节省成本的,如今,让咱们分析一下它是如何工做的!
将数据从一个进程传输到另外一个进程有不一样的函数,在Node.js中,有一个名为.pipe()
的内部内置函数,还有其余包也可使用!但最终,在这个过程的基本层面,咱们有两个独立的组件:数据来源和消费者。
当从源调用.pipe()
时,它向消费者发出信号,告知有数据要传输,管道函数有助于为事件触发器设置适当的背压闭合。
在Node.js中,源是Readable
流,而消费者是Writable
流(这些均可以与Duplex
或Transform
流互换,但这超出了本指南的范围)。
触发背压的时刻能够精确地缩小到Writable
的.write()
函数的返回值,固然,该返回值由几个条件决定。
在数据缓冲区已超过highWaterMark
或写入队列当前正忙的任何状况下,.write()
将返回false
。
当返回false
值时,背压系统启动,它会暂停传入的Readable
流发送任何数据,并等待消费者再次准备就绪,清空数据缓冲区后,将发出.drain()
事件并恢复传入的数据流。
队列完成后,背压将容许再次发送数据,正在使用的内存空间将自行释放并为下一批数据作好准备。
这有效地容许在任何给定时间为.pipe()
函数使用固定数量的内存,没有内存泄漏,没有无限缓冲,垃圾收集器只须要处理内存中的一个区域!
那么,若是背压如此重要,为何你(可能)没有据说过它?答案很简单:Node.js会自动为你完成全部这些工做。
那太好了!可是当咱们试图了解如何实现咱们本身的自定义流时,也不是那么好。
注意:在大多数机器中,有一个字节大小能够肯定缓冲区什么时候已满(在不一样的机器上会有所不一样),Node.js容许你设置本身的自定义highWaterMark
,但一般,默认设置为16kb
(16384
,或objectMode
流为16
),在你可能但愿提升该值的状况下,能够尝试,可是要当心!
.pipe()
的生命周期为了更好地理解背压,下面是一个关于Readable
流的生命周期的流程图,该流被管道传输到Writable
流中:
+===================+ x--> Piping functions +--> src.pipe(dest) | x are set up during |===================| x the .pipe method. | Event callbacks | +===============+ x |-------------------| | Your Data | x They exist outside | .on('close', cb) | +=======+=======+ x the data flow, but | .on('data', cb) | | x importantly attach | .on('drain', cb) | | x events, and their | .on('unpipe', cb) | +---------v---------+ x respective callbacks. | .on('error', cb) | | Readable Stream +----+ | .on('finish', cb) | +-^-------^-------^-+ | | .on('end', cb) | ^ | ^ | +-------------------+ | | | | | ^ | | ^ ^ ^ | +-------------------+ +=================+ ^ | ^ +----> Writable Stream +---------> .write(chunk) | | | | +-------------------+ +=======+=========+ | | | | | ^ | +------------------v---------+ ^ | +-> if (!chunk) | Is this chunk too big? | ^ | | emit .end(); | Is the queue busy? | | | +-> else +-------+----------------+---+ | ^ | emit .write(); | | | ^ ^ +--v---+ +---v---+ | | ^-----------------------------------< No | | Yes | ^ | +------+ +---v---+ ^ | | | ^ emit .pause(); +=================+ | | ^---------------^-----------------------+ return false; <-----+---+ | +=================+ | | | ^ when queue is empty +============+ | ^------------^-----------------------< Buffering | | | |============| | +> emit .drain(); | ^Buffer^ | | +> emit .resume(); +------------+ | | ^Buffer^ | | +------------+ add chunk to queue | | <---^---------------------< +============+
注意:若是要设置管道以将一些流连接在一块儿来操做数据,则极可能会实现Transform
流。
在这种状况下,你的Readable
流的输出将输入到Transform
中,并将管道到Writable
中。
Readable.pipe(Transformable).pipe(Writable);
背压将自动应用,但请注意,Transform
流的输入和输出highWaterMark
均可能被操纵并将影响背压系统。
从Node.js v0.10开始,Stream
类提供了经过使用这些相应函数的下划线版原本修改.read()
或.write()
的行为的功能(._read()
和._write()
)。
对于实现Readable
流和Writable
流,有文档化的指南,咱们假设你已阅读过这些内容,下一节将更深刻一些。
流的黄金法则始终是尊重背压,最佳实践的构成是非矛盾的实践,只要你当心避免与内部背压支持相冲突的行为,你就能够肯定你遵循良好作法。
通常来讲:
.push()
。false
后调用.write()
,而是等待'drain'。注意:关于第3点,构建浏览器流的很是有用的包是readable-stream,Rodd Vagg撰写了一篇很棒的博客文章,描述了这个库的实用性,简而言之,它为Readable
流提供了一种自动优雅降级,并支持旧版本的浏览器和Node.js。
到目前为止,咱们已经了解了.write()
如何影响背压,并将重点放在Writable
流上,因为Node.js的功能,数据在技术上从Readable
流向下游Writable
。可是,正如咱们能够在数据、物质或能量的任何传输中观察到的那样,源与目标同样重要,Readable
流对于如何处理背压相当重要。
这两个过程都相互依赖,有效地进行通讯,若是Readable
忽略Writable
流要求它中止发送数据的时候,那么.write()
的返回值不正确就会有问题。
所以,关于.write()
返回,咱们还必须尊重._read()
方法中使用的.push()
的返回值,若是.push()
返回false
值,则流将中止从源读取,不然,它将继续而不会停顿。
如下是使用.push()
的很差作法示例:
// This is problematic as it completely ignores return value from push // which may be a signal for backpressure from the destination stream! class MyReadable extends Readable { _read(size) { let chunk; while (null !== (chunk = getNextChunk())) { this.push(chunk); } } }
此外,在自定义流以外,存在忽略背压的陷阱,在这个良好的实践的反例中,应用程序的代码会在数据可用时强制经过(由.data
事件发出信号):
// This ignores the backpressure mechanisms Node.js has set in place, // and unconditionally pushes through data, regardless if the // destination stream is ready for it or not. readable.on('data', (data) => writable.write(data) );
回想一下.write()
可能会根据某些条件返回true
或false
,幸运的是,在构建咱们本身的Writable
流时,流状态机将处理咱们的回调并肯定什么时候处理背压并为咱们优化数据流。
可是,当咱们想直接使用Writable
时,咱们必须尊重.write()
返回值并密切注意这些条件:
.write()
将返回false
。.write()
将返回false
(该值由变量highWaterMark
指示)。// This writable is invalid because of the async nature of JavaScript callbacks. // Without a return statement for each callback prior to the last, // there is a great chance multiple callbacks will be called. class MyWritable extends Writable { _write(chunk, encoding, callback) { if (chunk.toString().indexOf('a') >= 0) callback(); else if (chunk.toString().indexOf('b') >= 0) callback(); callback(); } } // The proper way to write this would be: if (chunk.contains('a')) return callback(); else if (chunk.contains('b')) return callback(); callback();
在实现._writev()
时还须要注意一些事项,该函数与.cork()
结合使用,但写入时有一个常见错误:
// Using .uncork() twice here makes two calls on the C++ layer, rendering the // cork/uncork technique useless. ws.cork(); ws.write('hello '); ws.write('world '); ws.uncork(); ws.cork(); ws.write('from '); ws.write('Matteo'); ws.uncork(); // The correct way to write this is to utilize process.nextTick(), which fires // on the next event loop. ws.cork(); ws.write('hello '); ws.write('world '); process.nextTick(doUncork, ws); ws.cork(); ws.write('from '); ws.write('Matteo'); process.nextTick(doUncork, ws); // as a global function function doUncork(stream) { stream.uncork(); }
.cork()
能够被调用屡次,咱们只须要当心调用.uncork()
相同的次数,使其再次流动。
Streams是Node.js中常用的模块,它们对于内部结构很是重要,对于开发人员来讲,它们能够跨Node.js模块生态系统进行扩展和链接。
但愿你如今可以进行故障排除,安全地编写你本身的Writable
和Readable
流,并考虑背压,并与同事和朋友分享你的知识。
在使用Node.js构建应用程序时,请务必阅读有关其余API函数的Stream
的更多信息,以帮助改进和释放你的流功能。