当处理大文件读取、压缩、归档、媒体文件和巨大的日志文件时,数据都会被读入内存,内存很快就会被使用完,这将会给程序带来很大的问题。javascript
若是在进行这些操做的时候,配合一个合适的缓冲区,一次读取固定的长度,就会使用更少的内存,这就是流式的API。php
fs.createReadStream()
).fs.createWriteStream()
).net.Socket
).zlib.createDeflate()
).Node 的文件系统和网络操做的核心模块 fs 和 net 都提供了流接口。使用流来处理 I/O 问题会至关简单。html
使用Node 核心模块,实现简单的静态服务器:java
const http = require('http');
const fs = require('fs');
const server = http.createServer(function(req,res){
fs.readFile(__dirname + '/index.html', function(err,data){
if(err){
res.statusCode = 500;
res.end(String(err))
return;
}
res.end(data)
})
})
server.listen(3000)
复制代码
虽然上述代码是用来非阻塞的 readFile, 一旦读取的文件很是大或很是多的文件访问,将会很快耗完内存,所以须要使用fs.createReadStream 方法进行改进:node
const http = require('http');
const fs = require('fs');
const server = http.createServer(function(req,res){
// 数据经过流的方式,从html 文件输出到 http 的请求响应
fs.createReadStream(__dirname + '/index.html').pipe(res);
})
server.listen(3000)
复制代码
上述代码提供一个缓冲器来发送到客户端,若是客户端链接较慢,网络流将会发送信号暂停I/O资源直到客户端准备好接受更多数据。web
使用流实现一个简单的静态文件服务器:api
const http = require('http');
const fs = require('fs');
const server = http.createServer(function(req,res){
let filename = req.url
if(filename === '/'){
filename = '/index.html'
}
fs.createReadStream(__dirname + filename ).pipe(res);
})
server.listen(3000)
复制代码
使用gzip压缩的静态服务器bash
const http = require('http');
const fs = require('fs');
const zlib = require('zlib')
const server = http.createServer(function(req,res){
res.writeHead(200, { 'content-encoding': 'gzip' })
fs.createReadStream(__dirname + '/index.html' )
.pipe(zlib.createGzip())
.pipe(res);
})
server.listen(3000)
复制代码
stream 继承自 events, 所以有事件中的 on、emit 方法。服务器
一、事件网络
二、方法
继承可读流的注意事项:
readable._read()
方法去得到底层的数据资源,并仅能由Readable对象内部方法调用,不该该被用户程序直接调用。在 readable._read()
实现中,只有还有数据可读取,就应该调用 readable.push(chunk)
方法把数据加入到内部的可读队列,由readable.read
方法读取供应用程序使用。实例:实现一个可读流
const { Readable } = require('stream');
const util = require('util');
util.inherits(MyReadStream, Readable)
function MyReadStream(arr){
this.source = arr;
Readable.call(this);
}
MyReadStream.prototype._read = function(){
if(this.source.length){
this.push(this.source[0])
this.source.splice(0,1)
}else{
this.push(null)
}
}
let myStream = new MyReadStream(['php','js','java'])
myStream.on('readable',function(){
let output_buf = myStream.read();
console.log(output_buf,'output') // null
})
myStream.on('data',function(res){
console.log(res.toString(),'data')
})
myStream.on('end',function(){
console.log('end')
})
复制代码
在上述代码中,在 readable
事件中调用 read
方法,来读取一段字符串,并监听 data
事件来输出读取的数据。
Writable 流接口是对写入数据的目标的抽象。
write(chunk,[encoding],[callback]) --- 将数据写入流。chunk(数据块)中包含要写入的数据,encoding指定字符串的编码,callback指定当数据已经彻底刷新时执行的一个回调函数。若是成功写入,write()返回true.
end([chunk],[encoding],[callback]) ---与write()相同,它把Writable对象设为再也不接受数据的状态,并发送finish事件。
drain -- 在write()调用返回false后,当准备好开始写更多数据时,发出此事件通知监视器。
finish -- 当end()在Writable对象上调用,因此数据被刷新,并不会有更多的数据被接受时触发
pipe -- 当pipe()方法在Readable流上调用,已添加此writable为目的地时发出
unpipe -- 当unpipe()方法被调用,以删除Writable为目的地时发出。
继承可写流的注意事项:
writable.write()
方法向流中写入数据,并在数据处理完成后调用 callback
。若是有错误发生, callback
不必定以这个错误做为第一个参数并被调用。要确保可靠地检测到写入错误,应该监听 'error'
事件。writable._write()
方法将数据发送到底层资源。实例:实现一个标准输入到标准输出的可写流,并判断若是输入的字符包含a, 则报错并退出
const { Writable } = require('stream');
const util = require('util');
util.inherits(MyWriteStream, Writable)
function MyWriteStream(options){
Writable.call(this, options);
}
MyWriteStream.prototype._write = function(chunk, encoding, callback){
if(chunk.toString().indexOf('a') > -1){
process.stdout.write("新写入的:"+ chunk)
callback(null)
}else{
callback(new Error('no a'))
}
}
let myStream = new MyWriteStream();
myStream.write('abc\n')
process.stdin.pipe(myStream)复制代码
注意:必须调用callback
方法来表示写入成功或失败。若是出现错误,callback
第一个参数必须是Error
对象,成功时参数为null
。
继承 stream.Duplex
便可实现一个双工流
示例:实现一个改变标准输入内容的颜色,再从标准输出打印出来
const { Duplex } = require('stream');
const util = require('util');
util.inherits(MyDuplexStream, Duplex)
function MyDuplexStream(options){
Duplex.call(this, options);
this.wating = false;
}
MyDuplexStream.prototype._write = function(chunk, encoding, callback){
this.wating = false;
// 把数据推进到内部队列
this.push('\u001b[32m' + chunk + '\u001b[39m');
callback()
}
MyDuplexStream.prototype._read = function(chunk, encoding, callback){
if(!this.wating){
// 在等待数据时展现一个提示
this.push('等待输入> ')
this.wating = true;
}
}
let myStream = new MyDuplexStream();
// 获取标准输入,用管道传给双工流,单后返回给标准输出
process.stdin.pipe(myStream).pipe(process.stdout)
复制代码
转换流很像双工流,也实现了 Readable 和 Writable 的接口。不一样的是,转换流是转换数据,仍是用 _transform 实现的。这个方法有三个参数,thunk数据块、encoding编码、callback回调(很像_write), 当数据转换完成后执行回调,容许转换流异步解析数据。
示例待补。