RxJS的另外四种实现方式(六)——使用Stream类实现

接上一篇 RxJS的另外四种实现方式(五)——使用生成器实现webpack

该实现方式与以前几种不一样的,该实现方式仅针对Nodejs环境。在Nodejs环境中,提供了Stream类,包括Readable、Transform、Writeable等子类都是可扩展的。从字面上看,正好对应Rx中的生产者、传递者、消费者。web

实现该库的原由是,一次在Nodejs中须要在koa框架里面提供event-stream功能,目前除了IE浏览器外其余浏览器都支持了服务端事件推送,这个功能能够很好的代替轮询。webpack用的热更新就是经过这个功能实现的。浏览器

言归正传,首先得实现生产者,咱们先来看interval框架

class Interval extends Readable {
    constructor(period) {
        super({ objectMode: true })
        this.period = period
        this.i = 0
    }
    _read(size) {
        setTimeout(() => this.push(this.i++), this.period)
    }
}
exports.interval = period => new Interval(period)

说明一下,构造函数传入objectMode:true的对象是让stream处于对象模式,而不是二进制流模式。_read函数必须覆盖父类,不然出错,当有订阅者链接上来后,就会调用_read方法。咱们在这个方法里面发送数据,即调用push方法,将数据发送给流的接收者。koa

当调用过push方法后,后面的接收者若是调用了callback回调,则表示数据消费完毕,会再次调用_read方法,直到push(null)表示生产者已经complete函数

FromArray也十分简单易读oop

class FromArray extends Readable {
    constructor(array) {
        super({ objectMode: true })
        this.array = array
        this.pos = 0
        this.size = array.length
    }
    _read(size) {
        if (this.pos < this.size) {
            this.push(this.array[this.pos++])
        } else
            this.push(null)
    }
}
exports.fromArray = array => new FromArray(array)

下面要实现一个转换器(操做符)Filterui

class Filter extends Transform {
    constructor(f) {
        super({ readableObjectMode: true, writableObjectMode: true })
        this.f = f
    }
    _transform(data, encoding, callback) {
        const f = this.f
        if (f(data)) {
            this.push(data);
        }
        callback();
    }
    _flush(callback) {
        callback()
    }
}
exports.filter = f => new Filter(f)

这时候咱们须要覆盖_transform、_flush函数,一样的,push方法会让数据流到下面的流中,而callback回调会使得上一个流继续发送数据。this

最后咱们来实现Subscriber.net

class Subscriber extends Writable {
    constructor(n, e, c) {
        super({ objectMode: true })
        this.n = n
        this.e = e
        this.c = c
    }
    _write(chunk, encoding, callback) {
        this.n(chunk)
        callback(null)
    }
    _final(callback) {
        this.c()
        callback()
    }
}

exports.subscribe = (n, e = noop, c = noop) => new Subscriber(n, e, c)

Subscriber是一个可写流,咱们必须覆盖_write方法用于消费数据,_final方法用于complete事件处理。这里没有实现error事件。有兴趣的同窗能够思考如何实现。

最后咱们须要把各类stream串起来,变成一个长长的水管

exports.pipe = pipeline || ((first, ...cbs) => cbs.reduce((aac, c) => aac.pipe(c), first));

高版本的Nodejs已经提供了pipeline方法,能够直接使用,低版本的话,能够用上面的方法进行链接。

至此,咱们已经使用Nodejs提供的Stream类实现了Rx的基本逻辑。(完)

相关文章
相关标签/搜索