Node.js 可读流和可写流

Node.js操做按需数据使用sream API接口,stream 是一个数据集,数据可能不能立刻所有获取到,他们在缓冲区,不须要在内存中。适合处理大数据集或者来自外部的数据源的数据块 Node中不少内建模块实现了流式接口:javascript

上面的列表中的原生Node.js对象就是可读流和可写流的对象。有些对象是可读流也是可写流,如TCP sockets,zlib 和 crypto streamsjava

这些对象是密切相关。当一个HTTP响应在客户端上是一个可读流,相应的在服务端是一个可写流。这是由于在HTTP的状况下,咱们基于从一对象(http.IncomingMessage)读而从另一个对象(http.ServerResponse)写node

stdio流(stdin,stdout,stderr)在子进程中会有反向流类型。这样的话就能用很是简单的方式管道传送给其余流或者主进程的stdio流。bash

Node.js中有4个基本的流类型:socket

  1. 可读流(Readable)
  2. 可写流(Writable)
  3. 双工流(Duplex)
  4. 转换流(Transform streams)
  • 可读流是能够被消耗的数据源的抽象,典型例子就是fs.createReadStream方法。
  • 可写流是能够写入数据的目的地的抽象,典型例子就是 fs.createWriteStream 方法。
  • 双工流既是可读的也是可写的,典型例子是TCP套接字。
  • 转换流是基于双工流的,它能够用来修改或转换数据,由于它是写入和读取的。 zlib.createGzip 就是一个用gzip来压缩数据的转换流例子。你能够认为转换流就是一个函数,这个函数的输入是一个可写流,输出是一个可读流,你可能也据说过把转换流叫作" 经过流 "。

全部的流都是 EventEmitter 的实例。他们在数据可读或者可写的时候发出事件。然而,咱们也能够简单的经过 pipe 方法来使用流数据。函数

pipe方法:

**readable**.pipe(**writableDest**)
复制代码

这简单的一行,链接了可读流的输出——源数据和可写流的输入——目标。源必须是可读流,目标必须是可写流。固然也能够是双工流或者转换流,事实上,若是链接的是一个双工流,能够链式调用pipe:大数据

readable
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)

复制代码

pipe方法返回目标流,这使咱们可以执行上面的链式调用。对于流a(可读)、b和c(双工)和d(可写)优化

a.pipe(b).pipe(c).pipe(d)

复制代码
上面等价于
a.pipe(b)
b.pipe(c)
c.pipe(d)
复制代码

pipe 方法是最简单的方式去使用流,通常建议使用 pipe 方法或使用事件来处理流,可是要避免两个混合使用。一般,当你使用 pipe 方法时,你不须要使用事件,可是若是你须要用更多定制的方式来处理流,那你能够只用事件。ui

流事件

除了从可读流里读取数据和向可写流目标写数据外,pipe方法将自动管理沿途的一些事情。例如,它处理错误、文件结束以及当一个流比另外一个流慢或更快时的状况。this

然而,咱们也能够直接使用事件来操做流。下面是pipe方法主要用于读取和写入数据的事件的简化等效代码:

# readable.pipe(writable) 等于下面
复制代码
readable.on('data', (chunk) => {
  writable.write(chunk);
});

复制代码
readable.on('end', () => {
  writable.end();
});
复制代码

如下是可读流可写流的重要事件以及可用方法:

这些事件和函数在某种程度上是相关的,由于它们一般一块儿使用。

可读流中最重要的事件是:

  • data事件,每当流将数据块传递给消费者时,它就会触发。
  • end事件,当没有更多的数据从流中被消耗时触发。

可写流中最重要的事件是:

  • drain事件,这是可写流能够接收更多数据的信号。
  • finish事件,当全部数据都给到底层系统时触发。

能够结合事件和函数来定制和优化流的使用。使用一个可读的流,咱们能够用pipe / unpipe方法,或read / Unshift / resume方法。使用一个可写流,咱们能够把它pipe / unpipe目的地,或是写它的write方法调用end方法当咱们完成。

可读流的暂停和流(flowing)模式

可读流有两种主要模式,这影响咱们可使用它们的方式:

  • 它们能够是暂停(paused)模式
  • 或是流(flowing)模式

这些模式有时被称为拉和推模式。

全部可读的流默认状况下都是在暂停模式下启动的,但在须要时能够轻松切换到流模式或者返回到暂停状态。有时,转换是自动发生。

当一个可读流处于暂停模式,咱们可使用 read() 方法按需的从流中读取数据,然而,在流模式下的可读流,数据是不断流动的,咱们要监听事件来使用这些数据。

在流模式下,若是没有用户处理数据,那么实际上数据会丢失。这就是为何当咱们在流模式中有可读的流时,咱们须要一个 data 事件。事实上,只要添加一个 data 事件,就能够将暂停模式转换为流模式,删除 data 事件,流将切换回暂停模式。其中一些这样作事为了与旧的节点流接口向后兼容。

这两个流模式之间手动开关,可使用 resume()pause() 方法。

当使用 pipe 方法读取可读流时,咱们没必要担忧这些模式,由于pipe自动管理它们。

实现流

当咱们谈论Node.js中的流,主要有两种不一样的任务:

  • 实现流。
  • 使用流。

流的实现一般会 引入 (require)stream模块。

实现可写流

为了实现可写流,咱们须要使用流模块中的 Writable 构造函数。

const { Writable } = require('stream');
复制代码

咱们有不少方式来实现一个可写流。例如,若是咱们想要的话,咱们能够继承Writable构造函数。

class myWritableStream extends Writable {
}
复制代码

这里用简单的构造函数的方法。咱们只需给 Writable 构造函数传递一些选项并建立一个对象。惟一须要的选项是 Writable 函数,该函数揭露数据块要往哪里写。

const { Writable } = require('stream');
复制代码
const outStream = new Writable({
  **write**(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);
复制代码

这个write函数有3个参数:

  • chunk 一般是一个buffer,除非咱们配置不一样的流。
  • encoding 是在特定状况下须要的参数,一般咱们能够忽略它。
  • callback 是在完成处理数据块后须要调用的函数。这是写数据成功与否的标志。若要发出故障信号,请用错误对象调用回调函数。

outstream,咱们只是用 console.log 把数据块做为一个字符串打印到控制台,而后不用错误对象调用 callback 表示成功。这是一个很是简单的可能也不那么有用的 echo 流,它把收到的全部数据打印到控制台。

为了使用这个流,咱们能够直接用 process.stdin 这个可读流,就能够把 process.stdin pipe给 outStream.

执行上面的代码,任何咱们输入给 process.stdin 的内容都会被 outStreamconsole.log 输出到控制台。

实现这个流不怎么有用,由于它实际上被实现了并且node内置了,它等同于 process.stdout。如下一行代码,就是把 stdin pipe给 stdout ,就能实现以前的效果:

process.stdin.pipe(process.stdout);
复制代码

实现可读流

为了实现可读流,引用Readable接口并用它构造新对象:

const { Readable } = require('stream');
复制代码
const inStream = new Readable({});
复制代码

有一个简单的方法来实现可读流。咱们能够直接把供使用的数据 push 出去。

const { Readable } = require('stream');

const inStream = new Readable();

inStream.push('ABCDEFGHIJKLM');

inStream.push('NOPQRSTUVWXYZ');

inStream.push(null); // No more data

inStream.pipe(process.stdout);
复制代码

push 一个 null 对象就意味着咱们想发出信号——这个流没有更多数据了。

使用这个可写流,能够直接把它pipe给 process.stdout 这个可写流。

执行以上代码,会读取 inStream 中全部的数据,并输出在标准输出流。很简单,也不是颇有用。

咱们基本上在pipe给 process.stdout 以前把全部的数据都推到流里了。更好的方法是按需推送。咱们能够经过在一个可读流的配置实现 read() 方法来作这件事情:

const inStream = new Readable({
  **read**(size) {
    // there is a demand on the data... Someone wants to read it.
  }
});
复制代码

当在可读的流上调用读方法时,实现能够将部分数据推到队列中。例如,咱们能够一次推送一个字母,从字符代码65(表明A),而且每推一次增长1:

const inStream = new Readable({
  read(size) {
    **this.push**(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      **this.push**(null);
    }
  }
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);
复制代码

当从可读流里读数据, read 方法将被持续调用,咱们就会推送更多的字母。咱们须要中止这个循环的条件,这就是为何会一个if语句当currentcharcode大于90(表明Z)是推送null。

这段代码至关于咱们开始使用的更简单的代码,可是当用户要求时,咱们正在按需推送数据。你应该常常这样作。

相关文章
相关标签/搜索