Flutter | 状态管理拓展篇——RxDart(四)

前言

在前一篇文章向你们介绍了一种新的状态管理方式——BLoC,它在分离咱们的ui逻辑与业务逻辑上表现十分优秀。可是在最后咱们发现了一个问题。react

bloc是一个典型的观察者模式,咱们以counter bloc举例,在A,B页面都存在观察者,它们监听的是同一个广播流,当咱们pop B页面,回到A页面这个操做不会出现任何问题,而当咱们再次进入B页面的时候却发现,它显示了初始值0,而不是咱们想要的value,只有等咱们再次按下按钮时,它才能刷新得到实际的value。

Stream很棒,可是还不够强大git

因此今天要给你们简单介绍下ReactiveX的dart 实现——RxDart,它极大的扩展了Stream的功能,可以让咱们在使用bloc的时候更加游刃有余。github

在正式开始介绍前,我但愿您已经阅读并理解了stream的相关知识,后面的内容都基于此。若是您还未了解过dart:stream 的话,我建议您先阅读这篇文章:Dart:什么是Streamapi

RxDart

ReactiveX是什么

ReactiveX是一个强大的库,用于经过使用可观察序列来编写异步基于事件的程序。它突破了语言平台的限制,让咱们编写异步程序就像在自家花园散步那样 easy。我相信你必定会爱上它!缓存

基本概念

Dart:什么是Stream这篇文章中,我用到一个模型来理解stream里面到底发生了什么。今天咱们仍是利用这个模型来看看,在rxdart中它是什么样的。app

这个模式的关键思惟在于观察者的无状态。咱们平时调用方法的时候必定是很清楚咱们何时调用,并马上会返回一个预想的结果。

可是在这里,咱们中间进行处理的时候,彻底是处于异步状态的,也就是说没法马上返回一个值。咱们不知道stream何时会“吐”出处理结果,因此必需要一个观察者来守着这个出口。异步

当有事件/数据流出时,观察者捕捉到了这个事件并解析处理。ide

  • Subject实现并扩展了StreamController,它符合StreamController的全部规范。假如您以前使用的StreamController,那么你能够直接替换为Subject。你能够把它想像成streamController。
  • Observable实现并扩展了Stream。它将经常使用的stream和streamTransformer组合成了很是好用的api。你能够把它想像成stream。

可观察对象——Observable

建立Observavle

你能够把stream直接包装成Observablepost

var obs = Observable(Stream.fromIterable([1,2,3,4,5]));
  
  obs.listen(print);
复制代码

输出:1 2 3 4 5ui

经过Future建立:fromFuture

var obs = Observable.fromFuture(new Future.value("Hello"));
 
  obs.listen(print); 
复制代码

输出:Hello

经过Iterable建立:fromIterable

var obs = Observable.fromInterable([1,2,3,4,5]);

obs.listen(print);
复制代码

输出:1 2 3 4 5

让流的“吐”出间隔一段时间:interval

interval方法可以让流“吐出数据”后间隔一段时间再吐下一个数据。

var obs = Observable(Stream.fromIterable([1,2,3,4,5]))
    .interval(new Duration(seconds: 1));

  obs.listen(print);
复制代码

输出:1 ... 2 ... 3 ... 4 ... 5

其中...表明停顿了一秒。

迭代地处理数据:map

map方法可以让咱们迭代的处理每个数据并返回一个新的数据

var obs = Observable(Stream.fromIterable([1,2,3,4,5]))
    .map((item)=>++item);
    
obs.listen(print);
复制代码

输出:2 3 4 5 6

扩展流:expand

expand方法可以让咱们把把每一个item扩展至多个流

var obs = Observable(Stream.fromIterable([1,2,3,4,5]))
   .expand((item)=> [item,item.toDouble()]);

 obs.listen(print);
复制代码

输出:1 1.0 2 2.0 3 3.0 4 4.0 5 5.0

这里咱们将每一个数据扩展成【item,item.toDouble】你能够扩展成任意组的流。假如这是一个广播Observable,并被屡次收听,那么他能够单独调用expand并扩展本身。

合并流:merge

merge方法可以让咱们合并多个流,请注意输出。

var obs = Observable.merge([
    Stream.fromIterable([1,2,3]),
    Stream.fromIterable([4,5,6]),
    Stream.fromIterable([7,8,9]),
  ]);

  obs.listen(print);
复制代码

输出:1 4 7 2 5 8 3 6 9

顺序执行多个流:concat

concat方法可以让咱们按照顺序执行一组流,当一组流执行完毕后,再开始执行下一组。

var obs = Observable.concat([
    Stream.fromIterable([1,2,3]),
    Stream.fromIterable([4,5,6]),
    Stream.fromIterable([7,8,9]),
  ]);

  obs.listen(print);
复制代码

输出:1 2 3 4 5 6 7 8 9

检查每个item:every

every会检查每一个item是否符合要求,而后它将会返回一个可以被转化为 Observable 的 AsObservableFuture< bool>。

var obs = Observable.fromIterable([1,2,3,4,5]);

  obs.every((x)=> x < 10).asObservable().listen(print);
复制代码

输出结果:true

关于Observable你还须要知道这些

  • Dart中 Observables 默认是单一订阅。若是您尝试两次收听它,则会抛出 StateError 。你可使用工厂方法或者 asBroadcastStream 将其转化为多订阅流。
var obs = Observable(Stream.fromIterable([1,2,3,4,5])).asBroadcastStream();
复制代码
  • 不少方法的返回值并非一个 Single 也不是一个 Observable 而是必须返回一个Dart的 Future。幸运的是你很容易找到一些方法,把他们转化成回 stream
  • 出现错误时,Dart中的Stream不会默认关闭。可是在Rxdart中,Error会致使Observable终止,除非它被运算符拦截。
  • 默认状况下Dart中Stream是异步的,而Observables默认是同步的。
  • 在处理多订阅Observable的时候,onListen方法只有在第一次会被调用。且各个订阅者之间不会互相干涉。
var obs = Observable(Stream.fromIterable([1,2,3,4,5])).asBroadcastStream();

//第一个订阅者
  obs.interval(Duration(seconds: 1)).map((item) => ++item).listen(print);
//第二个订阅者
  obs.listen(print);
复制代码

输出:1 2 3 4 5 2 3 4 5 6

以上是一些比较常见的Observable的使用方法,它并不完整,我将会在之后持续的更新这篇文章,并完整介绍它的功能

加强版StreamController——Subject

普通广播流控制器:PublishSubject

PublishSubject就是一个普通广播版StreamController,你能够屡次收听,默认是sync是false,也就是说里面是一个AsyncBroadcastStreamController 异步广播流。

缓存最新一次事件的广播流控制器:BehaviorSubject

BehaviorSubject也是一个广播流,可是它能记录下最新一次的事件,并在新的收听者收听的时候将记录下的事件做为第一帧发送给收听者。

还记得咱们文章开头的那个小问题吗?在B页面从新收听的时候,获取不到最新的事件,必须等咱们从新触发流才能够获得正确的值。

我发誓我绝对不是为了凑篇幅🤣

ok,咱们如今用BehaviorSubject替换掉咱们的StreamCroller

//var _countController = StreamController.broadcast<int>();

var _subject = BehaviorSubject<int>();
复制代码

真的就是这么简单,无缝替换😆

代码已上传github,让咱们来看看效果

再来看两个例子,相信你们会对BehaviorSubject理解更深入

例1

final subject = new BehaviorSubject<int>();

  subject.add(1);
  subject.add(2);
  subject.add(3);

  subject.stream.listen(print); // prints 3
  subject.stream.listen(print); // prints 3
  subject.stream.listen(print);
复制代码

输出:3 3 3

因为咱们在add(3)以后才开始收听,因此将会收到最新的value。

例2

final subject = new BehaviorSubject<int>(seedValue: 1);

  subject.stream.listen(print); // prints 1
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print);
复制代码

输出:1 1 1

seedValue做为初始值,在后面有收听者的时候一样会把它当成最后一次的value发送给收听者。

缓存更多事件的广播流控制器:ReplaySubject

ReplaySubject可以缓存更多的值,默认状况下将会缓存全部值,并在新的收听的时候将记录下的事件做为第一帧发送给收听者。

final subject = ReplaySubject<int>();

  subject.add(1);
  subject.add(2);
  subject.add(3);
  
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print);
复制代码

输出:1 1 1 2 2 2 3 3 3

你还能够经过maxSize控制缓存个数

final subject = ReplaySubject<int>(maxSize: 2);

  subject.add(1);
  subject.add(2);
  subject.add(3);

  subject.stream.listen(print); // prints 1
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print);
复制代码

输出:2 2 2 3 3 3

自定义你的Subject

你能够经过自定义一个新的subject继承至Subject类来得到更加个性化的功能。这里就不举栗子了。😝

Subject的释放

当你再也不收听Subject,或者Subject再也不使用时,请务必释放它。你能够调用subscription的cancel()方法让某个听众取消收听,或者Subject.close(),关闭整个流。

了解更多

下面有一些优秀的文章可以给您更多参考

写在最后

以上即是RxDart篇的所有内容,它只是介绍了部分RxDart的功能,我在以后会逐渐完善它,最终整理完整。

RxDart十分强大,它让你在处理大量异步事件的时候感受很是温馨。我相信每个开发者在了解过它以后必定会喜欢上这个好用的库。

若是你在使用rxdart时候有任何好的idea,或是query,欢迎在下方评论区以及个人邮箱1652219550a@gmail.com留言,我会在24小时内与您联系!

下一篇文章将会是flutter状态管理总结篇,敬请关注。

相关文章
相关标签/搜索