白洁血战Node并发编程 - 预览

预览。javascript

先给出一个基础类代码。java

const EventEmitter = require('events')
const debug = require('debug')('transform')

class Transform extends EventEmitter {

  constructor (options) {
    super()
    this.concurrency = 1

    Object.assign(this, options)

    this.pending = []
    this.working = []
    this.finished = []
    this.failed = []

    this.ins = []
    this.outs = []
  }

  push (x) {
    this.pending.push(x)
    this.schedule()
  }

  pull () {
    let xs = this.finished
    this.finished = []
    this.schedule()
    return xs
  }

  isBlocked () {
    return !!this.failed.length ||              // blocked by failed
      !!this.finished.length ||                 // blocked by output buffer (lazy)
      this.outs.some(t => t.isBlocked())        // blocked by outputs transform
  }

  isStopped () {
    return !this.working.length && this.outs.every(t => t.isStopped())
  }

  root () {
    return this.ins.length === 0 ? this : this.ins[0].root()
  }

  pipe (next) {
    this.outs.push(next)
    next.ins.push(this)
    return next
  }

  print () {
    debug(this.name,
      this.pending.map(x => x.name),
      this.working.map(x => x.name),
      this.finished.map(x => x.name),
      this.failed.map(x => x.name),
      this.isStopped())
    this.outs.forEach(t => t.print())
  }

  schedule () {
    // stop working if blocked
    if (this.isBlocked()) return

    this.pending = this.ins.reduce((acc, t) => [...acc, ...t.pull()], this.pending)

    while (this.working.length < this.concurrency && this.pending.length) {
      let x = this.pending.shift()
      this.working.push(x)
      this.transform(x, (err, y) => {
        this.working.splice(this.working.indexOf(x), 1)
        if (err) {
          x.error = err
          this.failed.push(x)
        } else {
          if (this.outs.length) {
            this.outs.forEach(t => t.push(y))
          } else {
            if (this.root().listenerCount('data')) {
              this.root().emit('data', y)
            } else {
              this.finished.push(y)
            }
          }
        }

        this.schedule()
        this.root().emit('step', this.name, x.name)
      })
    }
  }

}

module.exports = Transform

这段代码目前仍是雏形。node

Transform类的设计相似node里的stream.Transform,可是它的设计目的不是buffering或流性能,而是做为并发编程的基础模块。程序员

若是你熟悉流式编程,Transform的设计就很容易理解;在内部,Transform维护四个队列:编程

  1. pending是input bufferapi

  2. working是当前正在执行的任务数组

  3. finished是output buffer,它的目的不是为了buffer输出,而是在没有其余输出办法的时候做一下buffer。restful

  4. failed是失败的任务并发

Transform能够组合成DAG(Directed Acyclic Graph)使用,insouts用来存储前置和后置Transform的引用,pipe方法负责设置这种双向连接;最多见的状况是双向链表,即insouts都只有一个对象。但把他们设计成数组就能够容许fan-in, fan-out的结构。异步

pushpull是write和read的等价物。

schedule是核心函数,它的任务是填充working队列。在构造函数的参数里应该提供一个名字为transform的异步函数,schedule使用这个函数运行任务,在运行结束后,根据结果把任务推到failed队列、推到下一个Transformer、用root节点的emit输出、或者推到本身的finished队列里。

Transform设计的核心思想,就是把并发任务的状态,不使用对象属性来编码,只使用队列位置来编码;任何一个子任务,在任什么时候刻,仅存在于一个Transform对象的某个队列中。换句话说,它等于把并发任务用资源来建模。若是你熟悉restful api对过程或状态的建模方式就很容易理解这一点。

Transform中,任何transform异步函数的返回,都是一个stepstep是用Transform实现并发组合的最重要概念;

每一次transform函数返回,都会发生改变本身的队列或向后续的Transform对象push任务的动做,这个push动做会触发后续Transformschedule方法;step结束时本身的schedule方法也会被调用,它会从新填充任务。在这些动做结束后,全部Transform的队列变化,就是整个组合任务状态机的下一个状态。

这个状态是显式的,能够打印出来看,对debug很是有帮助;虽然异步i/o会让这种状态具备不肯定性,但至少这里坚持了组合状态机模型在处理并发问题时的同步原则,每一个step结束时总体作一次状态迁移,这个状态迁移能够良好定义和观察,这是Event模型下并发编程和Thread模型的重要区别。后者遇到并发逻辑引发的微妙错误时,很难捕捉现场分析,由于每个Thread是黑盒。

transform返回开始到emit(step)之间的一连串连锁动做都是中间过程,最终实现一次完整的状态迁移,这个过程必须是同步的。不该在这里出现异步、setImmediate或者process.nextTick等调用,这会带来额外的不肯定因素和极难发现和修复的bug。

在前面很长一段时间的并发编程实践中,我指出过Promise的race/settle和错误处理逻辑在一些场景下的困难。Promise的过程逻辑不完备。我也花了不少力气试图在Process代数层面上把error, success, finish, race, settle, abort, pause, resume, 和他们的组合逻辑定义出来,但最终发现这很困难,由于实际编程中各类处理状况太多了。

因此在Transform的设计中,这些逻辑所有被抛弃了,由于事实上它们都不是真正的基础并发逻辑

Transform试图实现组合的基础并发逻辑只有一个:stoppedstopped的定义很是简单:在一次step结束时,全部的Transformworking队列为空,就是(总体的)stopped。这里要再次强调前述的step结束时同步方法的必要性,若是你在schedule里使用了异步方法调用,那么这个stopped的判断就多是错的,由于schedule可能会在event loop里放置了一个立刻就会产生新的working任务的动做,而isStopped()的判断就错了。

stopped时,总体组合状态多是success, error, paused, 等等,都不难判断,但目前代码还没有稳定,我不打算加入语法糖。

在blocking i/o和同步的编程模式下,因果链和代码书写形式是一致的,可是在异步编程下,因果是异步和并发的,你只能去改变因,而后去观察果,这是不少程序员不适应异步编程的根本缘由,由于它要改变思惟的习惯。

使用Transform来处理并发编程,仍然是在试图重建这个因果链,即便他们是并发的,可是咱们要有一个办法把他们串起来;

前面说到的isStopped()是观察到的果,可以影响它的因,是isBlocked()函数,这个函数在schedule中被调用,若是估值为true,就会阻止schedule继续向working队列调度任务。

这里写的isBlocked()的代码实现只是一个例子;能够阻止schedule的缘由可能有不少,好比出现错误,或者输出buffer满了,这些能够由实现者本身去定义。他们是policy,isBlocked()自己是mechanism。这个策略的粒度是每一个Transform对象均可以有本身的策略。好比一个删除临时文件的操做,结果是无关痛痒的,那么它不应由于error就block。

isBlocked()逻辑能够象示例代码里那样向下chain起来,即只要有后续任务block了,前置任务就该停下来;这在绝大多数状况下都是合理的逻辑。由于虽然咱们写的是流式处理办法,可是咱们不是在处理octet-stream,追求性能的buffering和flow control都没什么意义,若是前面任务在copy文件后面的任务要移动到目标文件夹,若是目标文件夹出了问题前面快速移动了大量文件最终也没法成功。

若是组合状态机中止了,向其中的任何一个Transform对象执行push或者pull操做均可以让整个状态机继续动起来。从root节点push是常见状况,从leaf节点pull也是,向中间节点push也是可能的;

资源建模的一个好处是你能够把状态呈现给用户,若是一个复制文件的任务由于文件名冲突而fail,你还可让用户选择处理策略,例如覆盖或者重命名,在用户选择了操做以后,代码会从某个Transform对象的failed队列中取走一个对象,修改策略参数后从新push进去,那么这个状态机能够继续执行下去;这种可处理的错误不应成为block整个状态机工做(复制其余文件和文件夹)的缘由,除非他们积累到可观的数量,在Transform模式下这些都很是容易实现,开发者能够很简单的编写isBlocked()的策略;

和node的stream同样,Transform是lazy的,纯粹的push machine可能会在中间节点buffer大量的任务,这对把任务做为流处理来讲是不合适的;同时,Lazy对于停下来的组合状态机能继续run起来很重要,pull方法就是这个设计目的,它的schedule逻辑和push同样,只是方向相反;若是设置了Leaf节点会由于输出缓冲而block,它就能够block整个状态机(或者其中的一部分),这在某些状况下也是有用的功能,若是整个状态机的输出由于某种缘由暂时没法被马上消费掉。

abort逻辑没有在代码中实现,但它很容易,能够遍历全部的Transform,若是working队列中的对象有abort方法,就调用它;这不是个当即的停止,该对象仍然要经过callback返回才能stop。若是要全局的block,能够把全部的Leaf Node都pipe到一个sink节点去,把这个sink节点强制设置成isBlocked,能够block所有。pauseresume也是很是相似的逻辑。

固然你可能会遇到相似finally的逻辑是必须去执行的,即便在发生错误的时候,它意味着这个Transform要向前传递isBlocked信息,可是它的Schedule方法没必要中止工做。它能够一直运行到把全部队列任务都处理完为止。

重载schedule方法也是可能的;例如你的任务之间有先后依赖的逻辑,你就能够重载schedule方法实现本身的调度方式。另外这里的schedule代码只基于transform函数,很显然若是transform自己是一个Transform对象它也应该工做,实现组合过程,包括Sequencer,Parallel等等,这些都是须要实现的。

总而言之,isBlockedschedule是分开的逻辑,它们有各自不一样的设计目的和使命,你能够重载它们得到本身想要的结果。因此写在这里的代码,重要的不是他们的实现,而是其机制设计和界面设计,以及接口承诺;全部逻辑都是足够原子化的,每一个函数只作一件事,isBlocked是因,能够根据须要选择策略,isStopped是果,经过step观察和实现后续逻辑。应该避免经过向基类添加新方法来扩展能力,由于Transform使用队列和任务描述状态,这个描述是完备的,机制也是完善的。

就像我在另外一篇介绍JavaScript语言的文章里写的同样,若是针对问题的模型具备完备性,即便抽象,也能够经过组合基本操做和概念得到更多的特性,而不是在模型上增长概念,除非你认为模型不够完备。


软件工程中不是什么地方都要上状态机(automaton)这么严格的模型工具,项目软件里写到bug数量足够低就能够了,可是若是你要写系统软件或者对正确性有苛刻要求的东西,若是你没有用状态机建模,那么实际上你没有完备设计。

固然有了完备设计也不意味着软件没bug了,但一个好的设计可让你对问题的理解、遇到问题时找到缘由,有极大的帮助。

在复杂系统中,上述的同步方法状态机组合,和Hierarchical的状态机组合,是咱们目前已知的两种具备完备性的模型方法。可是二者不一样。虽然Transform的组合看起来是一个Hierarchy,可是它就像你在纸上画一棵树,它仍然是二维的,每一个step的总体状态联动的迁移只是在populate一次状态迁移的范围,并非几何级数的增长状态组合;因此咱们仍然能够构筑一个线性的因果链,每一个step因果因果这样的继续下去,和没有并发的状态机是同样。

本质上这是数学概括法:若是咱们能证实若是n正确,那么n+1是正确的,这就能够证实chain下去的状态组合即便是无穷也是正确的。


第二段代码是使用的一个示例,这个class没有必要,是为了保证和老代码接口兼容,由于有一些项目内其余代码的依赖性就不解释了,很容易看明白大概逻辑;列在这里只是展现一下Transform使用时pipe过程的代码样子。

const Promise = require('bluebird')
const path = require('path')
const fs = Promise.promisifyAll(require('fs'))
const EventEmitter = require('events')
const debug = require('debug')('dircopy')
const rimraf = require('rimraf')

const Transform = require('../lib/transform')
const { forceXstat } = require('../lib/xstat')
const fileCopy = require('./filecopy')

class DirCopy extends EventEmitter {

  constructor (src, tmp, files, getDirPath) {
    super()

    let dst = getDirPath()
    let pipe = new Transform({
      name: 'copy',
      concurrency: 4,
      transform: (x, callback) =>
        (x.abort = fileCopy(path.join(src, x.name), path.join(tmp, x.name),
          (err, fingerprint) => {
            delete x.abort
            if (err) {
              callback(err)
            } else {
              callback(null, (x.fingerprint = fingerprint, x))
            }
          }))
    }).pipe(new Transform({
      name: 'stamp',
      transform: (x, callback) =>
        forceXstat(path.join(tmp, x.name), { hash: x.fingerprint },
          (err, xstat) => err
            ? callback(err)
            : callback(null, (x.uuid = xstat.uuid, x)))
    })).pipe(new Transform({
      name: 'move',
      transform: (x, callback) =>
        fs.link(path.join(tmp, x.name), path.join(dst, x.name), err => err
          ? callback(err)
          : callback(null, x))
    })).pipe(new Transform({
      name: 'remove',
      transform: (x, callback) => rimraf(path.join(tmp, x.name), () => callback(null))
    })).root()

    let count = 0

    // drain data
    pipe.on('data', data => this.emit('data', data))
    pipe.on('step', (tname, xname) => {
      debug('------------------------------------------')
      debug(`step ${count++}`, tname, xname)
      pipe.print()
      if (pipe.isStopped()) this.emit('stopped')
    })

    files.forEach(name => pipe.push({ name }))
    pipe.print()
    this.pipe = pipe
  }

}

module.exports = DirCopy
相关文章
相关标签/搜索