Flutter 异步机制:microtask

以前说到 Flutter 中与异步相关的 Future,了解到,dart 是一个单线程模型的语言,当咱们使用 Future 去实现一个异步操做时,多半是利用单线程的事件循环机制来模拟,相似于 Android 中的 Looper。c++

不过 dart 中的事件循环其实仍是分两种的,也就是网上常说的两个循环队列,microtask queue 和 event queue,在介绍 Future 的那篇文章里面其实主要说到的是 event queue 这种模式的运做方式,因此打算在此再详细说下 microtask queue 的一些实现细节,以此来讲明,为什么 microtask queue 必定会优先于 event queue 执行。markdown

就拿 Future 来讲,它就能够支持跑在两种队列上,当咱们直接调用 Future()Future.delayed() 的时候,内部都是经过 Timer 实现的,其内部就会利用 ReceivePort/SendPort 进行一次事件循环,在下一次循环时调用对应的 callback,具体的调用过程在 Future 那篇里面有说。而后,当咱们调用Future.microtask建立一个 Future 时,此时它的 callback 就是被添加到 microtask 队列上的,那么 callback 的调用时机就会被提早,下面就简单看下一个 microtask 的执行过程。异步

建立任务

首先,当咱们使用 Future 建立一个 microtask 时,调用的方法为:async

// sdk/lib/async/future.dart
factory Future.microtask(FutureOr<T> computation()) {
  _Future<T> result = new _Future<T>();
  scheduleMicrotask(() {
    try {
      result._complete(computation());
    } catch (e, s) {
      _completeWithErrorCallback(result, e, s);
    }
  });
  return result;
}

// sdk/lib/async/schedule_microtask.dart
@pragma('vm:entry-point', 'call')
void scheduleMicrotask(void Function() callback) {
  _Zone currentZone = Zone._current;
  if (identical(_rootZone, currentZone)) {
    // No need to bind the callback. We know that the root's scheduleMicrotask
    // will be invoked in the root zone.
    _rootScheduleMicrotask(null, null, _rootZone, callback);
    return;
  }
  _ZoneFunction implementation = currentZone._scheduleMicrotask;
  if (identical(_rootZone, implementation.zone) &&
      _rootZone.inSameErrorZone(currentZone)) {
    _rootScheduleMicrotask(
        null, null, currentZone, currentZone.registerCallback(callback));
    return;
  }
  Zone.current.scheduleMicrotask(Zone.current.bindCallbackGuarded(callback));
}
复制代码

结构跟调用 Timer 基本一致,这里主要是调用 scheduleMicrotask 去建立一个 microtask 的;在 scheduleMicrotask 中根据 Zone 类型不一样,选择了不一样的方式建立 microtask,通常状况下,就直接看 RootZone 的实现就能够了,默认状况下,代码就是跑在 RootZone 上的。ide

// sdk/lib/async/zone.dart
void _rootScheduleMicrotask(
    Zone? self, ZoneDelegate? parent, Zone zone, void f()) {
  if (!identical(_rootZone, zone)) {
    bool hasErrorHandler = !_rootZone.inSameErrorZone(zone);
    if (hasErrorHandler) {
      f = zone.bindCallbackGuarded(f);
    } else {
      f = zone.bindCallback(f);
    }
  }
  _scheduleAsyncCallback(f);
}

// sdk/lib/async/schedule_microtask.dart
void _scheduleAsyncCallback(_AsyncCallback callback) {
  _AsyncCallbackEntry newEntry = new _AsyncCallbackEntry(callback);
  _AsyncCallbackEntry? lastCallback = _lastCallback;
  if (lastCallback == null) {
    _nextCallback = _lastCallback = newEntry;
    if (!_isInCallbackLoop) {
      _AsyncRun._scheduleImmediate(_startMicrotaskLoop);
    }
  } else {
    lastCallback.next = newEntry;
    _lastCallback = newEntry;
  }
}
复制代码

以后调用到 _scheduleAsyncCallback,这个函数是比较重要的,_nextCallback 就是 microtask queue 的的队首,_lastCallback 则是队尾,这个函数的做用就是将新的 callback 添加到 microtask 队列中,同时,当此前尚未添加过 microtask 的时候,就须要调用_AsyncRun._scheduleImmediate(_startMicrotaskLoop);开始一轮 microtask 的执行,_scheduleImmediate 是一个 external 函数,其实如今 sdk/lib/_internal/vm/lib/schedule_microtask_patch.dart 中,函数

@patch
class _AsyncRun {
  @patch
  static void _scheduleImmediate(void callback()) {
    final closure = _ScheduleImmediate._closure;
    if (closure == null) {
      throw new UnsupportedError("Microtasks are not supported");
    }
    closure(callback);
  }
}

typedef void _ScheduleImmediateClosure(void callback());

class _ScheduleImmediate {
  static _ScheduleImmediateClosure? _closure;
}

@pragma("vm:entry-point", "call")
void _setScheduleImmediateClosure(_ScheduleImmediateClosure closure) {
  _ScheduleImmediate._closure = closure;
}
复制代码

这里的大体流程就是,给 _scheduleImmediate 传一个 callback,也就是 _startMicrotaskLoop 函数,不过 _scheduleImmediate 也只是转手将 callback 给了 _ScheduleImmediate._closure 执行,可是 _ScheduleImmediate._closure 是经过 _setScheduleImmediateClosure 赋值的,因此这里还须要再看 _setScheduleImmediateClosure 是什么时候被调用。从声明看,这个函数应该是要在 dart vm 中调用的,在 vm 代码中搜索找到,在进行 isolate 初始化时,会依此调用oop

  1. DartIsolate::LoadLibraries
  2. DartRuntimeHooks::Install
  3. InitDartAsync

_setScheduleImmediateClosure 就是在这里被调用的,ui

// dart_runtime_hooks.cc
static void InitDartAsync(Dart_Handle builtin_library, bool is_ui_isolate) {
  Dart_Handle schedule_microtask;
  if (is_ui_isolate) {
    schedule_microtask =
        InvokeFunction(builtin_library, "_getScheduleMicrotaskClosure");
  } else {
    Dart_Handle isolate_lib = Dart_LookupLibrary(ToDart("dart:isolate"));
    Dart_Handle method_name =
        Dart_NewStringFromCString("_getIsolateScheduleImmediateClosure");
    schedule_microtask = Dart_Invoke(isolate_lib, method_name, 0, NULL);
  }
  Dart_Handle async_library = Dart_LookupLibrary(ToDart("dart:async"));
  Dart_Handle set_schedule_microtask = ToDart("_setScheduleImmediateClosure");
  Dart_Handle result = Dart_Invoke(async_library, set_schedule_microtask, 1,
                                   &schedule_microtask);
  PropagateIfError(result);
}
复制代码

在这里,给 set_schedule_microtask 传的参数时 schedule_microtask,这个函数则是来自于名为 _getIsolateScheduleImmediateClosure 的函数,且这就是一个 dart 函数,直接搜索,即可以在 sdk/lib/_internal/vm/lib/isolate_patch.dart 找到函数定义,this

void _isolateScheduleImmediate(void callback()) {
  assert((_pendingImmediateCallback == null) ||
      (_pendingImmediateCallback == callback));
  _pendingImmediateCallback = callback;
}

@pragma("vm:entry-point", "call")
void _runPendingImmediateCallback() {
  final callback = _pendingImmediateCallback;
  if (callback != null) {
    _pendingImmediateCallback = null;
    callback();
  }
}

/// The embedder can execute this function to get hold of
/// [_isolateScheduleImmediate] above.
@pragma("vm:entry-point", "call")
Function _getIsolateScheduleImmediateClosure() {
  return _isolateScheduleImmediate;
}
复制代码

从而得知,_ScheduleImmediate._closure 就是 _isolateScheduleImmediate,因此,callback(也就是 _startMicrotaskLoop)最后做为 _isolateScheduleImmediate 的参数调用,也就是把它赋值给 _pendingImmediateCallback。spa

执行任务

接着看 microtask 的执行,从上面得知,_pendingImmediateCallback 就是 _startMicrotaskLoop,并且 _pendingImmediateCallback 在 _runPendingImmediateCallback 函数中被调用,也就是说,当 _runPendingImmediateCallback 被调用时,便会启动新一轮 microtask 的执行。

看到 _runPendingImmediateCallback 这个名字是否有点眼熟,以前在介绍 Future 的时候,经过查看代码,了解到 dart vm 中处理消息事件,最终会调用 dart 中的 _handleMessage 函数,

@pragma("vm:entry-point", "call")
static void _handleMessage(Function handler, var message) {
  // TODO(floitsch): this relies on the fact that any exception aborts the
  // VM. Once we have non-fatal global exceptions we need to catch errors
  // so that we can run the immediate callbacks.
  handler(message);
  _runPendingImmediateCallback();
}
复制代码

在 dart 中,全部的代码其实都是经过这里调用的,不论是当咱们启动一个 Isolate,仍是经过 Future 执行异步调用。好比当咱们直接启动 dart 时,它会在 vm 中先建立好一个 isolate,而后执行它的 entry point,对于 root isolate,它的 entry point 就是 main,对于自定义的 isolate,它的 entry point 就是外部传入的函数,而执行 entry point 的方式,就是经过事件队列,能够看下面这段代码:

@pragma("vm:entry-point", "call")
void _startIsolate(
    Function entryPoint, List<String>? args, Object? message, bool isSpawnUri) {
  _delayEntrypointInvocation(entryPoint, args, message, isSpawnUri);
}

void _delayEntrypointInvocation(Function entryPoint, List<String>? args,
    Object? message, bool allowZeroOneOrTwoArgs) {
  final port = RawReceivePort();
  port.handler = (_) {
    port.close();
    if (allowZeroOneOrTwoArgs) {
      if (entryPoint is _BinaryFunction) {
        (entryPoint as dynamic)(args, message);
      } else if (entryPoint is _UnaryFunction) {
        (entryPoint as dynamic)(args);
      } else {
        entryPoint();
      }
    } else {
      entryPoint(message);
    }
  };
  port.sendPort.send(null);
}
复制代码

当 isolate 在 vm 中建立好后,c++ 就会调用 _startIsolate 启动 isolate,而在这个函数中,它依旧是经过 ReceivePort/SendPort 向事件队列中发送一个事件,以此来执行 entryPoint。

以上文字,仅为说明 _handleMessage 这个函数的调用时机,同时也是在说明 _runPendingImmediateCallback 的调用时机,也就是 _startMicrotaskLoop。

// sdk/lib/async/schedule_microtask.dart
void _startMicrotaskLoop() {
  _isInCallbackLoop = true;
  try {
    // Moved to separate function because try-finally prevents
    // good optimization.
    _microtaskLoop();
  } finally {
    _lastPriorityCallback = null;
    _isInCallbackLoop = false;
    if (_nextCallback != null) {
      _AsyncRun._scheduleImmediate(_startMicrotaskLoop);
    }
  }
}

void _microtaskLoop() {
  for (var entry = _nextCallback; entry != null; entry = _nextCallback) {
    _lastPriorityCallback = null;
    var next = entry.next;
    _nextCallback = next;
    if (next == null) _lastCallback = null;
    (entry.callback)();
  }
}
复制代码

这个函数的实现仍是挺简单的,就是直接遍历 microtask 队列去执行,有一点,当咱们在 micro task 执行过程当中再建立 microtask 的时候,因为此时 _microtaskLoop 还未结束,因此当这个 microtask 执行完以后,会继续执行新加的 microtask。不过在 _startMicrotaskLoop 中执行 _microtaskLoop 外面加了一个 try...finally 却是没太理解这里的用途,由于从这里直到 _handleMessage 都没见到捕获异常的操做。

总的来讲,以上就是 microtask 从建立到执行的过程,下面具体讲讲几种不一样的代码块具体的执行时机。

microtask 执行时机

从当前的信息了解到,在一个连续的代码中,能够有三种形式的代码调用,

  1. 直接调用
  2. 经过 microtask 调用
  3. 经过事件队列调用

这三种形式的调用,其执行前后为直接调用->microtask->事件队列, 从以前的分析中就很容易理解了,下面再总结一下。

首先,直接调用很好理解,他们都是同在一个函数下,按顺序调用。

而后,microtask 的调用是发生在当前的消息事件调用以后(从 _handleMessage 实现可知),而从以前的分析中得知,全部的关于 dart 代码的调用,其入口都是 _handleMessage,直接执行 main 函数的时候是,新启动一个 isolate 也是,而 _handleMessage 下有两个入口,一个是 handler,一个是 _runPendingImmediateCallback,而无论 microtask 是在这两个函数的哪个中建立的,都会在本次 _handleMessage 调用过程当中执行。

而第三种,消息事件调用就不同了,它必定是要等到下次甚至好几回以后的 _handleMessage 中才会调用,相比之下,它的调用很昂贵,须要通过 dart vm 的一层处理才能调用到。

总结,基于 dart 的单线程事件循环模型,咱们能够将 _handleMessage 看做一次事件循环,那么「直接调用」与「microtask」都是在当次 _handleMessage 的调用中就会调用到,而「事件队列调用」则至少要等到下次 _handleMessage 调用,由此决定了这三种代码调用的执行顺序。

相关文章
相关标签/搜索