本文节选自 Node.js CheatSheet | Node.js 语法基础、框架使用与实践技巧,也能够阅读 JavaScript CheatSheet 或者 现代 Web 开发基础与工程实践 了解更多 JavaScript/Node.js 的实际应用。
Stream 是 Node.js 中的基础概念,相似于 EventEmitter,专一于 IO 管道中事件驱动的数据处理方式;类比于数组或者映射,Stream 也是数据的集合,只不过其表明了不必定正在内存中的数据。。Node.js 的 Stream 分为如下类型:node
Stream 自己提供了一套接口规范,不少 Node.js 中的内建模块都遵循了该规范,譬如著名的 fs
模块,便是使用 Stream 接口来进行文件读写;一样的,每一个 HTTP 请求是可读流,而 HTTP 响应则是可写流。git
const stream = require('stream'); const fs = require('fs'); const readableStream = fs.createReadStream(process.argv[2], { encoding: 'utf8' }); // 手动设置流数据编码 // readableStream.setEncoding('utf8'); let wordCount = 0; readableStream.on('data', function(data) { wordCount += data.split(/\s{1,}/).length; }); readableStream.on('end', function() { // Don't count the end of the file. console.log('%d %s', --wordCount, process.argv[2]); });
当咱们建立某个可读流时,其还并未开始进行数据流动;添加了 data 的事件监听器,它才会变成流动态的。在这以后,它就会读取一小块数据,而后传到咱们的回调函数里面。 data
事件的触发频次一样是由实现者决定,譬如在进行文件读取时,可能每行都会触发一次;而在 HTTP 请求处理时,可能数 KB 的数据才会触发一次。能够参考 nodejs/readable-stream/_stream_readable 中的相关实现,发现 on 函数会触发 resume 方法,该方法又会调用 flow 函数进行流读取:github
// function on if (ev === 'data') { // Start flowing on next tick if stream isn't explicitly paused if (this._readableState.flowing !== false) this.resume(); } ... // function flow while (state.flowing && stream.read() !== null) {}
咱们还能够监听 readable
事件,而后手动地进行数据读取:数组
let data = ''; let chunk; readableStream.on('readable', function() { while ((chunk = readableStream.read()) != null) { data += chunk; } }); readableStream.on('end', function() { console.log(data); });
Readable Stream 还包括以下经常使用的方法:缓存
在平常开发中,咱们能够用 stream-wormhole 来模拟消耗可读流:服务器
sendToWormhole(readStream, true);
readableStream.on('data', function(chunk) { writableStream.write(chunk); }); writableStream.end();
当 end()
被调用时,全部数据会被写入,而后流会触发一个 finish
事件。注意在调用 end()
以后,你就不能再往可写流中写入数据了。app
const { Writable } = require('stream'); const outStream = new Writable({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } }); process.stdin.pipe(outStream);
Writable Stream 中一样包含一些与 Readable Stream 相关的重要事件:框架
const fs = require('fs'); const inputFile = fs.createReadStream('REALLY_BIG_FILE.x'); const outputFile = fs.createWriteStream('REALLY_BIG_FILE_DEST.x'); // 当创建管道时,才发生了流的流动 inputFile.pipe(outputFile);
多个管道顺序调用,便是构建了连接(Chaining):异步
const fs = require('fs'); const zlib = require('zlib'); fs.createReadStream('input.txt.gz') .pipe(zlib.createGunzip()) .pipe(fs.createWriteStream('output.txt'));
管道也经常使用于 Web 服务器中的文件处理,以 Egg.js 中的应用为例,咱们能够从 Context 中获取到文件流并将其传入到可写文件流中:分布式
📎 完整代码参考 Backend Boilerplate/egg
const awaitWriteStream = require('await-stream-ready').write; const sendToWormhole = require('stream-wormhole'); ... const stream = await ctx.getFileStream(); const filename = md5(stream.filename) + path.extname(stream.filename).toLocaleLowerCase(); //文件生成绝对路径 const target = path.join(this.config.baseDir, 'app/public/uploads', filename); //生成一个文件写入文件流 const writeStream = fs.createWriteStream(target); try { //异步把文件流写入 await awaitWriteStream(stream.pipe(writeStream)); } catch (err) { //若是出现错误,关闭管道 await sendToWormhole(stream); throw err; } ...
参照分布式系统导论,可知在典型的流处理场景中,咱们不能够避免地要处理所谓的背压(Backpressure)问题。不管是 Writable Stream 仍是 Readable Stream,实际上都是将数据存储在内部的 Buffer 中,能够经过 writable.writableBuffer
或者 readable.readableBuffer
来读取。当要处理的数据存储超过了 highWaterMark
或者当前写入流处于繁忙状态时,write 函数都会返回 false
。pipe
函数即会自动地帮咱们启用背压机制:
当 Node.js 的流机制监测到 write 函数返回了 false
,背压系统会自动介入;其会暂停当前 Readable Stream 的数据传递操做,直到消费者准备完毕。
+===============+ | Your_Data | +=======+=======+ | +-------v-----------+ +-------------------+ +=================+ | Readable Stream | | Writable Stream +---------> .write(chunk) | +-------+-----------+ +---------^---------+ +=======+=========+ | | | | +======================+ | +------------------v---------+ +-----> .pipe(destination) >---+ | Is this chunk too big? | +==^=======^========^==+ | Is the queue busy? | ^ ^ ^ +----------+-------------+---+ | | | | | | | | > if (!chunk) | | ^ | | emit .end(); | | ^ ^ | > else | | | ^ | emit .write(); +---v---+ +---v---+ | | ^----^-----------------< No | | Yes | ^ | +-------+ +---v---+ ^ | | | ^ emit .pause(); +=================+ | | ^---^---------------------+ return false; <-----+---+ | +=================+ | | | ^ when queue is empty +============+ | ^---^-----------------^---< Buffering | | | |============| | +> emit .drain(); | <Buffer> | | +> emit .resume(); +------------+ | | <Buffer> | | +------------+ add chunk to queue | | <--^-------------------< +============+
Duplex Stream 能够看作读写流的聚合体,其包含了相互独立、拥有独立内部缓存的两个读写流, 读取与写入操做也能够异步进行:
Duplex Stream ------------------| Read <----- External Source You ------------------| Write -----> External Sink ------------------|
咱们可使用 Duplex 模拟简单的套接字操做:
const { Duplex } = require('stream'); class Duplexer extends Duplex { constructor(props) { super(props); this.data = []; } _read(size) { const chunk = this.data.shift(); if (chunk == 'stop') { this.push(null); } else { if (chunk) { this.push(chunk); } } } _write(chunk, encoding, cb) { this.data.push(chunk); cb(); } } const d = new Duplexer({ allowHalfOpen: true }); d.on('data', function(chunk) { console.log('read: ', chunk.toString()); }); d.on('readable', function() { console.log('readable'); }); d.on('end', function() { console.log('Message Complete'); }); d.write('....');
在开发中咱们也常常须要直接将某个可读流输出到可写流中,此时也能够在其中引入 PassThrough,以方便进行额外地监听:
const { PassThrough } = require('stream'); const fs = require('fs'); const duplexStream = new PassThrough(); // can be piped from reaable stream fs.createReadStream('tmp.md').pipe(duplexStream); // can pipe to writable stream duplexStream.pipe(process.stdout); // 监听数据,这里直接输出的是 Buffer<Buffer 60 60 ... > duplexStream.on('data', console.log);
Transform Stream 则是实现了 _transform
方法的 Duplex Stream,其在兼具读写功能的同时,还能够对流进行转换:
Transform Stream --------------|-------------- You Write ----> ----> Read You --------------|--------------
这里咱们实现简单的 Base64 编码器:
const util = require('util'); const Transform = require('stream').Transform; function Base64Encoder(options) { Transform.call(this, options); } util.inherits(Base64Encoder, Transform); Base64Encoder.prototype._transform = function(data, encoding, callback) { callback(null, data.toString('base64')); }; process.stdin.pipe(new Base64Encoder()).pipe(process.stdout);