初学rxjs,本着一个新手的角度完成一个小demo,相信过程当中会有不少你们也遇到过的问题,同时整个过程不断发散,讲解一些rxjs的核心知识点和API,但愿这篇文章能给学习rxjs的同窗们一些启发。javascript
项目地址html
折线图有12个点(按时间分布),每隔2秒(为了演示方便)刷新出一个点。java
先简单点想,react
须要一个集中存储状态
的地方,这里的状态其实就是图表对应的数据,这个地方每通过一个时间间隔就向服务器请求一次数据,它须要存储最近12个点对应的数据ios
把这种想法往rxjs上靠。首先咱们先写个最基本的可观察对象fetchData$
git
新建src/app.tses6
import {Observable, Observer} from 'rxjs'
import {Mock} from './mock'
const print = x => console.log('x: ', x)
const intervalEmit$ = Observable.interval(2000)
const fetchData$ = Observable.fromPromise(Mock.fetch())
intervalEmit$.subscribe(print)
fetchData$.subscribe(print)复制代码
新建src/mock.tsgithub
import axios from 'axios'
export class Mock {
static fetch():Promise<Number> {
// base : 20
return axios.get('https://zan.wilddogio.com/age.json')
.then(res => Number(res.data) + Mock.randomAge(10))
}
// random 1 ~ x
static randomAge(x) {
return Math.floor(1 + Math.random() * x)
}
}复制代码
很简单一个是每两秒produce一个递增值,一个是请求回来一个promiseable值并produce
如今咱们作个组合,也就是每隔两秒请求回来一个promiseable值并produce,咱们修改app.tsajax
const intervalEmit$ = Observable.interval(2000)
// 第一种
const app$ = intervalEmit$.switchMap(e => Observable.fromPromise(Mock.fetch()))
// 第二种,将switchMap拆开
const fetchData$ = intervalEmit$.map(e => Observable.fromPromise(Mock.fetch()))
const app$ = fetchData$.switch()
// 第三种,使用defer工厂建立Observable
const deferPromise$ = Observable.defer(function () {
return Observable.fromPromise(Mock.fetch())
})
const app$ = intervalEmit$.switchMap(e => deferPromise$)
app$.subscribe(print)复制代码
先说第三种,它相对单纯:),咱们先看下defer
的定义,Creates an Observable that, on subscribe, calls an Observable factory to make an Observable for each new Observer. 意思也比较好理解,defer接受一个产生observable的函数,当defer所建立的observable被订阅时就经过该函数建立一个observable对象。express
第一种和第二种放在一块说,map就不用说了,就是将一个observable通过一个函数
转换造成另外一个observable,和Array.prototyp.map很像,可是你能够把它理解成一个时间点上的值或者对一个值的一对一变换。重点说下switch,一样咱们先看下定义,Converts a higher-order Observable into a first-order Observable by subscribing to only the most recently emitted of those inner Observables. 解释一下,经过订阅的方式将一个高阶observable转换为一个低阶observable,同时仅产生一个低阶最近产生的值。
首先要先清楚什么叫高阶,
var fn = function(a, b) {return a + b}复制代码
经过typeof fn能够看到fn的类型是function
,继续
var fn1 = fn(1,2)复制代码
经过typeof fn1能够看到fn1的类型是number
,OK,它已经不是函数了,那么如何让fn1继续是函数呢,咱们改写一下
var fn = function(a) {return function(b) {return a + b}}复制代码
若是此次你还想获得1+2=3,那么你须要fn(1)(2)才能获得,也就是说咱们想获得最终的结果调用了一次以上的函数,好的这就叫作高阶,超过一次就是高阶,这和数学里的高阶导数相似的。好了咱们回到switch的主题。
var ob$ // 一个可观察对象
var higher$ = ob$.实例operator(静态operator)复制代码
这里有一个实例operator
,它就是一个转换器,它将一个源observable做为一个模版转变为另一个observable,并且源observable是不被改变的,而静态operator
就像一个observable制造器同样,一启动(subscribe)就开始生产。所以
var higher$ = ob$.实例operator(静态operator)
这里获得的higher$就是一个高阶observable
了,由于当你订阅它时,它不像静态operator产生数据,而是产生observable,因此就像你执行fn(1)产生的是一个新的函数而不是值同样。下面是个小栗子,能够看到打印出的是observable。
var print = x=>console.log('x: ', x)
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
higherOrder.subscribe(print)
// x: IntervalObservable {_isScalar: false, period: 1000, scheduler: AsyncScheduler}复制代码
所以咱们须要switch将high$转换成低阶observable,
var lower$ = higher$.switch()
这样当咱们订阅lower$的时候,将会获得静态operator
所产生的值,看官方栗子,
var print = x=>console.log('x: ', x)
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var lowerOrder = higherOrder.switch()
lowerOrder.subscribe(print)
//== 第一次点击 ==
// x: 0
// x: 1
//== 第二次点击 ==
// x: 0复制代码
能够看到,如今打印出的是值了,并且当咱们再次点击时,Rx.Observable.interval(1000)被从新执行了,这也正是Flattens an Observable-of-Observables by dropping the previous inner Observable once a new one appears.的含义,当外层observable产生值时,它会触发丢弃最近一次被订阅的内层observable。咱们知道promise对象一旦建立,它处于pending状态,最终变为onFulfille或者onRejected状态,所以它是不能被取消的。而经过rxjs能够达到目的,看一个栗子。咱们用express作一个restFul服务器,
app.js
var express = require('express');
var app = express()
app.use(express.static('blog'));
app.get('/delay', function(req, res) {
setTimeout(function(){
res.send('hello world')
},3000)
})
var server = app.listen(3000, function () {
var host = server.address().address
var port = server.address().port
console.log('app listening at http://%s:%s', host, port)
})复制代码
当服务器接收到http://localhost:3000/delay请求时,延迟三秒发送响应。再看客户端代码
最近被取消.html
<script> window.onload = function () { var print = x=>console.log('x: ', x) var ajax$ = Rx.Observable.fromPromise($.ajax('/delay')) var click$ = Rx.Observable.fromEvent(document, 'click') var higher$ = click$.map(e=>Rx.Observable.fromPromise($.ajax('/delay'))) var app$ = higher$.switch() app$.subscribe(print) //当我在三秒内疯狂点击5次,其实只返回一次数据,也就是说前四次被unsubscribe了 } </script>复制代码
此时我在页面疯狂点击五次(三秒以内),你会看到发出了五次请求,可是最终缺只打印出一条hello world,是的前四次都被unsubscribe
了也就是官网中多说的drop,这就达到了撤销promise的效果。
咱们继续,如今咱们实现了每两秒发送一个请求,接下来咱们实现数据的存储
首先咱们要先存储够24个点,以后每来一个点丢弃一个最旧的点。咱们小时候都听过磁带,录音机有倒带的功能(不是周杰伦给蔡依林写的那首),所以磁带存储了整个过程,你能够回退到以前播放的任意一个时间点从新播放,其实咱们的一次次请求就像在播放磁带,咱们想获取到以前点的最好办法就是能够存储它们,磁带也有存储大小,那么咱们也不可能无限存储,因此咱们就暂存最近24次记录。下面rxjs的倒带replay登场。
在rxjs的api文档中搜索replay能够看到两个东东ReplaySubject
和publishReplay
,前者是一个Subject类,后者是一个Observable实例operator,他们之间有没有什么关联,咱们仍是先来看看他俩该怎么用吧,先说和Observable更关系更紧密的publishReplay。
public publishReplay(bufferSize: , windowTime: , scheduler: *): ConnectableObservable
export function publishReplay(bufferSize = Number.POSITIVE_INFINITY, windowTime = Number.POSITIVE_INFINITY, scheduler) {
return multicast.call(this, new ReplaySubject(bufferSize, windowTime, scheduler));
}复制代码
原来publishReplay的三个参数都是为ReplaySubject实例化服务的,那么对于参数咱们先按下不谈,看看这个multicast
,这个this表明Observable实例,那么在咱们看看这个operator以前,咱们先说下单播
和多播
,这对咱们理解该operator颇有帮助。
虽然到目前为止咱们尚未讲Subject,可是先白话一下单播Observable和多播Subject,单播很高冷(cold)很专一(独立),她从不主动联系别人,只有在别人关注她后,才会和这我的侃侃而谈。再来一我的关注她,和她交流中感觉不到还有别人的存在。而Subject就很热情(hot)喜欢分享(不独立)。不论什么时候关注她,她都乐于将经验与人分享。下面看两个小栗子。
Obserable单播
const printA = (val) => console.log('observerA :' + val)
const printB = (val) => console.log('observerB :' + val)
var clicks = Rx.Observable.fromEvent(document, 'click');
var ones = clicks.mapTo(1);
var seed = 0;
var count = ones.scan((acc, one) => acc + one, seed);
count.subscribe(printA);
setTimeout(function() {
console.log('another subscribe.')
ones.scan((acc, one) => acc + one, seed).subscribe(printB)
}, 3000)复制代码
从图中能够看到,3秒之后observerB依然从1开始打印,同时也能够看出只有别人订阅她的时候,她才会和别人沟通。
从这个图能够更直观的看出,当咱们订阅蓝色scan转换后的observable和红色scan转换后的observable时,其实走的是两个独立的分支,每次订阅也都是经过fromEvent建立了一个新的observable,其实observable就是一个函数,当收到订阅时,就执行函数,在函数中经过订阅者留下的通知方式通知到订阅者。再来看Subject多播。
Subject多播
var subject = new Rx.Subject()
subject.subscribe(printA)
setTimeout(function() {
console.log('another subscribe.')
subject.subscribe(printB)
}, 3000)
Rx.Observable.fromEvent(document, 'click').mapTo(1).scan((acc, one) => acc + one, 0)
.do(num => subject.next(num))
.subscribe()复制代码
从图中看到虽然observerB3秒后姗姗来到,可是依然分享到了observerA的努力成果,从3开始打印。同时看到subject是主动告知订阅者,so hot~
能够看出Subject和Observable的区别,三秒后的订阅并无建立一个新的分支,也就是没有新的observable实例以及后续的一些列变换。
这里咱们简单讲解了Observable的冷、单播和独立性以及Subject的热、多播和共享性。那么咱们回来,继续说multicast,接受一个Subject实例做为参数,咱们有理由相信,这个operator是observable实例经过subject实例被赋予了多播的特性。咱们看一个multicast的小栗子。
var clickAddOne$ = Rx.Observable.fromEvent(document, 'click').mapTo(1).scan((acc, one) => acc + one, 0)
var subject = new Rx.Subject
subject.subscribe(printA)
setTimeout(function() {
console.log('another subscribe.')
subject.subscribe(printB)
}, 3000)
var app$ = clickAddOne$.multicast(subject)
app$.subscribe()复制代码
这段代码运行起来除了another subscribe.,不论你如何点击都不会打印其余信息。看来这个app$不是单纯的observable实例,咱们看下rxjs官网对于multicast的描述:
意思大概是,返回值是一个ConnectableObservable实例
,该实例能够产生数据共享给潜在的订阅者(即Subject实例上的订阅者),咱们修改一下代码。
// app$.subscribe()
app$.connect()复制代码
从图中咱们看到了和上面Subject多播一致的结果。这里咱们看到了一个陌生的方法connect
,ConnectableObservable
继承自Observable
,同时具备一个connect
方法和一个refCount
方法。connect方法决定什么时候订阅生效,同时返回一个方法以决定什么时候取消全部订阅。
var clickAddOne$ = Rx.Observable.fromEvent(document, 'click').mapTo(1).scan((acc, one) => acc + one, 0).do(x=>console.log('do: ' + x))
var subject = new Rx.Subject
subject.subscribe(printA)
setTimeout(function() {
console.log('another subscribe.')
subject.subscribe(printB)
}, 3000)
var app$ = clickAddOne$.multicast(subject)
var connector = app$.connect()
setTimeout(function() {
connector.unsubscribe()
}, 6000)复制代码
6秒事后,点击不会产生任何打印信息。这里显示调用connect和返回实例上的unsubscribe显得太命令式了,这里咱们还可使用refCount使得这个过程的关注点放在observer的订阅和取消上。改写下上面的例子
var clickAddOne$ = Rx.Observable.fromEvent(document, 'click').mapTo(1).scan((acc, one) => acc + one, 0).do(x=>console.log('do: ' + x))
var subject = new Rx.Subject
var app$ = clickAddOne$.multicast(subject).refCount()
app$.subscribe(printA)
setTimeout(function() {
console.log('another subscribe.')
app$.subscribe(printB)
}, 3000)复制代码
这更加Observable,同时咱们也达到了Observable多播化的目的,破费!
兜了一大圈回到publishReplay,再看下面的源码就更清楚了许多
export function publishReplay(bufferSize = Number.POSITIVE_INFINITY, windowTime = Number.POSITIVE_INFINITY, scheduler) {
return multicast.call(this, new ReplaySubject(bufferSize, windowTime, scheduler));
}复制代码
publishReplay自己就是observable.multicast(new ReplaySubject)的语法糖,那么咱们就来看下ReplaySubject是个啥。先上一个小栗子
const printA = (val) => console.log('observerA :' + val)
const printB = (val) => console.log('observerB :' + val)
var subject = new Rx.ReplaySubject(3);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.subscribe({
next: (v) => console.log('observerC: ' + v)
});复制代码
能够看出后两次subscribe,就打印出了前三次
可观察对象产生的值,这有点像Observable订阅,但又不会建立新的Observable实例,这种带有从新发送之前数据的能力就是ReplaySubject了,所以下面两端代码是所实现的功能是同样的
var app$ = Rx.Observable.interval(1000).multicast(new Rx.ReplaySubject(3)).refCount()
app$.subscribe(printA)
setTimeout(function () {
app$.subscribe(printB)
}, 3000)复制代码
var app$ = Rx.Observable.interval(1000).publishReplay(3).refCount()
app$.subscribe(printA)
setTimeout(function () {
app$.subscribe(printB)
}, 3000)复制代码
通过一个个引伸咱们掌握了很多rxjs的核心知识点和api使用,那么回到demo上,咱们已经完成了每两秒完成一次rest请求,下面咱们先完成这样一个任务,当咱们缓存到第23个点时,后面每新增一个点打印update画图
。联系以前的内容,首先咱们要有一个buffersize为24的ReplaySubject实例。每次订阅都会产生以前24个值,可是这里会有个问题须要经过订阅来获取旧的值,订阅完之后其实这个订阅就没有意义了,Replay功能的基础其实就是buffer能力,但Subject提供的这种Replay能力倒是cold、lazy的,咱们更但愿这种replay能力能够更hot,当到达一个bufferSize,就自动把这个bufferSize的数据produce出来,这有点像interval,通过一个时间间隔就produce一个数据,那么有没有相似intervalBuffer这种的静态operator呢:),咱们先来搜搜和buffer有关的API。
一看这个bufferCount好像挺适合咱们的,估计是buffer了count个数据后,就会产生count个buffer数据。仍是看个小栗子
var source$ = Rx.Observable.interval(1000)
var buffer$ = source$.bufferCount(10)
buffer$.subscribe(x => console.log(x))复制代码
从图中能够看到每隔10秒打印出了一组长度为10的数字,这显然不是咱们想要的,咱们但愿每秒打印出一组数字,且丢弃最旧的一个数字,看下bufferCount的函数签名,
public bufferCount(bufferSize: number, startBufferEvery: number): Observable
bufferCount还接受第二个参数,该参数表明了表明了计算bufferSize的起始位置,第一次达到bufferSize就produce,而从第二次起bufferSize从上一次buffer数据的startBufferEvery开始计算,也就是说当第一次produce后,bufferCount为bufferSize-startBufferEvery,也就是还须要缓存startBufferEvery个才会produce下一个buffer。改造下上一个栗子。
var source$ = Rx.Observable.interval(1000)
var buffer$ = source$.bufferCount(10, 1)
buffer$.subscribe(x => console.log(x))复制代码
能够看到达到了咱们预期。如今咱们完成子任务2,这里为了演示方便缓存5个点。
const print = x => console.log('x: ', x)
const intervalEmit$ = Observable.interval(2000)
const fetch$ = intervalEmit$.switchMap(e => Observable.fromPromise(Mock.fetch()))
const app$ = fetch$.bufferCount(5, 1).do('update画图')
app$.subscribe(print)复制代码
OK!
下面咱们完成画图功能。
const line = new LineChart(document.getElementById('showAge') as HTMLDivElement)
line.setOptions({
title: {
left: 'center',
text: '动态数据(年龄)'
},
xAxis: {
type: 'time',
splitLine: {
show: false
}
},
yAxis: {
type: 'value',
boundaryGap: [0, '100%'],
splitLine: {
show: false
}
},
series: [{
type: 'line',
data: []
}]
})
line.showLoading()
const now = new Date().getTime()
const span = 2 * 1000
const bufferSize = 12
let counter = 0
const intervalEmit$ = Observable.interval(span)
const fetch$ = intervalEmit$.switchMap(e => Observable.fromPromise(Mock.fetch()))
const app$ = fetch$.bufferCount(bufferSize, 1).map(
buffer => {
counter === 0 && line.hideLoading()
const points = buffer.map((b, index) => {
const point = []
point[0] = now + index * span + span * counter
point[1] = b
return point
})
counter++
return points
}
).do(data => {
debugger;
line.setOptions({
series: [{
data
}]
})
})
app$.subscribe()复制代码
效果以下
一个简单的实时监控折线图的demo就完成了,因为本人也是初学rxjs,一些知识点不免会有疏漏,但也尽可能作到不误导,相信你们仍是会有些收获的。