参考文章:html
图片来源 视觉中国前端
流(stream)是一种在 Node.js 中处理流式数据的抽象接口 ——官方文档node
流是数据的集合,你能够将它理解成数据链表或者字符串的形式,区别在于流中的数据并不能当即可用,这里又能够将其理解成水流。你无需将全部的数据一次性所有放入内存,相反,你可使用流这一特有的性质,完成对大量数据的操做以及逐段处理的操做gulp
在node异步处理数据的基础上,流将要传输的数据处理成小份数据(chunk)连续传输,这样经过更少的内存消耗,从而带来更多的性能提高api
Node.js中有四种基本类型的流:缓存
Readable
-- 可读流 能够读取数据的源的抽象。 eg. fs.createReadStream()
Writable
-- 可写流 能够写入数据目标的抽象。 eg. fs.createWriteStream()
Duplex
-- 双向流(双工流) 既是可读的,又是可写的。 eg. not.Socket
Transform
-- 变换流(可变流) 读写过程当中能够修改或者转化数据的双向流
。 eg. zlib.createDeflate()
全部的流都是 EventEmitter
的实例,他们发出能够被读和写的事件,在这个基础上,咱们可以很方便的利用 pipe
方法对这些流进行操做服务器
readableSrc.pipe(writableDest)
复制代码
上面这个简单的例子中,咱们利用 readable stream
的输出做为 writable stream
的输入。 那么再来想,若是咱们的输入输出都是 Duplex
那就能够一直 pipe
下去,实现如 Linux 命令般连续的操做。 若是你有用过 gulp
进行前端资源的压缩整合,对于此必定会印象深入网络
下表中所有数据Node.js中原生的对象,这些对象也是能够读写的流,一部分是双工流与可变流 注意:一个 HTTP 相应在客户端是可读流,但在服务端就是可写流。 stdio
流(stdin
, stdout
, stdout
)在子进程中有着与父进程中相反的类型,也正是这样,父子通讯才变的简单curl
Readable Stream | Writable Stream |
---|---|
HTTP response (客户端) | HTTP request (客户端) |
HTTP request (服务端) | HTTP response (服务端) |
fs read streams | fs write streams |
zlib streams | zlib streams |
crypto streams | crypto streams |
TCP sockets | TCP sockets |
child process stdout, stderr | child process stdin |
process.stdin | process.stdout, process.stderr |
big.file
写入100万行数据,文件大约 400Mconst fs = require('fs');
const file = fs.createWriteStream('./big.file');
for(let i=0; i<= 1e6; i++) {
file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}
file.end();
复制代码
big.file
的 node 服务。使用 curl
链接启动的 node 服务const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
fs.readFile('./big.file', (err, data) => {
if (err) throw err;
res.end(data);
});
});
server.listen(8000);
复制代码
当启动 node 服务并未链接时候,内存的占用为 8.7M,属于正常状况(下图)异步
当使用 curl localhost:8000
链接服务器,能够清晰看到一次性读取会消耗多少内存(下图)
createReadStram
方法,咱们能够利用此方法将读取到的流 pipe
到响应,减轻服务器负担。代码以及效果以下const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);
复制代码
能够看到,node服务对于内存的压力获得了力度极大的释放,而且在输出速度上依然很快。这里即很直观的体现了此文一开始提到的,node利用流经过极少内存的占用,高效完成了对大文件的操做。最后用一个形象的例子比喻一下上面的操做: 货运工人有一车的货物须要搬运,他能够选择将车上的货物所有卸下,而后一块儿搬到目的地;他还能够选择经过使用履带,将货物一件一件运输到目的地。试想一下,这两种方式操做的效率。
上面举了一个“不恰当”可是可意会的“履带”例子,其实node stream在计算机中真实的运做并无这么简单
生产者消费者问题 将会有助于更好理解流的原理
可读流分为两种模式
data
事件或执行resume
方法改变)流动模式,能够在比喻的基础上理解为三个节点,水泵(水源,数据源)、水桶(缓存容器,内存)、目的地(水流向的位置)
资源的数据流不会直接流向消费者,而会先通过 highWaterMark 的判断,push 到缓存池(内存)中。若是超过 highWaterMark, push操做返回 false。最后的 resume()
、pause()
是通向消费者的一个阀门
原理与读流相似,写入速度足够快会直接写入资源,当写入速度比较慢或者暂停写入时候,数据会在缓存池中缓存起来,当生产者写入过快,缓存池被放满时候,这时候应当通知生产者暂停生产(好比下文write
方法返回false
),当缓存池被释放空,Writable Stream 会给生产者发送 drain
消息,通知生产者再次开始写入。ps:这里的内容下文介绍 writable stream 时会有代码示例
上面总体介绍了流的概念
、流的类型
、使用流的优势
,接下来经过具体的代码,整理一些在fs模块中流的使用方式。
fs.createReadStream(path, )
const fs = require('fs);
const rs = fs.createReadStream('text.txt'); // options
/**
fs.createReadStream(path, {
flags: 'r', // 读文件,文件不存在报错,默认'r'
encoding: 'utf-8', // 以什么编码格式读取文件(能够被Buffer接收的任何格式),默认读取buffer
autoClose: true, // 读取后是否自动关闭文件,默认true
highWarterMark: 100, // 每次读取的字节数,默认64k(65536)
start: 0, // 开始读取的位置,默认0
end: 200 // 读取文件的终点索引,默认 Infinity
})
**/
复制代码
注意:
end
若是设置为100,则须要读取的字节数为101,即0~100,包括100
由于默认flags
为'r'
,若是path
指向的文件不存在,即会报错
open
、data
、end
、close
、error
事件上文提到:全部的流都是 EventEmitrer
的实例
const fs = require('fs);
const rs = fs.createReadStream('text.txt');
rs.on('open', () => {
console.log('open');
});
rs.on('data', (datas) => {
console.log('file is read', datas);
})
rs.on('close', () => {
console.log('file is closed');
});
rs.on('error', (err) => {
console.log(err);
});
/**
依次输出
open
文件的内容(buffer)
file is closed
**/
复制代码
注意:
data
事件可能被屡次触发,若是将highWarterMark
设置为3,读取写有0123456789
的text.txt
文件时,会触发四次,依次输出0十二、34五、67八、9对应的buffer
pause
、resume
,暂停、恢复/** * text.txt文件内容 0123456789 */
const fs = require('fs');
const rs = fs.createReadStream('text.txt', {
encoding: 'utf-8',
highWaterMark: 3,
});
rs.on('data', (datas) => {
console.log(datas);
rs.pause();
console.log('stream is paused now');
});
rs.on('end', () => {
console.log('stream is end');
clearInterval(interval); // 清除定时器,不然会一直打印stream is resumed now
});
const interval = setInterval(() => {
rs.resume();
console.log('stream is resumed now');
}, 1000);
/** 输出: 012 stream is paused now stream is resumed now 345 stream is paused now stream is resumed now 678 stream is paused now stream is resumed now 9 stream is paused now stream is end **/
复制代码
注意: 没什么注意的
fs.createWriteStream(path, )
const fs = require('fs');
fs.createWriteStream(path, options);
const ws = fs.createWriteStream('2.txt', {
flags: 'w', // 默认'w'写入文件,不存在则建立
encoding: 'utf-8'
fd: null, // 文件描述符
mode: 0o666, // 文件操做权限,同438
autoClose: true,
start: 0 // 开始写入位置
highWarterMark: 16384 // !!! 文档没有给出这一设置,默认 16k,文末将验证
});
复制代码
注意:
options 参数与createReadStream
不一样
也能够设置highWaterMark
选项,官方文档没有给出,默认的写入大小为 16k,在可写流对象执行write
方法的时候若是超出highWaterMark
,返回值将变成false
write
、 end
、 drain
、 finish
true
、 false
, 分别表明,表明当前内存中被写入的数据是否超出 highWaterMark
(上面刚刚提到)write
以后数据并不会当即被写入文件,而会在内存中缓存,而后依次写入/** * write 方法 * chunk 写入数据的buffer/string * encoding 编码格式,可选。且chunk为字符串时有用 * callback 写入成功回调函数 **/
ws.write(chunk,[encoding],[callback]);
/** * end 方法,代表接下来没有数据要被写入 * chunk 写入数据的buffer/string * encoding 编码格式,可选。且chunk为字符串时有用 * callback 回调函数,若是传入,将做为 finish 事件的回调函数 **/
ws.end(chunk,[encoding],[callback]);
/** * finish 方法,在调用了 stream.end() 方法,且缓冲区数据都已经传给底层系统以后, 'finish' 事件将被触发。 **/
const writer = fs.createWriteStream('2.txt');
for (let i = 0; i < 100; i++) {
writer.write(`hello, ${i}!\n`);
}
writer.end('结束\n');
writer.on('finish', () => {
console.error('全部的写入已经完成!');
});
复制代码
drain
方法
const fs = require('fs');
const ws = fs.createWriteStream('2.txt', {
encoding: 'utf-8',
highWaterMark: 3
});
let i = 10;
function write() {
let flag = true;
while(i && flag) {
flag = ws.write('1');
i --;
console.log(flag);
}
}
write();
ws.on('drain', () => {
console.log('drain');
write();
});
复制代码
注意:
- 当一个流处在
drain
状态,对write
的调用会被缓存(下面解释),而且返回false
。一旦全部缓存的数据都被排空(被操做系统用来进行输出),那么drain
事件将被触发,意思为内存中缓存的数据已经被所有写入到文件中,接下来能够继续执行write
向内存中写入数据了- 若是你在手动控制读写以及缓存,建议这么作,一旦
write
方法返回false,在drain
事件触发前,最好不要写入任何数据,固然这样须要配合createWriteStream
的highWaterMark
参数,(这一参数文档没有给出)
pipe
、unpipe
、cork
、uncork
方法pipe
方法上面题目的几种方法中,pipe
无疑使用最多,在流通常的使用场景下,pipe
能解决大部分的须要,下面一句很简单的语义代码就是 pipe
的使用方式,readable
经过 pipe
将数据传输给 writable
,正如其名,管道
readable.pipe(writable)
复制代码
其基本原理为:
pipe
方法,通知写入一个简单的例子:
const from = fs.createReadStream('./1.txt');
const to = fs.createWriteStream('./2.txt');
from.pipe(to);
复制代码
以上的例子都是可读流做为输入源,可写流做为返回结果,固然,若是咱们操做的是 duplex
/transform
,这时候就能够很容易写做链式调用
// 伪代码
readableSrc
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(finalWrtitableDest)
复制代码
unpipe
方法/** * dest 当前readable pipe 管道的目标可写流 **/
readable.unpipe(dest)
复制代码
dest
未被指定,则 readable 绑定的全部流都将被分离cork
、uncork
方法stream.cork();
stream.write('1');
stream.write('2');
process.nextTick(() => stream.uncork());
复制代码
Readable Stream 可读流的事件与方法
Event | Functions |
---|---|
data | pipe()、unpipe() |
end | read()、unshift() |
error | pause()、resume() |
close | isPaused() |
readable | setEncoding() |
Writable Stream 可写流的事件与方法
Event | Functions |
---|---|
drain | write() |
finish | end() |
error | cork() |
close | uncork() |
pipe/unpipe | setDefaultEncoding() |
highWaterMark
fs.createWriteStream()
option 中 highWaterMark
做用,我在此文屡次提到,但愿能够加深印象方式一:
const fs = require('fs');
let count = 0;
const ws = fs.createWriteStream('testInput.txt');
for (let i = 0; i < 10000; i ++) {
count ++;
let flag = ws.write(i.toString());
if (!flag) { // 返回false即到达了highWaterMark
console.log('写入' + count + '次');
break;
}
}
ws.end(function() {
console.log('文件写入结束,输出的总字节为', ws.bytesWritten);
});
// 输出:
写入4374次
文件写入结束,输出的总字节为 16386
16386 / 1024
// 结果:
16.001953125
复制代码
方式二:
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 10000;
write();
function write() {
let ok = true;
while (i-- > 0 && ok) {
// 写入结束时回调
= writer.write(data, encoding, i === 0 ? callback : null);
}
if (i > 0) {
// 这里提早停下了,'drain' 事件触发后才能够继续写入
console.log('drain', i);
writer.once('drain', write);
}
}
}
const Writable = require('stream').Writable;
const writer = new Writable({
write(chunk, encoding, callback) {
// 比 process.nextTick() 稍慢
setTimeout(() => {
callback && callback();
});
}
});
writeOneMillionTimes(writer, '123456', 'utf8', () => {
console.log('end');
});
// 输出
drain 7268
drain 4536
drain 1804
end
// 计算:
(10000-7268) * 6 / 1024
// 结果:16.0078125
复制代码
本文主要从文件操做的角度探究流的原理以及使用方法,node应用中你可使用流作不少事情,网络请求、文件上传、命令行工具等等。 在Node.js应用中,流随处可见,文件操做,网络请求,进程、socket中流无处不在。正是这样,流的特性能让你的node应用真正体现出“小而美”的特性,
文章目的为我的笔记,本人也是Node.js初学者,文中若有不恰当描述以及说明,欢迎指正交流。 文章借鉴了学习了不少大佬的文章(文首传送门),很是感谢 后续有时间会继续更新,祝本身node之路顺利吧🤡