nodejs Stream使用手册

介绍

本文介绍了使用 node.js streams 开发程序的基本方法。html

cc-by-3.0

"We should have some ways of connecting programs like garden hose--screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also."

Doug McIlroy. October 11, 1964node

doug mcilroy


最先接触Stream是从早期的unix开始的 数十年的实践证实Stream 思想能够很简单的开发出一些庞大的系统。在unix里,Stream是经过 |实现的;在node中,做为内置的stream模块,不少核心模块和三方模块都使用到。和unix同样, node Stream主要的操做也是.pipe(),使用者可使用反压力机制来控制读和写的平衡。 Stream 能够为开发者提供能够重复使用统一的接口,经过抽象的Stream接口来控制Stream之间的读写平衡。c++


为何使用Stream

node中的I/O是异步的,所以对磁盘和网络的读写须要经过回调函数来读取数据,下面是一个文件下载服务器git

的简单代码:github

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(8000);

这些代码能够实现须要的功能,可是服务在发送文件数据以前须要缓存整个文件数据到内存,若是"data.txt"文件很 大而且并发量很大的话,会浪费不少内存。由于用户须要等到整个文件缓存到内存才能接受的文件数据,这样致使 用户体验至关很差。不过还好(req, res)两个参数都是Stream,这样咱们能够用fs.createReadStream()代替npm

fs.readFile():api

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

.pipe()方法监听fs.createReadStream()'data''end'事件,这样"data.txt"文件就不须要缓存整 个文件,当客户端链接完成以后立刻能够发送一个数据块到客户端。使用.pipe()另外一个好处是能够解决当客户 端延迟很是大时致使的读写不平衡问题。若是想压缩文件再发送,可使用三方模块实现:浏览器

var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);

这样文件就会对支持gzip和deflate的浏览器进行压缩。oppressor 模块会处理全部的content-encoding缓存

Stream使开发程序变得简单。服务器

基础概念

有五种基本的Stream: readable, writable, transform, duplex, and"classic".

pipe

全部类型的Stream收是使用 .pipe() 来建立一个输入输出对,接收一个可读流src并将其数据输出到可写流dst,以下:

src.pipe(dst)

.pipe(dst)方法为返回dst流,这样就能够接连使用多个.pipe(),以下:

a.pipe(b).pipe(c).pipe(d)

功能与下面的代码相同:

a.pipe(b);
b.pipe(c);
c.pipe(d);

这样的用法十分相似于unix命令下面用法:

a | b | c | d

readable streams

经过调用Readable streams的 .pipe()方法能够把Readable streams的数据写入一个

Writable , Transform, 或者Duplex stream。

readableStream.pipe(dst)

建立 readable stream

这里咱们建立一个readable stream!

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);

rs.pipe(process.stdout);


$ node read0.js
beep boop

rs.push(null) 通知数据接收者数据已经发送完毕. 注意到咱们在将全部数据内容压入可读流以前并无调用rs.pipe(process.stdout);,可是咱们压入的全部数据 内容仍是彻底的输出了,这是由于可读流在接收者没有读取数据以前,会缓存全部压入的数据。可是在不少状况下, 更好的方法是只有数据接收着请求数据的时候,才压入数据到可读流而不是缓存整个数据。下面咱们重写 一下

._read()函数:

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    if (c > 'z'.charCodeAt(0)) rs.push(null);
};

rs.pipe(process.stdout);
```

```
$ node read1.js
abcdefghijklmnopqrstuvwxyz

上面的代码经过重写_read()方法实现了只有在数据接受者请求数据才向可读流中压入数据。_read()方法 也能够接收一个size参数表示数据请求着请求的数据大小,可是可读流能够根据须要忽略这个参数。注意我 们也能够用util.inherits()继承可读流。为了说明只有在数据接受者请求数据时_read()方法才被调用,咱们 在向可读流压入数据时作一个延时,以下:

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97 - 1;

rs._read = function () {
    if (c >= 'z'.charCodeAt(0)) return rs.push(null);
    
    setTimeout(function () {
        rs.push(String.fromCharCode(++c));
    }, 100);
};

rs.pipe(process.stdout);

process.on('exit', function () {
    console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);

用下面的命令运行程序咱们发现_read()方法只调用了5次:

$ node read2.js | head -c5
abcde
_read() called 5 times

使用计时器的缘由是系统须要时间来发送信号来通知程序关闭管道。使用process.stdout.on('error', fn) 是为了处理系统由于header命令关闭管道而发送SIGPIPE信号,由于这样会致使process.stdout触发 EPIPE事件。若是想建立一个的能够压入任意形式数据的可读流,只要在建立流的时候设置参数objectMode 为true便可,例如:Readable({ objectMode: true })

读取readable stream数据

大部分状况下咱们只要简单的使用pipe方法将可读流的数据重定向到另外形式的流,可是在某些状况下也许 直接从可读流中读取数据更有用。以下所示:

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    console.dir(buf);
});



$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 
<Buffer 61 62 63 0a>
<Buffer 64 65 66 0a>
<Buffer 67 68 69 0a>
null

当可读流中有数据可读取时,流会触发'readable' 事件,这样就能够调用.read()方法来读取相 关数据,当可读流中没有数据可读取时,.read() 会返回null,这样就能够结束.read() 的调用, 等待下一次'readable' 事件的触发。下面是一个使用.read(n)从标准输入每次读取3个字节的例子:

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
});

以下运行程序发现,输出结果并不彻底!

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>

这是应为额外的数据数据留在流的内部缓冲区里了,而咱们须要通知流咱们要读取更多的数据.read(0) 能够达到这个目的。

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
    process.stdin.read(0);
});

此次运行结果以下:

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
<Buffer 68 69 0a>

咱们可使用 .unshift() 将数据从新押回流数据队列的头部,这样能够接续读取押回的数据。以下面 的代码,会按行输出标准输入的内容:

var offset = 0;

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    if (!buf) return;
    for (; offset < buf.length; offset++) {
        if (buf[offset] === 0x0a) {
            console.dir(buf.slice(0, offset).toString());
            buf = buf.slice(offset + 1);
            offset = 0;
            process.stdin.unshift(buf);
            return;
        }
    }
    process.stdin.unshift(buf);
});

$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'

固然,有不少模块能够实现这个功能,如:split

writable streams

writable streams只能够做为.pipe()函数的目的参数。以下代码:

src.pipe(writableStream)

建立 writable stream

重写 ._write(chunk, enc, next) 方法就能够接受一个readable stream的数据。

var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
    console.dir(chunk);
    next();
};

process.stdin.pipe(ws);

$ (echo beep; sleep 1; echo boop) | node write0.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>

第一个参数chunk是数据输入者写入的数据。第二个参数end是数据的编码格式。第三个参数next(err)经过回调函数通知数据写入者能够写入更多的时间。若是readable stream写入的是字符串,那么字符串会默认转换为Buffer,若是在建立流的时候设置Writable({ decodeStrings: false })参数,那么不会作转换。若是readable stream写入的数据时对象,那么须要这样建立writable stream

Writable({ objectMode: true })

写数据到 writable stream

调用writable stream的.write(data)方法便可完成数据写入。

process.stdout.write('beep boop\n');

调用.end()方法通知writable stream 数据已经写入完成。

var fs = require('fs');
var ws = fs.createWriteStream('message.txt');

ws.write('beep ');

setTimeout(function () {
    ws.end('boop\n');
}, 1000);

$ node writing1.js 
$ cat message.txt
beep boop

若是须要设置writable stream的缓冲区的大小,那么在建立流的时候,须要设置opts.highWaterMark, 这样若是缓冲区里的数据超过opts.highWaterMark.write(data)方法会返回false。当缓冲区可写的 时候,writable stream会触发'drain' 事件。

classic streams

Classic streams比较老的接口了,最先出如今node 0.4版本中,可是了解一下其运行原理仍是十分有好 处的。当一个流被注册了"data" 事件的回到函数,那么流就会工做在老版本模式下,即会使用老的API。

classic readable streams

Classic readable streams事件就是一个事件触发器,若是Classic readable streams有数据可读取,那 么其触发 "data" 事件,等到数据读取完毕时,会触发"end" 事件。.pipe() 方法经过检查stream.readable 的值肯定流是否有数据可读。下面是一个使用Classic readable streams打印A-J字母的例子:

var Stream = require('stream');
var stream = new Stream;
stream.readable = true;

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit('end');
    }
    else stream.emit('data', String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);


$ node classic0.js
ABCDEFGHIJ

若是要从classic readable stream中读取数据,注册"data""end"两个事件的回调函数便可,代码以下:

process.stdin.on('data', function (buf) {
    console.log(buf);
});
process.stdin.on('end', function () {
    console.log('__END__');
});


$ (echo beep; sleep 1; echo boop) | node classic1.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

须要注意的是若是你使用这种方式读取数据,那么会失去使用新接口带来的好处。好比你在往一个 延迟很是大的流写数据时,须要注意读取数据和写数据的平衡问题,不然会致使大量数据缓存在内 存中,致使浪费大量内存。通常这时候强烈建议使用流的.pipe()方法,这样就不用本身监听"data""end"事件了,也不用担忧读写不平衡的问题了。固然你也能够用 through代替本身监听

"data""end" 事件,以下面的代码:

var through = require('through');
process.stdin.pipe(through(write, end));

function write (buf) {
    console.log(buf);
}
function end () {
    console.log('__END__');
}

$ (echo beep; sleep 1; echo boop) | node through.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

或者也可使用concat-stream来缓存整个流的内容:

var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
    console.log(JSON.parse(body));
}));


$ echo '{"beep":"boop"}' | node concat.js 
{ beep: 'boop' }

固然若是你非要本身监听"data""end"事件,那么你能够在写数据的流不可写的 时候使用.pause()方法暂停Classic readable streams继续触发"data" 事件。等到 写数据的流可写的时候再使用.resume() 方法通知流继续触发"data" 事件继续读取 数据。

classic writable streams

Classic writable streams 很是简单。只有 .write(buf), .end(buf).destroy()三个方法。.end(buf) 方法的buf参数是可选的,若是选择该参数,至关于stream.write(buf); stream.end() 这样的操做,须要注意的是当流的缓冲区写满即流不可写时.write(buf)方法会返回false,若是流再次可写时,流会触发drain事件。

transform

transform是一个对读入数据过滤然输出的流。

duplex

duplex stream是一个可读也可写的双向流,以下面的a就是一个duplex stream:

a.pipe(b).pipe(a)

read more

  • core stream documentation
  • You can use the readable-stream module to make your streams2 code compliant with node 0.8 and below. Just require('readable-stream') instead of require('stream') after you npm install readable-stream.
相关文章
相关标签/搜索