RxJS与Redux结合使用(一):打造本身的redux-observable

背景

Redux 的核心理念是单向数据流,只能经过 dispatch(action) 的方式修改状态,使用react-redux能够在组件和redux之间造成下面这么一个数据流闭环:javascript

view ->  action -> reducer -> state -> view
复制代码

然而,在实际业务中每每有大量异步场景,最直接的作法是在React组件中发起异步请求,在拿到数据后调用dispatch(action)去数据层修改数据。不过这样的作法使得视图层和数据层耦合在一块儿,会形成后期维护的困难。java

Redux做者建议用中间件来处理异步流,由于在中间件中咱们能够灵活地控制 dispatch的时机,这对于处理异步场景很是有效。较为常见的作法主要有两种:react

  1. 更改action的类型,如redux-thunk,用函数替换了action;
  2. 在middleware中接收到action的时候作出一些对应处理,如redux-saga。

而咱们今天要讲的rxjs与redux的结合,采用了第二种方式来处理异步流程。编程

中间件干预处理

市面上已经存在这么个中间件了:redux-observable。而咱们今天要作的就是带领你们,一步一步慢慢实现本身的一个redux-observable。redux

这个中间件的原理我能够简化为下面的代码:api

export default store => next => action => {
    const result = next(action);
    if (action.type === 'ping') {
        store.dispatch({ type: 'pong' })
    }
    return result;
}
复制代码

原理实在简单,在next(action)以后去根据action作判断,作一些异步逻辑,再发起dispatch修改数据便可,而redux-observable也只是在这个基础之上加入RxJs的一些特性。bash

处理异步逻辑的思路

若是你比较熟悉redux的话,就会知道,redux的中间件就是一个洋葱模型,上面咱们也说了,咱们会在中间件的后面根据你的action再去从新dispatch一些action,而Rxjs最核心的思想就是将数据都流化。因此你能够理解为action在中间件的末端流入一个管道,最后从管道又流出一些action,这些action最终会再次被store dispatch。异步

image

至于在这个管道中进行了什么样的变化、操做,那就是Rxjs的管辖范围,经过Rxjs的强大的操做符,咱们能够很是优雅地实现异步逻辑。async

因此,须要有个流来承载全部的action,这样你就能够经过这个action$来进行fetch:模块化

action$.pipe(
    switchMap(
        () => fromPromise(fetch('/api/whatever')).pipe(
            map(res => action)
        )
    ),
    catchError(() => {})
)
复制代码

这样就将异步逻辑嵌入到流当中。

建立Action流

咱们的核心思想是action in, action out,因此最终流出的action是要从新被store.dispatch消费的,因此action$是一个Observable对象。

同时,在dispatch的时候,action通过中间件,action中须要放入这个action,因此action也是一个observer。

所以,action$既是观察者又是可观察对象,是一个Subject对象:

替换中间件的简单写法,变成:

import { Subject } from 'rxjs/Subject';

export default (store) => {

  const action$ = new Subject();
  
  action$.subscribe(store.dispatch);
  
  return next => (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};
复制代码

在上面代码中咱们在middleware中去放入action,而后经过订阅,store会触发dispatch。

可是,若是咱们就这么写的话,这是个死循环,由于任何action在进入到action后就立马被消费者store.dipatch(action)执行了,这个action又会在后面的流程中从新被送到action去。

聪明的你应该想到了,咱们对于进入到action$的action尚未进行任何过滤,而这个过滤过程也正是咱们须要的处理异步逻辑的地方。

下面咱们要把这个步骤加上。

流的转化器Epic

为了达到action的一个转化处理,咱们将这个过程抽离出来,这个中间处理的逻辑称为Epic,epic的形式大概能够写为:

const epic = (action$) => {
    return action$.pipe(
        // 由于全部的action都会过来
        // 因此咱们只须要处理咱们想要的aciton
        filter(action => action.type === 'GET_USER'),
        switchMap(
            // 将fetch也转化为流
            () => fromPromise(fetch('/api/user/get', {
                method: 'POST',
                body: {
                    id: 1
                },
            })).pipe(
                map(res => ({ type: 'GET_USER_SUCCESS', payload: res })),
                catchError(error => ({ type: 'GET_USER_FAILED', payload: error }))
            )
        )
    )
}

复制代码

epic本质是一个函数,在这个函数中,咱们在action$的基础上,加入了管道控制,产生了另一个流,而这个流就是最终咱们要的,对action进行了控制的action流,上面的fetch只是一个例子,在这个管道中,你能够处理任意的异步逻辑。

而咱们要作的就是将这个Epic,整合进刚才的中间中。

作法也很简单,咱们只须要将订阅从action$换到新的流上就能够了:

import { Subject } from 'rxjs/Subject';

export default (store) => {

  const action$ = new Subject();
  const newAction$ = epic(action$);
  
  newAction$.subscribe(store.dispatch);
  
  return next => (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};
复制代码

这样,action$在接收到新的action的时候,会流经epic定义的管道,而后才出发dispatch

多个Epic合并

到此,咱们的中间件已经有初步处理异步逻辑的能力,可是,在现实中,咱们的异步逻辑不可能只有一个,因此epic是会有不少的,而store去订阅的流只能是一个,因此这么多的epic产生的流要合并成一个流。

合并流的操做,强大的RxJs天然是有安排的,相信你想到了操做符merge,咱们能够提供一个combineEpics的函数:

export const combineEpics = (...epics) => {
  const merger = (...args) => merge(
    ...epics.map((epic) => {
      const output$ = epic(...args);
      return output$;
    })
  );
  return merger;
};
复制代码

上面的代码不难理解,combineEpics整合了全部传入的epic,而后返回一个merger,这个merger是利用merge操做符,将全部的epic产生的流合并成一个流。

image

代码形式为:

const pingEpic = action$ => action$.pipe(
  filter(action => action.type === 'ping'),
  map(() => ({ type: 'pong' })),
);

const getUserEpic = action$ => action$.pipe(
  filter(action => action.type === 'GET_USER'),
  map(() => ({ type: 'GET_USER_SUCCESS', payload: { user: { name: 'kang' } } })),
);

const rootEpic = combineEpics(pingEpic, getUserEpic);

export default (store) => {
  const action$ = new Subject();
  const newAction$ = rootEpic(action$);

  newAction$.subscribe(store.dispatch);
  return next => (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};
复制代码

state获取

在epic中咱们不可避免地要借助state里面的数据进行不一样的处理,因此咱们是须要获取到state的,因此你能够在中间件中的epci执行函数中添加一个参数,将state获取函数暴露出去:

export default (store) => {
  ...
  const newAction$ = rootEpic(action$, store.getState);
  ...
};
复制代码

这样epic里就能够用getState()获取state:

const pingEpic = (action$, getState) => action$.pipe(
  filter(action => action.type === 'ping'),
  map(() => ({ type: 'pong', payload: getState() })),
);
复制代码

进一步优化:将state也流化

上面的作法是直接去获取state,这样的作法是主动获取,不符合函数响应式编程模式。函数响应式中,state的改变状态,应该是要能被观察的。

当state也能被响应观察,咱们就能够作更多的功能,例如:当state的某些数据在发生变化的时候,咱们要去进行实时保存。

在传统模式的作法中,你能够在中间件中这样写:

export default store => next => action => {
    const oldState = store.getState();
    const result = next(action);
    const newState = store.getState();
    // 相似这样的写法
    if (newState.xxx !== oldState.xxx) {
        fetch('/api/save', {
            method: 'POST',
            body: {
            
            }
        }).then(() => {}).catch(() => {})
    }
    return result;
}
复制代码

这个处理逻辑要独立为一个中间件,而若是你将state也流化,你能够直接使用epic这样处理:

const saveEpic = (action$, state$) => state$.pipe(
const autoSaveEpic = (action$, state$) =>
  return action$.pipe(
    filter(action => action.type === 'AUTO_SAVE_ENABLE'), // 自动保存的开启
    exhaustMap(() => state$.pipe(
        pluck('xxx'), // 获取state.xxx
        distinctUntilChanged(), // 先后值不一样时才将其发出。
        concatMap((value) => {
            // fetch to save
        }),
        // 自动保存的关闭
        takeUntil(action$.pipe(
            filter(action => action.type === 'AUTO_SAVE_DISABLE')
        ))
    ))
  )
)
复制代码

若是仔细阅读这段代码,能够发现这样的方式可使用很是优雅的方式控制这个自动保存,能够和action$结合使用,快速开关自动保存,能够利用RxJs的特性解决保存的异步执行延迟问题。

若是你只是单存想要获取最新state,可使用withLatestFrom操做符:

const countEpic = (action$, state$) => action$.pipe(
  filter(action => action.type === 'count'),
  withLatestFrom(state$),
  switchMap(([action, state]) => {
    return of({ type: 'whatever' });
  })
);
复制代码

在中间件加入state流:

export default (store) => {
  const action$ = new Subject();
  const state$ = new Subject();
  const source$ = rootEpic(action$, state$);

  source$.subscribe(store.dispatch);
  return next => (action) => {
    const result = next(action);
    state$.next(store.getState());
    action$.next(action);
    return result;
  };
};
复制代码

注意state.next要先执行,这样在epic中才会拿到最新的,另外能够知道一下,redux在init的时候不会通过中间件,因此当你没有dispatch任何action的时候,state最新值不是默认state。

Action的顺序问题

若是你有耐心看到这里,那么说明你对于redux结合RxJs的使用已经理解得差很少了,可是这里仍是有个问题,就是action的生效顺序,咱们能够直接看个例子说明,假设有下面这样两个epic:

const epic1 = action$ => action$.pipe(
  filter(action => action.type === 'one'),
  mergeMap(() => of({ type: 'two' }, { type: 'three' })),
);

const epic2 = action$ => action$.pipe(
  filter(action => action.type === 'two'),
  mergeMap(() => of({ type: 'four' })),
);
复制代码

store.dispatch({ type: 'one' }) 的时候,action的顺序为:

'one' -> 'two' -> 'four' -> 'three'
复制代码

可见,action的执行顺序并非如咱们预期的那样,在two触发后就发出了four,这是由于RxJs默认的调度器是同步的,用一段简单的代码,上面的效果相似于:

class Print {
  constructor(name, nexts = []) {
    this.name = name;
    this.nexts = nexts;
  }
  print() {
    console.log(this.name);
    this.nexts.forEach((p) => {
      p.print();
    });
  }
}
const three = new Print('three');
const four = new Print('four');
const two = new Print('two', [four]);
const one = new Print('one', [two, three]);
one.print(); // one, two, four, three
复制代码

换成上面的代码的话你就不陌生了吧,也会对于输出的结果表示确定,可是咱们须要的效果是

'one' -> 'two' -> 'three' -> 'four'
复制代码

这如何作到?

明显,须要将调度器换成其余的,RxJs有这么几种调度器:null(同步)、asap、queue、async、animationFrame。最后一种是动画场景的调度器,直接剔除,默认是第一种,那么就剩下asap、queue、async。在这个场景下,这三种调度器都是可行的,可是queue在大量的数据的时候对于性能是有利的,因此这里可使用它。不过,记住,这三种调度器是有区别的,你们有兴趣的本身去google一下,只提示:asap是Micro Task、async是Macro Task、queue在延迟为0的时候接近于同步,在延迟不为0的时候与async同样。

中间件中:

const action$ = new Subject().pipe(
    observeOn(queue)
  );
复制代码

这样获得的结果就是:

'one' -> 'two' -> 'three' -> 'four'
复制代码

若是用简单的代码,至关于发生了这样的变化:

class Print {
  constructor(name, nexts = []) {
    this.name = name;
    this.nexts = nexts;
  }
  print() {
    console.log(this.name);
    this.nexts.forEach((p) => {
      setTimeout(() => p.print(), 0);
    });
  }
}
const three = new Print('three');
const four = new Print('four');
const two = new Print('two', [four]);
const one = new Print('one', [two, three]);
one.print(); // one, two, three, four

复制代码

总结

本文就讲到这里,此次介绍了如何本身实现一个redux-observable,下次会讲redux-observable在实战中的一些应用,例如怎么相似dva那样进行模块化开发、如何统一处理loading、error等。

相关文章
相关标签/搜索