Flutter 详解 (6、深刻了解Stream)

Future

Future有三种状态未完成完成带有值完成带有异常,使用Future能够简化事件任务。 假如你有一个按钮,点击以后开始下载图片,首先事件循环机制会处理你的点击事件,而后开始下载图片,当下载完成,你可使用then来注册回调,而后获取到图片并显示出来。html

一般咱们不会直接建立,网络下载图片会返回一个Future,文件I/O会返回一个Future,那咱们怎么建立一个呢?只须要关键字async就表示该函数异步执行,返回类型是Future<T>git

Future<String> getStr()async{
  var str = HttpRequest.getString('www.fgyong.cn');
  return str;
}

使用http请求地址www.fgyong.cn获取数据,而后返回。github

如何接收文本呢?数组

其实很简单,只须要使用await关键字便可,用来注册then回调。浏览器

main(List<String> args) async {
  String string = await getStr();
  print(string);
}

等同于:服务器

main(List<String> args) async {
  getStr().then((value) {
    print(value);
  });
}

官方比较推荐前者,由于前者看起来很像同步函数,少了层层嵌套,方便开发者理解代码。
网络下载想延迟动画隐藏时间,可使用Future.delayed()网络

await Future.delayed(Duration(seconds: 2), () {
hideAnimation();
});

若是已经带有值的想异步去执行,那么可使用Future.value()多线程

Stream

dart:async库包含对许多Dart API很重要的两种类型:StreamFuture。若是Future表示单个计算的结果,则流是一系列结果。您侦听流以获取有关结果(数据和错误)以及流关闭的通知。您还能够在收听流时暂停播放或在流完成以前中止收听。闭包

如何使用Stream

流能够经过多种方式建立,后续在仔细讲解,可是它们均可以以相同的方式使用:异步for循环(一般仅称为await for)遍历流的事件,如for循环迭代遍历。例如:异步

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

此代码仅接收整数事件流中的每一个事件,将它们相加,而后返回(和)其和。当循环主体结束时,函数将暂停,直到下一个事件到达或流完成为止。 该函数标记有async关键字,在使用await for循环时须要此关键字。 如下示例经过使用async *函数生成简单的整数流来测试前面的代码:

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // 55
}

当流中没有更多事件时,就完成流,并通知接收事件的代码,就像通知新事件到来同样。使用await for循环读取事件时,流完成后循环中止。 在某些状况下,流完成以前会发生错误;多是网络从远程服务器上获取文件时发生故障,或者建立事件的代码存在错误,可是有人须要了解它。 流还能够传递错误事件,就像传递数据事件同样。大多数流将在出现第一个错误后中止,但有可能传递多个错误的流以及在发生错误事件后传递更多数据的流。在本文档中,咱们仅讨论最多产生一个错误的流。 当使用await for读取流时,循环语句会引起错误。这也结束了循环。您可使用try-catch捕获错误。如下示例在循环迭代器等于时引起错误 4:

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try {
    await for (var value in stream) {
      sum += value;
    }
  } catch (e) {
    return -1;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i == 4) {
      throw new Exception('Intentional exception');
    } else {
      yield i;
    }
  }
}

main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // -1
}

两种Stream

单一订阅流 最多见的流包含一系列事件,这些事件是较大总体的一部分。事件必须以正确的顺序传递,而且不能丢失任何事件。这是您在读取文件或接收Web请求时得到的流。 这样的流只能被收听一次。稍后再次收听可能意味着错过了最初的事件,而后其他部分毫无心义。当您开始收听时,数据将被提取并以块的形式提供。 广播流 另外一种流是针对能够一次处理的单个消息的。例如,这种流可用于浏览器中的鼠标事件。 您能够随时开始收听这样的流,而且在收听时会触发事件。多个收听者能够同时收听,而且您能够在取消上一个订阅以后稍后再次收听。

Stream的方法

Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object needle);
Future<E> drain<E>([E futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function() orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = ""]);
Future<T> lastWhere(bool Function(T element) test, {T Function() orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function() orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();

这么多方法基本均可以使用await for来循环

Future<bool> contains(Object needle) async {
  await for (var event in this) {
    if (event == needle) return true;
  }
  return false;
}

Future forEach(void Function(T element) action) async {
  await for (var event in this) {
    action(event);
  }
}

Future<List<T>> toList() async {
  final result = <T>[];
  await this.forEach(result.add);
  return result;
}

Future<String> join([String separator = ""]) async =>
    (await this.toList()).join(separator);

修改Stream

Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);

这些基本都是使用闭包过滤流的内容,固然也能够转换内容。

监听Stream

最后是监听,当流改变时会触发监听listen(),全部的流均可以被监听。

StreamSubscription<T> listen(void Function(T event) onData,
    {Function onError, void Function() onDone, bool cancelOnError});

建立本身的Stream

建立Stream大概有三种,以下所示:

  • 转换Stram
  • 使用async*建立Stream
  • 使用StreamController建立

转换Stram

常常有些Stream包含的值不是咱们想要的,那么就须要咱们转换一下了,例如咱们把数字转成字符串.

Future<String> _toString() async {
    var s = await _stream().map((event) => event.toString()).join('|');
    return s;
  }

也能够转成数组

Future<List> _toList() async {
    var s = await _stream().toList();
    return s;
  }

使用async*建立Stream

建立新流的一种方法是使用异步生成器(async *)函数。在调用该函数时建立该流,而且在侦听该流时该函数的主体开始运行。函数返回时,流关闭。在函数返回以前,它可使用yieldyield *语句在流上发出事件。

这是一个原始示例,该示例会按期发射数字:

Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}

此函数返回一个Stream。当收听该流时,主体开始运行。它反复延迟请求的时间间隔,而后产生下一个数字。若是忽略count参数,则循环上没有中止条件,所以流永远输出愈来愈大的数字-或直到侦听器取消其订阅为止。 当侦听器取消时(经过在listen()方法返回的StreamSubscription对象上调用cancel()),则主体下一次到达yield语句时,yield将充当return语句。执行全部封闭的finally块,而后函数退出。若是函数尝试在退出前产生一个值,则该操做将失败并充当返回值。 当函数最终退出时,由cancel()方法返回的Future完成。若是函数以错误退出,则Future会以该错误结束;不然,它以null结束。 另外一个更有用的示例是一个转换序列的函数:

Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
  for (var future in futures) {
    var result = await future;
    yield result;
  }
}

使用yield*

使用yield*来调用其余的函数取下一个值,当不获取的时候,则不运行。

Stream<int> _stream() async* {
    if (_count < 10) {
      yield _count++;
      await Future.delayed(Duration(seconds: 1));
      sleep(Duration(seconds: 1));
      yield* _getDataFromServer();
    }
  }

yield*yield区别是后者直接返回一个固定的值,而前者返回是的是一个函数。前者多用于分流,把一个Stream<T>分为其余的Stream<R>Stream<E>

使用StreamController建立stream

_streamController = StreamController();
// 监听
_streamController.stream.listen((event) { })
/// 添加数据
_streamController.add('data');

StreamControlelr的类图以下所示,他们从SinkStreamSink都只是到导入接口,并没有直接继承关系,在抽象类方面按照功能解耦,最终由StreamController整合,负责添加的是StreamControlelr,负责监听的是Stream,最终在当前的Zone中执行回调Zone.runUnaryGuarded()Zone相似一个沙盒环境,在APP启动的时候建立。

原理

StreamController中初始化,直接调用了_SyncStreamController,具体功能所有在_SyncStreamController(同步)或_AsyncStreamController(异步)中实现。

factory StreamController(
  {void onListen(),
  void onPause(),
  void onResume(),
  onCancel(),
  bool sync: false}) {
return sync
    ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
    : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}

最终在_BufferingStreamSubscription类中,实现了
onDoneonError函数,他们分别在类实例化的时候注册回调函数。

void onData(void handleData(T event)) {
  handleData ??= _nullDataHandler;//默认值
  _onData = _zone.registerUnaryCallback<dynamic, T>(handleData);
}
void onDone(void handleDone()) {
  handleDone ??= _nullDoneHandler;
  _onDone = _zone.registerCallback(handleDone);
}

那么listener函数是在什么时候添加的呢?
在获取stream_ControllerStream(),最终listener函数是_StreamImpl extends Steam中实现的,代码以下:

StreamSubscription<T> listen(void onData(T data),
      {Function onError, void onDone(), bool cancelOnError}) {
    cancelOnError = identical(true, cancelOnError);
    StreamSubscription<T> subscription =
        _createSubscription(onData, onError, onDone, cancelOnError);
    _onListen(subscription);
    return subscription;
  }
  
/// 建立一个订阅者
StreamSubscription<T> _createSubscription(void onData(T data),
    Function onError, void onDone(), bool cancelOnError) {
  return new _BufferingStreamSubscription<T>(
      onData, onError, onDone, cancelOnError);
}

最终监听是执行的_BufferingStreamSubscription_onData(void HandleData(T event)),这里是使用了_zone.registerUnaryCallback来注册回调函数,不然在调用的时候会报错。

void onData(void handleData(T event)) {
  handleData ??= _nullDataHandler;
  _onData = _zone.registerUnaryCallback<dynamic, T>(handleData);
}

stream.add()函数执行了_add(),源码以下

void _add(T data) {
  assert(!_isClosed);
  if (_isCanceled) return;
  if (_canFire) {
    _sendData(data);
  } else {
    _addPending(new _DelayedData<T>(data));
  }
}

当状态已关闭,直接断言,当状态已取消,返回操做,当能够发送数据,则执行_sendData()函数,该函数才是最终发送数据,执行listen操做的关键函数,源码以下:

void _sendData(T data) {
  assert(!_isCanceled);
  assert(!_isPaused);
  assert(!_inCallback);
  bool wasInputPaused = _isInputPaused;
  _state |= _STATE_IN_CALLBACK;//取出来第六位
  _zone.runUnaryGuarded(_onData, data);
  _state &= ~_STATE_IN_CALLBACK;// 舍弃第六位
  _checkState(wasInputPaused);
}

经过final Zone _zone = Zone.current;获取当前的Zone来执行已经注册的回调_zone.runUnaryGuarded(_onData, data),执行完毕,使用_state&=~_STATE_IN_CALLBACK来保存当前状态,使用&=~舍弃第6位数字,_STATE_IN_CALLBACK值为32,那么低第六位是1,~_STATE_IN_CALLBACK,除了第六位,剩下的都是1,而后&=来取出来其余的位数值保存下来。

而后在_checkState(wasInutPaused)来肯定在执行callback中间并没有状态改变。

那么再执行完毕_sendDone()的时候也是如此,首先判断是否已经取消,没取消的话,执行_onDone回调。

void _sendDone() {
    assert(!_isCanceled);
    assert(!_isPaused);
    assert(!_inCallback);

    void sendDone() {
      // If the subscription has been canceled while waiting for the cancel
      // future to finish we must not report the done event.
      if (!_waitsForCancel) return;
      _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
      _zone.runGuarded(_onDone);
      _state &= ~_STATE_IN_CALLBACK;
    }

    _cancel();
    _state |= _STATE_WAIT_FOR_CANCEL;
    if (_cancelFuture != null &&
        !identical(_cancelFuture, Future._nullFuture)) {
      _cancelFuture.whenComplete(sendDone);
    } else {
      sendDone();
    }
  }

事件流就是在前期使用Zone注册,在add的时候使用Zone调用已经注册好的回调,广播是循环调用

关键函数:

/// 注册回调
_zone.registerUnaryCallback<dynamic, T>(handleData)

/// 注册不带参数的回调

_zone.registerCallback(R callback())

/// 执行 附带参数的回调

_zone.runUnaryGuarded(_onData, data)

更多API能够查看官方源码

参考

文章汇总

<<Dart 异步与多线程>>

<<Flutter 详解(1、深刻了解状态管理--ScopeModel)>>

<<Flutter 详解(2、深刻了解状态管理--Redux)>>

<<Flutter 详解(3、深刻了解状态管理--Provider)>>

<<Flutter 详解(4、深刻了解状态管理--BLoC)>>

<<Flutter 详解 (5、深刻了解--Key)>>

<<Flutter 详解 (6、深刻了解--Stream>>

相关文章
相关标签/搜索