[译] Node.js Streams: 你应该知道的事情

阅读本文须要必定的 Node.js 基础,若是文中出现了一些不太理解的地方,记得用搜索引擎或者 Node 官方文档来解惑。html

生产者消费者问题:https://zh.wikipedia.org/zh-h...node

小胡子哥的文章:http://www.barretlee.com/blog...git

发现这篇文章也不错:https://github.com/zhengweike...github

阅读原文web

你们都以为 Node.js streams 很难用,更难理解。那么我有一个好消息告诉你,如今已经再也不这样了。api

这些年来,开发者为了让 streams 更好用造了不少轮子。在这篇文章中,我把重点放在原生的 Node.js stream API数组

Streams 是 Node 里最好的也是被误解最深的主意。缓存

streams 究竟是啥?

streams 是相似于数组或者字符串那样的数据集合。不一样之处在于,streams 有些时候不会一次性拿出全部数据,所以也没必要担忧其大小超过内存。 当处理大量数据时,streams 真的很强大。服务器

streams 不只被用在大数据量的处理上,使用 streams 还能让咱们的代码更具组合型。就像在 Linux 上能够经过管道符将其余较小的命令组成强大的命令同样,咱们能够在 Node 中使用 streams 实现同样的效果。
image.png!thumbnail异步

const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)

Node.js 中许多内置模块都实现了 streams 接口。

image.png!thumbnail

上面的列表中罗列了一些原生 Node.js 对象,这些对象也是可读或者可写的 streams。在这些对象中,有些既是可读流,也是可写流,好比 TCP sockets,zlib 和 crypto 。

这些对象是紧密相关的。虽然 HTTP 响应流在客户端上是可读的,但它在服务器上是可写的。这是由于在 HTTP 状况下,咱们是从一个对象(http.IncomingMessage)读取,从另外一个对象写入(http.ServerResponse)。

还要注意,父进程中 stdin 是可读流,stdout 和 stderr 是可写流。而在子进程中,正好与父进程相反,stdin 是可写流,而 stdout 和 stderr 是可读流。这方便了咱们管理来自主进程的 stdio 流。(译者注:个人理解是,子进程的 stdin 通常须要接收来自父进程的输入,因此应当设计成可写的,子进程的输出,通常须要导回到父进程中,因此应当设计成可读的)

streams 的实际使用示例

理论都是美好的,但其实每每不具备足够的说服力。咱们来看一个可以彰显 streams 在内存消耗上的不一样之处的例子。

咱们先要制造一个大文件来充当测试用例:

const fs = require('fs');
const file = fs.createWriteStream('./big.file');

for(let i=0; i<= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}

file.end();

我用了可写流来制造这个大文件。

fs 模块实现了 streams 的接口,使其既能够被用来读取文件,又能够用来执行写入的操做。在上面的例子中,咱们经过循环的调用可写流,在该文件中写入了 100 万行。

跑一下上面的脚本将会生成一个大小 400 MB 左右的文件。

这是一个简单的 Node web 服务器,专门为 big.file 提供服务:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;
  
    res.end(data);
  });
});

server.listen(8000);

当服务器得到请求时,它将使用异步方法 fs.readFile 读取大文件并返回给客户端。很简单的几行代码,看上去其表现不会跟一大堆事件循环或者其余复杂的代码那样。

那么,让咱们运行服务器,并在请求这个文件时监视内存,看看会发生什么。

当我启动服务时,它开始处于一个正常的内存量,8.7 MB:

image.png!thumbnail

而后我发起请求,注意看内存用量发生了什么变化:

图片描述

内存用量竟然涨到了 434.8 MB。

咱们基本上把整个 big.file 的内容都放在内存中了,而后再把它写到响应对象中。这是很是低效的。

HTTP 响应对象(上面的代码中的 res )也是可写的流。这意味着若是咱们有一个表明 big.file 的内容的可读流,咱们能够直接让这两个对象经过 pipe 链接,不用耗费 400 MB 的内存就能实现相同的功能。

fs 模块可使用 createReadStream 方法为任何文件提供可读的流。 咱们能够将其传递给响应对象:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

如今,当你再次发出请求,会发生一个神奇的事情(看内存消耗):

图片描述

发生了什么?

当客户端请求这个大文件时,咱们每次流式传输一个块,这意味着咱们不会在内存中缓存该文件。内存的使用量大约增加了25 MB,就是这样。

你能够将此示例推到极限。 从新生成 500 万行而不是 100 万行的 big.file 文件,这将使文件占用超过 2 GB,实际上超过了 Node 中的默认缓冲区限制。

默认状况下(你能够更改限制),你是没办法用 fs.readFile 来向客户端提供该文件的。可是使用 fs.createReadStream,将2 GB的数据传输给请求者彻底没问题,最重要的是,进程内存使用量也跟以前大体相同。

如今,你准备好学习 streams 了么?

Streams 101

Node.js 中有四种基本流类型:Readable(可读流), Writeable(可写流), Duplex(双工流), Transform(转换流)

  • 可读流是能够从中消费数据的数据源的抽象。fs.createReadStream 方法就是个例子。
  • 可写流是能够被写入数据的目标的抽象。fs.createWriteStream 方法就是个例子。
  • 双工流既可读又可写。TCP socket 就是个例子。
  • 转换流基本上是一个双工流,能够用于在写入和读取数据时修改或转换数据。 一个例子是使用 gzip 压缩数据的 zlib.createGzip 流。你能够将转换流视为函数,其中输入是可写入流部分,输出是可读流部分。你也可能会听转换流被称为 ”through streams” 。

全部流都是 EventEmitter 的实例。它们发出可用于读取和写入数据的事件。但其实咱们有更简单的处理流数据的办法 —— pipe。

pipe 方法

下面这个魔法公式你应该记住:

readableSrc.pipe(writableDest)

这行简单的代码中,咱们将可读流的输出(数据源)做为可写流的输入。源必须是可读流,目标必须是可写流。固然,它们也能够是双工/转换流。实际上,若是咱们正在处理一个双工流,那咱们就能够像在Linux中同样将 pipe 链式调用:

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)

管道方法返回目标流,这使咱们可以在上面进行连续调用。 对于流 a(可读),b和c(双工)和d(可写),咱们能够这样作:

a.pipe(b).pipe(c).pipe(d)

# Which is equivalent to:
a.pipe(b)
b.pipe(c)
c.pipe(d)

# Which, in Linux, is equivalent to:
$ a | b | c | d

pipe 方法是最简单的消费流的方法。通常比较建议使用 pipe 方法或使用事件来消费流,但应该避免混合使用这两种方式。一般当你使用 pipe 方法时,不须要使用事件,可是若是你须要以自定义的方式使用 streams,那么使用事件可能有些必要。

Stream events

除了从可读流源读取并写入可写流目的地以外,pipe 方法还会自动管理一些事情。例如,错误处理,文件结尾以及一个流比另外一个流慢或快的状况。

然而,流也能够直接与事件一块儿使用。如下是 pipe 方法主要用于读取和写入数据的简化事件等效代码:

# readable.pipe(writable)

readable.on('data', (chunk) => {
  writable.write(chunk);
});

readable.on('end', () => {
  writable.end();
});

如下是能够被用来处理可读流和可写流重要事件和函数列表:

image.png!thumbnail

事件和函数在某种程度上是相关的,由于它们一般被一块儿使用。

可读流中最重要的事件是:

  • data 事件:当流将一块数据传递给消费者时,该事件被触发
  • end 事件:当 streams 中没有数据时,该事件被触发
    可写流中最重要的事件是:
  • drain 事件:可写入流能够接收更多数据的信号。
  • finish 事件:当全部数据写入完毕时会触发该事件。
    事件和函数能够结合起来使用,以便定制和优化流。要使用可读流,咱们可使用 pipe/unpipe ,或者 read/unshift/resume 方法。要使用可写流,咱们能够将其做为 pipe/unpipe 的目标,或者用 write 方法写入它,并在完成后调用 end 方法。

可读流的暂停和流动模式

可读流具备两种主要模式会影响咱们消费数据的方式:

  • 可读流可能处于暂停模式
  • 也可能处于流动模式
    这两种模式也常被称为 pull 和 push 模式。

默认状况下,全部可读流可在暂停模式下启动,在须要时也能够轻松切换到流动状态或者返回到暂停状态。 有时这两种模式会自动切换。

当可读流处于暂停模式时,咱们可使用 read() 方法从流按需读取,然而,对于流模式中的可读流,数据持续流动,咱们必须监听事件不断消耗它。

在流动模式下,若是没有消费者在处理它,数据实际上可能会丢失(译者注:生产者一直向缓冲区导入数据,等到缓冲区满了,却一直没有人来消费他,这块空间可能会被释放掉)。这就是为何在流动模式下有可读流时,须要一个 data 事件处理程序。事实上,只需添加一个 data 事件处理程序便可将暂停的流转换为流模式,而删除 data 事件的处理程序将把流切换回暂停模式。这么作的一部分缘由是为了兼容旧的 Node Streams 接口。

要手动切换这两种流模式,可使用 resume() 和 pause() 方法。

image.png!thumbnail

当使用管道方法消耗可读流时,咱们没必要担忧管道自动管理这些模式。

实现 Streams

当咱们在 Node 中使用 streams 时,主要有两种不一样的任务:

  • 一个任务是实现 streams
  • 另外一个是消费 streams
    到目前为止,咱们一直都在讨论怎么消耗流。接下来,让咱们实现一些 streams!

stream 的实现者一般是须要依赖 stream 模块。

实现一个可写流

要实现可写流,咱们须要使用 stream 模块中的 Writable 构造函数。

const { Writable } = require('stream');

咱们能够经过不少方式实现一个可写流。 例如,咱们能够继承 Writable

class myWritableStream extends Writable {
}

然而,我喜欢更简单的构造方法。咱们只是从 Writable 构造函数建立一个对象,并传递一些选项。 惟一必需的选项是 write 函数,它要处理写入的数据块。

const { Writable } = require('stream');
const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);

write 函数接受三个参数:

  • chunk 参数一般是 buffer ,固然也能够经过配置修改参数的类型。
  • encoding 在某种状况下是必须的,但一般咱们能够忽略它。
  • callback 是处理完数据块后咱们须要调用的函数。这是写操做是否成功的信号。要发出失败信号,只要把 error 对象传入回调便可。

在 outStream 中,咱们只是将 chunk 做为一个字符串进行 console.log,而后在没有错误的状况下调用回调来表示成功。这是一个很简单的 echo 流,它会输出任何收到的东西。

要消费这个流,咱们能够直接使用 process.stdin,这是一个可读流,因此咱们能够将 process.stdin 经过 pipe 传入 outstream。

当咱们运行上面的代码时,咱们输入到 process.stdin 的任何内容都将会使用 outStream 中的 console.log 来输出。

这个例子其实没啥用,实际上 node 已经内置了这个功能 —— process.stdout。咱们能够将stdin 经过 pipe 输入到 stdout 中,咱们能够用一行代码实现刚刚的 echo 功能:

process.stdin.pipe(process.stdout);

实现一个可读流

为了实现可读流,咱们须要 Readable 接口并从中构造一个对象:

const { Readable } = require('stream');

const inStream = new Readable({});

有一种实现可读流的简单方法,咱们能够直接 push 咱们想要消费者消费的数据。

const { Readable } = require('stream');

const inStream = new Readable();

inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');

inStream.push(null); // No more data

inStream.pipe(process.stdout);

当咱们向 inStream 中 push null 时,意味着没有数据要继续 push 了。

为了消费这些数据,咱们用 pipe 把他们导入到了 process.stdout 中。

当咱们运行上面的代码时,咱们将从 inStream 读取全部数据,并将其输出到标准输出流中。 很简单,但不高效。

咱们基本上将流中的全部数据推送到 process.stdout 。更好的方法是在消费者要求时按需推送数据。咱们能够经过在可读流配置中实现 read() 方法:

const inStream = new Readable({
  read(size) {
    // there is a demand on the data... Someone wants to read it.
  }
});

当读取方法在可读流上被调用时,该实现能够将部分数据推送到队列。例如,咱们能够一次推一个字母,从字符代码 65(表示A)开始,并在每次推送时递增:

const inStream = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inStream.currentCharCode = 65;

inStream.pipe(process.stdout);

当消费者正在消费可读流时,read 方法将被持续触发,每次触发,又会有新的流进入缓冲区。 咱们要在某个地方中止这个循环,这就是为何当 currentCharCode 大于90(表示Z)时,if 语句推送 null 。

此代码的效果等同于咱们刚开始使用的那段代码,不一样之处在于,如今能够按需推送数据。这么作永远不会错~

实现双工/转换流

双工流可让一个对象既可读又可写。就好像这个对象继承了两个接口。

下面这个例子经过双工流实现了上面那两个可读流和可写流的功能:

const { Duplex } = require('stream');

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },

  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);

经过组合这些方法,咱们可使用这个双工流来读取从 A 到 Z 的字母,咱们还能利用它可以输出的特征。咱们将可读的 stdin 流经过 pipe 导入到这个双工流中以使用 echo 功能,并将双工流自己经过 pipe 导入到可写入的 stdout 流中,以查看字母 A 到 Z 。

重要的是要了解双工流的可读和可写是彻底独立运行的。双工流只是将两个特征组合到了一个对象上。

转换流是更有趣的双工流,由于它的输出是从其输入计算的。

对于一个转换流,咱们不须要实现 read 或者 write 方法,只须要实现一个结合两者的 transform 方法。咱们既能够经过 write 方法写入,也可使用它来 push 数据。

下面这个例子是一个简单的转换流,它将你的输入转换为大写格式后再输出:

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

在这个转换流中,咱们正在消费的数据与以前的双工流示例的数据同样,咱们只实现了一个transform() 方法。 在该方法中,咱们将 chunk 转换为大写版本,而后将该版本推送为可读部分。

流对象模式

默认状况下,流指望处理的值是 Buffer/String 类型。 有一个 objectMode 标志,咱们能够设置为让流接受任何 JavaScript 对象。

能够用一个简单的例子来演示一下。如下转换流的组合使得将逗号分隔值的字符串映射为 JavaScript 对象的功能。 因此 “a,b,c,d” 成为 {a:b,c:d} 。

const { Transform } = require('stream');

const commaSplitter = new Transform({
  readableObjectMode: true,  
  
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().trim().split(','));
    callback();
  }
});

const arrayToObject = new Transform({
  readableObjectMode: true,
  writableObjectMode: true,  
  
  transform(chunk, encoding, callback) {
    const obj = {};
    for (let i=0; i < chunk.length; i+=2) {
      obj[chunk[i]] = chunk[i+1];
    }
    this.push(obj);
    callback();
  }
});

const objectToString = new Transform({
  writableObjectMode: true,
  
  transform(chunk, encoding, callback) {
    this.push(JSON.stringify(chunk) + '\n');
    callback();
  }
});

process.stdin
  .pipe(commaSplitter)
  .pipe(arrayToObject)
  .pipe(objectToString)
  .pipe(process.stdout)

咱们经过 commaSplitter 传递输入字符串(例如,“a,b,c,d”),它将输入转换为数组并将其做为可读数据([“a”,“b”,“c”,“d”])。readableObjectMode 这个属性的配置必不可少,由于咱们是要把一个 JavaScript 对象 push 进来。

而后咱们把数组用 pipe 导入到 arrayToObject 数据流中。咱们须要配置一个 writableObjectMode 属性来使该流接受一个对象。咱们还须要把最后的结果 push 到可读流中,所以 readableObjectMode 这个属性也是必要的。最后一个 objectToString 流接受一个对象可是导出的结果是一个字符串,因此咱们须要 writableObjectMode 属性来处理输入的对象,可是由于输出到可读流的是字符串,因此不须要配置 readableObjectMode 属性。(译者注:因此,若是是要让可读流支持 js 对象,要设置 readableObjectMode,要让可写流支持 js 对象,要设置 writeableObjectMode)

image.png!thumbnail

Node 内置 transform streams

Node 内置了一些很是有用的转换流。即 zlib 和 crypto。

下面是一个使用 zlib.createGzip() 流结合 fs 可读/可写流建立文件压缩脚本的示例:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));

你能够用这个脚本指定的文件进行 gzip 压缩。咱们将该文件处理成一个可读流并使用 pipe 输入到 zlib 内置的转换流中,而后将新的 gzip 压缩文件转换可写流写入文件。

使用 pipe 很酷的地方在于,只要咱们须要,就能够把它们跟事件结合起来。例如,我但愿用户在脚本运行时可以看到一个进度条,而且在脚本工做完成时可以显示 “Done” 消息。因为 pipe 方法刚好可以返回目标流,咱们能够链式调用事件处理程序的注册程序:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .on('data', () => process.stdout.write('.'))
  .pipe(fs.createWriteStream(file + '.gz'))
  .on('finish', () => console.log('Done'));

使用 pipe 方法,咱们能够很轻松地消费流,咱们还可以使用须要的事件进一步定制与这些流的交互。

pipe 方法可以把咱们的程序解构成一步一步的方式,让咱们的程序更具可读性。例如,咱们能够建立一个简单的转换流来报告进度,而不用像上面那样监听 data 事件,能够直接拿 pipe() 替换 on() :

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

const { Transform } = require('stream');

const reportProgress = new Transform({
  transform(chunk, encoding, callback) {
    process.stdout.write('.');
    callback(null, chunk);
  }
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.gz'))
  .on('finish', () => console.log('Done'));

reportProgress 流是一个简单的直通流,同时它也将进度标准化。注意我如何在回调函数中使用第二个参数来推送 transform() 方法中的数据。 这至关于 push 数据。

把各类流组合起来能够完成不少事,例如,若是咱们须要在 gzip 以前或以后加密文件,须要作的就是按照需求的顺序来管理另外一个转换流。咱们可使用 Node 的 crypto 模块:

const crypto = require('crypto');
// ...

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_secret'))
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.gz'))
  .on('finish', () => console.log('Done'));

上面的脚本压缩而后加密传递的文件,只有知道秘钥的人才可使用输出的文件。咱们没法使用正常的解压工具来解压这个文件,由于它已被加密。

为了可以解压以上脚本压缩的文件,咱们须要以相反的顺序使用 crypto 的解密功能和 zlib 的解压缩功能:

fs.createReadStream(file)
  .pipe(crypto.createDecipher('aes192', 'a_secret'))
  .pipe(zlib.createGunzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file.slice(0, -3)))
  .on('finish', () => console.log('Done'));

假设传递的文件是压缩版本,上面的代码先建立了一个可读流,将该文件传输到 crypto.createDecipher() 流中(使用相同的秘钥),将结果经过 pipe 输入到 zlib.createGunzip() 流中, 而后将最终结果写到文件中。

感谢阅读~

相关文章
相关标签/搜索