何时使用Streamsjavascript
不管咱们使用fs.readFileSync仍是fs.readFile进行文件的读取, 咱们都要将文件所有读取进内存中. 若是文件特别的大, 那么解决方案应该是对大数据一块一块的读取, 即读取完一块数据后, 要求继续读取下一块数据.java
读取单个文件算法
假设咱们使用fs.readFile进行单文件的读取:编程
var fs = require('fs'); fs.readFile(__filename, (err, chunk) => { if (err) return console.error(err); if (chunk) { console.log(chunk.toString()); } });
若是文件过大, 甚至文件大于0x3FFFFFFF(Node.js最大的缓存大小)怎么办? 这时候咱们应该使用流来处理数据.json
var fs = require('fs'); fs.createReadStream(__filename).pipe(process.stdout);
错误处理缓存
由于stream是继承于EventEmitter, 因此它一样监听error事件, 用于错误处理.服务器
var fs = require('fs'); var stream = fs.createReadStream('not-found'); stream.on('error', (err) => { console.trace(); console.error('Stack:', err.stack); console.error('The error raised was:', err); });
正确的继承stream base classes网络
Readable: 输入流app
Writable: 输出流异步
Transform: 解析数据时候改变数据
Duplex: 输入输出流
PassThrough: 测试, 分析, 检查数据
实现一个readable stream
咱们能够经过继承于stream.Readable而且实现_read(size)方法, 来实现一个readable.stream.
在_read中, 须要执行push将数据读取出, 终止读取则push(null).
var fs = require('fs'); var ReadStream = require('stream').Readable; class MyRead extends ReadStream { constructor(options) { super(options); } _read(size) { this.push('hello\n'); this.push(null); } } var myRead = new MyRead(); myRead.pipe(process.stdout);
实现一个writable stream
经过继承stream.Writable并实现_write方法, 来实现一个输出流.
var fs = require('fs'); var WriteStream = require('stream').Writable; class MyWrite extends WriteStream { constructor(options) { super(options); } _write(chunk, encoding, cb) { process.stdout.write(chunk); cb(); } } var myWrite = new MyWrite(); process.stdin.pipe(myWrite);
实现一个duplex streams
继承stream.Duplex而且实现_read/_write方法.
var fs = require('fs'); var Duplex = require('stream').Duplex; class MyDuplex extends Duplex { constructor(options) { super(options); this.waiting = false; } _read(size) { if (!this.waiting) { this.push('Feed me data! >'); this.waiting = true; } } _write(chunk, encoding, cb) { this.waiting = false; this.push(chunk); cb(); } } var myDuplex = new MyDuplex(); process.stdin.pipe(myDuplex).pipe(process.stdout);
实现一个transform streams
继承stream.Transform并实现_transform方法.
var fs = require('fs'); var Transform = require('stream').Transform; class MyTransform extends Transform { constructor(options) { super(options); } _transform(chunk, encoding, cb) { this.push(chunk + ':::\n'); cb(); } } var myTransform = new MyTransform(); process.stdin.pipe(myTransform).pipe(process.stdout);
fs其实是POSIX的包装模块:
POSIX方法提供了一些底层的操做, 例如:
var fs = require('fs'); var assert = require('assert'); var fd = fs.openSync('./file.txt', 'w+'); var writeBuf = Buffer.from('some data to write'); fs.writeSync(fd, writeBuf, 0, writeBuf.length, 0); var readBuf = Buffer.alloc(writeBuf.length); fs.readSync(fd, readBuf, 0, writeBuf.length, 0); assert.equal(writeBuf.toString(), readBuf.toString()); fs.closeSync(fd);
Streaming
fs模块提供fs.createReadStream/fs.createWriteStream功能模块. 它们分别可建立一个输入输出流, 例如可用于pipe.
var fs = require('fs'); var read = fs.createReadStream('./file.txt'); var write = fs.createWriteStream('./out.txt'); read.pipe(write);
Bulk file I/O
fs模块也提供了fs.readFile/fs.writeFile/fs.appendFile, 用于将文件所有读取.
var fs = require('fs'); fs.readFile('./file.txt', function(err, buf) { console.log(buf.toString()); });
File watching
fs模块提供了fs.watch/fa.watchFile, 来观察文件是否被改变.
同步异步操做文件
考虑存在config.json文件, 咱们分别使用同步和异步读取文件:
var fs = require('fs'); // 异步 fs.readFile('./config.json', function(err, buf) { if (err) throw err; var config = JSON.parse(buf.toString()); doThisThing(config); }); // 同步 try { var config = JSON.parse(fs.readFileSync('./config.json').toString()); doThisThing(config); } catch (err) { console.error(err); }
备注: try/catch不可用于异步编程.
文件描述符的使用
任何一个文件操做, 均可经过文件描述符来操做, 默认状况下, 标准输入, 输出和错误分别对应0,1,2.
var fs = require('fs'); var fd = fs.openSync('./file.txt', 'r'); var buf = fs.readFileSync(fd); console.log(buf.toString()); fs.closeSync(fd);
文件锁的操做
Node.js并未原生支持文件锁的操做. 通常咱们能够经过如下两种方法达到相似锁的操做: 使用exclusive标志来建立锁文件, 使用mkdir来建立锁文件.
var fs = require('fs'); fs.open('file.txt', 'wx', function(err) { if (err) return console.error(err); });
存在'x'标志状况下, 若是文件存在, 则抛出异常.
同理, 咱们能够建立一个以前不存在的新目录, 在新目录下进行文件的读写, 也能够达到相似的锁功能.
文件递归操做
可查看fs的API, 经过readdir来读取文件夹, 经过fs.stat来判断当前路径为文件仍是目录.
var fs = require('fs'); function readFileName(filename, dir_path) { fs.stat(dir_path + '/' + filename, (err, stats) => { if (err) return console.error(err); if (stats.isFile()) console.log(dir_path + '/' + filename); else if (stats.isDirectory()) { fs.readdir(dir_path + '/' + filename, (err, files) => { if (err) return console.error(err); for (let i = 0; i < files.length; i++) { readFileName(files[i], dir_path + '/' + filename); } }); } }); } readFileName('bin', '/usr');
观察文件/文件夹的变更
可以使用fs.watch/fs.watchFile来观察文件/文件夹是否变更.
var fs = require('fs'); fs.watch('./watchdir', console.log); fs.watchFile('./watchdir', console.log);
打开两个窗口, 一个运行上例代码, 一个在watchdir目录内执行touch/mv等操做, 则可看到效果.
基本网络层次
TCP/IP
在IP协议中, 一个host由一个IP地址标识. 在Node.js中由net模块产生TCP链接.
但IP协议不能保证数据传输的完整性, 因此须要加入TCP传输协议.
UDP
数据包是UDP中基本的单元. UDP是不能保证数据传输的完整性.
在Node.js中, 建立服务端很简单, 使用net模块, 建立服务器, 监听端口便可:
var net = require('net'); var clients = 0; var server = net.createServer((client) => { clients++; var clientId = clients; console.log('Client connected:', clientId); client.on('end', () => { console.log('Client disconnected:', clientId); }); client.write('Welcome client:' + clientId + '\r\n'); client.pipe(client); }); server.listen(8000, () => { console.log('Server started on port 8000'); });
这里简单介绍一下可能难理解的一段代码:
client.pipe(client);
由于TCP/IP是一个duplex, 即既可输入, 又可输出的stream.
leicj@leicj:~/test$ telnet localhost 8000 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. Welcome client:1 hello world hello world i love this wrold i love this wrold
若是咱们想要编写一个低时延的实时应用, 则咱们须要socket.setNoDelay()去开启TCP_NODELAY.
因为Nagle's算法的存在, 小数据包会累积成大的数据包, 而后发送出去.
var net = require('net'); var server = net.createServer((c) => { // 设置非延时 c.setNoDelay(true); c.write('377375042377373001', 'binary'); console.log('server connected'); c.on('end', () => { console.log('server disconnected'); // 当没有客户端链接时候, 自动关闭 server.unref(); }); c.on('data', (data) => { process.stdout.write(data.toString()); c.write(data.toString()); }); }); server.listen(8000, () => { console.log('Server started on port 8000'); });
咱们尝试使用UDP进行文件的传输. 咱们能够使用dgram模块来建立数据包, 经过socket.send来发送数据.
var dgram = require('dgram'), fs = require('fs'), port = 41230, defaultSize = 16; function Client(remoteIP) { var inStream = fs.createReadStream(__filename), socket = dgram.createSocket('udp4'); inStream.on('readable', () => { sendData(); }); function sendData() { var msg = inStream.read(defaultSize); if (!msg) return socket.unref(); socket.send(msg, 0, msg.length, port, remoteIP, (err, bytes) => { sendData(); }); } } function Server() { var socket = dgram.createSocket('udp4'); socket.on('message', (msg, rinfo) => { process.stdout.write(msg.toString()); }); socket.on('listening', () => { console.log('Server ready:', socket.address()); }); socket.bind(port); } if (process.argv[2] === 'client') { new Client(process.argv[3]); } else { new Server(); }
对上述代码进行简单的解释: