Flutter 异步机制:Future(二),发送任务

Flutter 基于 dart 语言,dart 自己是一个单线程模型,Future 也是基于单线程的异步机制,即基于事件循环实现的异步,与多线程实现的异步并不同,比较相似于 Android 中的 Handler 机制,而所谓的异步,就是向事件循环中心发送一条消息,等待调度,在以后的某个时刻执行代码,可是这段代码仍是在当前线程执行的,因此,若是使用 Future 执行耗时任务,它可能不会阻塞当前的 UI 流程,不事后续的一些 UI 操做仍是会受到影响。 使用 Future 异步执行代码须要四个步骤:c++

  1. 建立任务
  2. 发送任务
  3. 执行任务
  4. 执行功能代码

建立任务

关于 Future 任务的建立,在应用层通常是这么写:api

new Future(() {
    doSomething();
});
复制代码

那么,从 Future 的构造函数开始,能够一窥 Future 任务建立的全过程。markdown

factory Future(FutureOr<T> computation()) {
  _Future<T> result = new _Future<T>();
  Timer.run(() {
    try {
      result._complete(computation());
    } catch (e, s) {
      _completeWithErrorCallback(result, e, s);
    }
  });
  return result;
}
复制代码

result._complete(computation()) 即最后的执行功能代码的部分,参见第四小节,Timer.run 则会一步步建立任务。多线程

static void run(void callback()) {
  new Timer(Duration.zero, callback);
}

// third_party/sdk/sdk/lib/async/timer.dart
factory Timer(Duration duration, void callback()) {
  if (Zone.current == Zone.root) {
    // No need to bind the callback. We know that the root's timer will
    // be invoked in the root zone.
    return Zone.current.createTimer(duration, callback);
  }
  return Zone.current
      .createTimer(duration, Zone.current.bindCallbackGuarded(callback));
}

// third_party/sdk/runtime/lib/timer_patch.dart
static Timer _createTimer(Duration duration, void callback()) {
  // TODO(iposva): Remove _TimerFactory and use VMLibraryHooks exclusively.
  if (_TimerFactory._factory == null) {
    _TimerFactory._factory = VMLibraryHooks.timerFactory;
  }
  if (_TimerFactory._factory == null) {
    throw new UnsupportedError("Timer interface not supported.");
  }
  int milliseconds = duration.inMilliseconds;
  if (milliseconds < 0) milliseconds = 0;
  return _TimerFactory._factory(milliseconds, (_) {
    callback();
  }, false);
}
复制代码

这里最后调用 _TimerFactory._factory 建立 Timer 实例,_TimerFactory._factory 来自于 VMLibraryHooks.timerFactory ,而 VMLibraryHooks.timerFactory 的设置时机能够一步步回溯至 InitDartInternal :app

// lib/ui/dart_runtime_hooks.cc
static void InitDartInternal(Dart_Handle builtin_library, bool is_ui_isolate) {
  Dart_Handle print = GetFunction(builtin_library, "_getPrintClosure");

  Dart_Handle internal_library = Dart_LookupLibrary(ToDart("dart:_internal"));

  Dart_Handle result =
      Dart_SetField(internal_library, ToDart("_printClosure"), print);
  PropagateIfError(result);

  if (is_ui_isolate) {
    // Call |_setupHooks| to configure |VMLibraryHooks|.
    Dart_Handle method_name = Dart_NewStringFromCString("_setupHooks");
    result = Dart_Invoke(builtin_library, method_name, 0, NULL);
    PropagateIfError(result);
  }

  Dart_Handle setup_hooks = Dart_NewStringFromCString("_setupHooks");

  Dart_Handle io_lib = Dart_LookupLibrary(ToDart("dart:io"));
  result = Dart_Invoke(io_lib, setup_hooks, 0, NULL);
  PropagateIfError(result);

  Dart_Handle isolate_lib = Dart_LookupLibrary(ToDart("dart:isolate"));
  result = Dart_Invoke(isolate_lib, setup_hooks, 0, NULL);
  PropagateIfError(result);
}

// third_party/sdk/runtime/lib/timer_impl.dart
@pragma("vm:entry-point", "call")
_setupHooks() {
  VMLibraryHooks.timerFactory = _Timer._factory;
}

static Timer _factory(
    int milliSeconds, void callback(Timer timer), bool repeating) {
  if (repeating) {
    return new _Timer.periodic(milliSeconds, callback);
  }
  return new _Timer(milliSeconds, callback);
}
复制代码

_Timer 是 Timer 的实现类,重复执行与不重复执行的 Timer 会调用不一样的构造函数,可是两者异曲同工。less

factory _Timer(int milliSeconds, void callback(Timer timer)) {
  return _createTimer(callback, milliSeconds, false);
}

factory _Timer.periodic(int milliSeconds, void callback(Timer timer)) {
  return _createTimer(callback, milliSeconds, true);
}

static Timer _createTimer(
    void callback(Timer timer), int milliSeconds, bool repeating) {
  // Negative timeouts are treated as if 0 timeout.
  if (milliSeconds < 0) {
    milliSeconds = 0;
  }
  // Add one because DateTime.now() is assumed to round down
  // to nearest millisecond, not up, so that time + duration is before
  // duration milliseconds from now. Using microsecond timers like
  // Stopwatch allows detecting that the timer fires early.
  int now = VMLibraryHooks.timerMillisecondClock();
  int wakeupTime = (milliSeconds == 0) ? now : (now + 1 + milliSeconds);

  _Timer timer =
      new _Timer._internal(callback, wakeupTime, milliSeconds, repeating);
  // Enqueue this newly created timer in the appropriate structure and
  // notify if necessary.
  timer._enqueue();
  return timer;
}
复制代码

_internal 函数是 _Timer 的构造函数,_enqueue 函数将 Timer 放入队列等待执行:异步

// third_party/sdk/runtime/lib/timer_impl.dart
void _enqueue() {
  if (_milliSeconds == 0) {
    if (_firstZeroTimer == null) {
      _lastZeroTimer = this;
      _firstZeroTimer = this;
    } else {
      _lastZeroTimer._indexOrNext = this;
      _lastZeroTimer = this;
    }
    // Every zero timer gets its own event.
    _notifyZeroHandler();
  } else {
    _heap.add(this);
    if (_heap.isFirst(this)) {
      _notifyEventHandler();
    }
  }
}
复制代码

Timer 的延迟是否为 0 是一个分界线,它会将 Timer 分别插入 _lastZeroTimer 和 _heap 中,而后调用 _notifyZeroHandler 或 _notifyEventHandler 通知目标线程处理任务,接下来就是发送任务的过程了。async

发送任务

以 _notifyZeroHandler 为例,ide

// third_party/sdk/runtime/lib/timer_impl.dart
static void _notifyZeroHandler() {
  if (_sendPort == null) {
    _createTimerHandler();
  }
  _sendPort.send(_ZERO_EVENT);
}
复制代码

首先,确保 _sendPort 的存在,而后,使用 _sendPort 发送一条 _ZERO_EVENT 消息。函数

// third_party/sdk/runtime/lib/timer_impl.dart
static void _createTimerHandler() {
  assert(_receivePort == null);
  assert(_sendPort == null);
  _receivePort = new RawReceivePort(_handleMessage);
  _sendPort = _receivePort.sendPort;
  _scheduledWakeupTime = null;
}
复制代码

_receivePort 与 _sendPort 是一对用于通讯的接口,首先调用 RawReceivePort 构造函数建立 _receivePort,而且传递了回调函数 _handleMessage ,而后从 _receivePort 中取出 _sendPort ,可见这个通讯模型的重点就是 _receivePort 的构造过程。

// third_party/sdk/runtime/lib/isolate_patch.dart
@patch
factory RawReceivePort([Function handler]) {
  _RawReceivePortImpl result = new _RawReceivePortImpl();
  result.handler = handler;
  return result;
}

factory _RawReceivePortImpl() native "RawReceivePortImpl_factory";

// third_party/sdk/runtime/lib/isolate.cc
DEFINE_NATIVE_ENTRY(RawReceivePortImpl_factory, 0, 1) {
  ASSERT(
      TypeArguments::CheckedHandle(zone, arguments->NativeArgAt(0)).IsNull());
  Dart_Port port_id = PortMap::CreatePort(isolate->message_handler());
  return ReceivePort::New(port_id, false /* not control port */);
}
复制代码

构造函数是一个 native 函数,在 native 中,首先调用 PortMap::CreatePort 建立出 Dart_Port ,而后调用 ReceivePort::New 建立 ReceivePort 实例,实例化以后,将回调函数 handler 保存到了 map 中,key 为 _get_id,这也是一个 native 函数。

建立 Dart_Port 的参数 isolate->message_handler() 的设置时机为 InitIsolate:

// third_party/sdk/runtime/vm/isolate.cc
Isolate* Isolate::InitIsolate(const char* name_prefix, IsolateGroup* isolate_group, const Dart_IsolateFlags& api_flags, bool is_vm_isolate) {


  // Setup the isolate message handler.
  MessageHandler* handler = new IsolateMessageHandler(result);
  ASSERT(handler != nullptr);
  result->set_message_handler(handler);
}
复制代码
Dart_Port PortMap::CreatePort(MessageHandler* handler) {
  ASSERT(handler != NULL);
  MutexLocker ml(mutex_);
#if defined(DEBUG)
  handler->CheckAccess();
#endif

  Entry entry;
  entry.port = AllocatePort();
  entry.handler = handler;
  entry.state = kNewPort;

  // Search for the first unused slot. Make use of the knowledge that here is
  // currently no port with this id in the port map.
  ASSERT(FindPort(entry.port) < 0);
  intptr_t index = entry.port % capacity_;
  Entry cur = map_[index];
  // Stop the search at the first found unused (free or deleted) slot.
  while (cur.port != 0) {
    index = (index + 1) % capacity_;
    cur = map_[index];
  }

  // Insert the newly created port at the index.
  ASSERT(index >= 0);
  ASSERT(index < capacity_);
  ASSERT(map_[index].port == 0);
  ASSERT((map_[index].handler == NULL) ||
         (map_[index].handler == deleted_entry_));
  if (map_[index].handler == deleted_entry_) {
    // Consuming a deleted entry.
    deleted_--;
  }
  map_[index] = entry;

  // Increment number of used slots and grow if necessary.
  used_++;
  MaintainInvariants();

  if (FLAG_trace_isolates) {
    OS::PrintErr(
        "[+] Opening port: \n"
        "\thandler: %s\n"
        "\tport: %" Pd64 "\n",
        handler->name(), entry.port);
  }

  return entry.port;
}
复制代码

在 CreatePort 中,先是调用 AllocatePort 建立一个端口(先随机生成一个,再判断这个端口没有被使用,就能够返回),而后构建出 Entry 并将其存在一个哈希表中。

RawReceivePort* ReceivePort::New(Dart_Port id, bool is_control_port, Heap::Space space) {
  ASSERT(id != ILLEGAL_PORT);
  Thread* thread = Thread::Current();
  Zone* zone = thread->zone();
  const SendPort& send_port =
      SendPort::Handle(zone, SendPort::New(id, thread->isolate()->origin_id()));

  ReceivePort& result = ReceivePort::Handle(zone);
  {
    RawObject* raw = Object::Allocate(ReceivePort::kClassId,
                                      ReceivePort::InstanceSize(), space);
    NoSafepointScope no_safepoint;
    result ^= raw;
    result.StorePointer(&result.raw_ptr()->send_port_, send_port.raw());
  }
  if (is_control_port) {
    PortMap::SetPortState(id, PortMap::kControlPort);
  } else {
    PortMap::SetPortState(id, PortMap::kLivePort);
  }
  return result.raw();
}
复制代码

在这个函数里面,先是建立了 SendPort ,而后对 ReceivePort 进行了一些初始化操做,并将 ReceivePort 的 RawObject 返回,接着在 third_party/sdk/runtime/lib/timer_impl.dart#_createTimerHandler 中还要经过 _receivePort 取得 _sendPort,最终会调用 _get_sendport,这也是一个 native 函数:

DEFINE_NATIVE_ENTRY(RawReceivePortImpl_get_sendport, 0, 1) {
  GET_NON_NULL_NATIVE_ARGUMENT(ReceivePort, port, arguments->NativeArgAt(0));
  return port.send_port();
}
复制代码

GET_NON_NULL_NATIVE_ARGUMENT 取出 ReceivePort 实例,经过 arguments->NativeArgAt(0) ,可是 _get_sendport 并无传递参数,也就是说这个参数就是 _RawReceivePortImpl 自身,也就是上面返回的 RawObject,而后返回它的 send_port 。

再回到 third_party/sdk/runtime/lib/timer_impl.dart#_notifyZeroHandler,_sendPort 调用 send 函数发送了一条消息,最终会调用 _sendInternal ,再转到 c++ 层:

// third_party/sdk/runtime/lib/isolate.cc
DEFINE_NATIVE_ENTRY(SendPortImpl_sendInternal_, 0, 2) {
  GET_NON_NULL_NATIVE_ARGUMENT(SendPort, port, arguments->NativeArgAt(0));
  // TODO(iposva): Allow for arbitrary messages to be sent.
  GET_NON_NULL_NATIVE_ARGUMENT(Instance, obj, arguments->NativeArgAt(1));

  const Dart_Port destination_port_id = port.Id();
  const bool can_send_any_object = isolate->origin_id() == port.origin_id();

  if (ApiObjectConverter::CanConvert(obj.raw())) {
    PortMap::PostMessage(
        Message::New(destination_port_id, obj.raw(), Message::kNormalPriority));
  } else {
    MessageWriter writer(can_send_any_object);
    // TODO(turnidge): Throw an exception when the return value is false?
    PortMap::PostMessage(writer.WriteMessage(obj, destination_port_id,
                                             Message::kNormalPriority));
  }
  return Object::null();
}
复制代码

首先取出 SendPort 和 Instance 实例,而后建立出 Message 实例,最后调用 PortMap::PostMessage 发送消息。

// third_party/sdk/runtime/vm/port.cc
bool PortMap::PostMessage(std::unique_ptr<Message> message, bool before_events) {
  MutexLocker ml(mutex_);
  intptr_t index = FindPort(message->dest_port());
  if (index < 0) {
    return false;
  }
  ASSERT(index >= 0);
  ASSERT(index < capacity_);
  MessageHandler* handler = map_[index].handler;
  ASSERT(map_[index].port != 0);
  ASSERT((handler != NULL) && (handler != deleted_entry_));
  handler->PostMessage(std::move(message), before_events);
  return true;
}
复制代码

根据 Message 的 dest_port 找到 index ,再从哈希表中取出 handler,这里的 handler 就是在初始化 Dart_Port 传入的 isolate->message_handler() ,也就是 IsolateMessageHandler 实例,不过 IsolateMessageHandler 并无重写 PostMessage 函数。

// third_party/sdk/runtime/vm/message_handler.cc
void MessageHandler::PostMessage(std::unique_ptr<Message> message, bool before_events) {
  Message::Priority saved_priority;

  {
    saved_priority = message->priority();
    if (message->IsOOB()) {
      oob_queue_->Enqueue(std::move(message), before_events);
    } else {
      queue_->Enqueue(std::move(message), before_events);
    }
    if (paused_for_messages_) {
      ml.Notify();
    }

    if (pool_ != nullptr && !task_running_) {
      ASSERT(!delete_me_);
      task_running_ = true;
      const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
      ASSERT(launched_successfully);
    }
  }

  // Invoke any custom message notification.
  MessageNotify(saved_priority);
}
复制代码

message 分为两种,oob 和非 oob,实际上就是优先级的区分:

// third_party/sdk/runtime/vm/message.h
typedef enum {
  kNormalPriority = 0,  // Deliver message when idle.
  kOOBPriority = 1,     // Deliver message asap.

  // Iteration.
  kFirstPriority = 0,
  kNumPriorities = 2,
} Priority;
复制代码

不一样优先级的 message 会被加入不一样的队列,oob_queue_ 和 queue_,完了调用 MessageNotify :

// third_party/sdk/runtime/vm/isolate.cc
void IsolateMessageHandler::MessageNotify(Message::Priority priority) {
  if (priority >= Message::kOOBPriority) {
    // Handle out of band messages even if the mutator thread is busy.
    I->ScheduleInterrupts(Thread::kMessageInterrupt);
  }
  Dart_MessageNotifyCallback callback = I->message_notify_callback();
  if (callback != nullptr) {
    // Allow the embedder to handle message notification.
    (*callback)(Api::CastIsolate(I));
  }
}
复制代码

此处的 priority 是 message 的 priority ,当优先级为 kOOBPriority 时,会中断当前的任务去处理这个 message 。而后从 isolate 中取出 callback 执行,再看 callback 究竟是谁。这要从 DartIsolate::Initialize 开始:

// runtime/dart_isolate.cc
bool DartIsolate::Initialize(Dart_Isolate dart_isolate, bool is_root_isolate) {

  SetMessageHandlingTaskRunner(GetTaskRunners().GetUITaskRunner(),
                               is_root_isolate);

}

void DartIsolate::SetMessageHandlingTaskRunner( fml::RefPtr<fml::TaskRunner> runner, bool is_root_isolate) {
  if (!is_root_isolate || !runner) {
    return;
  }

  message_handling_task_runner_ = runner;

  message_handler().Initialize(
      [runner](std::function<void()> task) { runner->PostTask(task); });
}
复制代码

从这里就能够看出,后续的 callback 会在 message_handling_task_runner_ 中运行,可是它实际上仍是 UITaskRunner ,因此说,Future 的异步是单线程的异步,而后再看:

// third_party/tonic/dart_message_handler.cc
void DartMessageHandler::Initialize(TaskDispatcher dispatcher) {1
  // Only can be called once.
  TONIC_CHECK(!task_dispatcher_ && dispatcher);
  task_dispatcher_ = dispatcher;
  Dart_SetMessageNotifyCallback(MessageNotifyCallback);
}

void DartMessageHandler::MessageNotifyCallback(Dart_Isolate dest_isolate) {
  auto dart_state = DartState::From(dest_isolate);
  TONIC_CHECK(dart_state);
  dart_state->message_handler().OnMessage(dart_state);
}
复制代码
// third_party/sdk/runtime/vm/dart_api_impl.cc
DART_EXPORT void Dart_SetMessageNotifyCallback( Dart_MessageNotifyCallback message_notify_callback) {
  Isolate* isolate = Isolate::Current();
  CHECK_ISOLATE(isolate);

  {
    NoSafepointScope no_safepoint_scope;
    isolate->set_message_notify_callback(message_notify_callback);
  }

  if (message_notify_callback != nullptr && isolate->HasPendingMessages()) {
    ::Dart_ExitIsolate();

    // If a new handler gets installed and there are pending messages in the
    // queue (e.g. OOB messages for doing vm service work) we need to notify
    // the newly registered callback, otherwise the embedder might never get
    // notified about the pending messages.
    message_notify_callback(Api::CastIsolate(isolate));

    ::Dart_EnterIsolate(Api::CastIsolate(isolate));
  }
}
复制代码

从以上代码能够看出,message_notify_callback 就是 MessageNotifyCallback 函数,这个函数调用了 OnMessage ,而 task_dispatcher_ ,就是上面给出的 UITaskRunner 的 dispatcher 。

// third_party/tonic/dart_message_handler.cc
void DartMessageHandler::MessageNotifyCallback(Dart_Isolate dest_isolate) {
  auto dart_state = DartState::From(dest_isolate);
  TONIC_CHECK(dart_state);
  dart_state->message_handler().OnMessage(dart_state);
}

void DartMessageHandler::OnMessage(DartState* dart_state) {
  auto task_dispatcher_ = dart_state->message_handler().task_dispatcher_;

  // Schedule a task to run on the message loop thread.
  auto weak_dart_state = dart_state->GetWeakPtr();
  task_dispatcher_([weak_dart_state]() {
    if (auto dart_state = weak_dart_state.lock()) {
      dart_state->message_handler().OnHandleMessage(dart_state.get());
    }
  });
}
复制代码

从 OnHandleMessage 开始,剩下的代码开始在 task_dispatcher_ 中执行,也能够说从这里开始,开始了接收任务阶段。

接收任务

void DartMessageHandler::OnHandleMessage(DartState* dart_state) {
  if (isolate_had_fatal_error_) {
    // Don't handle any more messages.
    return;
  }

  DartIsolateScope scope(dart_state->isolate());
  DartApiScope dart_api_scope;
  Dart_Handle result = Dart_Null();
  bool error = false;

  // On the first message, check if we should pause on isolate start.
  if (!handled_first_message()) {
    set_handled_first_message(true);
    if (Dart_ShouldPauseOnStart()) {
      // Mark that we are paused on isolate start.
      Dart_SetPausedOnStart(true);
    }
  }

  if (Dart_IsPausedOnStart()) {
      
  } else if (Dart_IsPausedOnExit()) {
      
  } else {
    // We are processing messages normally.
    result = Dart_HandleMessage();
    // If the Dart program has set a return code, then it is intending to shut
    // down by way of a fatal error, and so there is no need to emit a log
    // message.
    if (dart_state->has_set_return_code() && Dart_IsError(result) &&
        Dart_IsFatalError(result)) {
      error = true;
    } else {
      error = LogIfError(result);
    }
    dart_state->MessageEpilogue(result);
    if (!Dart_CurrentIsolate()) {
      isolate_exited_ = true;
      return;
    }
  }

}
复制代码

正常状况下会调用 Dart_HandleMessage:

// third_party/sdk/runtime/vm/dart_api_impl.cc
DART_EXPORT Dart_Handle Dart_HandleMessage() {
  Thread* T = Thread::Current();
  Isolate* I = T->isolate();
  CHECK_API_SCOPE(T);
  CHECK_CALLBACK_STATE(T);
  API_TIMELINE_BEGIN_END_BASIC(T);
  TransitionNativeToVM transition(T);
  if (I->message_handler()->HandleNextMessage() != MessageHandler::kOK) {
    return Api::NewHandle(T, T->StealStickyError());
  }
  return Api::Success();
}

// third_party/sdk/runtime/vm/message_handler.cc
MessageHandler::MessageStatus MessageHandler::HandleNextMessage() {
  // We can only call HandleNextMessage when this handler is not
  // assigned to a thread pool.
  MonitorLocker ml(&monitor_);
  ASSERT(pool_ == NULL);
  ASSERT(!delete_me_);
#if defined(DEBUG)
  CheckAccess();
#endif
  return HandleMessages(&ml, true, false);
}

MessageHandler::MessageStatus MessageHandler::HandleMessages( MonitorLocker* ml, bool allow_normal_messages, bool allow_multiple_normal_messages) {
  ASSERT(monitor_.IsOwnedByCurrentThread());

  // Scheduling of the mutator thread during the isolate start can cause this
  // thread to safepoint.
  // We want to avoid holding the message handler monitor during the safepoint
  // operation to avoid possible deadlocks, which can occur if other threads are
  // sending messages to this message handler.
  //
  // If isolate() returns nullptr [StartIsolateScope] does nothing.
  ml->Exit();
  StartIsolateScope start_isolate(isolate());
  ml->Enter();

  MessageStatus max_status = kOK;
  Message::Priority min_priority =
      ((allow_normal_messages && !paused()) ? Message::kNormalPriority
                                            : Message::kOOBPriority);
  std::unique_ptr<Message> message = DequeueMessage(min_priority);
  while (message != nullptr) {
    intptr_t message_len = message->Size();
    if (FLAG_trace_isolates) {
      OS::PrintErr(
          "[<] Handling message:\n"
          "\tlen: %" Pd
          "\n"
          "\thandler: %s\n"
          "\tport: %" Pd64 "\n",
          message_len, name(), message->dest_port());
    }

    // Release the monitor_ temporarily while we handle the message.
    // The monitor was acquired in MessageHandler::TaskCallback().
    ml->Exit();
    Message::Priority saved_priority = message->priority();
    Dart_Port saved_dest_port = message->dest_port();
    MessageStatus status = HandleMessage(std::move(message));
    if (status > max_status) {
      max_status = status;
    }
    ml->Enter();
    if (FLAG_trace_isolates) {
      OS::PrintErr(
          "[.] Message handled (%s):\n"
          "\tlen: %" Pd
          "\n"
          "\thandler: %s\n"
          "\tport: %" Pd64 "\n",
          MessageStatusString(status), message_len, name(), saved_dest_port);
    }
    // If we are shutting down, do not process any more messages.
    if (status == kShutdown) {
      ClearOOBQueue();
      break;
    }

    // Remember time since the last message. Don't consider OOB messages so
    // using Observatory doesn't trigger additional idle tasks.
    if ((FLAG_idle_timeout_micros != 0) &&
        (saved_priority == Message::kNormalPriority)) {
      idle_start_time_ = OS::GetCurrentMonotonicMicros();
    }

    // Some callers want to process only one normal message and then quit. At
    // the same time it is OK to process multiple OOB messages.
    if ((saved_priority == Message::kNormalPriority) &&
        !allow_multiple_normal_messages) {
      // We processed one normal message. Allow no more.
      allow_normal_messages = false;
    }

    // Reevaluate the minimum allowable priority. The paused state
    // may have changed as part of handling the message. We may also
    // have encountered an error during message processing.
    //
    // Even if we encounter an error, we still process pending OOB
    // messages so that we don't lose the message notification.
    min_priority = (((max_status == kOK) && allow_normal_messages && !paused())
                        ? Message::kNormalPriority
                        : Message::kOOBPriority);
    message = DequeueMessage(min_priority);
  }
  return max_status;
}
复制代码

直到 MessageHandler::HandleMessages 为止,这里又是一个 while 循环,不断调用 DequeueMessage 取出 message ,直到全部的 message 执行完毕,单个 message 的处理,则是调用 HandleMessage ,

在 HandleMessage 中首先作的是获取 msg_handler ,调用的是 DartLibraryCalls::LookupHandler ,

RawObject* DartLibraryCalls::LookupHandler(Dart_Port port_id) {
  Thread* thread = Thread::Current();
  Zone* zone = thread->zone();
  Function& function = Function::Handle(
      zone, thread->isolate()->object_store()->lookup_port_handler());
  const int kTypeArgsLen = 0;
  const int kNumArguments = 1;
  if (function.IsNull()) {
    Library& isolate_lib = Library::Handle(zone, Library::IsolateLibrary());
    ASSERT(!isolate_lib.IsNull());
    const String& class_name = String::Handle(
        zone, isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));
    const String& function_name = String::Handle(
        zone, isolate_lib.PrivateName(Symbols::_lookupHandler()));
    function = Resolver::ResolveStatic(isolate_lib, class_name, function_name,
                                       kTypeArgsLen, kNumArguments,
                                       Object::empty_array());
    ASSERT(!function.IsNull());
    thread->isolate()->object_store()->set_lookup_port_handler(function);
  }
  const Array& args = Array::Handle(zone, Array::New(kNumArguments));
  args.SetAt(0, Integer::Handle(zone, Integer::New(port_id)));
  const Object& result =
      Object::Handle(zone, DartEntry::InvokeFunction(function, args));
  return result.raw();
}
复制代码

能够看出,这就是一个典型的 c++ 调用 dart 的流程,先找到 function ,而后构建参数,最后 DartEntry::InvokeFunction 调用这个函数。从 8~20 行得知,这是 _RawReceivePortImpl 的 _lookupHandler 函数:

@pragma("vm:entry-point", "call")
static _lookupHandler(int id) {
  var result = _handlerMap[id];
  return result;
}
复制代码

根据 id 从 _handlerMap 中找到一个值返回,这个值,其实就是会调函数,设置时机以下:

void set handler(Function value) {
  _handlerMap[this._get_id()] = value;
}
复制代码

这就是在初始化 _RawReceivePortImpl 以后调用的,_get_id 返回的也正是 dest_id 。而后,HandleMessage 中对 message 分为三种状况进行处理:

  1. message 优先级为 oob
  2. message dest_port 为 kIllegalPort
  3. 正常状况

正常的处理以下:

const Object& result =
    Object::Handle(zone, DartLibraryCalls::HandleMessage(msg_handler, msg));
if (result.IsError()) {
  status = ProcessUnhandledException(Error::Cast(result));
} else {
  ASSERT(result.IsNull());
}
复制代码

DartLibraryCalls::HandleMessage:

// third_party/sdk/runtime/vm/dart_entry.cc
RawObject* DartLibraryCalls::HandleMessage(const Object& handler, const Instance& message) {
  Thread* thread = Thread::Current();
  Zone* zone = thread->zone();
  Isolate* isolate = thread->isolate();
  Function& function = Function::Handle(
      zone, isolate->object_store()->handle_message_function());
  const int kTypeArgsLen = 0;
  const int kNumArguments = 2;
  if (function.IsNull()) {
    Library& isolate_lib = Library::Handle(zone, Library::IsolateLibrary());
    ASSERT(!isolate_lib.IsNull());
    const String& class_name = String::Handle(
        zone, isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));
    const String& function_name = String::Handle(
        zone, isolate_lib.PrivateName(Symbols::_handleMessage()));
    function = Resolver::ResolveStatic(isolate_lib, class_name, function_name,
                                       kTypeArgsLen, kNumArguments,
                                       Object::empty_array());
    ASSERT(!function.IsNull());
    isolate->object_store()->set_handle_message_function(function);
  }
  const Array& args = Array::Handle(zone, Array::New(kNumArguments));
  args.SetAt(0, handler);
  args.SetAt(1, message);
#if !defined(PRODUCT)
  if (isolate->debugger()->IsStepping()) {
    // If the isolate is being debugged and the debugger was stepping
    // through code, enable single stepping so debugger will stop
    // at the first location the user is interested in.
    isolate->debugger()->SetResumeAction(Debugger::kStepInto);
  }
#endif
  const Object& result =
      Object::Handle(zone, DartEntry::InvokeFunction(function, args));
  ASSERT(result.IsNull() || result.IsError());
  return result.raw();
}
复制代码

这又是从 c++ 中调用 dart 函数,从代码可知,调用的是 _RawReceivePortImpl 的 _handleMessage 函数:

@pragma("vm:entry-point", "call")
static void _handleMessage(Function handler, var message) {
  // TODO(floitsch): this relies on the fact that any exception aborts the
  // VM. Once we have non-fatal global exceptions we need to catch errors
  // so that we can run the immediate callbacks.
  handler(message);
  _runPendingImmediateCallback();
}
复制代码

以 message 为参数,调用 handler 函数,即最初传进来的 _Timer 的 _handleMessage 函数:

// third_party/sdk/runtime/lib/timer_impl.dart
static void _handleMessage(msg) {
  var pendingTimers;
  if (msg == _ZERO_EVENT) {
    pendingTimers = _queueFromZeroEvent();
    assert(pendingTimers.length > 0);
  } else {
    assert(msg == _TIMEOUT_EVENT);
    _scheduledWakeupTime = null; // Consumed the last scheduled wakeup now.
    pendingTimers = _queueFromTimeoutEvent();
  }
  _runTimers(pendingTimers);
  // Notify the event handler or shutdown the port if no more pending
  // timers are present.
  _notifyEventHandler();
}
复制代码

这里根据 msg 的不一样从不一样的队列中取 pendingTimers ,两者分别实现以下:

static List _queueFromZeroEvent() {
  var pendingTimers = new List();
  assert(_firstZeroTimer != null);
  // Collect pending timers from the timer heap that have an expiration prior
  // to the currently notified zero timer.
  var timer;
  while (!_heap.isEmpty && (_heap.first._compareTo(_firstZeroTimer) < 0)) {
    timer = _heap.removeFirst();
    pendingTimers.add(timer);
  }
  // Append the first zero timer to the pending timers.
  timer = _firstZeroTimer;
  _firstZeroTimer = timer._indexOrNext;
  timer._indexOrNext = null;
  pendingTimers.add(timer);
  return pendingTimers;
}

static List _queueFromTimeoutEvent() {
  var pendingTimers = new List();
  if (_firstZeroTimer != null) {
    // Collect pending timers from the timer heap that have an expiration
    // prior to the next zero timer.
    // By definition the first zero timer has been scheduled before the
    // current time, meaning all timers which are "less than" the first zero
    // timer are expired. The first zero timer will be dispatched when its
    // corresponding message is delivered.
    var timer;
    while (!_heap.isEmpty && (_heap.first._compareTo(_firstZeroTimer) < 0)) {
      timer = _heap.removeFirst();
      pendingTimers.add(timer);
    }
  } else {
    // Collect pending timers from the timer heap which have expired at this
    // time.
    var currentTime = VMLibraryHooks.timerMillisecondClock();
    var timer;
    while (!_heap.isEmpty && (_heap.first._wakeupTime <= currentTime)) {
      timer = _heap.removeFirst();
      pendingTimers.add(timer);
    }
  }
  return pendingTimers;
}
复制代码

当 msg 为 _ZERO_EVENT 时,会取出一个 _firstZeroTimer 队列中的任务和 n 个 _heap 队列中达到执行时间的任务,而当 msg 不为 _ZERO_EVENT 时,则会取出 n 个执行时间先于 _firstZeroTimer 第一个任务的的任务,或者是执行时间先于当前时间的任务。取完以后则是调用 _runTimers 执行任务。

static void _runTimers(List pendingTimers) {
  // If there are no pending timers currently reset the id space before we
  // have a chance to enqueue new timers.
  if (_heap.isEmpty && (_firstZeroTimer == null)) {
    _idCount = 0;
  }

  // Fast exit if no pending timers.
  if (pendingTimers.length == 0) {
    return;
  }

  // Trigger all of the pending timers. New timers added as part of the
  // callbacks will be enqueued now and notified in the next spin at the
  // earliest.
  _handlingCallbacks = true;
  var i = 0;
  try {
    for (; i < pendingTimers.length; i++) {
      // Next pending timer.
      var timer = pendingTimers[i];
      timer._indexOrNext = null;

      // One of the timers in the pending_timers list can cancel
      // one of the later timers which will set the callback to
      // null. Or the pending zero timer has been canceled earlier.
      if (timer._callback != null) {
        var callback = timer._callback;
        if (!timer._repeating) {
          // Mark timer as inactive.
          timer._callback = null;
        } else if (timer._milliSeconds > 0) {
          var ms = timer._milliSeconds;
          int overdue =
              VMLibraryHooks.timerMillisecondClock() - timer._wakeupTime;
          if (overdue > ms) {
            int missedTicks = overdue ~/ ms;
            timer._wakeupTime += missedTicks * ms;
            timer._tick += missedTicks;
          }
        }
        timer._tick += 1;

        callback(timer);
        // Re-insert repeating timer if not canceled.
        if (timer._repeating && (timer._callback != null)) {
          timer._advanceWakeupTime();
          timer._enqueue();
        }
        // Execute pending micro tasks.
        var immediateCallback = _removePendingImmediateCallback();
        if (immediateCallback != null) {
          immediateCallback();
        }
      }
    }
  } finally {
    _handlingCallbacks = false;
    // Re-queue timers we didn't get to.
    for (i++; i < pendingTimers.length; i++) {
      var timer = pendingTimers[i];
      timer._enqueue();
    }
    _notifyEventHandler();
  }
}
复制代码

基本逻辑就是一个 for 循环,对于每个 timer,执行其 callback 函数,一步步回溯回去,就会调用到 result._complete(computation()) 函数,今后处开始,就到了上层逻辑代码。

相关文章
相关标签/搜索