做为系列文章的第十一篇,本篇将很是全面带你了解 Flutter 中最关键的设计之一,深刻原理帮助你理解 Stream 全家桶,这也许是目前 Flutter 中最全面的 Stream 分析了。git
前文:github
Stream
在 Flutter 是属于很是关键的概念,在 Flutter 中,状态管理除了 InheritedWidget
以外,不管 rxdart
,Bloc
模式,flutter_redux
,fish_redux
都离不开 Stream
的封装,而事实上 Stream
并非 Flutter 中特有的,而是 Dart 中自带的逻辑。redux
通俗来讲,Stream
就是事件流或者管道,事件流相信你们并不陌生,简单的说就是:基于事件流驱动设计代码,而后监听订阅事件,并针对事件变换处理响应。缓存
而在 Flutter 中,整个 Stream
设计外部暴露的对象主要以下图,主要包含了 StreamController
、Sink
、Stream
、StreamSubscription
四个对象。bash
以下代码所示,Stream
的使用并不复杂,通常咱们只须要:异步
StreamController
,StreamSink
用作事件入口,Stream
对象用于监听,StreamSubscription
管理事件订阅,最后在不须要时关闭便可,看起来是否是很简单?class DataBloc {
///定义一个Controller
StreamController<List<String>> _dataController = StreamController<List<String>>();
///获取 StreamSink 作 add 入口
StreamSink<List<String>> get _dataSink => _dataController.sink;
///获取 Stream 用于监听
Stream<List<String>> get _dataStream => _dataController.stream;
///事件订阅对象
StreamSubscription _dataSubscription;
init() {
///监听事件
_dataSubscription = _dataStream.listen((value){
///do change
});
///改变事件
_dataSink.add(["first", "second", "three", "more"]);
}
close() {
///关闭
_dataSubscription.cancel();
_dataController.close();
}
}
复制代码
在设置好监听后,以后每次有事件变化时, listen
内的方法就会被调用,同时你还能够经过操做符对 Stream
进行变换处理。async
以下代码所示,是否是一股 rx
风扑面而来?ide
_dataStream.where(test).map(convert).transform(streamTransformer).listen(onData);
复制代码
而在 Flutter 中, 最后结合 StreamBuilder
, 就能够完成 基于事件流的异步状态控件 了!布局
StreamBuilder<List<String>>(
stream: dataStream,
initialData: ["none"],
///这里的 snapshot 是数据快照的意思
builder: (BuildContext context, AsyncSnapshot<List<String>> snapshot) {
///获取到数据,随心所欲的更新 UI
var data = snapshot.data;
return Container();
});
复制代码
那么问题来了,它们内部到底是若是实现的呢?原理是什么?各自的做用是什么?都有哪些特性呢?后面咱们将开始深刻解析这个逻辑 。post
从上面咱们知道,在 Flutter 中使用 Stream
主要有四个对象,那么这四个对象是如何“勾搭”在一块儿的?他们各自又担任什么责职呢?
首先以下图,咱们能够从进阶版的流程图上看出 整个 Stream
的内部工做流程。
Flutter中 Stream
、StreamController
、StreamSink
和 StreamSubscription
都是 abstract
对象,他们对外抽象出接口,而内部实现对象大部分都是 _
开头的如 _SyncStreamController
、ControllerStream
等私有类,在这基础上整个流程归纳起来就是:
有一个事件源叫 Stream
,为了方便控制 Stream
,官方提供了使用 StreamController
做为管理;同时它对外提供了 StreamSink
对象做为事件输入口,可经过 sink
属性访问; 又提供 stream
属性提供 Stream
对象的监听和变换,最后获得的 StreamSubscription
能够管理事件的订阅。
因此咱们能够总结出:
Stream
过程的控制,提供各种接口用于建立各类事件流。add
, addStream
等。listen
、 where
。cacenl
、pause
,同时在内部也是事件的中转关键。回到 Stream
的工做流程上,在上图中咱们知道, 经过 StreamSink.add
添加一个事件时, 事件最后会回调到 listen
中的 onData
方法,这个过程是经过 zone.runUnaryGuarded
执行的,这里 zone.runUnaryGuarded
是什么做用后面再说,咱们须要知道这个 onData
是怎么来的?
如上图,经过源码咱们知道:
一、Stream
在 listen
的时候传入了 onData
回调,这个回调会传入到 StreamSubscription
中,以后经过 zone.registerUnaryCallback
注册获得 _onData
对象( 不是前面的 onData
回调哦 )。
二、StreamSink
在添加事件是,会执行到 StreamSubscription
中的 _sendData
方法,而后经过 _zone.runUnaryGuarded(_onData, data);
执行 1 中获得的 _onData
对象,触发 listen
时传入的回调方法。
能够看出整个流程都是和 StreamSubscription
相关的,如今咱们已经知道从 事件入口到事件出口 的整个流程时怎么运做的,那么这个过程是**怎么异步执行的呢?其中频繁出现的 zone
是什么?
首先咱们须要知道,Stream 是怎么实现异步的?
这就须要说到 Dart 中的异步实现逻辑了,由于 Dart 是 单线程应用 ,和大多数单线程应用同样,Dart 是以 消息循环机制 来运行的,而这里面主要包含两个任务队列,一个是 microtask 内部队列,一个是 event 外部队列,而 microtask 的优先级又高于 event 。
默认的在 Dart 中,如 点击、滑动、IO、绘制事件 等事件都属于 event 外部队列,microtask 内部队列主要是由 Dart 内部产生,而 Stream
中的执行异步的模式就是 scheduleMicrotask
了。
由于 microtask 的优先级又高于 event ,因此若是 microtask 太多就可能会对触摸、绘制等外部事件形成阻塞卡顿哦。
以下图,就是 Stream 内部在执行异步操做过程执行流程:
那么 Zone
又是什么?它是哪里来的?
在上一篇章中说过,由于 Dart 中 Future
之类的异步操做是没法被当前代码 try/cacth
的,而在 Dart 中你能够给执行对象指定一个 Zone
,相似提供一个沙箱环境 ,而在这个沙箱内,你就能够所有能够捕获、拦截或修改一些代码行为,好比全部未被处理的异常。
那么项目中默认的 Zone
是怎么来的?在 Flutter 中,Dart 中的 Zone
启动是在 _runMainZoned
方法 ,以下代码所示 _runMainZoned
的 @pragma("vm:entry-point")
注解表示该方式是给 Engine 调用的,到这里咱们知道了 Zone
是怎么来的了。
///Dart 中
@pragma('vm:entry-point')
// ignore: unused_element
void _runMainZoned(Function startMainIsolateFunction, Function userMainFunction) {
startMainIsolateFunction((){
runZoned<Future<void>>(····);
}, null);
}
///C++ 中
if (tonic::LogIfError(tonic::DartInvokeField(
Dart_LookupLibrary(tonic::ToDart("dart:ui")), "_runMainZoned",
{start_main_isolate_function, user_entrypoint_function}))) {
FML_LOG(ERROR) << "Could not invoke the main entrypoint.";
return false;
}
复制代码
那么 zone.runUnaryGuarded
的做用是什么?相较于 scheduleMicrotask
的异步操做,官方的解释是:在此区域中使用参数执行给定操做并捕获同步错误。 相似的还有 runUnary
、 runBinaryGuarded
等,因此咱们知道前面提到的 zone.runUnaryGuarded
就是 Flutter 在运行的这个 zone 里执行已经注册的 _onData
,并捕获异常。
前面咱们说了 Stream
的内部执行流程,那么同步和异步操做时又有什么区别?具体实现时怎么样的呢?
咱们以默认 Stream
流程为例子, StreamController
的工厂建立能够经过 sync
指定同步仍是异步,默认是异步模式的。 而不管异步仍是同步,他们都是继承了 _StreamController
对象,区别仍是在于 mixins
的是哪一个 _EventDispatch
实现:
_AsyncStreamControllerDispatch
_SyncStreamControllerDispatch
上面这两个 _EventDispatch
最大的不一样就是在调用 sendData
提交事件时,是直接调用 StreamSubscription
的 _add
方法,仍是调用 _addPending(new _DelayedData<T>(data));
方法的区别。
以下图, 异步执行的逻辑就是上面说过的 scheduleMicrotask
, 在 _StreamImplEvents
中 scheduleMicrotask
执行后,会调用 _DelayedData
的 perform
,最后经过 _sendData
触发 StreamSubscription
去回调数据 。
在 Stream
中又非为广播和非广播模式,若是是广播模式中,StreamControlle
的实现是由以下所示实现的,他们的基础关系以下图所示:
_SyncBroadcastStreamController
_AsyncBroadcastStreamController
广播和非广播的区别在于调用 _createSubscription
时,内部对接口类 _StreamControllerLifecycle
的实现,同时它们的差别在于:
在 _StreamController
里判断了若是 Stream
是 _isInitialState
的,也就是订阅过的,就直接报错 "Stream has already been listened to." ,只有未订阅的才建立 StreamSubscription
。
在 _BroadcastStreamController
中,_isInitialState
的判断被去掉了,取而代之的是 isClosed
判断,而且在广播中, _sendData
是一个 forEach
执行:
_forEachListener((_BufferingStreamSubscription<T> subscription) {
subscription._add(data);
});
复制代码
Stream
是支持变换处理的,针对 Stream
咱们能够通过屡次变化来获得咱们须要的结果。那么这些变化是怎么实现的呢?
以下图所示,通常操做符变换的 Stream
实现类,都是继承了 _ForwardingStream
, 在它的内部的_ForwardingStreamSubscription
里,会经过上一个 Pre A Stream
的 listen
添加 _handleData
回调,以后在回调里再次调用新的 Current B Stream
的 _handleData
。
因此事件变化的本质就是,变换都是对 Stream
的 listen
嵌套调用组成的。
同时 Stream
还有转换为 Future
, 如 firstWhere
、 elementAt
、 reduce
等操做符方法,基本都是建立一个内部 _Future
实例,而后再 listen
的回调用调用 Future
方法返回。
以下代码所示, 在 Flutter 中经过 StreamBuilder
构建 Widget ,只需提供一个 Stream
实例便可,其中 AsyncSnapshot
对象为数据快照,经过 data
缓存了当前数据和状态,那 StreamBuilder
是如何与 Stream
关联起来的呢?
StreamBuilder<List<String>>(
stream: dataStream,
initialData: ["none"],
///这里的 snapshot 是数据快照的意思
builder: (BuildContext context, AsyncSnapshot<List<String>> snapshot) {
///获取到数据,随心所欲的更新 UI
var data = snapshot.data;
return Container();
});
复制代码
如上图所示, StreamBuilder
的调用逻辑主要在 _StreamBuilderBaseState
中,_StreamBuilderBaseState
在 initState
、didUpdateWidget
中会调用 _subscribe
方法,从而调用 Stream
的 listen
,而后经过 setState
更新UI,就是这么简单有木有?
咱们经常使用的
setState
中实际上是调用了markNeedsBuild
,markNeedsBuild
内部标记element
为diry
,而后在下一帧WidgetsBinding.drawFrame
才会被绘制,这能够看出setState
并非当即生效的哦。
其实不管从订阅或者变换均可以看出, Dart 中的 Stream
已经自带了相似 rx
的效果,可是为了让 rx
的用户们更方便的使用,ReactiveX 就封装了 rxdart
来知足用户的熟悉感,以下图所示为它们的对应关系:
在 rxdart
中, Observable
是一个 Stream
,而 Subject
继承了 Observable
也是一个 Stream
,而且 Subject
实现了 StreamController
的接口,因此它也具备 Controller 的做用。
以下代码所示是 rxdart
的简单使用,能够看出它屏蔽了外界须要对 StreamSubscription
和 StreamSink
等的认知,更符合 rx
历史用户的理解。
final subject = PublishSubject<String>();
subject.stream.listen(observerA);
subject.add("AAAA1");
subject.add("AAAA2"));
subject.stream.listen(observeB);
subject.add("BBBB1");
subject.close();
复制代码
这里咱们简单分析下,以上方代码为例,
PublishSubject
内部实际建立是建立了一个广播 StreamController<T>.broadcast
。
当咱们调用 add
或者 addStream
时,最终会调用到的仍是咱们建立的 StreamController.add
。
当咱们调用 onListen
时,也是将回调设置到 StreamController
中。
rxdart
在作变换时,咱们获取到的 Observable
就是 this,也就是 PublishSubject
自身这个 Stream
,而 Observable
一系列的变换,也是基于建立时传入的 stream
对象,好比:
@override
Observable<S> asyncMap<S>(FutureOr<S> convert(T value)) =>
Observable<S>(_stream.asyncMap(convert));
复制代码
因此咱们能够看出来,rxdart
只是对 Stream
进行了概念变换,变成了咱们熟悉的对象和操做符,而这也是为何 rxdart
能够在 StreamBuilder
中直接使用的缘由。
因此,到这里你对 Flutter 中 Stream 有全面的理解了没?
自此,第十一篇终于结束了!(///▽///)
《Flutter完整开发实战详解(1、Dart语言和Flutter基础)》
《Flutter完整开发实战详解(4、Redux、主题、国际化)》
《Flutter完整开发实战详解(6、 深刻Widget原理)》
《Flutter完整开发实战详解(10、 深刻图片加载流程)》
《Flutter完整开发实战详解(11、全面深刻理解Stream)》
《React Native 的将来与React Hooks》