RxJS是一个基于可观测数据流在异步编程应用中的库。javascript
ReactiveX is a combination of the best ideas from
the Observer pattern, the Iterator pattern, and functional programming
正如官网所说,RxJS是基于观察者模式,迭代器模式和函数式编程。所以,首先要对这几个模式有所理解php
window.addEventListener('click', function(){ console.log('click!'); })
JS的事件监听就是天生的观察者模式。给window的click事件(被观察者)绑定了一个listener(观察者),当事件发生,回调函数就会被触发css
<!-- more -->html
迭代器模式,提供一种方法顺序访问一个聚合对象中的各类元素,而又不暴露该对象的内部表示。前端
ES6里的Iterator便可实现:java
let arr = ['a', 'b', 'c']; let iter = arr[Symbol.iterator](); iter.next() // { value: 'a', done: false } iter.next() // { value: 'b', done: false } iter.next() // { value: 'c', done: false } iter.next() // { value: undefined, done: true }
反复调用迭代对象的next
方法,便可顺序访问git
提到函数式编程,就要提到声明式编程和命令式编程
函数式编程是声明式编程的体现github
问题:将数组[1, 2, 3]
的每一个元素乘以2,而后计算总和。ajax
命令式编程编程
const arr = [1, 2, 3]; let total = 0; for(let i = 0; i < arr.length; i++) { total += arr[i] * 2; }
声明式编程
const arr = [1, 2, 3]; let total = arr.map(x => x * 2).reduce((total, value) => total + value)
声明式的特色是专一于描述结果自己,不关注到底怎么到达结果。而命令式就是真正实现结果的步骤
声明式编程把原始数据通过一系列转换(map, reduce),最后获得想要的数据
如今前端流行的MVC框架(Vue,React,Angular),也都是提倡:编写UI结构时使用声明式编程,在编写业务逻辑时使用命令式编程
RxJS里有两个重要的概念须要咱们理解:Observable
(可观察对象)Observer
(观察者)
var btn = document.getElementById('btn'); var handler = function() { console.log('click'); } btn.addEventListener('click', handler)
上面这个例子里:btn
这个DOM元素的click
事件就是一个Observablehandler
这个函数就是一个Observer,当btn的click事件被触发,就会调用该函数
改用RxJS编写;
Rx.Observable.fromEvent(btn, 'click') .subscribe(() => console.log('click'));
fromEvent
把一个event转成了一个Observable
,而后它就能够被订阅subscribe
了
Observable其实就是数据流stream
流是在时间流逝的过程当中产生的一系列事件。它具备时间与事件响应的概念。
咱们能够把一切输入都当作数据流来处理,好比说:
当产生了一个流后,咱们能够经过操做符(Operator)对这个流进行一系列加工操做,而后产生一个新的流
Rx.Observable.fromEvent(window, 'click') .map(e => 1) .scan((total, now) => total + now) .subscribe(value => { console.log(value) })
map
把流转换成了一个每次产生1的新流,而后scan
相似reduce
,也会产生一个新流,最后这个流被订阅。最终实现了:每次点击累加1的效果
能够用一个效果图来表示该过程:
也能够对若干个数据流进行组合:
例子:咱们要实现下面这个效果:
Rx.Observable.fromEvent(document.querySelector('input[name=plus]'), 'click') .mapTo(1) .merge( Rx.Observable.fromEvent(document.querySelector('input[name=minus]'), 'click') .mapTo(-1) ) .scan((total, now) => total + now) .subscribe(value => { document.querySelector('#counter').innerText = value; })
merge
能够把两个数据流整个在一块儿,效果能够参考以下:
刚才那个例子的数据流以下:
以RxJS的写法,就是把按下加1当成一个数据流,把按下减1当成一个数据流,再经过merge把两个数据流合并,最后经过scan
操做符,把新流上的数据累加,这就是咱们想要的计数器效果
有时候,咱们的Observable送出的是一个新的Observable:
var click = Rx.Observable.fromEvent(document.body, 'click'); var source = click.map(e => Rx.Observable.of(1, 2, 3)); source.subscribe(value => { console.log(value) });
这里,console打印出来的是对象,而不是咱们想要的1,2,3,这是由于map
返回的Rx.Observable.of(1, 2, 3)
自己也是个Observable
用图表示以下:
click : ------c------------c-------- map(e => Rx.Observable.of(1,2,3)) source : ------o------------o-------- \ \ (123)| (123)|
所以,咱们订阅到的value值就是一个Observable对象,而不是普通数据1,2,3
我想要的其实不是Observable自己,而是属于这个Observable里面的那些东西,如今这个情形就是Observable里面又有Observable,有两层,但是我想要让它变成一层就好,该怎么办呢?
这就须要把Observable扁平化
const arr = [1, [2, 3], 4]; // 扁平化后: const flatArr = [1, 2, 3, 4];
concatAll
这个操做符就能够把Observable扁平化
var click = Rx.Observable.fromEvent(document.body, 'click'); var source = click.map(e => Rx.Observable.of(1, 2, 3)); var example = source.concatAll(); example.subscribe(value => { console.log(value) })
click : ------c------------c-------- map(e => Rx.Observable.of(1,2,3)) source : ------o------------o-------- \ \ (123)| (123)| concatAll() example: ------(123)--------(123)------------
flatMap
操做符也能够实现一样的做用,就是写法有些不一样:
var click = Rx.Observable.fromEvent(document.body, 'click'); var source = click.flatMap(e => Rx.Observable.of(1, 2, 3)); source.subscribe(value => { console.log(value) })
click : ------c------------c-------- flatMap(e => Rx.Observable.of(1,2,3)) source: ------(123)--------(123)------------
学完前面几个操做符,咱们就能够写一个简单的实例了
拖拽的原理是:
<style type="text/css"> html, body { height: 100%; background-color: tomato; position: relative; } #drag { position: absolute; display: inline-block; width: 100px; height: 100px; background-color: #fff; cursor: all-scroll; } </style> <div id="drag"></div>
const mouseDown = Rx.Observable.fromEvent(dragDOM, 'mousedown'); const mouseUp = Rx.Observable.fromEvent(body, 'mouseup'); const mouseMove = Rx.Observable.fromEvent(body, 'mousemove');
首先给出3个Observable,分别表明3种事件,咱们但愿mousedown的时候监听mousemove,而后mouseup时中止监听,因而RxJS能够这么写:
const source = mouseDown .map(event => mouseMove.takeUntil(mouseUp))
takeUntil
操做符能够在某个条件符合时,发送complete
事件
source: -------e--------------e----- \ \ --m-m-m-m| -m--m-m--m-m|
从图上能够看出,咱们还须要把source扁平化,才能获取所需数据。
完整代码:
const dragDOM = document.getElementById('drag'); const body = document.body; const mouseDown = Rx.Observable.fromEvent(dragDOM, 'mousedown'); const mouseUp = Rx.Observable.fromEvent(body, 'mouseup'); const mouseMove = Rx.Observable.fromEvent(body, 'mousemove'); mouseDown .flatMap(event => mouseMove.takeUntil(mouseUp)) .map(event => ({ x: event.clientX, y: event.clientY })) .subscribe(pos => { dragDOM.style.left = pos.x + 'px'; dragDOM.style.top = pos.y + 'px'; })
前面的例子,咱们都在讨论fromEvent
转换的Observable,其实还有不少种方法产生一个Observable
,其中create
也是一种常见的方法,能够用来建立自定义的Observable
var observable = Rx.Observable.create(function (observer) { observer.next(1); observer.next(2); observer.next(3); setTimeout(() => { observer.next(4); observer.complete(); }, 1000); }); console.log('just before subscribe'); observable.subscribe({ next: x => console.log('got value ' + x), error: err => console.error('something wrong occurred: ' + err), complete: () => console.log('done'), }); console.log('just after subscribe');
控制台执行的结果:
just before subscribe got value 1 got value 2 got value 3 just after subscribe got value 4 done
Observable 执行能够传递三种类型的值:
"Next" 通知: 发送一个值,好比数字、字符串、对象,等等。
"Error" 通知: 发送一个 JavaScript 错误 或 异常。
"Complete" 通知: 再也不发送任何值。
"Next" 通知是最重要,也是最多见的类型:它们表示传递给观察者的实际数据。"Error" 和 "Complete" 通知可能只会在 Observable 执行期间发生一次,而且只会执行其中的一个。
var observable = Rx.Observable.create(function subscribe(observer) { try { observer.next(1); observer.next(2); observer.next(3); observer.complete(); } catch (err) { observer.error(err); // 若是捕获到异常会发送一个错误 } });
Observer观察者只是一组回调函数的集合,每一个回调函数对应一种 Observable 发送的通知类型:next、error 和 complete 。
var observer = { next: x => console.log('Observer got a next value: ' + x), error: err => console.error('Observer got an error: ' + err), complete: () => console.log('Observer got a complete notification'), };
Observer和Observable是经过subscribe方法创建联系的
observable.subscribe(observer);
observer订阅了Observable以后,还能够取消订阅
var observable = Rx.Observable.from([10, 20, 30]); var subscription = observable.subscribe(x => console.log(x)); // 稍后: subscription.unsubscribe();
unsubscribe陷阱:
let stream$ = new Rx.Observable.create((observer) => { let i = 0; let id = setInterval(() => { console.log('setInterval'); observer.next(i++); },1000) }) let subscription = stream$.subscribe((value) => { console.log('Value', value) }); setTimeout(() => { subscription.unsubscribe(); }, 3000)
3秒后虽然取消了订阅,可是开启的setInterval定时器并不会自动清理,咱们须要本身返回一个清理函数
let stream$ = new Rx.Observable.create((observer) => { let i = 0; let id = setInterval(() => { observer.next(i++); },1000) // 返回了一个清理函数 return function(){ clearInterval( id ); } }) let subscription = stream$.subscribe((value) => { console.log('Value', value) }); setTimeout(() => { subscription.unsubscribe() // 在这咱们调用了清理函数 }, 3000)
<input type="text">
function sendRequest(search) { return Rx.Observable.ajax.getJSON(`http://deepred5.com/cors.php?search=${search}`) .map(response => response) } Rx.Observable.fromEvent(document.querySelector('input'), 'keyup') .map(e => e.target.value) .flatMap(search => sendRequest(search)) .subscribe(value => { console.log(value) })
用户每次在input框每次进行输入,均会触发ajax请求,而且每一个ajax返回的值都会被打印一遍
如今须要实现这样一个功能:
但愿用户在300ms之内中止输入,才发送请求(防抖),而且console打印出来的值只要最近的一个ajax返回的
Rx.Observable.fromEvent(document.querySelector('input'), 'keyup') .debounceTime(300) .map(e => e.target.value) .switchMap(search => sendRequest(search)) .subscribe(value => { console.log(value) })
debounceTime
表示通过n毫秒后,没有流入新值,那么才将值转入下一个环节switchMap
能取消上一个已无用的请求,只保留最后的请求结果流,这样就确保处理展现的是最后的搜索的结果
能够看到,RxJS对异步的处理是很是优秀的,对异步的结果能进行各类复杂的处理和筛选。
Redux的action都是同步的,因此默认状况下也只能处理同步数据流。
为了生成异步action,处理异步数据流,有许多不一样的解決方案,例如 redux-thunk、redux-promise、redux-saga 等等。
以redux-thunk举例:
调用一个异步API,首先要先定义三个同步action构造函数,分别表示
而后再定义一个异步action构造函数,该函数再也不是返回普通的对象,而是返回一个函数,在这个函数里,进行ajax异步操做,而后根据返回的成功和失败,分别调用前面定义的同步action
actions.js
export const FETCH_STARTED = 'WEATHER/FETCH_STARTED'; export const FETCH_SUCCESS = 'WEATHER/FETCH_SUCCESS'; export const FETCH_FAILURE = 'WEATHER/FETCH_FAILURE'; // 普通action构造函数,返回普通对象 export const fetchWeatherStarted = () => ({ type: FETCH_STARTED }); export const fetchWeatherSuccess = (result) => ({ type: FETCH_SUCCESS, result }) export const fetchWeatherFailure = (error) => ({ type: FETCH_FAILURE, error }) // 异步action构造函数,返回一个函数 export const fetchWeather = (cityCode) => { return (dispatch) => { const apiUrl = `/data/cityinfo/${cityCode}.html`; dispatch(fetchWeatherStarted()) return fetch(apiUrl).then((response) => { if (response.status !== 200) { throw new Error('Fail to get response with status ' + response.status); } response.json().then((responseJson) => { dispatch(fetchWeatherSuccess(responseJson.weatherinfo)); }).catch((error) => { dispatch(fetchWeatherFailure(error)); }); }).catch((error) => { dispatch(fetchWeatherFailure(error)); }) }; }
如今若是想要异步请求,只要:
// fetchWeather是个异步action构造函数 dispatch(fetchWeather('23333'));
咱们再来看看redux-observable
:
调用一个异步API,再也不须要定义一个异步action构造函数,全部的action构造函数都只是返回普通的对象
那么ajax请求在哪里发送?
答案是在Epic进行异步操做
Epic是redux-observable的核心原语。
它是一个函数,接收 actions 流做为参数而且返回 actions 流。 Actions 入, actions 出.
export const FETCH_STARTED = 'WEATHER/FETCH_STARTED'; export const FETCH_SUCCESS = 'WEATHER/FETCH_SUCCESS'; export const FETCH_FAILURE = 'WEATHER/FETCH_FAILURE'; export const fetchWeather = cityCode => ({ type: FETCH_STARTED, cityCode }); export const fetchWeatherSuccess = result => ( { type: FETCH_SUCCESS, result }; ); export const fetchWeatherFailure = (error) => ( { type: FETCH_FAILURE, error } ) export const fetchWeatherEpic = action$ => action$.ofType(FETCH_STARTED) .mergeMap(action => ajax.getJSON(`/data/cityinfo/${action.cityCode}.html`) .map(response => fetchWeatherSuccess(response.weatherinfo)) // 这个处理异常的action必须使用Observable.of方法转为一个observable .catch(error => Observable.of(fetchWeatherFailure(error))) );
如今若是想要异步请求,只要:
// fetchWeather只是个普通的action构造函数 dispatch(fetchWeather('23333'));
相较于thunk中间件,使用redux-observable来处理异步action,有如下优势:
原生JS传统解决异步的方式:callback、Generator、Promise、async/await
RxJS解决的是数据流的问题,它可让批量数据处理起来更方便
能够想象的一些使用场景:
能够看出,这种须要对流进行复杂操做的场景更加适合RxJS
公司内部目前的大部分系统,前端就可能不太适合用RxJS,由于大部分是后台CRUD系统,总体性、实时性的要求都不高,而且也没有特别复杂的数据流操做
咱们推荐在适合RxJS的地方用RxJS,可是不强求RxJS for everything。RxJS给了咱们另外一种思考和解决问题的方式,但这不必定是必要的