Node.js in Practice总结2

Streams

Streams概述

何时使用Streamsjavascript

不管咱们使用fs.readFileSync仍是fs.readFile进行文件的读取, 咱们都要将文件所有读取进内存中. 若是文件特别的大, 那么解决方案应该是对大数据一块一块的读取, 即读取完一块数据后, 要求继续读取下一块数据.java

屏幕快照 2016-11-20 下午1.52.07

内建Streams

读取单个文件算法

假设咱们使用fs.readFile进行单文件的读取:编程

var fs = require('fs');

fs.readFile(__filename, (err, chunk) => {
  if (err) return console.error(err);
  if (chunk) {
    console.log(chunk.toString());
  }
});

若是文件过大, 甚至文件大于0x3FFFFFFF(Node.js最大的缓存大小)怎么办? 这时候咱们应该使用流来处理数据.json

var fs = require('fs');

fs.createReadStream(__filename).pipe(process.stdout);

错误处理缓存

由于stream是继承于EventEmitter, 因此它一样监听error事件, 用于错误处理.服务器

var fs = require('fs');

var stream = fs.createReadStream('not-found');

stream.on('error', (err) => {
  console.trace();
  console.error('Stack:', err.stack);
  console.error('The error raised was:', err);
});

使用Stream base classes

正确的继承stream base classes网络

Readable: 输入流app

Writable: 输出流异步

Transform: 解析数据时候改变数据

Duplex: 输入输出流

PassThrough: 测试, 分析, 检查数据

实现一个readable stream

咱们能够经过继承于stream.Readable而且实现_read(size)方法, 来实现一个readable.stream.

在_read中, 须要执行push将数据读取出, 终止读取则push(null).

var fs = require('fs');
var ReadStream = require('stream').Readable;

class MyRead extends ReadStream {
  constructor(options) {
    super(options);
  }
  _read(size) {
    this.push('hello\n');
    this.push(null);
  }
}

var myRead = new MyRead();
myRead.pipe(process.stdout);

 

实现一个writable stream

经过继承stream.Writable并实现_write方法, 来实现一个输出流.

var fs = require('fs');
var WriteStream = require('stream').Writable;

class MyWrite extends WriteStream {
  constructor(options) {
    super(options);
  }
  _write(chunk, encoding, cb) {
    process.stdout.write(chunk);
    cb();
  }
}

var myWrite = new MyWrite();
process.stdin.pipe(myWrite);

实现一个duplex streams

继承stream.Duplex而且实现_read/_write方法.

var fs = require('fs');
var Duplex = require('stream').Duplex;

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    this.waiting = false;
  }
  _read(size) {
    if (!this.waiting) {
      this.push('Feed me data! >');
      this.waiting = true;
    }
  }
  _write(chunk, encoding, cb) {
    this.waiting = false;
    this.push(chunk);
    cb();
  }
}

var myDuplex = new MyDuplex();

process.stdin.pipe(myDuplex).pipe(process.stdout);

实现一个transform streams

继承stream.Transform并实现_transform方法.

var fs = require('fs');
var Transform = require('stream').Transform;

class MyTransform extends Transform {
  constructor(options) {
    super(options);
  }
  _transform(chunk, encoding, cb) {
    this.push(chunk + ':::\n');
    cb();
  }
}

var myTransform = new MyTransform();

process.stdin.pipe(myTransform).pipe(process.stdout);

文件系统: 同步和异步操做文件

fs模块概述

fs其实是POSIX的包装模块:

屏幕快照 2016-11-20 下午7.15.38

POSIX方法提供了一些底层的操做, 例如:

var fs = require('fs');
var assert = require('assert');

var fd = fs.openSync('./file.txt', 'w+');
var writeBuf = Buffer.from('some data to write');
fs.writeSync(fd, writeBuf, 0, writeBuf.length, 0);

var readBuf = Buffer.alloc(writeBuf.length);
fs.readSync(fd, readBuf, 0, writeBuf.length, 0);
assert.equal(writeBuf.toString(), readBuf.toString());

fs.closeSync(fd);

Streaming

fs模块提供fs.createReadStream/fs.createWriteStream功能模块. 它们分别可建立一个输入输出流, 例如可用于pipe.

var fs = require('fs');

var read = fs.createReadStream('./file.txt');
var write = fs.createWriteStream('./out.txt');
read.pipe(write);

Bulk file I/O

fs模块也提供了fs.readFile/fs.writeFile/fs.appendFile, 用于将文件所有读取.

var fs = require('fs');
fs.readFile('./file.txt', function(err, buf) {
  console.log(buf.toString());
});

File watching

fs模块提供了fs.watch/fa.watchFile, 来观察文件是否被改变.

文件操做

同步异步操做文件

考虑存在config.json文件, 咱们分别使用同步和异步读取文件:

var fs = require('fs');
// 异步
fs.readFile('./config.json', function(err, buf) {
  if (err) throw err;
  var config = JSON.parse(buf.toString());
  doThisThing(config);
});

// 同步
try {
  var config = JSON.parse(fs.readFileSync('./config.json').toString());
  doThisThing(config);
} catch (err) {
  console.error(err);
}

备注: try/catch不可用于异步编程.

文件描述符的使用

任何一个文件操做, 均可经过文件描述符来操做, 默认状况下, 标准输入, 输出和错误分别对应0,1,2.

var fs = require('fs');
var fd = fs.openSync('./file.txt', 'r');
var buf = fs.readFileSync(fd);
console.log(buf.toString());
fs.closeSync(fd);

文件锁的操做

Node.js并未原生支持文件锁的操做. 通常咱们能够经过如下两种方法达到相似锁的操做: 使用exclusive标志来建立锁文件, 使用mkdir来建立锁文件.

var fs = require('fs');
fs.open('file.txt', 'wx', function(err) {
  if (err) return console.error(err);
});

存在'x'标志状况下, 若是文件存在, 则抛出异常.

同理, 咱们能够建立一个以前不存在的新目录, 在新目录下进行文件的读写, 也能够达到相似的锁功能.

文件递归操做

可查看fs的API, 经过readdir来读取文件夹, 经过fs.stat来判断当前路径为文件仍是目录.

var fs = require('fs');
function readFileName(filename, dir_path) {
  fs.stat(dir_path + '/' + filename, (err, stats) => {
    if (err) return console.error(err);
    if (stats.isFile()) console.log(dir_path + '/' + filename);
    else if (stats.isDirectory()) {
      fs.readdir(dir_path + '/' + filename, (err, files) => {
        if (err) return console.error(err);
        for (let i = 0; i < files.length; i++) {
          readFileName(files[i], dir_path + '/' + filename);
        }
      });
    }
  });
}

readFileName('bin', '/usr');

观察文件/文件夹的变更

可以使用fs.watch/fs.watchFile来观察文件/文件夹是否变更.

var fs = require('fs');

fs.watch('./watchdir', console.log);
fs.watchFile('./watchdir', console.log);

打开两个窗口, 一个运行上例代码, 一个在watchdir目录内执行touch/mv等操做, 则可看到效果.

网络

网络概述

基本网络层次

屏幕快照 2016-11-25 下午7.14.31

TCP/IP

在IP协议中, 一个host由一个IP地址标识. 在Node.js中由net模块产生TCP链接.

但IP协议不能保证数据传输的完整性, 因此须要加入TCP传输协议.

UDP

数据包是UDP中基本的单元. UDP是不能保证数据传输的完整性.

TCP客户端和服务端

在Node.js中, 建立服务端很简单, 使用net模块, 建立服务器, 监听端口便可:

var net = require('net');
var clients = 0;

var server = net.createServer((client) => {
  clients++;
  var clientId = clients;
  console.log('Client connected:', clientId);

  client.on('end', () => {
    console.log('Client disconnected:', clientId);
  });

  client.write('Welcome client:' + clientId + '\r\n');
  client.pipe(client);
});

server.listen(8000, () => {
  console.log('Server started on port 8000');
});

这里简单介绍一下可能难理解的一段代码:

client.pipe(client);

由于TCP/IP是一个duplex, 即既可输入, 又可输出的stream.

leicj@leicj:~/test$ telnet localhost 8000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome client:1
hello world
hello world
i love this wrold
i love this wrold

若是咱们想要编写一个低时延的实时应用, 则咱们须要socket.setNoDelay()去开启TCP_NODELAY.

因为Nagle's算法的存在, 小数据包会累积成大的数据包, 而后发送出去.

var net = require('net');

var server = net.createServer((c) => {
  // 设置非延时
  c.setNoDelay(true);
  c.write('377375042377373001', 'binary');
  console.log('server connected');
  c.on('end', () => {
    console.log('server disconnected');
    // 当没有客户端链接时候, 自动关闭
    server.unref();
  });
  c.on('data', (data) => {
    process.stdout.write(data.toString());
    c.write(data.toString());
  });
});

server.listen(8000, () => {
  console.log('Server started on port 8000');
});

UDP客户端和服务端

咱们尝试使用UDP进行文件的传输. 咱们能够使用dgram模块来建立数据包, 经过socket.send来发送数据.

var dgram = require('dgram'),
  fs = require('fs'),
  port = 41230,
  defaultSize = 16;

function Client(remoteIP) {
  var inStream = fs.createReadStream(__filename),
    socket = dgram.createSocket('udp4');

  inStream.on('readable', () => {
    sendData();
  });

  function sendData() {
    var msg = inStream.read(defaultSize);
    if (!msg) return socket.unref();
    socket.send(msg, 0, msg.length, port, remoteIP, (err, bytes) => {
      sendData();
    });
  }
}

function Server() {
  var socket = dgram.createSocket('udp4');

  socket.on('message', (msg, rinfo) => {
    process.stdout.write(msg.toString());
  });

  socket.on('listening', () => {
    console.log('Server ready:', socket.address());
  });

  socket.bind(port);
}

if (process.argv[2] === 'client') {
  new Client(process.argv[3]);
} else {
  new Server();
}

对上述代码进行简单的解释:

  1. 对于UDP来讲, 使用dgram.createSocket('udp4')建立一个socket对象, 使用socket.sendData进行数据的传输.
  2. 做为服务器的socket, 则不单单须要绑定端口(socket.bind(port)), 还会监听两个信息: message为接收数据时候的事件, 而listening为服务器作好接收数据.
相关文章
相关标签/搜索