《Node.js设计模式》使用流进行编码

本系列文章为《Node.js Design Patterns Second Edition》的原文翻译和读书笔记,在GitHub连载更新,同步翻译版连接javascript

欢迎关注个人专栏,以后的博文将在专栏同步:html

Coding with Streams

StreamsNode.js最重要的组件和模式之一。 社区中有一句格言“Stream all the things(Steam就是全部的)”,仅此一点就足以描述流在Node.js中的地位。 Dominic Tarr做为Node.js社区的最大贡献者,它将流定义为Node.js最好,也是最难以理解的概念。前端

使Node.jsStreams如此吸引人还有其它缘由; 此外,Streams不只与性能或效率等技术特性有关,更重要的是它们的优雅性以及它们与Node.js的设计理念完美契合的方式。java

在本章中,将会学到如下内容:node

  • Streams对于Node.js的重要性。
  • 如何建立并使用Streams
  • Streams做为编程范式,不仅是对于I/O而言,在多种应用场景下它的应用和强大的功能。
  • 管道模式和在不一样的配置中链接Streams

发现Streams的重要性

在基于事件的平台(如Node.js)中,处理I / O的最有效的方法是实时处理,一旦有输入的信息,立马进行处理,一旦有须要输出的结果,也立马输出反馈。git

在本节中,咱们将首先介绍Node.jsStreams和它的优势。 请记住,这只是一个概述,由于本章后面将会详细介绍如何使用和组合Streamsgithub

Streams和Buffer的比较

咱们在本书中几乎全部看到过的异步API都是使用的Buffer模式。 对于输入操做,Buffer模式会未来自资源的全部数据收集到Buffer区中; 一旦读取完整个资源,就会把结果传递给回调函数。 下图显示了这个范例的一个真实的例子:算法

从上图咱们能够看到,在t1时刻,一些数据从资源接收并保存到缓冲区。 在t2时刻,最后一段数据被接收到另外一个数据块,完成读取操做,这时,把整个缓冲区的内容发送给消费者。npm

另外一方面,Streams容许你在数据到达时当即处理数据。 以下图所示:编程

这一张图显示了Streams如何从资源接收每一个新的数据块,并当即提供给消费者,消费者如今没必要等待缓冲区中收集全部数据再处理每一个数据块。

可是这两种方法有什么区别呢? 咱们能够将它们归纳为两点:

  • 空间效率
  • 时间效率

此外,Node.jsStreams具备另外一个重要的优势:可组合性(composability)。 如今让咱们看看这些属性对咱们设计和编写应用程序的方式会产生什么影响。

空间效率

首先,Streams容许咱们作一些看起来不可能的事情,经过缓冲数据并一次性处理。 例如,考虑一下咱们必须读取一个很是大的文件,好比说数百MB甚至千MB。 显然,等待彻底读取文件时返回大BufferAPI不是一个好主意。 想象一下,若是并发读取一些大文件, 咱们的应用程序很容易耗尽内存。 除此以外,V8中的Buffer不能大于0x3FFFFFFF字节(小于1GB)。 因此,在耗尽物理内存以前,咱们可能会碰壁。

使用Buffered的API进行压缩文件

举一个具体的例子,让咱们考虑一个简单的命令行接口(CLI)的应用程序,它使用Gzip格式压缩文件。 使用BufferedAPI,这样的应用程序在Node.js中大概这么编写(为简洁起见,省略了异常处理):

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.readFile(file, (err, buffer) => {
  zlib.gzip(buffer, (err, buffer) => {
    fs.writeFile(file + '.gz', buffer, err => {
      console.log('File successfully compressed');
    });
  });
});

如今,咱们能够尝试将前面的代码放在一个叫作gzip.js的文件中,而后执行下面的命令:

node gzip <path to file>

若是咱们选择一个足够大的文件,好比说大于1GB的文件,咱们会收到一个错误信息,说明咱们要读取的文件大于最大容许的缓冲区大小,以下所示:

RangeError: File size is greater than possible Buffer:0x3FFFFFFF

上面的例子中,没找到一个大文件,但确实对于大文件的读取速率慢了许多。

正如咱们所预料到的那样,使用Buffer来进行大文件的读取显然是错误的。

使用Streams进行压缩文件

咱们必须修复咱们的Gzip应用程序,并使其处理大文件的最简单方法是使用StreamsAPI。 让咱们看看如何实现这一点。 让咱们用下面的代码替换刚建立的模块的内容:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'))
  .on('finish', () => console.log('File successfully compressed'));

“是吗?”你可能会问。是的;正如咱们所说的,因为Streams的接口和可组合性,所以咱们还能写出这样的更加简洁,优雅和精炼的代码。 咱们稍后会详细地看到这一点,可是如今须要认识到的重要一点是,程序能够顺畅地运行在任何大小的文件上,理想状况是内存利用率不变。 尝试一下(但考虑压缩一个大文件可能须要一段时间)。

时间效率

如今让咱们考虑一个压缩文件并将其上传到远程HTTP服务器的应用程序的例子,该远程HTTP服务器进而将其解压缩并保存到文件系统中。若是咱们的客户端是使用BufferedAPI实现的,那么只有当整个文件被读取和压缩时,上传才会开始。 另外一方面,只有在接收到全部数据的状况下,解压缩才会在服务器上启动。 实现相同结果的更好的解决方案涉及使用Streams。 在客户端机器上,Streams只要从文件系统中读取就能够压缩和发送数据块,而在服务器上,只要从远程对端接收到数据块,就能够解压每一个数据块。 咱们经过构建前面提到的应用程序来展现这一点,从服务器端开始。

咱们建立一个叫作gzipReceive.js的模块,代码以下:

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log('File request received: ' + filename);
  req
    .pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on('finish', () => {
      res.writeHead(201, {
        'Content-Type': 'text/plain'
      });
      res.end('That\'s it\n');
      console.log(`File saved: ${filename}`);
    });
});

server.listen(3000, () => console.log('Listening'));

服务器从网络接收数据块,将其解压缩,并在接收到数据块后当即保存,这要归功于Node.jsStreams

咱们的应用程序的客户端将进入一个名为gzipSend.js的模块,以下所示:

在前面的代码中,咱们再次使用Streams从文件中读取数据,而后在从文件系统中读取的同时压缩并发送每一个数据块。

如今,运行这个应用程序,咱们首先使用如下命令启动服务器:

node gzipReceive

而后,咱们能够经过指定要发送的文件和服务器的地址(例如localhost)来启动客户端:

node gzipSend <path to file> localhost

若是咱们选择一个足够大的文件,咱们将更容易地看到数据如何从客户端流向服务器,但为何这种模式下,咱们使用Streams,比使用BufferedAPI更有效率? 下图应该给咱们一个提示:

一个文件被处理的过程,它通过如下阶段:

  1. 客户端从文件系统中读取
  2. 客户端压缩数据
  3. 客户端将数据发送到服务器
  4. 服务端接收数据
  5. 服务端解压数据
  6. 服务端将数据写入磁盘

为了完成处理,咱们必须按照流水线顺序那样通过每一个阶段,直到最后。在上图中,咱们能够看到,使用BufferedAPI,这个过程彻底是顺序的。为了压缩数据,咱们首先必须等待整个文件被读取完毕,而后,发送数据,咱们必须等待整个文件被读取和压缩,依此类推。当咱们使用Streams时,只要咱们收到第一个数据块,流水线就会被启动,而不须要等待整个文件的读取。但更使人惊讶的是,当下一块数据可用时,不须要等待上一组任务完成;相反,另外一条装配线是并行启动的。由于咱们执行的每一个任务都是异步的,这样显得很完美,因此能够经过Node.js来并行执行Streams的相关操做;惟一的限制就是每一个阶段都必须保证数据块的到达顺序。

从前面的图能够看出,使用Streams的结果是整个过程花费的时间更少,由于咱们不用等待全部数据被所有读取完毕和处理。

组合性

到目前为止,咱们已经看到的代码已经告诉咱们如何使用pipe()方法来组装Streams的数据块,Streams容许咱们链接不一样的处理单元,每一个处理单元负责单一的职责(这是符合Node.js风格的)。这是可能的,由于Streams具备统一的接口,而且就API而言,不一样Streams也能够很好的进行交互。惟一的先决条件是管道的下一个Streams必须支持上一个Streams生成的数据类型,能够是二进制,文本甚至是对象,咱们将在后面的章节中看到。

为了证实Streams组合性的优点,咱们能够尝试在咱们先前构建的gzipReceive / gzipSend应用程序中添加加密功能。
为此,咱们只须要经过向流水线添加另外一个Streams来更新客户端。 确切地说,由crypto.createChipher()返回的流。 由此产生的代码应以下所示:

const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');
const http = require('http');
const path = require('path');

const file = process.argv[2];
const server = process.argv[3];

const options = {
  hostname: server,
  port: 3000,
  path: '/',
  method: 'PUT',
  headers: {
    filename: path.basename(file),
    'Content-Type': 'application/octet-stream',
    'Content-Encoding': 'gzip'
  }
};

const req = http.request(options, res => {
  console.log('Server response: ' + res.statusCode);
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_shared_secret'))
  .pipe(req)
  .on('finish', () => {
    console.log('File successfully sent');
  });

使用相同的方式,咱们更新服务端的代码,使得它能够在数据块进行解压以前先解密:

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log('File request received: ' + filename);
  req
    .pipe(crypto.createDecipher('aes192', 'a_shared_secret'))
    .pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on('finish', () => {
      res.writeHead(201, {
        'Content-Type': 'text/plain'
      });
      res.end('That\'s it\n');
      console.log(`File saved: ${filename}`);
    });
});

server.listen(3000, () => console.log('Listening'));
crypto是Node.js的核心模块之一,提供了一系列加密算法。

只需几行代码,咱们就在应用程序中添加了一个加密层。 咱们只须要简单地经过把已经存在的Streams模块和加密层组合到一块儿,就能够。相似的,咱们能够添加和合并其余Streams,如同在玩乐高积木同样。

显然,这种方法的主要优势是可重用性,但正如咱们从目前为止所介绍的代码中能够看到的那样,Streams也能够实现更清晰,更模块化,更加简洁的代码。 出于这些缘由,流一般不只仅用于处理纯粹的I / O,并且它仍是简化和模块化代码的手段。

开始使用Streams

在前面的章节中,咱们了解了为何Streams如此强大,并且它在Node.js中无处不在,甚至在Node.js的核心模块中也有其身影。 例如,咱们已经看到,fs模块具备用于从文件读取的createReadStream()和用于写入文件的createWriteStream()HTTP请求和响应对象本质上是Streams,而且zlib模块容许咱们使用StreamsAPI压缩和解压缩数据块。

如今咱们知道为何Streams是如此重要,让咱们退后一步,开始更详细地探索它。

Streams的结构

Node.js中的每一个Streams都是Streams核心模块中可用的四个基本抽象类之一的实现:

  • stream.Readable
  • stream.Writable
  • stream.Duplex
  • stream.Transform

每一个stream类也是EventEmitter的一个实例。实际上,Streams能够产生几种类型的事件,好比end事件会在一个可读的Streams完成读取,或者错误读取,或其过程当中产生异常时触发。

请注意,为简洁起见,在本章介绍的例子中,咱们常常会忽略适当的错误处理。可是,在生产环境下中,老是建议为全部Stream注册错误事件侦听器。

Streams之因此如此灵活的缘由之一是它不只可以处理二进制数据,并且几乎能够处理任何JavaScript值。实际上,Streams能够支持两种操做模式:

  • 二进制模式:以数据块形式(例如buffersstrings)流式传输数据
  • 对象模式:将流数据视为一系列离散对象(这使得咱们几乎可使用任何JavaScript值)

这两种操做模式使咱们不只可使用I / O流,并且还能够做为一种工具,以函数式的风格优雅地组合处理单元,咱们将在本章后面看到。

在本章中,咱们将主要使用在Node.js 0.11中引入的Node.js流接口,也称为版本3。 有关与旧接口差别的更多详细信息,请参阅StrongLoop在 https://strongloop.com/strong...

可读的Streams

一个可读的Streams表示一个数据源,在Node.js中,它使用stream模块中的Readableabstract类实现。

从Streams中读取信息

从可读Streams接收数据有两种方式:non-flowing模式和flowing模式。 咱们来更详细地分析这些模式。

non-flowing模式(不流动模式)

从可读的Streams中读取数据的默认模式是为其附加一个可读事件侦听器,用于指示要读取的新数据的可用性。而后,在一个循环中,咱们读取全部的数据,直到内部buffer被清空。这可使用read()方法完成,该方法同步从内部缓冲区中读取数据,并返回表示数据块的BufferString对象。read()方法以以下使用模式:

readable.read([size]);

使用这种方法,数据随时能够直接从Streams中按需提取。

为了说明这是如何工做的,咱们建立一个名为readStdin.js的新模块,它实现了一个简单的程序,它从标准输入(一个可读流)中读取数据,并将全部数据回送到标准输出:

process.stdin
  .on('readable', () => {
    let chunk;
    console.log('New data available');
    while ((chunk = process.stdin.read()) !== null) {
      console.log(
        `Chunk read: (${chunk.length}) "${chunk.toString()}"`
      );
    }
  })
  .on('end', () => process.stdout.write('End of stream'));

read()方法是一个同步操做,它从可读Streams的内部Buffers区中提取数据块。若是Streams在二进制模式下工做,返回的数据块默认为一个Buffer对象。

在以二进制模式工做的可读的Stream中,咱们能够经过在Stream上调用setEncoding(encoding)来读取字符串而不是Buffer对象,并提供有效的编码格式(例如utf8)。

数据是从可读的侦听器中读取的,只要有新的数据,就会调用这个侦听器。当内部缓冲区中没有更多数据可用时,read()方法返回null;在这种状况下,咱们不得不等待另外一个可读的事件被触发,告诉咱们能够再次读取或者等待表示Streams读取过程结束的end事件触发。当一个流以二进制模式工做时,咱们也能够经过向read()方法传递一个size参数来指定咱们想要读取的数据大小。这在实现网络协议或解析特定数据格式时特别有用。

如今,咱们准备运行readStdin模块并进行实验。让咱们在控制台中键入一些字符,而后按Enter键查看回显到标准输出中的数据。要终止流并所以生成一个正常的结束事件,咱们须要插入一个EOF(文件结束)字符(在Windows上使用Ctrl + Z或在Linux上使用Ctrl + D)。

咱们也能够尝试将咱们的程序与其余程序链接起来;这可使用管道运算符(|),它将程序的标准输出重定向到另外一个程序的标准输入。例如,咱们能够运行以下命令:

cat <path to a file> | node readStdin

这是流式范例是一个通用接口的一个很好的例子,它使得咱们的程序可以进行通讯,而无论它们是用什么语言写的。

flowing模式(流动模式)

Streams中读取的另外一种方法是将侦听器附加到data事件;这会将Streams切换为flowing模式,其中数据不是使用read()函数来提取的,而是一旦有数据到达data监听器就被推送到监听器内。例如,咱们以前建立的readStdin应用程序将使用流动模式:

process.stdin
  .on('data', chunk => {
    console.log('New data available');
    console.log(
      `Chunk read: (${chunk.length}) "${chunk.toString()}"`
    );
  })
  .on('end', () => process.stdout.write('End of stream'));

flowing模式是旧版Streams接口(也称为Streams1)的继承,其灵活性较低,API较少。随着Streams2接口的引入,flowing模式不是默认的工做模式,要启用它,须要将侦听器附加到data事件或显式调用resume()方法。 要暂时中断Streams触发data事件,咱们能够调用pause()方法,致使任何传入数据缓存在内部buffer中。

调用pause()不会致使Streams切换回non-flowing模式。

实现可读的Streams

如今咱们知道如何从Streams中读取数据,下一步是学习如何实现一个新的Readable数据流。为此,有必要经过继承stream.Readable的原型来建立一个新的类。 具体流必须提供_read()方法的实现:

readable._read(size)

Readable类的内部将调用_read()方法,而该方法又将启动
使用push()填充内部缓冲区:

请注意,read()是Stream消费者调用的方法,而_read()是一个由Stream子类实现的方法,不能直接调用。下划线一般表示该方法为私有方法,不该该直接调用。

为了演示如何实现新的可读Streams,咱们能够尝试实现一个生成随机字符串的Streams。 咱们来建立一个名为randomStream.js的新模块,它将包含咱们的字符串的generator的代码:

const stream = require('stream');
const Chance = require('chance');

const chance = new Chance();

class RandomStream extends stream.Readable {
  constructor(options) {
    super(options);
  }

  _read(size) {
    const chunk = chance.string(); //[1]
    console.log(`Pushing chunk of size: ${chunk.length}`);
    this.push(chunk, 'utf8'); //[2]
    if (chance.bool({
        likelihood: 5
      })) { //[3]
      this.push(null);
    }
  }
}

module.exports = RandomStream;

在文件顶部,咱们将加载咱们的依赖关系。除了咱们正在加载一个chance的npm模块以外,没有什么特别之处,它是一个用于生成各类随机值的库,从数字到字符串到整个句子都能生成随机值。

下一步是建立一个名为RandomStream的新类,并指定stream.Readable做为其父类。 在前面的代码中,咱们调用父类的构造函数来初始化其内部状态,并将收到的options参数做为输入。经过options对象传递的可能参数包括如下内容:

  • 用于将Buffers转换为Stringsencoding参数(默认值为null
  • 是否启用对象模式(objectMode默认为false
  • 存储在内部buffer区中的数据的上限,一旦超过这个上限,则暂停从data source读取(highWaterMark默认为16KB

好的,如今让咱们来解释一下咱们重写的stream.Readable类的_read()方法:

  • 该方法使用chance生成随机字符串。
  • 它将字符串push内部buffer。 请注意,因为咱们push的是String,此外咱们还指定了编码为utf8(若是数据块只是一个二进制Buffer,则不须要)。
  • 5%的几率随机中断stream的随机字符串产生,经过push null到内部Buffer来表示EOF,即stream的结束。

咱们还能够看到在_read()函数的输入中给出的size参数被忽略了,由于它是一个建议的参数。 咱们能够简单地把全部可用的数据都push到内部的buffer中,可是若是在同一个调用中有多个推送,那么咱们应该检查push()是否返回false,由于这意味着内部buffer已经达到了highWaterMark限制,咱们应该中止添加更多的数据。

以上就是RandomStream模块,咱们如今准备好使用它。咱们来建立一个名为generateRandom.js的新模块,在这个模块中咱们实例化一个新的RandomStream对象并从中提取一些数据:

const RandomStream = require('./randomStream');
const randomStream = new RandomStream();

randomStream.on('readable', () => {
  let chunk;
  while ((chunk = randomStream.read()) !== null) {
    console.log(`Chunk received: ${chunk.toString()}`);
  }
});

如今,一切都准备好了,咱们尝试新的自定义的stream。 像往常同样简单地执行generateRandom模块,观察随机的字符串在屏幕上流动。

可写的Streams

一个可写的stream表示一个数据终点,在Node.js中,它使用stream模块中的Writable抽象类来实现。

写入一个stream

把一些数据放在可写入的stream中是一件简单的事情, 咱们所要作的就是使用write()方法,它具备如下格式:

writable.write(chunk, [encoding], [callback])

encoding参数是可选的,其在chunkString类型时指定(默认为utf8,若是chunkBuffer,则忽略);当数据块被刷新到底层资源中时,callback就会被调用,callback参数也是可选的。

为了表示没有更多的数据将被写入stream中,咱们必须使用end()方法:

writable.end([chunk], [encoding], [callback])

咱们能够经过end()方法提供最后一块数据。在这种状况下,callbak函数至关于为finish事件注册一个监听器,当数据块所有被写入stream中时,会触发该事件。

如今,让咱们经过建立一个输出随机字符串序列的小型HTTP服务器来演示这是如何工做的:

const Chance = require('chance');
const chance = new Chance();

require('http').createServer((req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/plain'
  }); //[1]
  while (chance.bool({
      likelihood: 95
    })) { //[2]
    res.write(chance.string() + '\n'); //[3]
  }
  res.end('\nThe end...\n'); //[4]
  res.on('finish', () => console.log('All data was sent')); //[5]
}).listen(8080, () => console.log('Listening on http://localhost:8080'));

咱们建立了一个HTTP服务器,并把数据写入res对象,res对象是http.ServerResponse的一个实例,也是一个可写入的stream。下面来解释上述代码发生了什么:

  1. 咱们首先写HTTP response的头部。请注意,writeHead()不是Writable接口的一部分,实际上,这个方法是http.ServerResponse类公开的辅助方法。
  2. 咱们开始一个5%的几率终止的循环(进入循环体的几率为chance.bool()产生,其为95%)。
  3. 在循环内部,咱们写入一个随机字符串到stream
  4. 一旦咱们不在循环中,咱们调用streamend(),表示没有更多

数据块将被写入。另外,咱们在结束以前提供一个最终的字符串写入流中。

  1. 最后,咱们注册一个finish事件的监听器,当全部的数据块都被刷新到底层socket中时,这个事件将被触发。

咱们能够调用这个小模块称为entropyServer.js,而后执行它。要测试这个服务器,咱们能够在地址http:// localhost:8080打开一个浏览器,或者从终端使用curl命令,以下所示:

curl localhost:8080

此时,服务器应该开始向您选择的HTTP客户端发送随机字符串(请注意,某些浏览器可能会缓冲数据,而且流式传输行为可能不明显)。

Back-pressure(反压)

相似于在真实管道系统中流动的液体,Node.jsstream也可能遭受瓶颈,数据写入速度可能快于stream的消耗。 解决这个问题的机制包括缓冲输入数据;然而,若是数据stream没有给生产者任何反馈,咱们可能会产生愈来愈多的数据被累积到内部缓冲区的状况,致使内存泄露的发生。

为了防止这种状况的发生,当内部buffer超过highWaterMark限制时,writable.write()将返回false。 可写入的stream具备highWaterMark属性,这是write()方法开始返回false的内部Buffer区大小的限制,一旦Buffer区的大小超过这个限制,表示应用程序应该中止写入。 当缓冲器被清空时,会触发一个叫作drain的事件,通知再次开始写入是安全的。 这种机制被称为back-pressure

本节介绍的机制一样适用于可读的stream。事实上,在可读stream中也存在back-pressure,而且在_read()内调用的push()方法返回false时触发。 可是,这对于stream实现者来讲是一个特定的问题,因此咱们将不常常处理它。

咱们能够经过修改以前建立的entropyServer模块来演示可写入的streamback-pressure

const Chance = require('chance');
const chance = new Chance();

require('http').createServer((req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/plain'
  });

  function generateMore() { //[1]
    while (chance.bool({
        likelihood: 95
      })) {
      const shouldContinue = res.write(
        chance.string({
          length: (16 * 1024) - 1
        }) //[2]
      );
      if (!shouldContinue) { //[3]
        console.log('Backpressure');
        return res.once('drain', generateMore);
      }
    }
    res.end('\nThe end...\n', () => console.log('All data was sent'));
  }
  generateMore();
}).listen(8080, () => console.log('Listening on http://localhost:8080'));

前面代码中最重要的步骤能够归纳以下:

  1. 咱们将主逻辑封装在一个名为generateMore()的函数中。
  2. 为了增长得到一些back-pressure的机会,咱们将数据块的大小增长到16KB-1Byte,这很是接近默认的highWaterMark限制。
  3. 在写入一大块数据以后,咱们检查res.write()的返回值。 若是它返回false,这意味着内部buffer已满,咱们应该中止发送更多的数据。在这种状况下,咱们从函数中退出,而后新注册一个写入事件的发布者,当drain事件触发时调用generateMore

若是咱们如今尝试再次运行服务器,而后使用curl生成客户端请求,则极可能会有一些back-pressure,由于服务器以很是高的速度生成数据,速度甚至会比底层socket更快。

实现可写入的Streams

咱们能够经过继承stream.Writable类来实现一个新的可写入的流,并为_write()方法提供一个实现。实现一个咱们自定义的可写入的Streams类。

让咱们构建一个可写入的stream,它接收对象的格式以下:

{
  path: <path to a file>
  content: <string or buffer>
}

这个类的做用是这样的:对于每个对象,咱们的stream必须将content部分保存到在给定路径中建立的文件中。 咱们能够当即看到,咱们stream的输入是对象,而不是StringsBuffers,这意味着咱们的stream必须以对象模式工做。

调用模块toFileStream.js

const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirp = require('mkdirp');

class ToFileStream extends stream.Writable {
  constructor() {
    super({
      objectMode: true
    });
  }

  _write(chunk, encoding, callback) {
    mkdirp(path.dirname(chunk.path), err => {
      if (err) {
        return callback(err);
      }
      fs.writeFile(chunk.path, chunk.content, callback);
    });
  }
}
module.exports = ToFileStream;

做为第一步,咱们加载全部咱们所须要的依赖包。注意,咱们须要模块mkdirp,正如你应该从前几章中所知道的,它应该使用npm安装。

咱们建立了一个新类,它从stream.Writable扩展而来。

咱们不得不调用父构造函数来初始化其内部状态;咱们还提供了一个option对象做为参数,用于指定流在对象模式下工做(objectMode:true)。stream.Writable接受的其余选项以下:

  • highWaterMark(默认值是16KB):控制back-pressure的上限。
  • decodeStrings(默认为true):在字符串传递给_write()方法以前,将字符串自动解码为二进制buffer区。 在对象模式下这个参数被忽略。

最后,咱们为_write()方法提供了一个实现。正如你所看到的,这个方法接受一个数据块,一个编码方式(只有在二进制模式下,stream选项decodeStrings设置为false时才有意义)。

另外,该方法接受一个回调函数,该函数在操做完成时须要调用;而没必要要传递操做的结果,可是若是须要的话,咱们仍然能够传递一个error对象,这将致使stream触发error事件。

如今,为了尝试咱们刚刚构建的stream,咱们能够建立一个名为writeToFile.js的新模块,并对该流执行一些写操做:

const ToFileStream = require('./toFileStream.js');
const tfs = new ToFileStream();

tfs.write({path: "file1.txt", content: "Hello"});
tfs.write({path: "file2.txt", content: "Node.js"});
tfs.write({path: "file3.txt", content: "Streams"});
tfs.end(() => console.log("All files created"));

有了这个,咱们建立并使用了咱们的第一个自定义的可写入流。 像往常同样运行新模块来检查其输出;你会看到执行后会建立三个新文件。

双重的Streams

双重的stream既是可读的,也可写的。 当咱们想描述一个既是数据源又是数据终点的实体时(例如socket),这就显得十分有用了。 双工流继承stream.Readablestream.Writable的方法,因此它对咱们来讲并不新鲜。这意味着咱们能够read()write()数据,或者能够监听readabledrain事件。

要建立一个自定义的双重stream,咱们必须为_read()_write()提供一个实现。传递给Duplex()构造函数的options对象在内部被转发给ReadableWritable的构造函数。options参数的内容与前面讨论的相同,options增长了一个名为allowHalfOpen值(默认为true),若是设置为false,则会致使只要stream的一方(ReadableWritable)结束,stream就结束了。

为了使双重的stream在一方以对象模式工做,而在另外一方以二进制模式工做,咱们须要在流构造器中手动设置如下属性:
this._writableState.objectMode
this._readableState.objectMode

转换的Streams

转换的Streams是专门设计用于处理数据转换的一种特殊类型的双重Streams

在一个简单的双重Streams中,从stream中读取的数据和写入到其中的数据之间没有直接的关系(至少stream是不可知的)。 想一想一个TCP socket,它只是向远程节点发送数据和从远程节点接收数据。TCP socket自身没有意识到输入和输出之间有任何关系。

下图说明了双重Streams中的数据流:

另外一方面,转换的Streams对从可写入端接收到的每一个数据块应用某种转换,而后在其可读端使转换的数据可用。

下图显示了数据如何在转换的Streams中流动:

从外面看,转换的Streams的接口与双重Streams的接口彻底相同。可是,当咱们想要构建一个新的双重Streams时,咱们必须提供_read()_write()方法,而为了实现一个新的变换流,咱们必须填写另外一对方法:_transform()_flush())。

咱们来演示如何用一个例子来建立一个新的转换的Streams

实现转换的Streams

咱们来实现一个转换的Streams,它将替换给定全部出现的字符串。 要作到这一点,咱们必须建立一个名为replaceStream.js的新模块。 让咱们直接看怎么实现它:

const stream = require('stream');
const util = require('util');

class ReplaceStream extends stream.Transform {
  constructor(searchString, replaceString) {
    super();
    this.searchString = searchString;
    this.replaceString = replaceString;
    this.tailPiece = '';
  }

  _transform(chunk, encoding, callback) {
    const pieces = (this.tailPiece + chunk)         //[1]
      .split(this.searchString);
    const lastPiece = pieces[pieces.length - 1];
    const tailPieceLen = this.searchString.length - 1;

    this.tailPiece = lastPiece.slice(-tailPieceLen);     //[2]
    pieces[pieces.length - 1] = lastPiece.slice(0,-tailPieceLen);

    this.push(pieces.join(this.replaceString));       //[3]
    callback();
  }

  _flush(callback) {
    this.push(this.tailPiece);
    callback();
  }
}

module.exports = ReplaceStream;

与往常同样,咱们将从其依赖项开始构建模块。此次咱们没有使用第三方模块。

而后咱们建立了一个从stream.Transform基类继承的新类。该类的构造函数接受两个参数:searchStringreplaceString。 正如你所想象的那样,它们容许咱们定义要匹配的文本以及用做替换的字符串。 咱们还初始化一个将由_transform()方法使用的tailPiece内部变量。

如今,咱们来分析一下_transform()方法,它是咱们新类的核心。_transform()方法与可写入的stream_write()方法具备几乎相同的格式,但不是将数据写入底层资源,而是使用this.push()将其推入内部buffer,这与咱们会在可读流的_read()方法中执行。这显示了转换的Streams的双方如何实际链接。

ReplaceStream_transform()方法实现了咱们这个新类的核心。正常状况下,搜索和替换buffer区中的字符串是一件容易的事情;可是,当数据流式传输时,状况则彻底不一样,可能的匹配可能分布在多个数据块中。代码后面的程序能够解释以下:

  1. 咱们的算法使用searchString函数做为分隔符来分割块。
  2. 而后,它取出分隔后生成的数组的最后一项lastPiece,并提取其最后一个字符searchString.length - 1。结果被保存到tailPiece变量中,它将会被做为下一个数据块的前缀。
  3. 最后,全部从split()获得的片断用replaceString做为分隔符链接在一块儿,并推入内部buffer区。

stream结束时,咱们可能仍然有最后一个tailPiece变量没有被压入内部缓冲区。这正是_flush()方法的用途;它在stream结束以前被调用,而且这是咱们最终有机会完成流或者在彻底结束流以前推送任何剩余数据的地方。

_flush()方法只须要一个回调函数做为参数,当全部的操做完成后,咱们必须确保调用这个回调函数。完成了这个,咱们已经完成了咱们的ReplaceStream类。

如今,是时候尝试新的stream。咱们能够建立另外一个名为replaceStreamTest.js的模块来写入一些数据,而后读取转换的结果:

const ReplaceStream = require('./replaceStream');

const rs = new ReplaceStream('World', 'Node.js');
rs.on('data', chunk => console.log(chunk.toString()));

rs.write('Hello W');
rs.write('orld!');
rs.end();

为了使得这个例子更复杂一些,咱们把搜索词分布在两个不一样的数据块上;而后,使用flowing模式,咱们从同一个stream中读取数据,记录每一个已转换的块。运行前面的程序应该产生如下输出:

Hel
lo Node.js
!
有一个值得说起是,第五种类型的stream:stream.PassThrough。 与咱们介绍的其余流类不一样,PassThrough不是抽象的,能够直接实例化,而不须要实现任何方法。实际上,这是一个可转换的stream,它能够输出每一个数据块,而不须要进行任何转换。

使用管道链接Streams

Unix管道的概念是由Douglas Mcllroy发明的;这使程序的输出可以链接到下一个的输入。看看下面的命令:

echo Hello World! | sed s/World/Node.js/g

在前面的命令中,echo会将Hello World!写入标准输出,而后被重定向到sed命令的标准输入(由于有管道操做符 |)。 而后sedNode.js替换任何World,并将结果打印到它的标准输出(此次是控制台)。

以相似的方式,可使用可读的Streamspipe()方法将Node.jsStreams链接在一块儿,它具备如下接口:

readable.pipe(writable, [options])

很是直观地,pipe()方法将从可读的Streams中发出的数据抽取到所提供的可写入的Streams中。 另外,当可读的Streams发出end事件(除非咱们指定{end:false}做为options)时,可写入的Streams将自动结束。 pipe()方法返回做为参数传递的可写入的Streams,若是这样的stream也是可读的(例如双重或可转换的Streams),则容许咱们建立链式调用。

将两个Streams链接到一块儿时,则容许数据自动流向可写入的Streams,因此不须要调用read()write()方法;但最重要的是不须要控制back-pressure,由于它会自动处理。

举个简单的例子(将会有大量的例子),咱们能够建立一个名为replace.js的新模块,它接受来自标准输入的文本流,应用替换转换,而后将数据返回到标准输出:

const ReplaceStream = require('./replaceStream');
process.stdin
  .pipe(new ReplaceStream(process.argv[2], process.argv[3]))
  .pipe(process.stdout);

上述程序未来自标准输入的数据传送到ReplaceStream,而后返回到标准输出。 如今,为了实践这个小应用程序,咱们能够利用Unix管道将一些数据重定向到它的标准输入,以下所示:

echo Hello World! | node replace World Node.js

运行上述程序,会输出以下结果:

Hello Node.js

这个简单的例子演示了Streams(特别是文本Streams)是一个通用接口,管道几乎是构成和链接全部这些接口的通用方式。

error事件不会经过管道自动传播。举个例子,看以下代码片断:
stream1
  .pipe(stream2)
  .on('error', function() {});
在前面的链式调用中,咱们将只捕获来自 stream2的错误,这是因为咱们给其添加了 erorr事件侦听器。这意味着,若是咱们想捕获从 stream1生成的任何错误,咱们必须直接附加另外一个错误侦听器。 稍后咱们将看到一种能够实现共同错误捕获的另外一种模式(合并 Streams)。 此外,咱们应该注意到,若是目标 Streams(读取的 Streams)发出错误,它将会对源 Streams通知一个 error,以后致使管道的中断。

Streams如何经过管道

到目前为止,咱们建立自定义Streams的方式并不彻底遵循Node定义的模式;实际上,从stream基类继承是违反small surface area的,并须要一些示例代码。 这并不意味着Streams设计得很差,实际上,咱们不该该忘记,由于StreamsNode.js核心的一部分,因此它们必须尽量地灵活,普遍拓展Streams以至于用户级模块可以将它们充分运用。

然而,大多数状况下,咱们并不须要原型继承能够给予的全部权力和可扩展性,但一般咱们想要的仅仅是定义新Streams的一种快速开发的模式。Node.js社区固然也为此建立了一个解决方案。 一个完美的例子是through2,一个使得咱们能够简单地建立转换的Streams的小型库。 经过through2,咱们能够经过调用一个简单的函数来建立一个新的可转换的Streams

const transform = through2([options], [_transform], [_flush]);

相似的,from2也容许咱们像下面这样建立一个可读的Streams

const readable = from2([options], _read);

接下来,咱们将在本章其他部分展现它们的用法,那时,咱们会清楚使用这些小型库的好处。

throughfrom是基于 Stream1规范的顶层库。

基于Streams的异步控制流

经过咱们已经介绍的例子,应该清楚的是,Streams不只能够用来处理I / O,并且能够用做处理任何类型数据的优雅编程模式。 但优势并不止这些;还能够利用Streams来实现异步控制流,在本节将会看到。

顺序执行

默认状况下,Streams将按顺序处理数据;例如,转换的Streams_transform()函数在前一个数据块执行callback()以后才会进行下一块数据块的调用。这是Streams的一个重要属性,按正确顺序处理每一个数据块相当重要,可是也能够利用这一属性将Streams实现优雅的传统控制流模式。

代码老是比太多的解释要好得多,因此让咱们来演示一下如何使用流来按顺序执行异步任务的例子。让咱们建立一个函数来链接一组接收到的文件做为输入,确保遵照提供的顺序。咱们建立一个名为concatFiles.js的新模块,并从其依赖开始:

const fromArray = require('from2-array');
const through = require('through2');
const fs = require('fs');

咱们将使用through2来简化转换的Streams的建立,并使用from2-array从一个对象数组中建立可读的Streams
接下来,咱们能够定义concatFiles()函数:

function concatFiles(destination, files, callback) {
  const destStream = fs.createWriteStream(destination);
  fromArray.obj(files)             //[1]
    .pipe(through.obj((file, enc, done) => {   //[2]
      const src = fs.createReadStream(file);
      src.pipe(destStream, {end: false});
      src.on('end', done); //[3]
    }))
    .on('finish', () => {         //[4]
      destStream.end();
      callback();
    });
}

module.exports = concatFiles;

前面的函数经过将files数组转换为Streams来实现对files数组的顺序迭代。 该函数所遵循的程序解释以下:

  1. 首先,咱们使用from2-arrayfiles数组建立一个可读的Streams
  2. 接下来,咱们使用through来建立一个转换的Streams来处理序列中的每一个文件。对于每一个文件,咱们建立一个可读的Streams,并经过管道将其输入到表示输出文件的destStream中。 在源文件完成读取后,经过在pipe()方法的第二个参数中指定{end:false},咱们确保不关闭destStream
  3. 当源文件的全部内容都被传送到destStream时,咱们调用through.obj公开的done函数来传递当前处理已经完成,在咱们的状况下这是须要触发处理下一个文件。
  4. 全部文件处理完后,finish事件被触发。咱们最后能够结束destStream并调用concatFiles()callback()函数,这个函数表示整个操做的完成。

咱们如今能够尝试使用咱们刚刚建立的小模块。让咱们建立一个名为concat.js的新文件来完成一个示例:

const concatFiles = require('./concatFiles');

concatFiles(process.argv[2], process.argv.slice(3), () => {
  console.log('Files concatenated successfully');
});

咱们如今能够运行上述程序,将目标文件做为第一个命令行参数,接着是要链接的文件列表,例如:

node concat allTogether.txt file1.txt file2.txt

执行这一条命令,会建立一个名为allTogether.txt的新文件,其中按顺序保存file1.txtfile2.txt的内容。

使用concatFiles()函数,咱们可以仅使用Streams实现异步操做的顺序执行。正如咱们在Chapter3 Asynchronous Control Flow Patters with Callbacks中看到的那样,若是使用纯JavaScript实现,或者使用async等外部库,则须要使用或实现迭代器。咱们如今提供了另一个能够达到一样效果的方法,正如咱们所看到的,它的实现方式很是优雅且可读性高。

模式:使用Streams或Streams的组合,能够轻松地按顺序遍历一组异步任务。

无序并行执行

咱们刚刚看到Streams按顺序处理每一个数据块,但有时这可能并不能这么作,由于这样并无充分利用Node.js的并发性。若是咱们必须对每一个数据块执行一个缓慢的异步操做,那么并行化执行这一组异步任务彻底是有必要的。固然,只有在每一个数据块之间没有关系的状况下才能应用这种模式,这些数据块可能常常发生在对象模式的Streams中,可是对于二进制模式的Streams不多使用无序的并行执行。

注意:当处理数据的顺序很重要时,不能使用无序并行执行的Streams。

为了并行化一个可转换的Streams的执行,咱们能够运用Chapter3 Asynchronous Control Flow Patters with Callbacks所讲到的无序并行执行的相同模式,而后作出一些改变使它们适用于Streams。让咱们看看这是如何更改的。

实现一个无序并行的Streams

让咱们用一个例子直接说明:咱们建立一个叫作parallelStream.js的模块,而后自定义一个普通的可转换的Streams,而后给出一系列可转换流的方法:

const stream = require('stream');

class ParallelStream extends stream.Transform {
  constructor(userTransform) {
    super({objectMode: true});
    this.userTransform = userTransform;
    this.running = 0;
    this.terminateCallback = null;
  }

  _transform(chunk, enc, done) {
    this.running++;
    this.userTransform(chunk, enc, this._onComplete.bind(this), this.push.bind(this));
    done();
  }

  _flush(done) {
    if(this.running > 0) {
      this.terminateCallback = done;
    } else {
      done();
    }
  }

  _onComplete(err) {
    this.running--;
    if(err) {
      return this.emit('error', err);
    }
    if(this.running === 0) {
      this.terminateCallback && this.terminateCallback();
    }
  }
}

module.exports = ParallelStream;

咱们来分析一下这个新的自定义的类。正如你所看到的同样,构造函数接受一个userTransform()函数做为参数,而后将其另存为一个实例变量;咱们也调用父构造函数,而且咱们默认启用对象模式。

接下来,来看_transform()方法,在这个方法中,咱们执行userTransform()函数,而后增长当前正在运行的任务个数; 最后,咱们经过调用done()来通知当前转换步骤已经完成。_transform()方法展现了如何并行处理另外一项任务。咱们不用等待userTransform()方法执行完毕再调用done()。 相反,咱们当即执行done()方法。另外一方面,咱们提供了一个特殊的回调函数给userTransform()方法,这就是this._onComplete()方法;以便咱们在userTransform()完成的时候收到通知。

Streams终止以前,会调用_flush()方法,因此若是仍有任务正在运行,咱们能够经过不当即调用done()回调函数来延迟finish事件的触发。相反,咱们将其分配给this.terminateCallback变量。为了理解Streams如何正确终止,来看_onComplete()方法。

在每组异步任务最终完成时,_onComplete()方法会被调用。首先,它会检查是否有任务正在运行,若是没有,则调用this.terminateCallback()函数,这将致使Streams结束,触发_flush()方法的finish事件。

利用刚刚构建的ParallelStream类能够轻松地建立一个无序并行执行的可转换的Streams实例,可是有个注意:它不会保留项目接收的顺序。实际上,异步操做能够在任什么时候候都有可能完成并推送数据,而跟它们开始的时刻并无必然的联系。所以咱们知道,对于二进制模式的Streams并不适用,由于二进制的Streams对顺序要求较高。

实现一个URL监控应用程序

如今,让咱们使用ParallelStream模块实现一个具体的例子。让咱们想象如下咱们想要构建一个简单的服务来监控一个大URL列表的状态,让咱们想象如下,全部的这些URL包含在一个单独的文件中,而且每个URL占据一个空行。

Streams可以为这个场景提供一个高效且优雅的解决方案。特别是当咱们使用咱们刚刚写的ParallelStream类来无序地审核这些URL

接下来,让咱们建立一个简单的放在checkUrls.js模块的应用程序。

const fs = require('fs');
const split = require('split');
const request = require('request');
const ParallelStream = require('./parallelStream');

fs.createReadStream(process.argv[2])         //[1]
  .pipe(split())                             //[2]
  .pipe(new ParallelStream((url, enc, done, push) => {     //[3]
    if(!url) return done();
    request.head(url, (err, response) => {
      push(url + ' is ' + (err ? 'down' : 'up') + '\n');
      done();
    });
  }))
  .pipe(fs.createWriteStream('results.txt'))   //[4]
  .on('finish', () => console.log('All urls were checked'))
;

正如咱们所看到的,经过流,咱们的代码看起来很是优雅,直观。 让咱们看看它是如何工做的:

  1. 首先,咱们经过给定的文件参数建立一个可读的Streams,便于接下来读取文件。
  2. 咱们经过split将输入的文件的Streams的内容输出一个可转换的Streams到管道中,而且分隔了数据块的每一行。
  3. 而后,是时候使用咱们的ParallelStream来检查URL了,咱们发送一个HEAD请求而后等待请求的response。当请求返回时,咱们把请求的结果pushstream中。
  4. 最后,经过管道把结果保存到results.txt文件中。
node checkUrls urlList.txt

这里的文件urlList.txt包含一组URL,例如:

  • http://www.mariocasciaro.me/
  • http://loige.co/
  • http://thiswillbedownforsure.com/

当应用执行完成后,咱们能够看到一个文件results.txt被建立,里面包含有操做的结果,例如:

  • http://thiswillbedownforsure.com is down
  • http://loige.co is up
  • http://www.mariocasciaro.me is up

输出的结果的顺序颇有可能与输入文件中指定URL的顺序不一样。这是Streams无序并行执行任务的明显特征。

出于好奇,咱们可能想尝试用一个正常的through2流替换ParallelStream,并比较二者的行为和性能(你可能想这样作的一个练习)。咱们将会看到,使用through2的方式会比较慢,由于每一个URL都将按顺序进行检查,并且文件results.txt中结果的顺序也会被保留。

无序限制并行执行

若是运行包含数千或数百万个URL的文件的checkUrls应用程序,咱们确定会遇到麻烦。咱们的应用程序将同时建立不受控制的链接数量,并行发送大量数据,并可能破坏应用程序的稳定性和整个系统的可用性。咱们已经知道,控制负载的无序限制并行执行是一个极好的解决方案。

让咱们经过建立一个limitedParallelStream.js模块来看看它是如何工做的,这个模块是改编自上一节中建立的parallelStream.js模块。

让咱们看看它的构造函数:

class LimitedParallelStream extends stream.Transform {
  constructor(concurrency, userTransform) {
    super({objectMode: true});
    this.concurrency = concurrency;
    this.userTransform = userTransform;
    this.running = 0;
    this.terminateCallback = null;
    this.continueCallback = null;
  }
// ...
}

咱们须要一个concurrency变量做为输入来限制并发量,此次咱们要保存两个回调函数,continueCallback用于任何挂起的_transform方法,terminateCallback用于_flush方法的回调。
接下来看_transform()方法:

_transform(chunk, enc, done) {
  this.running++;
  this.userTransform(chunk, enc,  this.push.bind(this), this._onComplete.bind(this));
  if(this.running < this.concurrency) {
    done();
  } else {
    this.continueCallback = done;
  }
}

此次在_transform()方法中,咱们必须在调用done()以前检查是否达到了最大并行数量的限制,若是没有达到了限制,才能触发下一个项目的处理。若是咱们已经达到最大并行数量的限制,咱们能够简单地将done()回调保存到continueCallback变量中,以便在任务完成后当即调用它。

_flush()方法与ParallelStream类保持彻底同样,因此咱们直接转到实现_onComplete()方法:

_onComplete(err) {
  this.running--;
  if(err) {
    return this.emit('error', err);
  }
  const tmpCallback = this.continueCallback;
  this.continueCallback = null;
  tmpCallback && tmpCallback();
  if(this.running === 0) {
    this.terminateCallback && this.terminateCallback();
  }
}

每当任务完成,咱们调用任何已保存的continueCallback()将致使
stream解锁,触发下一个项目的处理。

这就是limitedParallelStream模块。 咱们如今能够在checkUrls模块中使用它来代替parallelStream,而且将咱们的任务的并发限制在咱们设置的值上。

顺序并行执行

咱们之前建立的并行Streams可能会使得数据的顺序混乱,可是在某些状况下这是不可接受的。有时,实际上,有那种须要每一个数据块都以接收到的相同顺序发出的业务场景。咱们仍然能够并行运行transform函数。咱们所要作的就是对每一个任务发出的数据进行排序,使其遵循与接收数据相同的顺序。

这种技术涉及使用buffer,在每一个正在运行的任务发出时从新排序块。为简洁起见,咱们不打算提供这样一个stream的实现,由于这本书的范围是至关冗长的;咱们要作的就是重用为了这个特定目的而构建的npm上的一个可用包,例如through2-parallel

咱们能够经过修改现有的checkUrls模块来快速检查一个有序的并行执行的行为。 假设咱们但愿咱们的结果按照与输入文件中的URL相同的顺序编写。 咱们可使用经过through2-parallel来实现:

const fs = require('fs');
const split = require('split');
const request = require('request');
const throughParallel = require('through2-parallel');

fs.createReadStream(process.argv[2])
  .pipe(split())
  .pipe(throughParallel.obj({concurrency: 2}, function (url, enc, done) {
    if(!url) return done();
    request.head(url, (err, response) => {
      this.push(url + ' is ' + (err ? 'down' : 'up') + '\n');
      done();
    });
  }))
  .pipe(fs.createWriteStream('results.txt'))
  .on('finish', () => console.log('All urls were checked'))
;

正如咱们所看到的,through2-parallel的接口与through2的接口很是类似;惟一的不一样是在through2-parallel还能够为咱们提供的transform函数指定一个并发限制。若是咱们尝试运行这个新版本的checkUrls,咱们会看到results.txt文件列出结果的顺序与输入文件中
URLs的出现顺序是同样的。

经过这个,咱们总结了使用Streams实现异步控制流的分析;接下来,咱们研究管道模式。

管道模式

就像在现实生活中同样,Node.jsStreams也能够按照不一样的模式进行管道链接。事实上,咱们能够将两个不一样的Streams合并成一个Streams,将一个Streams分红两个或更多的管道,或者根据条件重定向流。 在本节中,咱们将探讨可应用于Node.jsStreams最重要的管道技术。

组合的Streams

在本章中,咱们强调Streams提供了一个简单的基础结构来模块化和重用咱们的代码,可是却漏掉了一个重要的部分:若是咱们想要模块化和重用整个流水线?若是咱们想要合并多个Streams,使它们看起来像外部的Streams,那该怎么办?下图显示了这是什么意思:

从上图中,咱们看到了如何组合几个流的了:

  • 当咱们写入组合的Streams的时候,实际上咱们是写入组合的Streams的第一个单元,即StreamA
  • 当咱们从组合的Streams中读取信息时,实际上咱们从组合的Streams的最后一个单元中读取。

一个组合的Streams一般是一个多重的Streams,经过链接第一个单元的写入端和链接最后一个单元的读取端。

要从两个不一样的Streams(一个可读的Streams和一个可写入的Streams)中建立一个多重的Streams,咱们可使用一个npm模块,例如 duplexer2

但上述这么作并不完整。实际上,组合的Streams还应该作到捕获到管道中任意一段Streams单元产生的错误。咱们已经说过,任何错误都不会自动传播到管道中。 因此,咱们必须有适当的错误管理,咱们将不得不显式附加一个错误监听器到每一个Streams。可是,组合的Streams其实是一个黑盒,这意味着咱们没法访问管道中间的任何单元,因此对于管道中任意单元的异常捕获,组合的Streams也充当聚合器的角色。

总而言之,组合的Streams具备两个主要优势:

  • 管道内部是一个黑盒,对使用者不可见。
  • 简化了错误管理,由于咱们没必要为管道中的每一个单元附加一个错误侦听器,而只须要给组合的Streams自身附加上就能够了。

组合的Streams是一个很是通用和广泛的作法,因此若是咱们没有任何特殊的须要,咱们可能只想重用现有的解决方案,如multipipecombine-stream

实现一个组合的Streams

为了说明一个简单的例子,咱们来考虑下面两个组合的Streams的状况:

  • 压缩和加密数据
  • 解压和解密数据

使用诸如multipipe之类的库,咱们能够经过组合一些核心库中已有的Streams(文件combinedStreams.js)来轻松地构建组合的Streams

const zlib = require('zlib');
const crypto = require('crypto');
const combine = require('multipipe');
module.exports.compressAndEncrypt = password => {
  return combine(
    zlib.createGzip(),
    crypto.createCipher('aes192', password)
  );
};
module.exports.decryptAndDecompress = password => {
  return combine(
    crypto.createDecipher('aes192', password),
    zlib.createGunzip()
  );
};

例如,咱们如今可使用这些组合的数据流,如同黑盒,这些对咱们均是不可见的,能够建立一个小型应用程序,经过压缩和加密来归档文件。 让咱们在一个名为archive.js的新模块中作这件事:

const fs = require('fs');
const compressAndEncryptStream = require('./combinedStreams').compressAndEncrypt;
fs.createReadStream(process.argv[3])
  .pipe(compressAndEncryptStream(process.argv[2]))
  .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"));

咱们能够经过从咱们建立的流水线中构建一个组合的Stream来进一步改进前面的代码,但此次并不仅是为了得到对外不可见的黑盒,而是为了进行异常捕获。 实际上,正如咱们已经提到过的那样,写下以下的代码只会捕获最后一个Stream单元发出的错误:

fs.createReadStream(process.argv[3])
  .pipe(compressAndEncryptStream(process.argv[2]))
  .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"))
  .on('error', function(err) {
    // 只会捕获最后一个单元的错误
    console.log(err);
  });

可是,经过把全部的Streams结合在一块儿,咱们能够优雅地解决这个问题。重构后的archive.js以下:

const combine = require('multipipe');
   const fs = require('fs');
   const compressAndEncryptStream =
     require('./combinedStreams').compressAndEncrypt;
   combine(
     fs.createReadStream(process.argv[3])
     .pipe(compressAndEncryptStream(process.argv[2]))
     .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"))
   ).on('error', err => {
     // 使用组合的Stream能够捕获任意位置的错误
     console.log(err);
   });

正如咱们所看到的,咱们如今能够将一个错误侦听器直接附加到组合的Streams,它将接收任何内部流发出的任何error事件。
如今,要运行archive模块,只需在命令行参数中指定passwordfile参数,即压缩模块的参数:

node archive mypassword /path/to/a/file.text

经过这个例子,咱们已经清楚地证实了组合的Stream是多么重要; 从一个方面来讲,它容许咱们建立流的可重用组合,从另外一方面来讲,它简化了管道的错误管理。

分开的Streams

咱们能够经过将单个可读的Stream管道化为多个可写入的Stream来执行Stream的分支。当咱们想要将相同的数据发送到不一样的目的地时,这便体现其做用了,例如,两个不一样的套接字或两个不一样的文件。当咱们想要对相同的数据执行不一样的转换时,或者当咱们想要根据一些标准拆分数据时,也可使用它。如图所示:

Node.js中分开的Stream是一件小事。举例说明。

实现一个多重校验和的生成器

让咱们建立一个输出给定文件的sha1md5散列的小工具。咱们来调用这个新模块generateHashes.js,看以下的代码:

const fs = require('fs');
const crypto = require('crypto');
const sha1Stream = crypto.createHash('sha1');
sha1Stream.setEncoding('base64');
const md5Stream = crypto.createHash('md5');
md5Stream.setEncoding('base64');

目前为止没什么特别的 该模块的下一个部分其实是咱们将从文件建立一个可读的Stream,并将其分叉到两个不一样的流,以得到另外两个文件,其中一个包含sha1散列,另外一个包含md5校验和:

const inputFile = process.argv[2];
const inputStream = fs.createReadStream(inputFile);
inputStream
  .pipe(sha1Stream)
  .pipe(fs.createWriteStream(inputFile + '.sha1'));
inputStream
  .pipe(md5Stream)
  .pipe(fs.createWriteStream(inputFile + '.md5'));

这很简单:inputStream变量经过管道一边输入到sha1Stream,另外一边输入到md5Stream。可是要注意:

  • inputStream结束时,md5Streamsha1Stream会自动结束,除非当调用pipe()时指定了end选项为false
  • Stream的两个分支会接受相同的数据块,所以当对数据执行一些反作用的操做时咱们必须很是谨慎,由于那样会影响另一个分支。
  • 黑盒外会产生背压,来自inputStream的数据流的流速会根据接收最慢的分支的流速做出调整。

合并的Streams

合并与分开相对,经过把一组可读的Streams合并到一个单独的可写的Stream里,如图所示:

将多个Streams合并为一个一般是一个简单的操做; 然而,咱们必须注意咱们处理end事件的方式,由于使用自动结束选项的管道系统会在一个源结束时当即结束目标流。 这一般会致使错误,由于其余还未结束的源将继续写入已终止的Stream。 解决此问题的方法是在将多个源传输到单个目标时使用选项{end:false},而且只有在全部源完成读取后才在目标Stream上调用end()

用多个源文件压缩为一个压缩包

举一个简单的例子,咱们来实现一个小程序,它根据两个不一样目录的内容建立一个压缩包。 为此,咱们将介绍两个新的npm模块:

  • tar用来建立压缩包
  • fstream从文件系统文件建立对象streams的库

咱们建立一个新模块mergeTar.js,以下开始初始化:

var tar = require('tar');
var fstream = require('fstream');
var path = require('path');
var destination = path.resolve(process.argv[2]);
var sourceA = path.resolve(process.argv[3]);
var sourceB = path.resolve(process.argv[4]);

在前面的代码中,咱们只加载所有依赖包和初始化包含目标文件和两个源目录(sourceAsourceB)的变量。

接下来,咱们建立tarStream并经过管道输出到一个可写入的Stream

const pack = tar.Pack();
pack.pipe(fstream.Writer(destination));

如今,咱们开始初始化源Stream

let endCount = 0;

function onEnd() {
  if (++endCount === 2) {
    pack.end();
  }
}

const sourceStreamA = fstream.Reader({
    type: "Directory",
    path: sourceA
  })
  .on('end', onEnd);

const sourceStreamB = fstream.Reader({
    type: "Directory",
    path: sourceB
  })
  .on('end', onEnd);

在前面的代码中,咱们建立了从两个源目录(sourceStreamAsourceStreamB)中读取的Stream那么对于每一个源Stream,咱们附加一个end事件订阅者,只有当这两个目录被彻底读取时,才会触发packend事件。

最后,合并两个Stream

sourceStreamA.pipe(pack, {end: false});
sourceStreamB.pipe(pack, {end: false});

咱们将两个源文件都压缩到pack这个Stream中,并经过设定pipe()option参数为{end:false}配置终点Stream的自动触发end事件。

这样,咱们已经完成了咱们简单的TAR程序。咱们能够经过提供目标文件做为第一个命令行参数,而后是两个源目录来尝试运行这个实用程序:

node mergeTar dest.tar /path/to/sourceA /path/to/sourceB

npm中咱们能够找到一些能够简化Stream的合并的模块:

要注意,流入目标Stream的数据是随机混合的,这是一个在某些类型的对象流中能够接受的属性(正如咱们在上一个例子中看到的那样),可是在处理二进制Stream时一般是一个不但愿这样。

然而,咱们能够经过一种模式按顺序合并Stream; 它包含一个接一个地合并源Stream,当前一个结束时,开始发送第二段数据块(就像链接全部源Stream的输出同样)。在npm上,咱们能够找到一些也处理这种状况的软件包。其中之一是multistream

多路复用和多路分解

合并Stream模式有一个特殊的模式,咱们并非真的只想将多个Stream合并在一块儿,而是使用一个共享通道来传送一组数据Stream。与以前的不同,由于源数据Stream在共享通道内保持逻辑分离,这使得一旦数据到达共享通道的另外一端,咱们就能够再次分离数据Stream。如图所示:

将多个Stream组合在单个Stream上传输的操做被称为多路复用,而相反的操做(即,从共享Stream接收数据重构原始的Stream)则被称为多路分用。执行这些操做的设备分别称为多路复用器和多路分解器(。 这是一个在计算机科学和电信领域普遍研究的话题,由于它是几乎任何类型的通讯媒体,如电话,广播,电视,固然还有互联网自己的基础之一。 对于本书的范围,咱们不会过多解释,由于这是一个很大的话题。

咱们想在本节中演示的是,如何使用共享的Node.js Streams来传送多个逻辑上分离的Stream,而后在共享Stream的另外一端再次分离,即实现一次多路复用和多路分解。

建立一个远程logger日志记录

举例说明,咱们但愿有一个小程序来启动子进程,并将其标准输出和标准错误都重定向到远程服务器,服务器接受它们而后保存为两个单独的文件。所以,在这种状况下,共享介质是TCP链接,而要复用的两个通道是子进程的stdoutstderr。 咱们将利用分组交换的技术,这种技术与IPTCPUDP等协议所使用的技术相同,包括将数据封装在数据包中,容许咱们指定各类源信息,这对多路复用,路由,控制 流程,检查损坏的数据都十分有帮助。

如图所示,这个例子的协议大概是这样,数据被封装成具备如下结构的数据包:

在客户端实现多路复用

先说客户端,建立一个名为client.js的模块,这是咱们这个应用程序的一部分,它负责启动一个子进程并实现Stream多路复用。

开始定义模块,首先加载依赖:

const child_process = require('child_process');
const net = require('net');

而后开始实现多路复用的函数:

function multiplexChannels(sources, destination) {
  let totalChannels = sources.length;

  for(let i = 0; i < sources.length; i++) {
    sources[i]
      .on('readable', function() { // [1]
        let chunk;
        while ((chunk = this.read()) !== null) {
          const outBuff = new Buffer(1 + 4 + chunk.length); // [2]
          outBuff.writeUInt8(i, 0);
          outBuff.writeUInt32BE(chunk.length, 1);
          chunk.copy(outBuff, 5);
          console.log('Sending packet to channel: ' + i);
          destination.write(outBuff); // [3]
        }
      })
      .on('end', () => { //[4]
        if (--totalChannels === 0) {
          destination.end();
        }
      });
  }
}

multiplexChannels()函数接受要复用的源Stream做为输入
和复用接口做为参数,而后执行如下步骤:

  1. 对于每一个源Stream,它会注册一个readable事件侦听器,咱们使用non-flowing模式从流中读取数据。
  2. 每读取一个数据块,咱们将其封装到一个首部中,首部的顺序为:channel ID为1字节(UInt8),数据包大小为4字节(UInt32BE),而后为实际数据。
  3. 数据包准备好后,咱们将其写入目标Stream
  4. 咱们为end事件注册一个监听器,以便当全部源Stream结束时,end事件触发,通知目标Stream触发end事件。
注意,咱们的协议最多可以复用多达256个不一样的源流,由于咱们只有1个字节来标识 channel
const socket = net.connect(3000, () => { // [1]
  const child = child_process.fork( // [2]
    process.argv[2],
    process.argv.slice(3), {
      silent: true
    }
  );
  multiplexChannels([child.stdout, child.stderr], socket); // [3]
});

在最后,咱们执行如下操做:

  1. 咱们建立一个新的TCP客户端链接到地址localhost:3000
  2. 咱们经过使用第一个命令行参数做为路径来启动子进程,同时咱们提供剩余的process.argv数组做为子进程的参数。咱们指定选项{silent:true},以便子进程不会继承父级的stdoutstderr
  3. 咱们使用mutiplexChannels()函数将stdoutstderr多路复用到socket里。
在服务端实现多路分解

如今来看服务端,建立server.js模块,在这里咱们未来自远程链接的Stream多路分解,并将它们传送到两个不一样的文件中。

首先建立一个名为demultiplexChannel()的函数:

function demultiplexChannel(source, destinations) {
  let currentChannel = null;
  let currentLength = null;
  source
    .on('readable', () => { //[1]
      let chunk;
      if(currentChannel === null) {          //[2]
        chunk = source.read(1);
        currentChannel = chunk && chunk.readUInt8(0);
      }
    
      if(currentLength === null) {          //[3]
        chunk = source.read(4);
        currentLength = chunk && chunk.readUInt32BE(0);
        if(currentLength === null) {
          return;
        }
      }
    
      chunk = source.read(currentLength);        //[4]
      if(chunk === null) {
        return;
      }
    
      console.log('Received packet from: ' + currentChannel);
    
      destinations[currentChannel].write(chunk);      //[5]
      currentChannel = null;
      currentLength = null;
    })
    .on('end', () => {            //[6]
      destinations.forEach(destination => destination.end());
      console.log('Source channel closed');
    })
  ;
}

上面的代码可能看起来很复杂,仔细阅读并不是如此;因为Node.js可读的Stream的拉动特性,咱们能够很容易地实现咱们的小协议的多路分解,以下所示:

  1. 咱们开始使用non-flowing模式从流中读取数据。
  2. 首先,若是咱们尚未读取channel ID,咱们尝试从流中读取1个字节,而后将其转换为数字。
  3. 下一步是读取首部的长度。咱们须要读取4个字节,因此有可能在内部Buffer尚未足够的数据,这将致使this.read()调用返回null。在这种状况下,咱们只是中断解析,而后重试下一个readable事件。
  4. 当咱们最终还能够读取数据大小时,咱们知道从内部Buffer中拉出多少数据,因此咱们尝试读取全部数据。
  5. 当咱们读取全部的数据时,咱们能够把它写到正确的目标通道,必定要记得重置currentChannelcurrentLength变量(这些变量将被用来解析下一个数据包)。
  6. 最后,当源channel结束时,必定不要忘记调用目标Streamend()方法。

既然咱们能够多路分解源Stream,进行以下调用:

net.createServer(socket => {
  const stdoutStream = fs.createWriteStream('stdout.log');
  const stderrStream = fs.createWriteStream('stderr.log');
  demultiplexChannel(socket, [stdoutStream, stderrStream]);
})
  .listen(3000, () => console.log('Server started'))
;

在上面的代码中,咱们首先在3000端口上启动一个TCP服务器,而后对于咱们接收到的每一个链接,咱们将建立两个可写入的Stream,指向两个不一样的文件,一个用于标准输出,另外一个用于标准错误; 这些是咱们的目标channel。 最后,咱们使用demultiplexChannel()将套接字流解复用为stdoutStreamstderrStream

运行多路复用和多路分解应用程序

如今,咱们准备尝试运行咱们的新的多路复用/多路分解应用程序,但首先让咱们建立一个小的Node.js程序来产生一些示例输出; 咱们把它叫作generateData.js

console.log("out1");
console.log("out2");
console.error("err1");
console.log("out3");
console.error("err2");

首先,让咱们开始运行服务端:

node server

而后运行客户端,须要提供做为子进程的文件参数:

node client generateData.js

客户端几乎立马运行,可是进程结束时,generateData应用程序的标准输入和标准输出通过一个TCP链接,而后在服务器端,被多路分解成两个文件。

注意,当咱们使用 child_process.fork()时,咱们的客户端可以启动别的 Node.js模块。

对象Streams的多路复用和多路分解

咱们刚刚展现的例子演示了如何复用和解复用二进制/文本Stream,但值得一提的是,相同的规则也适用于对象Stream。 最大的区别是,使用对象,咱们已经有了使用原子消息(对象)传输数据的方法,因此多路复用就像设置一个属性channel ID到每一个对象同样简单,而多路分解只须要读·channel ID属性,并将每一个对象路由到正确的目标Stream

还有一种模式是取一个对象上的几个属性并分发到多个目的Stream的模式 经过这种模式,咱们能够实现复杂的流程,以下图所示:

如上图所示,取一个对象Stream表示animals,而后根据动物类型:reptilesamphibiansmammals,而后分发到正确的目标Stream中。

总结

在本章中,咱们已经对Node.js Streams及其使用案例进行了阐述,但同时也应该为编程范式打开一扇大门,几乎具备无限的可能性。咱们了解了为何StreamNode.js社区赞誉,而且咱们掌握了它们的基本功能,使咱们可以利用它作更多有趣的事情。咱们分析了一些先进的模式,并开始了解如何将不一样配置的Streams链接在一块儿,掌握这些特性,从而使流如此多才多艺,功能强大。

若是咱们遇到不能用一个Stream来实现的功能,咱们能够经过将其余Streams链接在一块儿来实现,这是Node.js的一个很好的特性;Streams在处理二进制数据,字符串和对象都十分有用,并具备鲜明的特色。

在下一章中,咱们将重点介绍传统的面向对象的设计模式。尽管JavaScript在某种程度上是面向对象的语言,但在Node.js中,函数式或混合方法一般是首选。在阅读下一章便揭晓答案。

相关文章
相关标签/搜索