以前写过一篇流的文章,说说node中可读流和可写流,比较宏观的介绍了流的一些基本概念。
下面是以前文章里的一张图:
javascript
咱们来具体看看实现可写流的核心类的代码:java
WritableState
类用来存放配置,部分代码以下:
node
Writeable
类主要作了如下几件事情:web
WritableState
实例_write
等方法所有代码以下:
api
这是对外暴露的write
方法,核心逻辑是调用了writeOrBuffer
方法,根据字面意思就能够看出,是写入数据或者缓存起来。代码以下:缓存
1Writable.prototype.write = function(chunk, encoding, cb) {
2 const state = this._writableState;
3
4 // chunk转换成buffer
5 if (chunk === null) {
6 throw new ERR_STREAM_NULL_VALUES();
7 } else if (!state.objectMode) {
8 if (typeof chunk === 'string') {
9 if (state.decodeStrings !== false) {
10 chunk = Buffer.from(chunk, encoding);
11 encoding = 'buffer';
12 }
13 } else if (chunk instanceof Buffer) {
14 encoding = 'buffer';
15 } else if (Stream._isUint8Array(chunk)) {
16 chunk = Stream._uint8ArrayToBuffer(chunk);
17 encoding = 'buffer';
18 } else {
19 throw new ERR_INVALID_ARG_TYPE(
20 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
21 }
22 }
23
24 // 若是流已经结束,继续写入会报错
25 let err;
26 if (state.ending) {
27 err = new ERR_STREAM_WRITE_AFTER_END();
28 } else if (state.destroyed) {
29 err = new ERR_STREAM_DESTROYED('write');
30 }
31
32 // 存在错误会在下个tick调用回掉。不存在错误调用writeOrBuffer方法马上写入或者存入缓冲区
33 if (err) {
34 process.nextTick(cb, err);
35 errorOrDestroy(this, err, true);
36 return false;
37 } else {
38 state.pendingcb++;
39 return writeOrBuffer(this, state, chunk, encoding, cb);
40 }
41};
复制代码
下面看看writeOrBuffer
方法,若是系统正在写入,则放入缓冲区。不然直接调用_write
方法写入。若是_write
返回fasle。则须要等待"drain"事件触发后才能够继续写入:app
前面介绍过WritableState
类,缓冲区就是这个类中的buffered
对象。排队写入的内容会暂时存放在这里。而higherWaterMark
标识是否已经超出了合理的缓存值(默认为16kb)
函数
思考一下:若是超出了
higherWaterMark
还继续写入会怎么样呢?post
若是正在写入,write
方法会返回false。当写入完成,就会发出“drain”事件通知咱们,下面是官网对“drain”事件的描述:ui
若是调用 stream.write(chunk) 返回 false,则当能够继续写入数据到流时会触发 'drain' 事件。
能够看出,可写流最终写入数据是经过_write
完成的,若是你想实现一个自定义的可写流,其实就是实现一个新的 _write
方法。具体的作法能够在官网中找到示例。
咱们通常经过fs.createWriteStream
方法来建立可写流。这个方法会实例化WriteStream
这个类。
咱们能够在/nodejs/node/blob/master/lib/internal/fs/streams.js路径下看到WriteStream
的实现,其实就是实现了一个_write
方法。伪代码以下:
1const { Writable } = require('stream');
2const fs = require('fs');
3
4// 继承Writeable
5function WriteStream(path, options) {
6 Writable.call(this, options);
7}
8ObjectSetPrototypeOf(WriteStream.prototype, Writable.prototype);
9ObjectSetPrototypeOf(WriteStream, Writable);
10
11// 实现_write方法
12WriteStream.prototype._write = function(data, encoding, cb) {
13 // 用法见fs
14 fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
15 this.bytesWritten += bytes;
16 cb();
17 });
18
19 if (this.pos !== undefined) this.pos += data.length;
20};
复制代码
最后再看下Writeable
的父类,源码在internal/streams/legacy.js中,代码很少,主要作了如下几件事:
events
模块,得到事件通讯能力pipe
方法(经过监听“data”事件读取数据。由于读取数据较写入数据快不少,为了不内存溢出,经过监听“drain”事件来控制流量)因此说基类Stream主要就是提供事件能力,和pipe这个api,代码以下:
1const {
2 ObjectSetPrototypeOf,
3} = primordials;
4
5const EE = require('events');
6
7// 继承events模块
8function Stream(opts) {
9 EE.call(this, opts);
10}
11ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
12ObjectSetPrototypeOf(Stream, EE);
13
14// 实现pipe方法
15Stream.prototype.pipe = function(dest, options) {
16 const source = this;
17
18 function ondata(chunk) {
19 // 若是可写流处于“不可写”状态,则暂停
20 if (dest.writable && dest.write(chunk) === false && source.pause) {
21 source.pause();
22 }
23 }
24 // 经过监听‘data’事件读取数据
25 source.on('data', ondata);
26 // 若是这是一个可读流,触发“drain”事件时,调用resume方法继续读取数据
27 function ondrain() {
28 if (source.readable && source.resume) {
29 source.resume();
30 }
31 }
32 // 监听“drain”事件
33 dest.on('drain', ondrain);
34
35 // 注册监听“onend”、“onclose”方法
36 // If the 'end' option is not supplied, dest.end() will be called when
37 // source gets the 'end' or 'close' events. Only dest.end() once.
38 if (!dest._isStdio && (!options || options.end !== false)) {
39 source.on('end', onend);
40 source.on('close', onclose);
41 }
42
43 let didOnEnd = false;
44 function onend() {
45 if (didOnEnd) return;
46 didOnEnd = true;
47
48 dest.end();
49 }
50
51
52 function onclose() {
53 if (didOnEnd) return;
54 didOnEnd = true;
55
56 if (typeof dest.destroy === 'function') dest.destroy();
57 }
58
59 // 下面作的都是清除工做
60 // Don't leave dangling pipes when there are errors.
61 function onerror(er) {
62 cleanup();
63 if (EE.listenerCount(this, 'error') === 0) {
64 throw er; // Unhandled stream error in pipe.
65 }
66 }
67
68 source.on('error', onerror);
69 dest.on('error', onerror);
70
71 // Remove all the event listeners that were added.
72 function cleanup() {
73 source.removeListener('data', ondata);
74 dest.removeListener('drain', ondrain);
75
76 source.removeListener('end', onend);
77 source.removeListener('close', onclose);
78
79 source.removeListener('error', onerror);
80 dest.removeListener('error', onerror);
81
82 source.removeListener('end', cleanup);
83 source.removeListener('close', cleanup);
84
85 dest.removeListener('close', cleanup);
86 }
87
88 source.on('end', cleanup);
89 source.on('close', cleanup);
90
91 dest.on('close', cleanup);
92 dest.emit('pipe', source);
93
94 // 返回这个实例,支持链式调用
95 // Allow for unix-like usage: A.pipe(B).pipe(C)
96 return dest;
97};
98
99module.exports = Stream;
复制代码