本文介绍了使用 node.js streams 开发程序的基本方法。html
"We should have some ways of connecting programs like garden hose--screw in another segment when it becomes necessary to massage data in another way. This is the way of IO also."
Doug McIlroy. October 11, 1964node
最先接触Stream是从早期的unix开始的 数十年的实践证实Stream 思想能够很简单的开发出一些庞大的系统。在unix里,Stream是经过 |
实现的;在node中,做为内置的stream模块,不少核心模块和三方模块都使用到。和unix同样, node Stream主要的操做也是.pipe()
,使用者可使用反压力机制来控制读和写的平衡。 Stream 能够为开发者提供能够重复使用统一的接口,经过抽象的Stream接口来控制Stream之间的读写平衡。c++
node中的I/O是异步的,所以对磁盘和网络的读写须要经过回调函数来读取数据,下面是一个文件下载服务器git
的简单代码:github
var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { fs.readFile(__dirname + '/data.txt', function (err, data) { res.end(data); }); }); server.listen(8000);
这些代码能够实现须要的功能,可是服务在发送文件数据以前须要缓存整个文件数据到内存,若是"data.txt"文件很 大而且并发量很大的话,会浪费不少内存。由于用户须要等到整个文件缓存到内存才能接受的文件数据,这样致使 用户体验至关很差。不过还好(req, res)
两个参数都是Stream,这样咱们能够用fs.createReadStream()
代替npm
fs.readFile()
:api
var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(res); }); server.listen(8000);
.pipe()
方法监听fs.createReadStream()
的'data'
和'end'
事件,这样"data.txt"文件就不须要缓存整 个文件,当客户端链接完成以后立刻能够发送一个数据块到客户端。使用.pipe()
另外一个好处是能够解决当客户 端延迟很是大时致使的读写不平衡问题。若是想压缩文件再发送,可使用三方模块实现:浏览器
var http = require('http'); var fs = require('fs'); var oppressor = require('oppressor'); var server = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(oppressor(req)).pipe(res); }); server.listen(8000);
这样文件就会对支持gzip和deflate的浏览器进行压缩。oppressor 模块会处理全部的content-encoding
。缓存
Stream使开发程序变得简单。服务器
有五种基本的Stream: readable, writable, transform, duplex, and"classic".
全部类型的Stream收是使用 .pipe()
来建立一个输入输出对,接收一个可读流src
并将其数据输出到可写流dst
,以下:
src.pipe(dst)
.pipe(dst)
方法为返回dst
流,这样就能够接连使用多个.pipe()
,以下:
a.pipe(b).pipe(c).pipe(d)
功能与下面的代码相同:
a.pipe(b); b.pipe(c); c.pipe(d);
这样的用法十分相似于unix命令下面用法:
a | b | c | d
经过调用Readable streams的 .pipe()
方法能够把Readable streams的数据写入一个
Writable , Transform, 或者Duplex stream。
readableStream.pipe(dst)
这里咱们建立一个readable stream!
var Readable = require('stream').Readable; var rs = new Readable; rs.push('beep '); rs.push('boop\n'); rs.push(null); rs.pipe(process.stdout); $ node read0.js beep boop
rs.push(null)
通知数据接收者数据已经发送完毕. 注意到咱们在将全部数据内容压入可读流以前并无调用rs.pipe(process.stdout);
,可是咱们压入的全部数据 内容仍是彻底的输出了,这是由于可读流在接收者没有读取数据以前,会缓存全部压入的数据。可是在不少状况下, 更好的方法是只有数据接收着请求数据的时候,才压入数据到可读流而不是缓存整个数据。下面咱们重写 一下
._read()
函数:
var Readable = require('stream').Readable; var rs = Readable(); var c = 97; rs._read = function () { rs.push(String.fromCharCode(c++)); if (c > 'z'.charCodeAt(0)) rs.push(null); }; rs.pipe(process.stdout); ``` ``` $ node read1.js abcdefghijklmnopqrstuvwxyz
上面的代码经过重写_read()
方法实现了只有在数据接受者请求数据才向可读流中压入数据。_read()
方法 也能够接收一个size
参数表示数据请求着请求的数据大小,可是可读流能够根据须要忽略这个参数。注意我 们也能够用util.inherits()
继承可读流。为了说明只有在数据接受者请求数据时_read()
方法才被调用,咱们 在向可读流压入数据时作一个延时,以下:
var Readable = require('stream').Readable; var rs = Readable(); var c = 97 - 1; rs._read = function () { if (c >= 'z'.charCodeAt(0)) return rs.push(null); setTimeout(function () { rs.push(String.fromCharCode(++c)); }, 100); }; rs.pipe(process.stdout); process.on('exit', function () { console.error('\n_read() called ' + (c - 97) + ' times'); }); process.stdout.on('error', process.exit);
用下面的命令运行程序咱们发现_read()
方法只调用了5次:
$ node read2.js | head -c5 abcde _read() called 5 times
使用计时器的缘由是系统须要时间来发送信号来通知程序关闭管道。使用process.stdout.on('error', fn)
是为了处理系统由于header
命令关闭管道而发送SIGPIPE信号,由于这样会致使process.stdout
触发 EPIPE事件。若是想建立一个的能够压入任意形式数据的可读流,只要在建立流的时候设置参数objectMode 为true便可,例如:Readable({ objectMode: true })
。
大部分状况下咱们只要简单的使用pipe
方法将可读流的数据重定向到另外形式的流,可是在某些状况下也许 直接从可读流中读取数据更有用。以下所示:
process.stdin.on('readable', function () { var buf = process.stdin.read(); console.dir(buf); }); $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js <Buffer 61 62 63 0a> <Buffer 64 65 66 0a> <Buffer 67 68 69 0a> null
当可读流中有数据可读取时,流会触发'readable'
事件,这样就能够调用.read()
方法来读取相 关数据,当可读流中没有数据可读取时,.read()
会返回null
,这样就能够结束.read()
的调用, 等待下一次'readable'
事件的触发。下面是一个使用.read(n)
从标准输入每次读取3个字节的例子:
process.stdin.on('readable', function () { var buf = process.stdin.read(3); console.dir(buf); });
以下运行程序发现,输出结果并不彻底!
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js <Buffer 61 62 63> <Buffer 0a 64 65> <Buffer 66 0a 67>
这是应为额外的数据数据留在流的内部缓冲区里了,而咱们须要通知流咱们要读取更多的数据.read(0)
能够达到这个目的。
process.stdin.on('readable', function () { var buf = process.stdin.read(3); console.dir(buf); process.stdin.read(0); });
此次运行结果以下:
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js <Buffer 61 62 63> <Buffer 0a 64 65> <Buffer 66 0a 67> <Buffer 68 69 0a>
咱们可使用 .unshift()
将数据从新押回流数据队列的头部,这样能够接续读取押回的数据。以下面 的代码,会按行输出标准输入的内容:
var offset = 0; process.stdin.on('readable', function () { var buf = process.stdin.read(); if (!buf) return; for (; offset < buf.length; offset++) { if (buf[offset] === 0x0a) { console.dir(buf.slice(0, offset).toString()); buf = buf.slice(offset + 1); offset = 0; process.stdin.unshift(buf); return; } } process.stdin.unshift(buf); }); $ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 'hearties' 'heartiest' 'heartily' 'heartiness' 'heartiness\'s' 'heartland' 'heartland\'s' 'heartlands' 'heartless' 'heartlessly'
固然,有不少模块能够实现这个功能,如:split 。
writable streams只能够做为.pipe()
函数的目的参数。以下代码:
src.pipe(writableStream)
重写 ._write(chunk, enc, next)
方法就能够接受一个readable stream的数据。
var Writable = require('stream').Writable; var ws = Writable(); ws._write = function (chunk, enc, next) { console.dir(chunk); next(); }; process.stdin.pipe(ws); $ (echo beep; sleep 1; echo boop) | node write0.js <Buffer 62 65 65 70 0a> <Buffer 62 6f 6f 70 0a>
第一个参数chunk
是数据输入者写入的数据。第二个参数end
是数据的编码格式。第三个参数next(err)
经过回调函数通知数据写入者能够写入更多的时间。若是readable stream写入的是字符串,那么字符串会默认转换为Buffer
,若是在建立流的时候设置Writable({ decodeStrings: false })
参数,那么不会作转换。若是readable stream写入的数据时对象,那么须要这样建立writable stream
Writable({ objectMode: true })
调用writable stream的.write(data)
方法便可完成数据写入。
process.stdout.write('beep boop\n');
调用.end()
方法通知writable stream 数据已经写入完成。
var fs = require('fs'); var ws = fs.createWriteStream('message.txt'); ws.write('beep '); setTimeout(function () { ws.end('boop\n'); }, 1000); $ node writing1.js $ cat message.txt beep boop
若是须要设置writable stream的缓冲区的大小,那么在建立流的时候,须要设置opts.highWaterMark
, 这样若是缓冲区里的数据超过opts.highWaterMark
,.write(data)
方法会返回false。当缓冲区可写的 时候,writable stream会触发'drain'
事件。
Classic streams比较老的接口了,最先出如今node 0.4版本中,可是了解一下其运行原理仍是十分有好 处的。当一个流被注册了"data"
事件的回到函数,那么流就会工做在老版本模式下,即会使用老的API。
Classic readable streams事件就是一个事件触发器,若是Classic readable streams有数据可读取,那 么其触发 "data"
事件,等到数据读取完毕时,会触发"end"
事件。.pipe()
方法经过检查stream.readable
的值肯定流是否有数据可读。下面是一个使用Classic readable streams打印A-J字母的例子:
var Stream = require('stream'); var stream = new Stream; stream.readable = true; var c = 64; var iv = setInterval(function () { if (++c >= 75) { clearInterval(iv); stream.emit('end'); } else stream.emit('data', String.fromCharCode(c)); }, 100); stream.pipe(process.stdout); $ node classic0.js ABCDEFGHIJ
若是要从classic readable stream中读取数据,注册"data"
和"end"
两个事件的回调函数便可,代码以下:
process.stdin.on('data', function (buf) { console.log(buf); }); process.stdin.on('end', function () { console.log('__END__'); }); $ (echo beep; sleep 1; echo boop) | node classic1.js <Buffer 62 65 65 70 0a> <Buffer 62 6f 6f 70 0a> __END__
须要注意的是若是你使用这种方式读取数据,那么会失去使用新接口带来的好处。好比你在往一个 延迟很是大的流写数据时,须要注意读取数据和写数据的平衡问题,不然会致使大量数据缓存在内 存中,致使浪费大量内存。通常这时候强烈建议使用流的.pipe()
方法,这样就不用本身监听"data"
和"end"
事件了,也不用担忧读写不平衡的问题了。固然你也能够用 through代替本身监听
"data"
和"end"
事件,以下面的代码:
var through = require('through'); process.stdin.pipe(through(write, end)); function write (buf) { console.log(buf); } function end () { console.log('__END__'); } $ (echo beep; sleep 1; echo boop) | node through.js <Buffer 62 65 65 70 0a> <Buffer 62 6f 6f 70 0a> __END__
或者也可使用concat-stream来缓存整个流的内容:
var concat = require('concat-stream'); process.stdin.pipe(concat(function (body) { console.log(JSON.parse(body)); })); $ echo '{"beep":"boop"}' | node concat.js { beep: 'boop' }
固然若是你非要本身监听"data"
和"end"
事件,那么你能够在写数据的流不可写的 时候使用.pause()
方法暂停Classic readable streams继续触发"data"
事件。等到 写数据的流可写的时候再使用.resume()
方法通知流继续触发"data"
事件继续读取 数据。
Classic writable streams 很是简单。只有 .write(buf)
, .end(buf)
和.destroy()
三个方法。.end(buf)
方法的buf参数是可选的,若是选择该参数,至关于stream.write(buf); stream.end()
这样的操做,须要注意的是当流的缓冲区写满即流不可写时.write(buf)
方法会返回false,若是流再次可写时,流会触发drain
事件。
transform是一个对读入数据过滤然输出的流。
duplex stream是一个可读也可写的双向流,以下面的a就是一个duplex stream:
a.pipe(b).pipe(a)
require('readable-stream')
instead of require('stream')
after you npm install readable-stream
.