[译]关于Node.js streams你须要知道的一切

Node.js的stream模块是有名的应用困难,更别说理解了。那如今能够告诉你,这些都不是问题了。node

多年来,开发人员在那里建立了大量的软件包,其惟一目的就是使用stream使用起来更简单,可是在这篇文章里,咱们专一于介绍原生的Node.js Steam Api。linux

"Stream 是Node.js中最好的却最容易被误解的部分" ----- Dominic Tarr程序员

Streams究竟是什么

Streams是数据的集合,就跟数组和字符串同样。不一样点就在于Streams可能不是马上就所有可用,而且不会所有载入内存。这使得他很是适合处理大量数据,或者处理每隔一段时间有一个数据片断传入的状况。web

可是,Stream并不只仅适用于处理大数据(大块的数据。。。)。使用它,一样也有利于组织咱们大代码。就像咱们使用管道去和合并强大的Linux命令。在Node.js中,咱们也能够作一样的事情。数组

clipboard.png

const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)

Node.js的不少内置模块都实现了Stream接口服务器

clipboard.png

上面例子里面的Node.js对象列表包括了可读流和可写流,有一些对象既是可读流也是可写流,像TCP sockets, zlib 和 crypto streams。微信

注意这些对象是有很密切的关联的。当一个客户端的HTTP 响应对象是一个可读流,那么在服务器端这就是一个可写流。由于在HTTP例子中,咱们一般是从一个对象(http.IncomingMessage)读取再写入到另一个对象(http.ServerResponse)中去。异步

还要注意,当涉及到子进程时,stdio流(stdinstdoutstderr)具备逆流类型。这就容许咱们很是方便的使用管道从主进程链接子进程的Streamssocket

一些实例的Streams例子

理论都是很好的,但事实究竟是怎么样子的呢?让咱们看一些例子示范代码Streams在内存使用方面的比较。ide

咱们先建立一个大文件

const fs = require('fs');
const file = fs.createWriteStream('./big.file');

for(let i=0; i<= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}

file.end();

看看我使用什么建立文件的?一个可写流嘛

fs模块能够经过Stream接口来读取和写入文件。在上面的例子中,咱们在循环中经过可写流向big.file写入了1百万行数据。

运行上面的代码会生成一个大概400M的文件

这是一个简单的Node web服务器,专门为big.file提供服务:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;
  
    res.end(data);
  });
});

server.listen(8000);

server收到请求,它会使用异步方法fs.readFile处理这个big file。可是这并不表明咱们会打断事件循环机制。一切都是正确的吗??

那如今当咱们启动server,看看内存监视器都发生了什么。

clipboard.png

如今访问这个服务器,看看内存的使用状况。

图片描述

内存占用马上飙升到434.8 MB。

在咱们把文件内容输出到客户端以前,咱们就把整个文件读入了内存。这是很低效的。

HTTP response对象(上文中的res对象)也是一个可写流,这就意味着若是咱们有一个表明着big file的可读流,咱们能够经过管道把他们俩链接起来实现一样的功能,而不须要使用400M内存。

Node的fs模块给咱们提供了一个能够操做任何文件的可读流,经过createReadStream方法建立。咱们能够把它和response对象链接起来。

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

如今再去访问server的时候,使人惊讶的事情发生了(看内存监视器)

clipboard.png

发生了什么?

当咱们访问服务器的时候,咱们经过流每次使用一段数据,这意味着咱们不是把所有的数据都加载到内存中。内存使用量只上升了不到25M。

能够把上面的例子用到极致,生成5百万行数据而不是1百万行。这样子的话,这个文件的大小会超过2GB,这实际上大于Node中的默认缓冲区限制。

若是你想在server上使用fs.readFile,这在默认状况下是行不通的,除非你改了Node.js的默认缓冲区限制。可是使用fs.createReadStream,把2 GB的数据返回给客户端根本不存在问题,甚至内存使用量都没有任何变化。

准备好学习Steam了吗?

Streams 101

在Node.js中有4中基本的流类型:Readable, Writable, Duplex, and Transform streams。

  • Readable 可读流是能够从中消耗数据的源的抽象,一个例子就是fs.createReadStream方法
  • Writable 可写流是能够写入数据的目标的抽象,一个例子就是fs.createWriteStream方法
  • duplex Steam是一个同时具备读写功能的流,一个例子就是TCP socket
  • Transform 是一个双工流,它能够在交换数据的时候作转换。一个例子就是zlib.createGzip使用gzip压缩数据。你能够把Transform streams当成是一个传入可读流,返回一个可写流的函数。它还有一个别名through streams

全部的Stream都是EventEmitter的实例对象。当流读和写的时候都会触发相应的事件。可是还有一个更简单的使用方法,那就是使用pipe

The pipe method

要记住下面这个魔幻方法

readableSrc.pipe(writableDest)

在这一行里面,咱们经过管道把可读流(源)输出到一个可写流里面去(目标),源必须是一个可写流,目标必须是可写流。固然,他们也均可以是duplex/Transform。事实上,当咱们使用管道链接流的时候,咱们能够像在linux中同样使用链式链接。

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)

pipe方法返回目标流,这保证了咱们可使用链式调用。对于streams a(可读流),b,c(可读可写流),d可写流,咱们可使用:

a.pipe(b).pipe(c).pipe(d)
# Which is equivalent to:
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Which, in Linux, is equivalent to:
$ a | b | c | d

pipe方法是使用流最简便的方法。一般经过管道和事件的方法使用流,可是要尽可能避免二者混用。一般当你使用pipe方法就不须要使用事件了。可是当你须要更多定制的操做的话,使用事件的方式会更好。

Stream events

除了从可读流读取数据传输到可写流,pipe方法还自动处理一些其余事情。好比处理错误,处理文件结束操做,流之间速度快慢问题。

同时,流也能够直接使用事件操做。如下是和管道相等的经过事件操做流的方法。

# readable.pipe(writable)

readable.on('data', (chunk) => {
  writable.write(chunk);
});

readable.on('end', () => {
  writable.end();
});

下面是一些重要流的事件和方法。

clipboard.png

这些事件和方法在某种程度上是相关的,由于它们一般被一块儿使用。

可读流上的最重要的事件是

  • data事件,当可读流传输了一段数据的时候会触发
  • end事件,当没有数据被传输时触发

可写流上的最重要的事件是

  • drain事件,当可写流能够接收事件的时候被触发
  • finish事件,当全部数据被接收时被触发

事件和方法能够结合起来,以便定制和优化流的使用。读取可读流,咱们可使用pipe/unpipe方法,或者read/unshift/resume方法。使用可写流,咱们能够可写流做为pipe/unpipe方法的参数,或者使用write方法写入,使用end方法关闭。

可读流的暂停和流动

可读流有两个很重要的模式影响了咱们使用的方式。

  • 暂停模式
  • 流动模式

这些模式有时候被称为拉和推模式

全部的可读流开始的时候都是默认暂停模式,可是它们能够轻易的被切换成流动模式,当咱们须要的时候又能够切换成暂停模式。有时候这个切换是自动的。

当一个可读流是暂停模式的时候,咱们可使用read方法从流中读取。可是当一个流是流动模式的时候,数据是持续的流动,咱们须要使用事件去监听数据的变化。

在流动模式中,若是可读流没有监听者,可读流的数据会丢失。这就是为何当可读流逝流动模式的时候,咱们必须使用data事件去监听数据的变化。事实上,只需添加一个数据事件处理程序便可将暂停的流转换为流模式,删除数据事件处理程序将流切换回暂停模式。 其中一些是为了与旧的Node Stream接口进行向后兼容。

可使用resume()pause()方法在这两种模式之间切换。

clipboard.png

当咱们使用pipe方法操做可读流的时候是不须要担忧上面的这些操做的,由于pipe方法会自动帮咱们处理这些问题。

流的建立

当咱们讨论Node.js中的流时,有两项重要的任务:

  • 流的建立
  • 流的使用

咱们到如今为止讨论的都是如何使用流,那下面来看看如何建立吧!

Streams的建立一般使用stream模块。

建立一个可写流

为了建立一个可写流,咱们须要使用stream模块里面的Writable类。

const { Writable } = require('stream');

咱们能够有不少种方式实现一个可写流。例如,咱们能够继承Writable类。

class myWritableStream extends Writable {

}

可是我更喜欢使用构造函数的方式建立。经过给Writable传递一些参数来建立一个对象。惟一必需要传的选项时write方法,它须要暴漏须要写入的数据块。

const { Writable } = require('stream');
const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);

write方法接收3个参数

  • chunk一般是一个buffer对象,咱们能够经过配置修改
  • encoding在这种状况下就须要了,不过一般状况是能够忽略的
  • callback是当咱们处理完这个数据块的时候须要调用的函数。这是一个写入是否成功的信号。若是失败了,给这个回调传递一个Error对象

outStream中,咱们简单的把chunk打印出来,由于并无发生错误,咱们直接调用了callback方法。这是这是简单并不实用的打印流。它会打印接收到的全部值。

为了使用这个流,咱们能够简单的process.stdin这个可读流。经过pipe方法链接起来。

当咱们运行上面的例子,任何咱们在控制台输入的内容都会被console.log打印出来。

这不是一个很是实用的流的实现,可是它已经被Node.js内置实现了。outStream功能和process.stdout基本相似。咱们也能够经过pipe方法把stdinstdout链接起来并实现一样的功能。

process.stdin.pipe(process.stdout);

建立一个可读流

建立可读流,咱们须要Readable

const { Readable } = require('stream');
const inStream = new Readable({});

建立一个可读流很是简单。可使用push方法推入数据给其余流使用

const { Readable } = require('stream'); 
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);

当咱们push一个null对象进去的时候,这就标志着咱们要终止传输了。

咱们能够简单的把这个流经过pipe方法链接到一个可写流process.stdout

运行上面的代码,会获取全部的inStream的数据并打印出来。很是简单但有效。

咱们在经过pipe链接以前,就会把全部的数据推送到流里面。更好的方法是在消费者要求时按需推送数据。能够经过修改可读流配置里面的read()方法实现。

const inStream = new Readable({
  read(size) {
    // there is a demand on the data... Someone wants to read it.
  }
});

当读取方法在可读流上被调用时,该实现能够将部分数据推送到队列。 例如,咱们能够一次推一个字母,从字符代码65(表示A)开始,并在每次推送时递增:

const inStream = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);

当有流在读取可读流的数据的时候,read方法会持续执行,这样就会一直推出更多的字符。咱们须要在某个时刻终止它,这就是为何咱们设置了一个终止条件推入了null

咱们应该始终按需推送数据。

Duplex/Transform 流的实现

使用Duplex流,咱们经过同一个对象实现可读流和可写流。这相似同时实现了两个接口。

下面这个例子就结合了上面两个可读流和可写流的综合例子。

const { Duplex } = require('stream');

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },

  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);

经过合并这些方法,咱们可使用这个duplex流读取从A-Z的字母也一样可使用它的打印功能。咱们把stdin流链接到这个duplex上去使用它的打印功能,再把这个duplex流自己链接到stdout上去就在控制台看到了A-Z。

双工流的可读写的两侧彻底独立运行。就像一个对象上两种独立的功能。

transform流是一种更有趣的duplex流。由于它的输出来源于她的输入。

对于一个transform流,咱们不须要实现readwrite方法,咱们仅仅须要实现transform方法,这个方法合并了它们两个。它具备写入方法的功能,也能够用它推送数据。

这是一个简单的transform例子,把任何输入转换成大写。

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

在这个transformstream里面,像上个例子中双工流同样。可是咱们只实现了transform()方法。咱们把chunk转换成大写,再把大写字母做为可读流的输入。

Streams Object Mode

默认,流会接收 Buffer/String 类型的数据。还有个字段 objectMode 设置,可让stream 接收任意类型的对象。

下面是一个这种类型的例子。如下变换流的组合使得将逗号分隔值的字符串映射为JavaScript对象的功能。 因此“a,b,c,d”成为{a:b,c:d}。

const { Transform } = require('stream');

const commaSplitter = new Transform({
  readableObjectMode: true,
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().trim().split(','));
    callback();
  }
});

const arrayToObject = new Transform({
  readableObjectMode: true,
  writableObjectMode: true,
  transform(chunk, encoding, callback) {
    const obj = {};
    for(let i=0; i < chunk.length; i+=2) {
      obj[chunk[i]] = chunk[i+1];
    }
    this.push(obj);
    callback();
  }
});

const objectToString = new Transform({
  writableObjectMode: true,
  transform(chunk, encoding, callback) {
    this.push(JSON.stringify(chunk) + '\n');
    callback();
  }
});

process.stdin
  .pipe(commaSplitter)
  .pipe(arrayToObject)
  .pipe(objectToString)
  .pipe(process.stdout)

咱们经过commasplitter传递输入字符串(例如,“a,b,c,d”),它将数组做为其可读数据([“a”,“b”,“c”,“d”]))。 在该流上添加可读的ObjectMode标志是必要的,由于咱们正在将对象推送到其上,而不是字符串。

而后咱们把数组导入到arrayToObject数据流中,咱们须要把writableObjectMode设置为 true,以表示arrayToObject会接收一个对象。另外它还会推送一个对象出去,因此还要把他的readableObjectModetrue。最后一个objectToString接收一个对象可是输出字符串,因此就只须要设置一个writableObjectMode

clipboard.png

Node.js内置transform streams对象

Node有一些很是有用的内置transform streams对象。这包括zlibcrypto

下面这个例子使用了zlib.createGzip()结合了额fs readable/writable streams实现了文件压缩。

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));

你可使用上面的脚本压缩任何你传入的参数文件。咱们把文件的可读流传入了zlib的内置转换流。再写入到新的.gz文件中。

使用管道还有一个很酷的事情,就是能够和事件结合起来。好比我想用户看到进度,并在结束的时候发个消息。由于pipe方法会返回目标流,咱们也能够经过链式注册事件。

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .on('data', () => process.stdout.write('.'))
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));

因此使用管道方法,咱们能够轻松地操做流,可是咱们还可使用须要的事件进一步定制与这些流的交互。

管道方法的好处是,咱们能够用它来以一种可读的方式逐一构成咱们的程序。 例如,咱们能够简单地建立一个变换流来报告进度,而不用监听上面的数据事件,并用另外一个.pipe()调用替换 .on() 调用:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

const { Transform } = require('stream');

const reportProgress = new Transform({
  transform(chunk, encoding, callback) {
    process.stdout.write('.');
    callback(null, chunk);
  }
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));

reportProgress流是一个简单的pass-through流,可是也跟标准事件同样报告进度。注意callback()函数的第二个参数,这至关于把数据推送出去。

结合流的应用是无止境的。例如,若是咱们须要在咱们gzip以前或以后加密文件,咱们须要作的就是按照咱们须要的确切顺序来管理另外一个转换流。使用Node的crypto模块处理这个事情。

const crypto = require('crypto');
// ...

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_secret'))
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));

上面的脚本压缩而后加密传递的文件,只有具备密码的人才可使用文件。 咱们没法使用正常的解压缩实用程序解压缩此文件,由于它已被加密。

为了可以解压缩文件,咱们须要使用彻底相反的操做,这也很简单。

fs.createReadStream(file)
  .pipe(crypto.createDecipher('aes192', 'a_secret'))
  .pipe(zlib.createGunzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file.slice(0, -3)))
  .on('finish', () => console.log('Done'));

假设传递的文件是压缩版本,上面的代码将建立一个读取流,将其传输到crypto createDecipher()流中(使用相同的秘密),将其输出管道输入到zlib createGunzip()流中, 而后将文件写回到没有扩展名的文件中。

以上就是所有了,谢谢阅读!!

翻译自Node.js Streams: Everything you need to know

建立了一个程序员交流微信群,你们进群交流IT技术

图片描述

若是已过时,能够添加博主微信号15706211347,拉你进群

相关文章
相关标签/搜索