构建流式应用—RxJS详解

最近在 Alloyteam Conf 2016 分享了《使用RxJS构建流式前端应用》,会后在线上线下跟你们交流时发现对于 RxJS 的态度呈现出两大类:有用过的都表达了 RxJS 带来的优雅编码体验,未用过的则反馈太难入门。因此,本文将结合本身对 RxJS 理解,经过 RxJS 的实现原理、基础实现及实例来一步步分析,提供 RxJS 较为全面的指引,感觉下使用 RxJS 编码是怎样的体验。html

目录

  • 常规方式实现搜索功能
  • RxJS · 流 Stream
  • RxJS 实现原理简析
    • 观察者模式
    • 迭代器模式
    • RxJS 的观察者 + 迭代器模式
  • RxJS 基础实现
    • Observable
    • Observer
  • RxJS · Operators
    • Operators ·入门
    • 一系列的 Operators 操做
  • 使用 RxJS 一步步实现搜索功能
  • 总结

常规方式实现搜索

作一个搜索功能在前端开发中其实并不陌生,通常的实现方式是:监听文本框的输入事件,将输入内容发送到后台,最终将后台返回的数据进行处理并展现成搜索结果。前端



    var text = document.querySelector('#text');
    text.addEventListener('keyup', (e) =>{
        var searchText = e.target.value;
        // 发送输入内容到后台
        $.ajax({
            url: `search.qq.com/${searchText}`,
            success: data => {
              // 拿到后台返回数据,并展现搜索结果
              render(data);
            }
        });
    });
复制代码

上面代码实现咱们要的功能,但存在两个较大的问题:react

  1. 多余的请求
    当想搜索“爱迪生”时,输入框可能会存在三种状况,“爱”、“爱迪”、“爱迪生”。而这三种状况将会发起 3 次请求,存在 2 次多余的请求。git

  2. 已无用的请求仍然执行
    一开始搜了“爱迪生”,而后立刻改搜索“达尔文”。结果后台返回了“爱迪生”的搜索结果,执行渲染逻辑后结果框展现了“爱迪生”的结果,而不是当前正在搜索的“达尔文”,这是不正确的。github

减小多余请求数,能够用 setTimeout 函数节流的方式来处理,核心代码以下ajax



    var text = document.querySelector('#text'),
        timer = null;
    text.addEventListener('keyup', (e) =>{
        // 在 250 毫秒内进行其余输入,则清除上一个定时器
        clearTimeout(timer);
        // 定时器,在 250 毫秒后触发
        timer = setTimeout(() => {
            console.log('发起请求..');
        },250)
    })
复制代码

已无用的请求仍然执行的解决方式,能够在发起请求前声明一个当前搜索的状态变量,后台将搜索的内容及结果一块儿返回,前端判断返回数据与当前搜索是否一致,一致才走到渲染逻辑。最终代码为编程



    var text = document.querySelector('#text'),
        timer = null,
        currentSearch = '';

    text.addEventListener('keyup', (e) =>{
        clearTimeout(timer)
        timer = setTimeout(() => {
            // 声明一个当前所搜的状态变量
            currentSearch = '书'; 

            var searchText = e.target.value;
            $.ajax({
                url: `search.qq.com/${searchText}`,
                success: data => {
                    // 判断后台返回的标志与咱们存的当前搜索变量是否一致
                    if (data.search === currentSearch) {
                        // 渲染展现
                        render(data);
                    } else {
                        // ..
                    }
                }           
            });
        },250)
    })
复制代码

上面代码基本知足需求,但代码开始显得乱糟糟。咱们来使用 RxJS 实现上面代码功能,以下api

var text = document.querySelector('#text');
var inputStream = Rx.Observable.fromEvent(text, 'keyup')
                    .debounceTime(250)
                    .pluck('target', 'value')
                    .switchMap(url => Http.get(url))
                    .subscribe(data => render(data));复制代码

能够明显看出,基于 RxJS 的实现,代码十分简洁!数组

RxJS · 流 Stream

RxJS 是 Reactive Extensions for JavaScript 的缩写,起源于 Reactive Extensions,是一个基于可观测数据流在异步编程应用中的库。RxJS 是 Reactive Extensions 在 JavaScript 上的实现,而其余语言也有相应的实现,如 RxJava、RxAndroid、RxSwift 等。学习 RxJS,咱们须要从可观测数据流(Streams)提及,它是 Rx 中一个重要的数据类型。框架

是在时间流逝的过程当中产生的一系列事件。它具备时间与事件响应的概念。

rxjs_stream

下雨天时,雨滴随时间推移逐渐产生,下落时对水面产生了水波纹的影响,这跟 Rx 中的流是很相似的。而在 Web 中,雨滴可能就是一系列的鼠标点击、键盘点击产生的事件或数据集合等等。

RxJS 基础实现原理简析

对流的概念有必定理解后,咱们来说讲 RxJS 是怎么围绕着流的概念来实现的,讲讲 RxJS 的基础实现原理。RxJS 是基于观察者模式和迭代器模式以函数式编程思惟来实现的。

观察者模式

观察者模式在 Web 中最多见的应该是 DOM 事件的监听和触发。

  • 订阅:经过 addEventListener 订阅 document.body 的 click 事件。
  • 发布:当 body 节点被点击时,body 节点便会向订阅者发布这个消息。
document.body.addEventListener('click', function listener(e) {
    console.log(e);
},false);

document.body.click(); // 模拟用户点击复制代码

将上述例子抽象模型,并对应通用的观察者模型

2016-11-01 9 53 52

迭代器模式

迭代器模式能够用 JavaScript 提供了 Iterable Protocol 可迭代协议来表示。Iterable Protocol 不是具体的变量类型,而是一种可实现协议。JavaScript 中像 Array、Set 等都属于内置的可迭代类型,能够经过 iterator 方法来获取一个迭代对象,调用迭代对象的 next 方法将获取一个元素对象,以下示例。

var iterable = [1, 2];

var iterator = iterable[Symbol.iterator]();

iterator.next(); // => { value: "1", done: false}
iterator.next(); // => { value: "2", done: false}

iterator.next(); // => { value: undefined, done: true}复制代码

元素对象中:value 表示返回值,done 表示是否已经到达最后。

遍历迭代器可使用下面作法。

var iterable = [1, 2];
var iterator = iterable[Symbol.iterator]();

var iterator = iterable();

while(true) {
    try {
        let result = iterator.next();  // 
    } catch (err) {
        handleError(err);  // 
    }
    if (result.done) {
        handleCompleted();  // 
        break;
    }
    doSomething(result.value);
}复制代码

主要对应三种状况:

  • 获取下一个值
    调用 next 能够将元素一个个地返回,这样就支持了返回屡次值。

  • 无更多值(已完成)
    当无更多值时,next 返回元素中 done 为 true。

  • 错误处理
    当 next 方法执行时报错,则会抛出 error 事件,因此能够用 try catch 包裹 next 方法处理可能出现的错误。

RxJS 的观察者 + 迭代器模式

RxJS 中含有两个基本概念:Observables 与 Observer。Observables 做为被观察者,是一个值或事件的流集合;而 Observer 则做为观察者,根据 Observables 进行处理。
Observables 与 Observer 之间的订阅发布关系(观察者模式) 以下:

  • 订阅:Observer 经过 Observable 提供的 subscribe() 方法订阅 Observable。
  • 发布:Observable 经过回调 next 方法向 Observer 发布事件。

下面为 Observable 与 Observer 的伪代码

// Observer
var Observer = {
    next(value) {
        alert(`收到${value}`);
    }
};

// Observable
function Observable (Observer) {
    setTimeout(()=>{
        Observer.next('A');
    },1000)
}

// subscribe
Observable(Observer);复制代码

上面实际也是观察者模式的表现,那么迭代器模式在 RxJS 中如何体现呢?

在 RxJS 中,Observer 除了有 next 方法来接收 Observable 的事件外,还能够提供了另外的两个方法:error() 和 complete(),与迭代器模式一一对应。

var Observer = {
    next(value) { /* 处理值*/ },
    error(error) { /* 处理异常 */ },
    complete() { /* 处理已完成态 */ }
};复制代码

结合迭代器 Iterator 进行理解:

  • next()
    Observer 提供一个 next 方法来接收 Observable 流,是一种 push 形式;而 Iterator 是经过调用 iterator.next() 来拿到值,是一种 pull 的形式。

  • complete()
    当再也不有新的值发出时,将触发 Observer 的 complete 方法;而在 Iterator 中,则须要在 next 的返回结果中,当返回元素 done 为 true 时,则表示 complete。

  • error()
    当在处理事件中出现异常报错时,Observer 提供 error 方法来接收错误进行统一处理;Iterator 则须要进行 try catch 包裹来处理可能出现的错误。

下面是 Observable 与 Observer 实现观察者 + 迭代器模式的伪代码,数据的逐渐传递传递与影响其实就是流的表现。

// Observer
var Observer = {
    next(value) {
        alert(`收到${value}`);
    },
    error(error) {
        alert(`收到${value}`);
    },
    complete() {
        alert("complete");
    },
};

// Observable
function Observable (Observer) {
    [1,2,3].map(item=>{
        Observer.next(item);
    });

    Observer.complete();
    // Observer.error("error message");
}

// subscribe
Observable(Observer);复制代码

RxJS 基础实现

有了上面的概念及伪代码,那么在 RxJS 中是怎么建立 Observable 与 Observer 的呢?

建立 Observable

RxJS 提供 create 的方法来自定义建立一个 Observable,可使用 next 来发出流。

var Observable = Rx.Observable.create(observer => {
    observer.next(2);
    observer.complete();
    return  () => console.log('disposed');
});复制代码

建立 Observer

Observer 能够声明 next、err、complete 方法来处理流的不一样状态。

var Observer = Rx.Observer.create(
    x => console.log('Next:', x),
    err => console.log('Error:', err),
    () => console.log('Completed')
);复制代码

最后将 Observable 与 Observer 经过 subscribe 订阅结合起来。

var subscription = Observable.subscribe(Observer);复制代码

RxJS 中流是能够被取消的,调用 subscribe 将返回一个 subscription,能够经过调用 subscription.unsubscribe() 将流进行取消,让流再也不产生。

看了起来挺复杂的?换一个实现形式:

// @Observables 建立一个 Observables
var streamA = Rx.Observable.of(2);

// @Observer streamA$.subscribe(Observer)
streamA.subscribe(v => console.log(v));复制代码

将上面代码改用链式写法,代码变得十分简洁:

Rx.Observable.of(2).subscribe(v => console.log(v));复制代码

RxJS · Operators 操做

Operators 操做·入门

Rx.Observable.of(2).subscribe(v => console.log(v));复制代码

上面代码至关于建立了一个流(2),最终打印出2。那么若是想将打印结果翻倍,变成4,应该怎么处理呢?

方案一?: 改变事件源,让 Observable 值 X 2

Rx.Observable.of(2 * 2 /* ).subscribe(v => console.log(v));复制代码

方案二?: 改变响应方式,让 Observer 处理 X 2

Rx.Observable.of(2).subscribe(v => console.log(v * 2 /* ));复制代码

优雅方案: RxJS 提供了优雅的处理方式,能够在事件源(Observable)与响应者(Observer)之间增长操做流的方法。

Rx.Observable.of(2)
             .map(v => v * 2) /* 
             .subscribe(v => console.log(v));复制代码

map 操做跟数组操做的做用是一致的,不一样的这里是将流进行改变,而后将新的流传出去。在 RxJS 中,把这类操做流的方式称之为 Operators(操做)。RxJS提供了一系列 Operators,像map、reduce、filter 等等。操做流将产生新流,从而保持流的不可变性,这也是 RxJS 中函数式编程的一点体现。关于函数式编程,这里暂很少讲,能够看看另一篇文章 《谈谈函数式编程》

到这里,咱们知道了,流从产生到最终处理,可能通过的一些操做。即 RxJS 中 Observable 将通过一系列 Operators 操做后,到达 Observer。

Operator1   Operator2
Observable ----|-----------|-------> Observer复制代码

一系列的 Operators 操做

RxJS 提供了很是多的操做,像下面这些。

Aggregate,All,Amb,ambArray,ambWith,AssertEqual,averageFloat,averageInteger,averageLong,blocking,blockingFirst,blockingForEach,blockingSubscribe,Buffer,bufferWithCount,bufferWithTime,bufferWithTimeOrCount,byLine,cache,cacheWithInitialCapacity,case,Cast,Catch,catchError,catchException,collect,concatWith,Connect,connect_forever,cons,Contains,doAction,doAfterTerminate,doOnComplete,doOnCompleted,doOnDispose,doOnEach,doOnError,doOnLifecycle,doOnNext,doOnRequest,dropUntil,dropWhile,ElementAt,ElementAtOrDefault,emptyObservable,fromNodeCallback,fromPromise,fromPublisher,fromRunnable,Generate,generateWithAbsoluteTime,generateWithRelativeTime,Interval,intervalRange,into,latest (Rx.rb version of Switch),length,mapTo,mapWithIndex,Materialize,Max,MaxBy,mergeArray,mergeArrayDelayError,mergeWith,Min,MinBy,multicastWithSelector,nest,Never,Next,Next (BlockingObservable version),partition,product,retryWhen,Return,returnElement,returnValue,runAsync,safeSubscribe,take_with_time,takeFirst,TakeLast,takeLastBuffer,takeLastBufferWithTime,windowed,withFilter,withLatestFrom,zipIterable,zipWith,zipWithIndex复制代码

关于每个操做的含义,能够查看官网进行了解。operators 具备静态(static)方法和实例( instance)方法,下面使用 Rx.Observable.xx 和 Rx.Observable.prototype.xx 来简单区分,举几个例子。

Rx.Observable.of
of 能够将普通数据转换成流式数据 Observable。如上面的 Rx.Observable.of(2)。

Rx.Observable.fromEvent
除了数值外,RxJS 还提供了关于事件的操做,fromEvent 能够用来监听事件。当事件触发时,将事件 event 转成可流动的 Observable 进行传输。下面示例表示:监听文本框的 keyup 事件,触发 keyup 能够产生一系列的 event Observable。

var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
             .subscribe(e => console.log(e));复制代码

Rx.Observable.prototype.map
map 方法跟咱们日常使用的方式是同样的,不一样的只是这里是将流进行改变,而后将新的流传出去。上面示例已有涉及,这里再也不多讲。

Rx.Observable.of(2)
             .map(v => 10 * v)
             .subscribe(v => console.log(v));复制代码

Rx 提供了许多的操做,为了更好的理解各个操做的做用,咱们能够经过一个可视化的工具 marbles 图 来辅助理解。如 map 方法对应的 marbles 图以下

map

箭头能够理解为时间轴,上面的数据通过中间的操做,转变成下面的模样。

Rx.Observable.prototype.mergeMap
mergeMap 也是 RxJS 中经常使用的接口,咱们来结合 marbles 图(flatMap(alias))来理解它

rxjs_flatmap

上面的数据流中,产生了新的分支流(流中流),mergeMap 的做用则是将分支流调整回主干上,最终分支上的数据流都通过主干的其余操做,其实也是将流中流进行扁平化。

Rx.Observable.prototype.switchMap
switchMap 与 mergeMap 都是将分支流疏通到主干上,而不一样的地方在于 switchMap 只会保留最后的流,而取消抛弃以前的流。

除了上面提到的 marbles,也能够 ASCII 字符的方式来绘制可视化图表,下面将结合 Map、mergeMap 和 switchMap 进行对比来理解。

@Map             @mergeMap            @switchMap
                         ↗  ↗                 ↗  ↗
-A------B-->           a2 b2                a2 b2  
-2A-----2B->          /  /                 /  /  
                    /  /                 /  /
                  a1 b1                a1 b1
                 /  /                 /  /
                -A-B----------->     -A-B---------->
                --a1-b1-a2-b2-->     --a1-b1---b2-->复制代码

mergeMap 和 switchMap 中,A 和 B 是主干上产生的流,a一、a2 为 A 在分支上产生,b一、b2 为 B 在分支上产生,可看到,最终将归并到主干上。switchMap 只保留最后的流,因此将 A 的 a2 抛弃掉。

Rx.Observable.prototype.debounceTime
debounceTime 操做能够操做一个时间戳 TIMES,表示通过 TIMES 毫秒后,没有流入新值,那么才将值转入下一个操做。

rxjs_debounce

RxJS 中的操做符是知足咱们之前的开发思惟的,像 map、reduce 这些。另外,不管是 marbles 图仍是用 ASCII 字符图这些可视化的方式,都对 RxJS 的学习和理解有很是大的帮助。

使用 RxJS 一步步实现搜索示例

RxJS 提供许多建立流或操做流的接口,应用这些接口,咱们来一步步将搜索的示例进行 Rx 化。

使用 RxJS 提供的 fromEvent 接口来监听咱们输入框的 keyup 事件,触发 keyup 将产生 Observable。

var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
             .subscribe(e => console.log(e));复制代码

这里咱们并不想输出事件,而想拿到文本输入值,请求搜索,最终渲染出结果。涉及到两个新的 Operators 操做,简单理解一下:

  • Rx.Observable.prototype.pluck('target', 'value')
    将输入的 event,输出成 event.target.value。

  • Rx.Observable.prototype.mergeMap()
    将请求搜索结果输出回给 Observer 上进行渲染。

var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
             .pluck('target', 'value') // 
             .mergeMap(url => Http.get(url)) // 
             .subscribe(data => render(data))复制代码

上面代码实现了简单搜索呈现,但一样存在一开始说起的两个问题。那么如何减小请求数,以及取消已无用的请求呢?咱们来了解 RxJS 提供的其余 Operators 操做,来解决上述问题。

  • Rx.Observable.prototype.debounceTime(TIMES)
    表示通过 TIMES 毫秒后,没有流入新值,那么才将值转入下一个环节。这个与前面使用 setTimeout 来实现函数节流的方式有一致效果。

  • Rx.Observable.prototype.switchMap()
    使用 switchMap 替换 mergeMap,将能取消上一个已无用的请求,只保留最后的请求结果流,这样就确保处理展现的是最后的搜索的结果。

最终实现以下,与一开始的实现进行对比,能够明显看出 RxJS 让代码变得十分简洁。

var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
             .debounceTime(250) // 
             .pluck('target', 'value')
             .switchMap(url => Http.get(url)) // 
             .subscribe(data => render(data))复制代码

总结

本篇做为 RxJS 入门篇到这里就结束,关于 RxJS 中的其余方面内容,后续再拎出来进一步分析学习。
RxJS 做为一个库,能够与众多框架结合使用,但并非每一种场合都须要使用到 RxJS。复杂的数据来源,异步多的状况下才能更好凸显 RxJS 做用,这一块能够看看民工叔写的《流动的数据——使用 RxJS 构造复杂单页应用的数据逻辑》 相信会有更好的理解。点击查看更多文章>>

附:
RxJS(JavaScript) github.com/Reactive-Ex…
RxJS(TypeScript ) github.com/ReactiveX/r…

相关文章
相关标签/搜索