最近在学习函数式编程相关的知识,这篇文章源于学习自定义RxJs操做符的一点想法,若有理解不正确的地方,还请指出。html
mkdir rxjs && cd rxjs yarn init -y yarn add rxjs rxjs-compat webpack webpack-dev-server typescript ts-loader --save yarn add webpack-cli --save-dev
① 建立webpack.cofig.js文件node
const path = require('path'); module.exports = { entry: './src/index.ts', devtool: 'inline-source-map', module: { rules: [ { test: /\.tsx?$/, use: 'ts-loader', exclude: /node_modules/ } ] }, resolve: { extensions: [ '.ts', '.js', '.tsx' ] }, output: { filename: 'bundle.js', path: path.resolve(__dirname, 'dist') } };
② 建立typescript.json文件webpack
{ "compilerOptions": { "outDir": "./dist/", "noImplicitAny": true, "module": "es6", "moduleResolution": "node", "sourceMap": true, "target": "es6", "typeRoots": [ "node_modules/@types" ], "lib": [ "es2017", "dom" ] } }
③ 在package.json文件下添加启动命令es6
"scripts": { "start": "webpack-dev-server --mode development" },
① 建立index.html并复制粘贴以下代码web
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta http-equiv="X-UA-Compatible" content="ie=edge"> <title>Learn RxJS with Coursetro</title> <style> body { font-family: 'Arial'; background: #ececec; } ul { list-style-type: none; padding: 20px; } li { padding: 20px; background: white; margin-bottom: 5px; } </style> </head> <body> <script src="/bundle.js"></script> </body> </html>
② 建立src
文件夹,在项目下建立``index.tsajax
import * as Rx from "rxjs/Observable"; console.log(Rx);
③ 在浏览器中访问 http://localhost:8080,打开控制台便可看到 Rx 对象数据typescript
import {from} from "rxjs"; const observable$ = from([1,2,3,4,5]); const subscriber = { next: (value: any) => console.log(value), complete: () => console.log('has something done.'), error: (err: any) => console.error('something error: ', err) }; observable$.subscribe(subscriber); // 输出: // 1 // 2 // 3 // 4 // 5
Subscriber
并重写_next
函数class DoubleSubscriber extends Subscriber<number> { protected _next(value: number): void { // @ts-ignore this.destination.next(value * 2); } } observable$.subscribe(new DoubleSubscriber(subscriber)); // 输出: // 2 // 4 // 6 // 8 // 10
① 手动链接shell
const double = (source: any) => { const o$ = new Observable(); o$.source = source; o$.operator = { call(sub, source) { // @ts-ignore source.subscribe(new DoubleSubscriber(sub)) } }; return o$; }; observable$.pipe(double).subscribe(subscriber);
② 使用lift
连接source和subscriber编程
const double = (source: any) => source.lift({ call(subscriber: Subscriber<number>, source: any): any { source.subscribe(new DoubleSubscriber(subscriber)) } });
① 使用lift建立可复用的pipe操做符json
// src/operators/multiply/index.ts /** * @format * @Author: Alvin * @Date 2020-03-05 * @Last modified by: Alvin * @Last modified time: 2020-03-05 */ import {Subscriber} from "rxjs"; import {PartialObserver} from "rxjs/src/internal/types"; class MultiplySubscriber extends Subscriber<number> { constructor(subscriber: PartialObserver<any> | ((value: number) => void), number: number) { super(subscriber); // @ts-ignore this.number = number; } protected _next(value: number): void { // @ts-ignore this.destination.next(value * this.number); } } export const multiply = (number: number) => (source: any) => source.lift({ call(subscriber: Subscriber<number>, source: any): any { source.subscribe(new MultiplySubscriber(subscriber, number)) } }); // src/index.ts observable$.pipe(multiply(3)).subscribe(subscriber); observable$.pipe(multiply(45)).subscribe(subscriber);
② 复用现有的map实现相同功能的multiply
import {map} from "rxjs/operators"; export const multiply = (number: number) => map((value: number) => value * number);
③ 自定义pipe
构建自定义操做符
const pipe = (...fns: Array<Function>) => (source: any) => fns.reduce((acc: any, fn: any) => fn(acc), source) export const multiply = (number: number) => pipe( map((value: number) => value * number), filter((value: number) => value < 100) ); observable$.pipe(multiply(3)).subscribe(subscriber); observable$.pipe(multiply(45)).subscribe(subscriber);
在前面咱们拆解了与map相似的操做符multiply
的实现方式,这个操做符其实很是好理解,由于有JavaScript原生的map对应。接下来分析真正有RxJs特点的高阶map —— 把Observable对象玩得更加出神入化的操做符。
① mergeMap
mergeMap可以解决异步操做的问题,最典型的应该属于AJAX请求的处理。在网页应用当中,每点击按钮一次就发送一个AJAX
请求给server,同时还要根据返回的结果在ui上更新状态。传统的方法来解决这样的异步操做代码会比较繁杂。
mergeMap把用户的点击操做看做一个数据流。把AJAX的返回结果也看作一个数据流,为本来繁杂的解决方式提供了一种简单解。
fromEvent(document.querySelector("#send"), 'click').pipe( mergeMap(() => ajax("apiUrl")) ).subscribe((result: any) => { // 常规处理ajax返回的结果 })
稍微讲解了merge的使用场景,接下来看看若是手动实现这样的功能,该如何作呢?
import {fromEvent, Observable, of, Subscriber} from "rxjs"; import {PartialObserver} from "rxjs/src/internal/types"; import {delay, scan} from "rxjs/operators"; class MergeMapSubscriber extends Subscriber<Function> { constructor( destination: PartialObserver<any> | ((value: any) => void), private readonly func: Function ) { super(destination); this.func = func; } protected _next(value: any): void { const o$ = this.func(value); o$.subscribe({ next: (value: any) => { this.destination.next(value); } }) } } const mergeMap = (func: any) => (source: Observable<any>) => source.lift({ call(subscriber: Subscriber<any>, source: any): any { source.subscribe(new MergeMapSubscriber(subscriber, func)) } }); const observable$ = fromEvent(document, 'click').pipe( scan(i => i + 1, 0), mergeMap((value: any) => of(value).pipe(delay(500))) ); const subscriber = { next: (value: any) => console.log("subscriber function output ::: ", value), complete: () => console.log('has something done.'), error: (err: any) => console.error('something error: ', err) }; observable$.subscribe(subscriber);
② switchMap
上面介绍了一个适合于AJAX的mergeMap,但mergeMap会存在一个问题,每个上游数据都将会引起调用Ajax并且会将每个Ajax结果传递给下游。这样的处理方式彷佛并非适合全部的场景。好比对股票等网页系统等对数据显示实时性要求比较高的状况下相对来讲,若是没有处理好将会出现重大经济损失。而switchMap正是解决了mergeMap这样弊端的操做符。
import {fromEvent, Observable, of, Subscriber} from "rxjs"; import {PartialObserver} from "rxjs/src/internal/types"; import {delay, scan} from "rxjs/operators"; class SwitchMapSubscriber extends Subscriber<Function> { private innerSubscription: any; constructor( destination: PartialObserver<any> | ((value: any) => void), private readonly func: Function ) { super(destination); this.func = func; } protected _next(value: any): void { const o$ = this.func(value); // 保证获取到最新的数据流 if(this.innerSubscription) { this.innerSubscription.unsubscribe(); } this.innerSubscription = o$.subscribe({ next: (value: any) => { this.destination.next(value); } }) } } const switchMap = (func: any) => (source: Observable<any>) => source.lift({ call(subscriber: Subscriber<any>, source: any): any { source.subscribe(new SwitchMapSubscriber(subscriber, func)) } }); export const observable$ = fromEvent(document, 'click').pipe( scan(i => i + 1, 0), switchMap((value: any) => of(value).pipe(delay(500))) );
③ concatMap
concatMap的模式与switchMap基本类似,差异在于获取最新的数据流后concatMap将其顺序推动数据流,而switchMap则将本来的数据流取消订阅,转而订阅最新的数据流。
class ConcatMapSubscriber extends Subscriber<Function> { private innerSubscription: any; private buffer: Array<any> = []; constructor( destination: PartialObserver<any> | ((value: any) => void), private readonly func: Function ) { super(destination); this.func = func; } protected _next(value: any): void { const {isStopped} = this.innerSubscription || {isStopped: true}; if(!isStopped) { this.buffer = [...this.buffer, value]; } else { const o$ = this.func(value); this.innerSubscription = o$.subscribe({ next: (value: any) => { this.destination.next(value); }, complete: () => { if(this.buffer.length) { const [first, ...rest] = this.buffer; this.buffer = rest; this._next(first); } } }); // 顺序推动数据流 this.add(this.innerSubscription); } } } const concatMap = (func: any) => (source: Observable<any>) => source.lift({ call(subscriber: Subscriber<any>, source: any): any { source.subscribe(new ConcatMapSubscriber(subscriber, func)) } }); export const observable$ = fromEvent(document, 'click').pipe( scan(i => i + 1, 0), concatMap((value: any) => of(value).pipe(delay(500))), takeUntil(fromEvent(document, 'keydown')) );