如何实现一个 redux-observable

本文是 《使用 RxJS + Redux 管理应用状态》系列第二篇文章,将会介绍 redux-observable 的设计哲学和实现思路。返回第一篇:使用 redux-observable 实现组件自治html

本系列的文章地址汇总:前端

Redux

Redux 脱胎于 Elm 架构,其状态管理视角和流程很是清晰和明确:react

  1. dispatch 了一个 action
  2. reducer 俘获 action,并根据 action 类型进行不一样的状态更新逻辑
  3. 周而复始地进行这个过程

这个过程是同步的,Redux 为了保护 reducer 的纯度是不推荐在 reducer 中处理反作用的(如 HTTP 请求)。所以,就出现了 redux-thunk、redux-saga 这样的 Redux 中间件去处理反作用。git

这些中间件本质都是俘获 dispatch 的内容,并在这个过程当中进行反作用处理,最终 dispatch 一个新的 action 给 reducer,让 reducer 专心作一个纯的状态机。es6

用 observable 管理反作用

假定咱们在 UI 层能派发出一个数据拉取的 FETCH action,拉取数据后,将派发拉取成功的 FETCH_SUCCESS action 或者是数据拉取失败的 FETCH_ERROR action 到 reducer。github

FETCH
             |
       fetching data...
             |
            / \
           /   \
 FETCH_SUCCESS FETCH_ERROR
复制代码

若是咱们用 FRP 模式来思考这个过程,FETCH 就不是一个独立的个体,而是存在于一条会派发 FETCH action 的流上(observable):编程

---- FETCH ---- FETCH ---- 

---- FETCH_SUCCESS ---- FETCH_SUCCESS ----

---- FETCH_ERROR ---- FETCH_ERROR ----
复制代码

若咱们将 FETCH 流定义为 fetch$,则 FETCH_SUCCESS 和 FETCH_ERROR 都未来自于 fetch$redux

const fetch$: Observable<FetchAction> = //....
fetch$.pipe(
  switchMap(() => from(api.fetch).pipe(
    // 拉取数据成功
    switchMap(resp => ({
      type: FETCH_SUCCESS,
      payload: {
        // ...
      }
    }),
    // 拉取数据失败
    catchError(error => of({
      type: FETCH_ERROR,
      payload: {
        // ....
      }
    }))
  ))
)
复制代码

除此以外,咱们能够用一个流来承载页面全部的 action:api

const action$: Observable<Action>
复制代码

那么, fetch$ 亦能够由 action$ 流转获得:缓存

const fetch$ = action$.pipe(
  filter(({type}) => type === FETCH)
)
复制代码

这样,咱们就造成了使用 observable 流转 action 的模式:

接下来,咱们尝试讲这个模式整合到 Redux 中,让 observable 来负责应用的 action 流转和反作用处理。

构建中间件

Redux 提供的中间件机制能让咱们干预每一个到来的 action, 借此处理一些业务逻辑,而后再返还一个 action 给 reducer:

中间件的函数构成以下:

const middleware: Middleware = store => {
  // 初始化中间件
  return next => action => { 
  	// do something
  }
}

const store = createStore(
  rootReducer,
  applyMiddleware(middleware)
)
复制代码

如今,当中间件初始化时,咱们进行 action$ 。当新的 action 到来时:

  1. 将 action 交给 reducer 处理
  2. action$ 中放入 action
  3. action$ 能够转化另外一个的 action 流

所以,action$ 既是观察者又是可观察对象,是一个 Subject 对象:

const createMiddleware = (): Middleware => {
  const action$ = new Subject()
  const middleware: Middleware = store => next => action => {
    // 将 action 交给 reducer 处理
    const result = next(action)
    // 将 action 放到 action$ 中进行流转
    action$.next(action)
    return result
  }
  return middleware
}
复制代码

流的转换器

如今,在中间件中,咱们初始化了 action$,可是如何获得 fetch$ 这些由 action$ 派生的流呢?所以,咱们还须要告知中间件若是经过 action$ 生成更多的流,不妨定义一个转换器,由它负责 action$ 的流转,并在当中处理反作用:

interface Transformer {
  (action$: Observable<Action>): Observable<Action>
}

const fetchTransformer: Transformer = (action$) => {
  action$.pipe(
    filter(({type}) => type === FETCH),
    switchMap(() => from(api.fetch).pipe(
      switchMap(resp => ({
        type: FETCH_SUCCESS,
        payload: {
          // ...
        }
      }),
      catchError(error => of({
        type: FETCH_ERROR,
        payload: {
          // ....
        }
      }))
    ))
  )
}
复制代码

应用中,咱们可能定义不一样的转换器,从而获得派发不一样 action 的流:

const newActionsStreams: Observable<Action>[] = transformers.map(transformer => transformer(action$))
复制代码

因为这些 action 还具备一致的数据结构,所以咱们能够将这些流进行合并,由合并后的流负责派发 action 到 reducer:

const newAction$ = merge(newActionStreams)
复制代码

那么,修改咱们的中间件实现:

const createMiddleware = (...transformers): Middleware => {
  const action$ = new Subject()
  // 运行各个 transformer,并将转换的流进行合并
  const newAction$ = merge(tramsformer.map(transformer => transformer(action$)))
  const middleware: Middleware = store => {
    // 订阅 newAction$
    newAction$.subscribe(action => store.dispatch(action))
    return next => action => {
      // 将 action 交给 reducer 处理
      const result = next(action)
      // 将 action 放到 action$ 中进行流转
      action$.next(action)
      return result
    }
  }
  return middleware
}
复制代码

优化:ofType operator

因为咱们老是须要 filter(action => action.type === SOME_TYPE) 来过滤 action,所以能够封装一个 operator 来优化这个过程:

const ofType: OperatorFunction<Observable<Action>, Observable<Action>> = (type: String) => pipe(
  filter(action => action.type === type)
)
复制代码
const fetchTransformer: Transformer = (action$) {
  return action$.pipe(
    filter(({type}) => type === FETCH),
    switchMap(() => from(api.fetch)),
    // ...
  )
}
复制代码

再考虑到咱们可能不仅过滤一个 action type,所以能够优化咱们的 ofType operator 为:

const ofType: OperatorFunction<Observable<Action>, Observable<Action>> = 
  (...types: String[]) => pipe(
    filter((action: Action) => types.indexOf(action.type) > -1)
  )
复制代码
const counterTransformer: Transformer = (action$) {
  return action$.pipe(
    ofType(INCREMENT, DECREMENT),
    // ...
  )
}
复制代码

下面这个测试用例将用来测试咱们的中间件是否可以工做了:

it('should transform action', () => {
   const reducer: Reducer = (state = 0, action) => {
    switch(action.type) {
      case 'PONG':
        return state + 1
      default:
        return state
    }
  }

  const transformer: Transformer = (action$) => {
    return action$.pipe(
        ofType('PING'),
        mapTo({type: 'PONG'})
      )
    )
  }

  const middleware = createMiddleware(transformer)
  const store = createStore(reducer, applyMiddleware(middleware))
  store.dispatch({type: 'PING'})
  expect(store.getState()).to.be.equal(1)
})
复制代码

优化:得到 state

在 action 的流转过程可能还须要得到应用状态,例如,fetch$ 中获取数据前,须要封装请求参数,部分参数可能来自于应用状态。所以,咱们能够考虑为每一个 transformer 再传递当前的 store 对象,使它能拿到当前的应用状态:

interface Transformer {
  (action$: Observable<Action>, store: Store): Observable<Action>
}

// ...

const createMiddleware = (...transformers): Middleware => {
  const action$ = new Subject()
  const middleware: Middleware = store => {
    // 将 store 也传递给 transformer
    const newAction$ = merge(tramsformer.map(transformer => transformer(action$, store)))
    newAction$.subscribe(action => store.dispatch(action))
    return next => action => {
      const result = next(action)
      action$.next(action)
      return result
    }
  }
  return middleware
}
复制代码

如今,当须要取用状态的时候,就经过 store.getState() 拿取:

const fetchTransformer: Transformer = (action$, store) {
  return action$.pipe(
    filter(({type}) => type === FETCH),
    switchMap(() => {
      const { query, page, pageSize } = store.getState()
      const params = { query, page, pageSize }
      return from(api.fetch, params)
    }),
    // ...
  )
}
复制代码

优化:观察状态

在响应式编程体系下,一切数据源都应当是可被观察的,而上面咱们对状态的取值确是主动的(proactive)的,正确的方式是应当观察状态的变化,并在变化时做出决策:

为此,相似 action$,咱们也将 state 流化,使得应用状态成为一个可观察对象,并将 state$ 传递给 transformer:

interface Transformer {
  (action$: Observable<Action>, state$: Observable<State>): Observable<Action>
}

// ...

const createMiddleware = (...transformers): Middleware => {
  const action$ = new Subject()
  const state$ = new Subject()
  const middleware: Middleware = store => {
    // 由各个 transformer 得到应用的 action$
    const newAction$ = merge(tramsformer.map(transformer => transformer(action$, state$)))
    // 新的 action 到来时,将其又 dispatch 到 Redux 生态
    newAction$.subscribe(action => store.dispatch(action))
    return next => action => {
      // 将 action 交给 reducer
      const result = next(action)
      // 得到 reducer 处理后的新状态
      state$.next(state)
		  // 将 action 放入 action$
      action$.next(action)
      return result
    }
  }
  return middleware
}
复制代码

当业务流程须要状态时,就能够自由组合 state$ 获得:

const fetchTransformer: Transformer = (action$, state$) {
  return action$.pipe(
    filter(({type}) => type === FETCH),
    withLatestFrom(state$),
    switchMap(([action, state]) => {
      const { query, page, pageSize } = state
      const params = { query, page, pageSize }
      return from(api.fetch, params)
    }),
    // ...
  )
}
复制代码

乍看之下,彷佛不如 store.getState() 来的方便,为了得到当前状态,咱们还额外引入了一个 operator withLatestFrom。可是,要注意到,咱们引入 state$ 不仅为了得到状态和统一模式,更重要是为了观察状态。

举个例子,咱们有一个备忘录组件,每次内容变更时,咱们就存储一下草稿。若是咱们能观察状态变更,经过响应式编程模式,当状态变更时,自动造成草稿存储的业务:

const saveDraft$: Observable<Action> = state$.pipe(
  // 选出当前
	pluck('content'),
  // 只有当内容变更时才考虑存储草稿
  distinctUntilChanged(),
  // 只在 1 s 内保存一次
  throttleTime(1000),
  // 调用服务存储草稿
  switchMap(content => from(api.saveDraft(content)))
  // ....
)
复制代码

你们也能够在回顾系列第一篇所介绍的内容,正是因为 redux-observable 在 1.0 版本引入了 state$,咱们才得以解耦组件的业务关系,实现单个组件的自治。

优化:响应初始状态

如今,咱们能够测试一下如今的中间件,看可否观察应用状态了:

it('should observe state', () => {
   const reducer: Reducer = (state = {step: 10, counter: 0}, action) => {
    switch(action.type) {
      case 'PONG':
        return {
          ...state,
          counter: action.counter
        }
      default:
        return state
    }
  }

  const transformer: Transformer = (action$, state$) => {
    return action$.pipe(
        ofType('PING'),
      	withLatestFrom(state$, (action, state) => state.step + state.counter),
        map(counter => ({type: 'PONG', counter}))
      )
    )
  }

  const middleware = createMiddleware(transformer)
  const store = createStore(reducer, applyMiddleware(middleware))
  store.dispatch({type: 'PING'})
  expect(store.getState().counter).to.be.equal(10)
})
复制代码

遗憾的是,这个测试用例将不会经过,经过调试发现,当咱们 dispatch 了 PING action 后,withLatestFrom 没有拿到最近一次的 state。这是为何呢?原来是由于 Redux 的 init action 并无暴露给中间件进行拦截,所以,应用的初始状态没能被送入 state$ 中,观察者没法观察到初始状态。

为了解决这个问题,在建立了 store 后,咱们能够尝试 dispatch 一个无心义的 action 给中间件,强制将初始状态先送入 state$ 中:

const middleware = createMiddleware(transformer)
const store = createStore(reducer, applyMiddleware(middleware))
// 派发一个 action 去得到初始状态
store.dispatch({type: '@@INIT_STATE'})
复制代码

这个方式虽然能让测试经过,但缺不是很优雅,咱们让用户手动去派发一个无心义的 action,这会让用户感受很困惑。所以,咱们考虑为中间件单独设置一个 API,用以在 store 建立后,完成一些任务:

// 设置一个 store 副本
let cachedStore: Store
const createMiddleware = (...transformers): Middleware => {
  const action$ = new Subject()
  const state$ = new Subject()
  const newAction$ = merge(transformers.map(transformer => transformer(action$, state$)))
  
  const middleware: Middleware = store => {
    cachedStore = store
    
    return next => action => {
      // 将 action 交给 reducer
      const result = next(action)
      // 得到 reducer 处理后的新状态
      state$.next(state)
		  // 将 action 放入 action$
      action$.next(action)
      return result
    }
  }
  
  middleware.run = function() {
    // 1. 开始对 action 的订阅
    newAction$.subscribe(cachedStore.dispatch)
    // 2. 将初始状态传递给 state$
    state$.next(cachedStore.getState())
  }
  return middleware
}
复制代码

如今,咱们为中间件提供了一个 run 方法,来让中间件在 store 建立之后完成一些工做。当咱们建立好 store 后,运行 run 方法来运行中间件:

const middleware = createMiddleware(transformer)
const store = createStore(reducer, applyMiddleware(middleware))
// 运行咱们的中间件
middleware.run()
复制代码

优化:相互关联的 transformer

再考虑一个更加场景,各个 transformer 之间可能存在关联,各个 trasformer 也可能直接发出 action,而不须要依赖于 action$

it('should queue synchronous actions', () => {
    const reducer = (state = [], action) => state.concat(action)
    const transformer1 = (action$, state$) => action$.pipe(
      ofType('FIRST'),
      mergeMap(() => of({ type: 'SECOND' }, { type: 'THIRD'} ))
    )
    const transformer2 = (action$, state$) => action$.pipe(
        ofType('SECOND'),
        mapTo({type: 'FORTH'})
    )
    
    const middleware = createMiddleware(transformer1, transformer2)
    const store = createStore(reducer, applyMiddleware(middleware))
    middleware.run()
    
    const actions = store.getState()
    actions.shift() // remove redux init action
    expect(actions).to.deep.equal([
      { type: 'FIRST' },
      { type: 'SECOND' },
      { type: 'THIRD' },
      { type: 'FORTH' }
    ])
})
复制代码

在这个测试用例中,咱们看到的 action 序列是:

FIRST
SECOND
THIRD
FORTH
复制代码

可是,在当前的实现中,你将获得:

FIRST
SECOND
FORTH
THIRD
复制代码

这并不符合预期。可是,问题又出在哪里呢?咱们分析下程序执行过程:

  1. 发出 first action
  2. 调度 first action,派生出 second action 及 third action 的 observable
  3. 调度 second action,派生出 forth action 的 observable
  4. 调度 forth action
  5. 调度 third action

问题显然就出在第 二、3 步,若是第 2 步中,咱们控制 observable 吐出值的速度,将同时到来的 second 和 third action 缓存到队列,并依次执行,就能获得咱们指望的输出。

幸运的是,RxJS 中提供了 observeOn 这个 operator 来控制数据源发出值的节奏。其第一个参数接收一个调度器,用于告知数据源以怎样的速录调度任务,这里咱们将使用 Queue Scheduler 将各个 action 缓存到队列,当此时再无 action 时,各个 action 出队并被调度:

export const createEpicMiddleware = (...epics) => {
  const action$ = new Subject().pipe(observeOn(queueScheduler)) as Subject<Action>
  
  // ...
  
  return middleware
}
复制代码

如今,再次运行测试用例,你讲看到符合指望的 action 序列:

FIRST
SECOND
THIRD
FORTH
复制代码

这是由于:

  1. 发出 first action
  2. 调度 first action,入队
  3. 此时没有 action,first action 出队,store.dispatch(first),派生出 second action 及 third action 的 observable
  4. second action 入队,third action 入队
  5. 此时没有等待的 action,则 second action 出队,store.dispatch(second),派生出 forth action 的 observable
  6. forth action 入队
  7. 此时没有等待的 action,队首元素 third action 出队,store.dispatch(third)
  8. forth action 出队,store.dispatch(forth)

总结

截止目前,咱们的中间件已经容许咱们经过 FRP 模式梳理应用状态了,这个中间件的实现已经很是相似于 redux-observable 的实现了。固然,你们生产环境仍是用更流行,更稳定的 redux-observable,本文旨在帮助你们更好的理解如何在 Redux 中集成 RxJS 更好的管理状态,经过一步一步对中间件的优化,也让你们理解了了 redux-observable 的设计哲学和实现原理。本文实现的 mini redux-observable 我也放到了个人 github 上,包含了一些测试用例和一个小的 demo。

接下来,咱们将探索将 redux-observable 以及 FRP 这套模式集成到 dva 架构的前端框架中,dva 架构帮助砍掉 Redux 冗长的样板代码,而 redux-observable 则专一于反作用处理。


参考资料

关于本系列

  • 本系列将从介绍 redux-observable 1.0 开始,阐述本身在结合 RxJS 到 Redux 中的心得体会。涉及内容会有 redux-observable 实践介绍,redux-observable 实现原理探究,最后会介绍下本身当前基于 redux-observble + dva architecture 的一个 state 管理框架 reobservable。
  • 本系列不是 RxJS 或者 Redux 入门,再也不讲述他们的基础概念,宣扬他们的核心优点。若是你搜索 RxJS 不当心进到了这个系列,对 RxJS 和 FRP 程序设计产生了兴趣,那么入门我会推荐:
  • 本系列更不是教程,只是介绍本身在 Redux 中应用 RxJS 的一些思路,但愿更多人能指出当中存在的误区,或者交流更优雅的实践。
  • 由衷的感谢实践路上一些师兄的帮助,尤为感谢腾讯云的 questguo 学长在模式上的指导。reobservable 脱胎于腾讯云 questguo 主导的 React 框架 —— TCFF,期待将来 TCFF 的开源。
  • 感谢小雨的设计支援。
相关文章
相关标签/搜索