函数式编程 | 打造自定义的RxJs操做符

map_operator.jpeg

最近在学习函数式编程相关的知识,这篇文章源于学习自定义RxJs操做符的一点想法,若有理解不正确的地方,还请指出。html


第一步:搭建环境

1. 安装项目依赖
mkdir rxjs && cd rxjs
yarn init -y
yarn add rxjs rxjs-compat webpack webpack-dev-server typescript ts-loader --save
yarn add webpack-cli --save-dev
2. 设置webpack和typescript

① 建立webpack.cofig.js文件node

const path = require('path');

module.exports = {
  entry: './src/index.ts',
  devtool: 'inline-source-map',
  module: {
    rules: [
      {
        test: /\.tsx?$/,
        use: 'ts-loader',
        exclude: /node_modules/
      }
    ]
  },
  resolve: {
    extensions: [ '.ts', '.js', '.tsx' ]
  },
  output: {
    filename: 'bundle.js',
    path: path.resolve(__dirname, 'dist')
  }
};

② 建立typescript.json文件webpack

{
  "compilerOptions": {
    "outDir": "./dist/",
    "noImplicitAny": true,
    "module": "es6",
    "moduleResolution": "node",
    "sourceMap": true,
    "target": "es6",
    "typeRoots": [
      "node_modules/@types"
    ],
    "lib": [
      "es2017",
      "dom"
    ]
  }
}

③ 在package.json文件下添加启动命令es6

"scripts": {
    "start": "webpack-dev-server --mode development"
  },
3. 完成项目设置

① 建立index.html并复制粘贴以下代码web

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Learn RxJS with Coursetro</title>

    <style>
        body { font-family: 'Arial'; background: #ececec; }
        ul { list-style-type: none; padding: 20px; }
        li { padding: 20px; background: white; margin-bottom: 5px; }
    </style>
</head>
<body>
    <script src="/bundle.js"></script>
</body>
</html>

② 建立src文件夹,在项目下建立``index.tsajax

import * as Rx from "rxjs/Observable";

console.log(Rx);

③ 在浏览器中访问 http://localhost:8080,打开控制台便可看到 Rx 对象数据typescript

第二步:建立通用subscriber

import {from} from "rxjs";

const observable$ = from([1,2,3,4,5]);

const subscriber = {
    next: (value: any) => console.log(value),
    complete: () => console.log('has something done.'),
    error: (err: any) => console.error('something error: ', err)
};

observable$.subscribe(subscriber);

// 输出:
// 1
// 2
// 3
// 4
// 5

第三步:继承Subscriber并重写_next函数

class DoubleSubscriber extends Subscriber<number> {
    protected _next(value: number): void {
        // @ts-ignore
        this.destination.next(value * 2);
    }
}

observable$.subscribe(new DoubleSubscriber(subscriber));

// 输出:
// 2
// 4
// 6
// 8
// 10

第四步:链接source和subscribe,建立一个基础 pipe 操做符

① 手动链接shell

const double = (source: any) => {
  const o$ = new Observable();
  o$.source = source;
  o$.operator = {
      call(sub, source) {
          // @ts-ignore
          source.subscribe(new DoubleSubscriber(sub))
      }
  };
  return o$;
};

observable$.pipe(double).subscribe(subscriber);

② 使用lift连接source和subscriber编程

const double = (source: any) => source.lift({
    call(subscriber: Subscriber<number>, source: any): any {
        source.subscribe(new DoubleSubscriber(subscriber))
    }
});

第五步:建立一个可复用的 操做符

① 使用lift建立可复用的pipe操做符json

// src/operators/multiply/index.ts
/**
 * @format
 * @Author: Alvin
 * @Date 2020-03-05
 * @Last modified by: Alvin
 * @Last modified time: 2020-03-05
 */
import {Subscriber} from "rxjs";
import {PartialObserver} from "rxjs/src/internal/types";

class MultiplySubscriber extends Subscriber<number> {
    constructor(subscriber: PartialObserver<any> | ((value: number) => void), number: number) {
        super(subscriber);
        // @ts-ignore
        this.number = number;
    }
    protected _next(value: number): void {
        // @ts-ignore
        this.destination.next(value * this.number);
    }
}

export const multiply = (number: number) => (source: any) => source.lift({
    call(subscriber: Subscriber<number>, source: any): any {
        source.subscribe(new MultiplySubscriber(subscriber, number))
    }
});


// src/index.ts
observable$.pipe(multiply(3)).subscribe(subscriber);
observable$.pipe(multiply(45)).subscribe(subscriber);

② 复用现有的map实现相同功能的multiply

import {map} from "rxjs/operators";

export const multiply = (number: number) => map((value: number) => value * number);

③ 自定义pipe构建自定义操做符

const pipe = (...fns: Array<Function>) => (source: any) => fns.reduce((acc: any, fn: any) => fn(acc), source)

export const multiply = (number: number) => pipe(
    map((value: number) => value * number),
    filter((value: number) => value < 100)
);

observable$.pipe(multiply(3)).subscribe(subscriber);
observable$.pipe(multiply(45)).subscribe(subscriber);

第六步:建立有RXJS特点的高阶map

在前面咱们拆解了与map相似的操做符multiply的实现方式,这个操做符其实很是好理解,由于有JavaScript原生的map对应。接下来分析真正有RxJs特点的高阶map —— 把Observable对象玩得更加出神入化的操做符。

① mergeMap

mergeMap marble diagram

mergeMap可以解决异步操做的问题,最典型的应该属于AJAX请求的处理。在网页应用当中,每点击按钮一次就发送一个AJAX

请求给server,同时还要根据返回的结果在ui上更新状态。传统的方法来解决这样的异步操做代码会比较繁杂。

mergeMap把用户的点击操做看做一个数据流。把AJAX的返回结果也看作一个数据流,为本来繁杂的解决方式提供了一种简单解。

fromEvent(document.querySelector("#send"), 'click').pipe(
    mergeMap(() => ajax("apiUrl"))
).subscribe((result: any) => {
    // 常规处理ajax返回的结果
})

稍微讲解了merge的使用场景,接下来看看若是手动实现这样的功能,该如何作呢?

import {fromEvent, Observable, of, Subscriber} from "rxjs";
import {PartialObserver} from "rxjs/src/internal/types";
import {delay, scan} from "rxjs/operators";

class MergeMapSubscriber extends Subscriber<Function> {
    constructor(
        destination: PartialObserver<any> | ((value: any) => void),
        private readonly func: Function
    ) {
        super(destination);
        this.func = func;
    }

    protected _next(value: any): void {
        const o$ = this.func(value);

        o$.subscribe({
            next: (value: any) => {
                this.destination.next(value);
            }
        })
    }
}

const mergeMap = (func: any) => (source: Observable<any>) => source.lift({
    call(subscriber: Subscriber<any>, source: any): any {
        source.subscribe(new MergeMapSubscriber(subscriber, func))
    }
});

const observable$ = fromEvent(document, 'click').pipe(
    scan(i => i + 1, 0),
    mergeMap((value: any) => of(value).pipe(delay(500)))
);

const subscriber = {
    next: (value: any) => console.log("subscriber function output ::: ", value),
    complete: () => console.log('has something done.'),
    error: (err: any) => console.error('something error: ', err)
};

observable$.subscribe(subscriber);

② switchMap

switchMap marble diagram

上面介绍了一个适合于AJAX的mergeMap,但mergeMap会存在一个问题,每个上游数据都将会引起调用Ajax并且会将每个Ajax结果传递给下游。这样的处理方式彷佛并非适合全部的场景。好比对股票等网页系统等对数据显示实时性要求比较高的状况下相对来讲,若是没有处理好将会出现重大经济损失。而switchMap正是解决了mergeMap这样弊端的操做符。

import {fromEvent, Observable, of, Subscriber} from "rxjs";
import {PartialObserver} from "rxjs/src/internal/types";
import {delay, scan} from "rxjs/operators";

class SwitchMapSubscriber extends Subscriber<Function> {
    private innerSubscription: any;
    constructor(
        destination: PartialObserver<any> | ((value: any) => void),
        private readonly func: Function
    ) {
        super(destination);
        this.func = func;
    }

    protected _next(value: any): void {
        const o$ = this.func(value);

          // 保证获取到最新的数据流
        if(this.innerSubscription) {
            this.innerSubscription.unsubscribe();
        }

        this.innerSubscription = o$.subscribe({
            next: (value: any) => {
                this.destination.next(value);
            }
        })
    }
}

const switchMap = (func: any) => (source: Observable<any>) => source.lift({
    call(subscriber: Subscriber<any>, source: any): any {
        source.subscribe(new SwitchMapSubscriber(subscriber, func))
    }
});

export const observable$ = fromEvent(document, 'click').pipe(
    scan(i => i + 1, 0),
    switchMap((value: any) => of(value).pipe(delay(500)))
);

③ concatMap

concatMap marble diagram

concatMap的模式与switchMap基本类似,差异在于获取最新的数据流后concatMap将其顺序推动数据流,而switchMap则将本来的数据流取消订阅,转而订阅最新的数据流。

class ConcatMapSubscriber extends Subscriber<Function> {
    private innerSubscription: any;
    private buffer: Array<any> = [];
    constructor(
        destination: PartialObserver<any> | ((value: any) => void),
        private readonly func: Function
    ) {
        super(destination);
        this.func = func;
    }

    protected _next(value: any): void {
        const {isStopped} = this.innerSubscription || {isStopped: true};

        if(!isStopped) {
            this.buffer = [...this.buffer, value];
        } else {
            const o$ = this.func(value);
            this.innerSubscription = o$.subscribe({
                next: (value: any) => {
                    this.destination.next(value);
                },
                complete: () => {
                    if(this.buffer.length) {
                        const [first, ...rest] = this.buffer;
                        this.buffer = rest;
                        this._next(first);
                    }
                }
            });
                        
              // 顺序推动数据流
            this.add(this.innerSubscription);
        }
    }
}

const concatMap = (func: any) => (source: Observable<any>) => source.lift({
    call(subscriber: Subscriber<any>, source: any): any {
        source.subscribe(new ConcatMapSubscriber(subscriber, func))
    }
});

export const observable$ = fromEvent(document, 'click').pipe(
    scan(i => i + 1, 0),
    concatMap((value: any) => of(value).pipe(delay(500))),
    takeUntil(fromEvent(document, 'keydown'))
);
相关文章
相关标签/搜索