这一篇是在实际工程中遇到的一个可贵的例子;反映在Node里两种编程范式的设计冲突。这种冲突具备普适性,但本文仅分析问题本质,不探讨更高层次的抽象。node
我在写一个相似HTTP的资源协议,叫RP,Resource Protocol,和HTTP不一样的地方,RP是构建在一个中立的传输层上的;这个传输层里最小的数据单元,message,是一个JSON对象。npm
协议内置支持multiplexing,即一个传输层链接能够同时维护多个RP请求应答过程。编程
考虑客户端request
类设计,相似Node内置的HTTP Client,或流行的npm包,如request
或 superagent
;promise
能够采用EventEmitter
方式emit error
和response
s事件,也能够采用Node Callback的形式,须要使用者提供接口形式为(err, res) => {}
的callback函数。服务器
随着async/await的流行,request
类也能够提供一个.then
接口,用以下方式实现(实际上superagent
就是这么实现的):并发
class Request extends Duplex { constructor () { super() ... this.promise = new Promise((resolve, reject) => { this.resolve = resolve this.reject = reject }) } then (...args) { return this.promise.then(...args) } }
RP的实际设计,形式和你们熟悉的HTTP Client有一点小区别,response对象自己不是stream,而是把stream作为一个property提供。换句话说,callback函数形式为:异步
(err, { data, chunk, stream }) => {}
若是请求返回的不是stream,则data
或者chunk
有值;若是返回的是stream,则仅stream
有值,且为stream.Readable
类型。async
这个形式上的区别和本文要讨论的问题无关。函数
RP底层从传输层取二进制数据,解析出message,而后emit给上层;它采用了一个简单方式,循环解析收到的data chunk,直到没有完整的message为止。性能
这意味着能够在一个tick里分发多个消息。request
对象也必须可以在一个tick里处理多个来自服务端的消息。
咱们具体要讨论的状况是服务器连续发了这样两条消息:
第一条意思是后面还有message stream,第二条abort指server发生意外没法继续发送了。
在request
对象收到第一条消息时,它建立response
对象,包含stream
对象:
this.res = { stream: new stream.Readabe({...}) } // this.emit('response', this.res) // this.callback(null, this.res) this.resolve(this.res)
象注释中emit
或trigger使用者提供的callback
,都没有问题;但若是调用resolve
,注意,Promise是保证异步的,这意味着使用者经过then
提供的onFulfilled
,不会在当前tick被调用。
接下来第二条消息,abort,在同一个tick被处理;但这个时候,由于使用者还没来得及挂载上任何listener,包括error handler,若是设计上要求这个stream emit error——很合理的设计要求——此时,按照Node的约定,error没有handler,整个程序crash了。
这个问题的dirty fix有不少种办法。
首先request.handleMessage
方法,若是没法同步完成对message的处理,而message的处理顺序又须要保证,它应该buffer message,这是node里最多见的一种synchronize方式,表明性的实现就是stream.Writable
。
但这里有一个困难,this.resolve
这个函数没有callback提供,必须预先知道运行环境的Promise实现方式;在node里是nextTick
,因此在this.resolve
以后nextTick
一下,同时buffer其它后续消息的处理,可让使用者在onFulfilled
函数中给stream挂载上handler。
这里能够看出,callback和emitter其实是同步的。
当调用callback或者listener时,request和使用者作了一个约定,你必须在这个函数内作什么(在对象上挂载全部的listener),而后我继续作什么(处理下一个消息,emit data或者error);这至关因而interface protocol对顺序的约定。
咱们能够称之为synchronous sequential composition,是程序语义意义上的。
对应的asynchronous版本呢?
若是咱们不去假设运行环境的Promise的实现呢?它应该和同步版本的语义同样对吧。
再回头看看问题,假如stream emit error不会致使系统crash,使用者在onFulfilled
拿到{ stream }
这个对象时,它看到了什么?一个已经发生错误后结束了的stream。
这个可能使用上会难过一点,须要判断一下,但还感受不出是多大的问题。
再进一步,若是是另外一种状况呢?Server在一个chunk里发来了3个消息;
这个时候使用者看到的仍是一个errored stream,data去哪里了呢?你还能说asynchronous sequential composition的语义和synchronous的一致么?不能了对吧,同步的版本处理了data,极可能对结果产生影响。
在理想的状况下,sequential composition,不管是synchronous的,仍是asynchronous的,语义(执行结果)应该一致。
那么来看看如何作到一个与Promise A+的实现无关的作法,保证异步和同步行为一致。
若是你愿意用『通信』理解计算,这个问题的答案很容易思考出来:假想这个异步的handler位于半人马座阿尔法星上,那咱们惟一能作的事情是老老实实按照事件发生的顺序,发送给它,不能打乱顺序,就像咱们收到他们时同样。
可是当咱们把进来的message,翻译实现成stream时,没能保证这个order,包括:
这是问题的root cause,当咱们异步处理一个消息序列时,前面写的实现break了顺序和内容的完整性。
在数学思惟上,咱们说Promise增长了一个callback/EventEmitter不具有的属性,deferred evaluation,是一个编程中罕见的temporal属性;固然这不奇怪,由于这就是Promise的目的。
同时Promise -> Value还有一个属性是它能够被不一样的使用者访问屡次,保持了Value的属性。
这也不奇怪。
只是Stream做为一种体积上能够为无穷大的值,在实践中不可能去cache全部的值,把它总体当成一个值处理,因此这个能够被无限提取的『值』属性就消失了。
可是这不意味着stream做为一个对象,它的行为,不能延迟等到它被构造且使用后才开始处理消息。
一种方式是写一个stream有这种能力的;stream.Readable
有一个flow属性,必须经过readable.resume
开始,这是一个触发方式;另外一个方式是有点tricky,能够截获response.stream
的getter,在它第一次被访问时触发异步处理buffered message。
这样的作法是不须要依赖Promise A+的实现的;但不是百分百asynchronous sequential composition,由于stream的handler确定是synchronous的。
彻底的asynchronous能够参照Dart的使用await消费stream的方式。
它的逻辑能够这样理解:把全部Event,不管哪里来的,包括error,都写到一个流里去,用await消费这个流;但实际上在await返回的时候仍然面对一个状态机,好处是
总结:
Node的Callback和EventEmitter在组合时handler/listener是同步的;Promise则反过来保证每一个handler/listener都是异步组合,这是二者的根本区别。
在顺序组合函数(或者进程代数意义上的进程)上,同步组合是紧耦合的;它体如今一旦功能上出现什么缘由,须要把一个同步逻辑修改为异步时,都要大动干戈,好比原本是读取内存,后来变成了读取文件。
若是程序天生写成异步组合,相似变化就不会对实现逻辑产生很大影响;可是细粒度的异步组合有巨大的性能损失,这和现代处理器和编译器的设计与实现有关。
真正理想的状况应该是开发者只表达“顺序”,并不表达它是同步仍是异步实现;就像前面看到的,实际上同步的实现都有能够对应的异步实现,差异只是执行效率和内存使用(buffer有更多的内存开销,同步处理实际上更可能是『阅后即焚』);
但咱们使用的imperative langugage不是如此,它在强制你表达顺序;而另一类号称将来其实狗屎的语言,在反过来强制你不得表达顺序。
都是神经病。学术界就不会真正理解产业界的实际问题。