经过Node.js Stream API 实现逐行读取的实例

Node 给 streaming 带来了简洁和美。Streams 目前是一种很棒的用于创建模块和应用的方式。原来的 streams API 存在一些问题,在 v0.10 版本中对这些问题进行了修复,而且扩展了一些 API 使得应用更简单而且能够归纳更多的应用场景。node

本篇文章将介绍并经过实例应用 v0.10 版本提供的新的 API。shell

逐行问题

具备良好组织的日志数据对一个公司的开发团队是很是宝贵的资源。为了更好的使用并分析它们,而不仅是经过 shell 命令行去操做,咱们须要逐行扫描日志数据。api

Stream 的优势就是咱们不须要把整个日志文件都一次性读入内存,由于它们可能很庞大,而是在它们能够准备好去读取的时候再处理它们。Stream 能够工做于任何 I/O 场景下,包括文件系统,网络等等。数组

经过使用心得 stream API,咱们能够建立可复用的实现上述逐行操做的 I/O 模块。网络

Transform Stream

Node 0.10 提供了很是优雅的 stream.Transform 类,用以处理输入输出是因果相关的。对于咱们的问题来讲,输入和输出的数据是彻底同样的,只是把输入的数据逐行分离为了更好的处理。异步

位于管道中间层的 Transform 是便可读也可写的:
Transform pipeline
如下是使用 Transform的初始化代码:函数

var stream = require('stream')
var liner = new stream.Transform( { objectMode: true } )

打开 objectMode

吼吼,这个 {objectMode: true} 是个啥?若是没有这个 objectMode,stream 默认把纯数据块送过来,不然会把数据快放到一个 object 中,固然这个 object 中还会包含其余信息。oop

_transform 方法

这只是一个开始,咱们继续。Transform 类在应用时须要咱们必须提供一个叫作 _transform 的方法,还有一个 _flush 方法能够选择提供。咱们先来看一下这个 _transform 方法究竟是什么。测试

_transform 方法在每次 stream 中有数据来了以后都会被执行,先看代码:ui

liner._transform = function (chunk, encoding, done) {
  var data = chunk.toString()
  if (this._lastLineData) data = this._lastLineData + data 

  var lines = data.split('\n') 
  this._lastLineData = lines.splice(lines.length-1,1)[0] 

  lines.forEach(this.push.bind(this)) 
  done()
}

数据一来,_transform 方法就会被执行。联同数据一块儿过来的还有数据的编码和一个表示此数据已经接受完毕的信号函数。

在这个问题中,咱们并不关心编码问题。经过 toString() 把数据转为须要的字符串,而后再经过 split('\n') 数据块字符串按换行符打散为一个数组。而后在把每一行 push 到对应的处理模块中。

注:push 方法是 Readable stream 类的内置方法,同时在 Node 0.8 版本中和产生 data 时间的的方法是同类的:

stream.emit(‘data’, data) ➤ stream.push(data)

最后经过调用 done() 方法来发出接受完成的信号。因为 done 方法是一个回调函数,咱们也能够把它在 _transform 中进行异步调用。

代码中的 _lastLineData 又是神马?在 stream 中咱们并不想一块数据的结尾是从一行的中间断开的,为了解决这个问题,咱们实际上并不会吧打散的数组中的最后一行送出去,而是留到下一次的数据块来的时候放到下一次数据块的开头。

_flush 方法

而后咱们再来看看这个 _flulsh 方法,还记得在 _transform 方法中每次 _lastLineData 的值都不会被送出去吗,是否是最后一次数据块的 _lastLindeData 就无法收到了?没错, _flush方法就是用来处理这种状况的。在全部的数据块都被 _transform 方法处理事后,才会调用 _flush 方法。因此它的做用就是处理残留数据的:

liner._flush = function (done) {
  if (this._lastLineData) this.push(this._lastLineData)
  this._lastLineData = null
  done()
}

若是有 _lastLineData 则把它 push 出去而后清空它,最后调用 `done()` 方法标志着完成处理残留数据的工做,同时这也意味着 stream 的结束。须要注意的是, _flush 方法并非必须的有些场景下就不须要。

简单代码实现

如下代码是一个简单的逐行读取的模块的实现:

var stream = require('stream')
var liner = new stream.Transform( { objectMode: true } )

liner._transform = function (chunk, encoding, done) {
  var data = chunk.toString()
  if (this._lastLineData) data = this._lastLineData + data

  var lines = data.split('\n')
  this._lastLineData = lines.splice(lines.length-1,1)[0]

  lines.forEach(this.push.bind(this))
  done()
}

liner._flush = function (done) {
     if (this._lastLineData) this.push(this._lastLineData)
     this._lastLineData = null
     done()
}

module.exports = liner

测试实例

看起来已经差很少了,是否是能够用起来了?

首先咱们须要一个数据源。任何由行组成的文件均可以,以日志文件为例:

var fs = require('fs')
var liner = require('./liner')
var source = fs.createReadStream('./access_log')
source.pipe(liner)
liner.on('readable', function () {
  var line
  while (line = liner.read()) {
    // do something with line
  }
})

原文连接

相关文章
相关标签/搜索