Node中的流

一.流的概念
stream是数据集合,与数组、字符串差很少。但stream不一次性访问所有数据,而是一部分一部分发送/接收(chunk式的),因此没必要占用那么大块内存,尤为适用于处理大量(外部)数据的场景数组

stream具备管道(pipeline)特性,例如:缓存

const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)

不少原生模块都是基于stream的,包括进程的stdin/stdout/stderr:socket

Node中的流

例如常见的场景:ide

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

其中pipe方法把可读流的输出(数据源)做为可写流的输入(目标),直接把读文件的输出流做为输入链接到HTTP响应的输出流,从而避免把整个文件读入内存函数

P.S.甚至平常使用的console.log()内部实现也是stream工具

二.流的类型
Node中有4种基础流:ui

Readablethis

可读流是对源的抽象, 从中能够消耗数据,如fs.createReadStream命令行

Writable设计

可写流是对可写入数据的目标的抽象,如fs.createWriteStream

Duplex(双工)

双工流既可读又可写,如TCP socket

Transform(转换)

转换流本质上是双工流,用于在写入和读取数据时对其进行修改或转换,如zlib.createGzip用gzip压缩数据

转换流看一看作一个输入可写流,输出可读流的函数

P.S.有一种转换流叫(Pass)Through Stream(经过流),相似于FP中的identity = x => x

三.管道
src.pipe(res)要求源必须可读,目标必须可写,因此,若是是对双工流进行管道传输,就能够像Linux的管道同样链式调用:

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)

pipe()方法返回目标流,因此:

// a (readable), b and c (duplex), and d (writable)
a.pipe(b).pipe(c).pipe(d)
// 等价于
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Linux下,等价于
$ a | b | c | d

四.流与事件
事件驱动是Node在设计上的一个重要特色,不少Node原生对象都是基于事件机制(EventEmitter模块)实现的,包括流(stream模块):

Most of Node’s objects — like HTTP requests, responses, and streams — implement the EventEmitter module so they can provide a way to emit and listen to events.

全部stream都是EventEmitter实例,经过事件机制来读写数据,例如上面提到的pipe()方法至关于:

// readable.pipe(writable)

readable.on('data', (chunk) => {
  writable.write(chunk);
});
readable.on('end', () => {
  writable.end();
});

P.S.pipe还处理了一些别的事情,好比错误处理,EoF以及某个流的速度较快/较慢的状况

Readable与Writable stream的主要事件和方法以下:

Node中的流

Readable的主要事件有:

data事件:stream把一个chunk传递给使用者时触发

end事件:再没有要从stream中获取(consume)的数据时触发

Writable的主要事件有:

drain事件,断流了,这是Writable stream能够接收更多数据的信号

finish事件,当全部数据都已flush到下层系统时触发

五.Readable stream的两种模式:Paused与Flowing
一个Readable stream要么流动(Flowing)要么暂停(Paused),也被称为拉(pull)和推(push)两种模式

建立出来后默认处于Paused状态,能够经过read()方法读取数据。若是处于Flowing状态,数据会持续地流出来,此时只须要经过监听事件来使用这些数据,若是没有使用者的话,数据会丢失,因此都会监听Readable stream的data事件,实际上监听data事件会把Readable stream从Paused状态切换到Flowing,移除data事件监听会再切回来。须要手动切换的话,能够经过resume()和pause()来作

使用pipe()方式时不用关心这些,都会自动处理稳当:

Readable触发data事件,直到Writable忙不过来了

pipe收到信号后调用Readable.pause(),进入Paused模式

Writable再干一下子压力不大了的时候,会触发drain事件,此时pipe调用Readable.resume()进入Flowing模式,让Readable接着触发data事件

highWaterMark与backpressure
其实drain事件就是用来应对Backpressure现象的,简单的说,Backpressure就是下游的消费速度限制了传输,形成下游向上游的反向压力

若是消费速度慢于生产速度,会在下游产生堆积,来不及处理的数据会存放到Writable的buffer里,若是不加(限流)处理,这个buffer会持续增加,可能溢出进而形成错误或数据丢失

Backpressure现象发生的标志是Writable.write()返回了false,说明来自上游的待处理数据量已经触及highWaterMark(高水位线,默认16kb):

Buffer level when stream.write() starts returning false. Defaults to 16384 (16kb), or 16 for objectMode streams.

这是下游开始有点紧张了(todo项足够忙一阵子了)的信号。建议在此时对上游限流,即调用Readable.pause()先给停了,给下游多点时间处理堆积的数据,下游以为轻松了会触发darin事件,表示此时有能力处理更多数据了,因此这时候应该开闸放水(Readable.resume())

注意,Readable的数据会存放在缓存中,直到有个Writable来消耗这些数据。因此Paused状态只是说不往下流了,已经缓存的数据还在Readable的buffer里。因此若是不限流,来不及处理的数据就缓存在下游,并持续堆积,限流的话,这部分数据被缓存在上游,由于限流了而再也不持续堆积

另外,Readable也有highWaterMark的概念:

The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Defaults to 16384 (16kb), or 16 for objectMode streams

是对从实际数据源读取速度的限制(好比从磁盘读文件),防止生产速度太快引起缓存堆积(好比一顿猛push())。因此Flowing Readable的正常工做方式是被push()–push()–push()…诶,发现buffer里的量已经攒够一个chunk了,吐给下游。一样,Readable触及highWaterMark的标志是push()返回false,说明Readable的buffer不那么十分空了,此时若是还持续push(),没错,也会出现BackPressure(Readable消费能力限制了从数据源到Readable的传输速度):

快-------------慢
数据源-------->Readable------->Writable
                 快--------------慢

只要上游(生产)快,下游(消费)慢就会出现BackPressure,因此在readable.pipe(writable)的简单场景,可能会出现上面两段BackPressure

六.示例
Writable stream
常见的造大文件:

const fs = require('fs');
const file = fs.createWriteStream('./big.file');

for(let i=0; i<= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}

file.end();

经过fs.createWriteStream()建立指向文件的Writable stream,经过write()填充数据,写完后end()

或者更通常的,直接new一个Writable:

const { Writable } = require('stream');
const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    // nowrap version
    // process.stdout.write(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);

一个最简单的echo实现,把当前进程的标准输入接到自定义输出流outStream,像日志中间件同样(标准输入流经outStream,再该干吗干吗去callback):

cc
oo
nn
ss
oo
ll
ee

Console {
  log: [Function: bound consoleCall],
  ...
}

write()方法的3个参数中,chunk是个Buffer,encoding在某些场景下须要,大多数时候能够忽略,callback是应该在chunk处理完毕后调用的通知函数,代表写入成功与否(失败的话,传Error对象进去),相似于尾触发机制中的next()

或者更简单的echo实现:

process.stdin.pipe(process.stdout);

直接把标准输入流链接到标准输出流

Readable stream
const { Readable } = require('stream'); 
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);

经过push向Readable stream里填充数据,push(null)表示结束。上例中把全部数据都读进来,而后才交给标准输出,实际上有更高效的方式(按需推数据给使用者):

const { Readable } = require('stream'); 
const inStream = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);

read()方法每次吐一个字符,使用者从Readable stream取数据的时候,read()会持续触发

Duplex/Transform stream

Duplex stream兼具Readable和Writable的特色:既能够做为数据源(生产者),也能够做为目标(消费者)。例如:

const { Duplex } = require('stream');

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },

  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);

上例把前2个例子结合起来了,inoutStream被链接到标准输出流了,A-Z会做为数据源传递给标准输出(打印出来),同时标准输入流被接到inoutStream,来自标准输入的全部数据会被log出来,效果以下:

ABCDEFGHIJKLMNOPQRSTUVWXYZcc
oo
nn
ss
oo
ll
ee

Console {
  log: [Function: bound consoleCall],
  ...
}

P.S.先输出A-Z是由于pipe()会把Readable stream切换到Flowing模式,因此一开始就把A-Z“流”出来了

注意,Duplex stream的Readable与Writable部分是彻底独立的,读写互不影响,Duplex只是把两个特性组合成一个对象了,就像两根筷子同样绑在一块儿的单向管道

Transform stream是一种有意思的Duplex stream:其输出是根据输入计算得来的。因此不用分别实现read/write()方法,只实现一个transform()方法就够了:

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  // 函数签名与write一致
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

一样,Transform stream的Readable与Writable部分也是独立的(不手动push就不会自动传递到Readable部分),只是形式上结合起来了

P.S.另外,stream之间除了能够传递Buffer/String,还能够传递Object(包括Array),具体见Streams Object Mode

Node提供了一些原生Transform stream,例如zlib和crypto stream:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));

简单的命令行工具,gzip压缩。更多示例见Node’s built-in transform streams

参考资料
Node.js Streams: Everything you need to know

Node.js writable.write return false?

探究 Node.js 中的 drain 事件

深刻理解 Node.js Stream 内部机制

Backpressuring in Streams

相关文章
相关标签/搜索