Flutter 基于 dart 语言,dart 自己是一个单线程模型,Future 也是基于单线程的异步机制,即基于事件循环实现的异步,与多线程实现的异步并不同,比较相似于 Android 中的 Handler 机制,而所谓的异步,就是向事件循环中心发送一条消息,等待调度,在以后的某个时刻执行代码,可是这段代码仍是在当前线程执行的,因此,若是使用 Future 执行耗时任务,它可能不会阻塞当前的 UI 流程,不事后续的一些 UI 操做仍是会受到影响。 使用 Future 异步执行代码须要四个步骤:c++
关于 Future 任务的建立,在应用层通常是这么写:api
new Future(() {
doSomething();
});
复制代码
那么,从 Future 的构造函数开始,能够一窥 Future 任务建立的全过程。markdown
factory Future(FutureOr<T> computation()) {
_Future<T> result = new _Future<T>();
Timer.run(() {
try {
result._complete(computation());
} catch (e, s) {
_completeWithErrorCallback(result, e, s);
}
});
return result;
}
复制代码
result._complete(computation()) 即最后的执行功能代码的部分,参见第四小节,Timer.run 则会一步步建立任务。多线程
static void run(void callback()) {
new Timer(Duration.zero, callback);
}
// third_party/sdk/sdk/lib/async/timer.dart
factory Timer(Duration duration, void callback()) {
if (Zone.current == Zone.root) {
// No need to bind the callback. We know that the root's timer will
// be invoked in the root zone.
return Zone.current.createTimer(duration, callback);
}
return Zone.current
.createTimer(duration, Zone.current.bindCallbackGuarded(callback));
}
// third_party/sdk/runtime/lib/timer_patch.dart
static Timer _createTimer(Duration duration, void callback()) {
// TODO(iposva): Remove _TimerFactory and use VMLibraryHooks exclusively.
if (_TimerFactory._factory == null) {
_TimerFactory._factory = VMLibraryHooks.timerFactory;
}
if (_TimerFactory._factory == null) {
throw new UnsupportedError("Timer interface not supported.");
}
int milliseconds = duration.inMilliseconds;
if (milliseconds < 0) milliseconds = 0;
return _TimerFactory._factory(milliseconds, (_) {
callback();
}, false);
}
复制代码
这里最后调用 _TimerFactory._factory 建立 Timer 实例,_TimerFactory._factory 来自于 VMLibraryHooks.timerFactory ,而 VMLibraryHooks.timerFactory 的设置时机能够一步步回溯至 InitDartInternal :app
// lib/ui/dart_runtime_hooks.cc
static void InitDartInternal(Dart_Handle builtin_library, bool is_ui_isolate) {
Dart_Handle print = GetFunction(builtin_library, "_getPrintClosure");
Dart_Handle internal_library = Dart_LookupLibrary(ToDart("dart:_internal"));
Dart_Handle result =
Dart_SetField(internal_library, ToDart("_printClosure"), print);
PropagateIfError(result);
if (is_ui_isolate) {
// Call |_setupHooks| to configure |VMLibraryHooks|.
Dart_Handle method_name = Dart_NewStringFromCString("_setupHooks");
result = Dart_Invoke(builtin_library, method_name, 0, NULL);
PropagateIfError(result);
}
Dart_Handle setup_hooks = Dart_NewStringFromCString("_setupHooks");
Dart_Handle io_lib = Dart_LookupLibrary(ToDart("dart:io"));
result = Dart_Invoke(io_lib, setup_hooks, 0, NULL);
PropagateIfError(result);
Dart_Handle isolate_lib = Dart_LookupLibrary(ToDart("dart:isolate"));
result = Dart_Invoke(isolate_lib, setup_hooks, 0, NULL);
PropagateIfError(result);
}
// third_party/sdk/runtime/lib/timer_impl.dart
@pragma("vm:entry-point", "call")
_setupHooks() {
VMLibraryHooks.timerFactory = _Timer._factory;
}
static Timer _factory(
int milliSeconds, void callback(Timer timer), bool repeating) {
if (repeating) {
return new _Timer.periodic(milliSeconds, callback);
}
return new _Timer(milliSeconds, callback);
}
复制代码
_Timer 是 Timer 的实现类,重复执行与不重复执行的 Timer 会调用不一样的构造函数,可是两者异曲同工。less
factory _Timer(int milliSeconds, void callback(Timer timer)) {
return _createTimer(callback, milliSeconds, false);
}
factory _Timer.periodic(int milliSeconds, void callback(Timer timer)) {
return _createTimer(callback, milliSeconds, true);
}
static Timer _createTimer(
void callback(Timer timer), int milliSeconds, bool repeating) {
// Negative timeouts are treated as if 0 timeout.
if (milliSeconds < 0) {
milliSeconds = 0;
}
// Add one because DateTime.now() is assumed to round down
// to nearest millisecond, not up, so that time + duration is before
// duration milliseconds from now. Using microsecond timers like
// Stopwatch allows detecting that the timer fires early.
int now = VMLibraryHooks.timerMillisecondClock();
int wakeupTime = (milliSeconds == 0) ? now : (now + 1 + milliSeconds);
_Timer timer =
new _Timer._internal(callback, wakeupTime, milliSeconds, repeating);
// Enqueue this newly created timer in the appropriate structure and
// notify if necessary.
timer._enqueue();
return timer;
}
复制代码
_internal 函数是 _Timer 的构造函数,_enqueue 函数将 Timer 放入队列等待执行:异步
// third_party/sdk/runtime/lib/timer_impl.dart
void _enqueue() {
if (_milliSeconds == 0) {
if (_firstZeroTimer == null) {
_lastZeroTimer = this;
_firstZeroTimer = this;
} else {
_lastZeroTimer._indexOrNext = this;
_lastZeroTimer = this;
}
// Every zero timer gets its own event.
_notifyZeroHandler();
} else {
_heap.add(this);
if (_heap.isFirst(this)) {
_notifyEventHandler();
}
}
}
复制代码
Timer 的延迟是否为 0 是一个分界线,它会将 Timer 分别插入 _lastZeroTimer 和 _heap 中,而后调用 _notifyZeroHandler 或 _notifyEventHandler 通知目标线程处理任务,接下来就是发送任务的过程了。async
以 _notifyZeroHandler 为例,ide
// third_party/sdk/runtime/lib/timer_impl.dart
static void _notifyZeroHandler() {
if (_sendPort == null) {
_createTimerHandler();
}
_sendPort.send(_ZERO_EVENT);
}
复制代码
首先,确保 _sendPort 的存在,而后,使用 _sendPort 发送一条 _ZERO_EVENT 消息。函数
// third_party/sdk/runtime/lib/timer_impl.dart
static void _createTimerHandler() {
assert(_receivePort == null);
assert(_sendPort == null);
_receivePort = new RawReceivePort(_handleMessage);
_sendPort = _receivePort.sendPort;
_scheduledWakeupTime = null;
}
复制代码
_receivePort 与 _sendPort 是一对用于通讯的接口,首先调用 RawReceivePort 构造函数建立 _receivePort,而且传递了回调函数 _handleMessage ,而后从 _receivePort 中取出 _sendPort ,可见这个通讯模型的重点就是 _receivePort 的构造过程。
// third_party/sdk/runtime/lib/isolate_patch.dart
@patch
factory RawReceivePort([Function handler]) {
_RawReceivePortImpl result = new _RawReceivePortImpl();
result.handler = handler;
return result;
}
factory _RawReceivePortImpl() native "RawReceivePortImpl_factory";
// third_party/sdk/runtime/lib/isolate.cc
DEFINE_NATIVE_ENTRY(RawReceivePortImpl_factory, 0, 1) {
ASSERT(
TypeArguments::CheckedHandle(zone, arguments->NativeArgAt(0)).IsNull());
Dart_Port port_id = PortMap::CreatePort(isolate->message_handler());
return ReceivePort::New(port_id, false /* not control port */);
}
复制代码
构造函数是一个 native 函数,在 native 中,首先调用 PortMap::CreatePort 建立出 Dart_Port ,而后调用 ReceivePort::New 建立 ReceivePort 实例,实例化以后,将回调函数 handler 保存到了 map 中,key 为 _get_id,这也是一个 native 函数。
建立 Dart_Port 的参数 isolate->message_handler() 的设置时机为 InitIsolate:
// third_party/sdk/runtime/vm/isolate.cc
Isolate* Isolate::InitIsolate(const char* name_prefix, IsolateGroup* isolate_group, const Dart_IsolateFlags& api_flags, bool is_vm_isolate) {
// Setup the isolate message handler.
MessageHandler* handler = new IsolateMessageHandler(result);
ASSERT(handler != nullptr);
result->set_message_handler(handler);
}
复制代码
Dart_Port PortMap::CreatePort(MessageHandler* handler) {
ASSERT(handler != NULL);
MutexLocker ml(mutex_);
#if defined(DEBUG)
handler->CheckAccess();
#endif
Entry entry;
entry.port = AllocatePort();
entry.handler = handler;
entry.state = kNewPort;
// Search for the first unused slot. Make use of the knowledge that here is
// currently no port with this id in the port map.
ASSERT(FindPort(entry.port) < 0);
intptr_t index = entry.port % capacity_;
Entry cur = map_[index];
// Stop the search at the first found unused (free or deleted) slot.
while (cur.port != 0) {
index = (index + 1) % capacity_;
cur = map_[index];
}
// Insert the newly created port at the index.
ASSERT(index >= 0);
ASSERT(index < capacity_);
ASSERT(map_[index].port == 0);
ASSERT((map_[index].handler == NULL) ||
(map_[index].handler == deleted_entry_));
if (map_[index].handler == deleted_entry_) {
// Consuming a deleted entry.
deleted_--;
}
map_[index] = entry;
// Increment number of used slots and grow if necessary.
used_++;
MaintainInvariants();
if (FLAG_trace_isolates) {
OS::PrintErr(
"[+] Opening port: \n"
"\thandler: %s\n"
"\tport: %" Pd64 "\n",
handler->name(), entry.port);
}
return entry.port;
}
复制代码
在 CreatePort 中,先是调用 AllocatePort 建立一个端口(先随机生成一个,再判断这个端口没有被使用,就能够返回),而后构建出 Entry 并将其存在一个哈希表中。
RawReceivePort* ReceivePort::New(Dart_Port id, bool is_control_port, Heap::Space space) {
ASSERT(id != ILLEGAL_PORT);
Thread* thread = Thread::Current();
Zone* zone = thread->zone();
const SendPort& send_port =
SendPort::Handle(zone, SendPort::New(id, thread->isolate()->origin_id()));
ReceivePort& result = ReceivePort::Handle(zone);
{
RawObject* raw = Object::Allocate(ReceivePort::kClassId,
ReceivePort::InstanceSize(), space);
NoSafepointScope no_safepoint;
result ^= raw;
result.StorePointer(&result.raw_ptr()->send_port_, send_port.raw());
}
if (is_control_port) {
PortMap::SetPortState(id, PortMap::kControlPort);
} else {
PortMap::SetPortState(id, PortMap::kLivePort);
}
return result.raw();
}
复制代码
在这个函数里面,先是建立了 SendPort ,而后对 ReceivePort 进行了一些初始化操做,并将 ReceivePort 的 RawObject 返回,接着在 third_party/sdk/runtime/lib/timer_impl.dart#_createTimerHandler 中还要经过 _receivePort 取得 _sendPort,最终会调用 _get_sendport,这也是一个 native 函数:
DEFINE_NATIVE_ENTRY(RawReceivePortImpl_get_sendport, 0, 1) {
GET_NON_NULL_NATIVE_ARGUMENT(ReceivePort, port, arguments->NativeArgAt(0));
return port.send_port();
}
复制代码
GET_NON_NULL_NATIVE_ARGUMENT 取出 ReceivePort 实例,经过 arguments->NativeArgAt(0) ,可是 _get_sendport 并无传递参数,也就是说这个参数就是 _RawReceivePortImpl 自身,也就是上面返回的 RawObject,而后返回它的 send_port 。
再回到 third_party/sdk/runtime/lib/timer_impl.dart#_notifyZeroHandler,_sendPort 调用 send 函数发送了一条消息,最终会调用 _sendInternal ,再转到 c++ 层:
// third_party/sdk/runtime/lib/isolate.cc
DEFINE_NATIVE_ENTRY(SendPortImpl_sendInternal_, 0, 2) {
GET_NON_NULL_NATIVE_ARGUMENT(SendPort, port, arguments->NativeArgAt(0));
// TODO(iposva): Allow for arbitrary messages to be sent.
GET_NON_NULL_NATIVE_ARGUMENT(Instance, obj, arguments->NativeArgAt(1));
const Dart_Port destination_port_id = port.Id();
const bool can_send_any_object = isolate->origin_id() == port.origin_id();
if (ApiObjectConverter::CanConvert(obj.raw())) {
PortMap::PostMessage(
Message::New(destination_port_id, obj.raw(), Message::kNormalPriority));
} else {
MessageWriter writer(can_send_any_object);
// TODO(turnidge): Throw an exception when the return value is false?
PortMap::PostMessage(writer.WriteMessage(obj, destination_port_id,
Message::kNormalPriority));
}
return Object::null();
}
复制代码
首先取出 SendPort 和 Instance 实例,而后建立出 Message 实例,最后调用 PortMap::PostMessage 发送消息。
// third_party/sdk/runtime/vm/port.cc
bool PortMap::PostMessage(std::unique_ptr<Message> message, bool before_events) {
MutexLocker ml(mutex_);
intptr_t index = FindPort(message->dest_port());
if (index < 0) {
return false;
}
ASSERT(index >= 0);
ASSERT(index < capacity_);
MessageHandler* handler = map_[index].handler;
ASSERT(map_[index].port != 0);
ASSERT((handler != NULL) && (handler != deleted_entry_));
handler->PostMessage(std::move(message), before_events);
return true;
}
复制代码
根据 Message 的 dest_port 找到 index ,再从哈希表中取出 handler,这里的 handler 就是在初始化 Dart_Port 传入的 isolate->message_handler() ,也就是 IsolateMessageHandler 实例,不过 IsolateMessageHandler 并无重写 PostMessage 函数。
// third_party/sdk/runtime/vm/message_handler.cc
void MessageHandler::PostMessage(std::unique_ptr<Message> message, bool before_events) {
Message::Priority saved_priority;
{
saved_priority = message->priority();
if (message->IsOOB()) {
oob_queue_->Enqueue(std::move(message), before_events);
} else {
queue_->Enqueue(std::move(message), before_events);
}
if (paused_for_messages_) {
ml.Notify();
}
if (pool_ != nullptr && !task_running_) {
ASSERT(!delete_me_);
task_running_ = true;
const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
ASSERT(launched_successfully);
}
}
// Invoke any custom message notification.
MessageNotify(saved_priority);
}
复制代码
message 分为两种,oob 和非 oob,实际上就是优先级的区分:
// third_party/sdk/runtime/vm/message.h
typedef enum {
kNormalPriority = 0, // Deliver message when idle.
kOOBPriority = 1, // Deliver message asap.
// Iteration.
kFirstPriority = 0,
kNumPriorities = 2,
} Priority;
复制代码
不一样优先级的 message 会被加入不一样的队列,oob_queue_ 和 queue_,完了调用 MessageNotify :
// third_party/sdk/runtime/vm/isolate.cc
void IsolateMessageHandler::MessageNotify(Message::Priority priority) {
if (priority >= Message::kOOBPriority) {
// Handle out of band messages even if the mutator thread is busy.
I->ScheduleInterrupts(Thread::kMessageInterrupt);
}
Dart_MessageNotifyCallback callback = I->message_notify_callback();
if (callback != nullptr) {
// Allow the embedder to handle message notification.
(*callback)(Api::CastIsolate(I));
}
}
复制代码
此处的 priority 是 message 的 priority ,当优先级为 kOOBPriority 时,会中断当前的任务去处理这个 message 。而后从 isolate 中取出 callback 执行,再看 callback 究竟是谁。这要从 DartIsolate::Initialize 开始:
// runtime/dart_isolate.cc
bool DartIsolate::Initialize(Dart_Isolate dart_isolate, bool is_root_isolate) {
SetMessageHandlingTaskRunner(GetTaskRunners().GetUITaskRunner(),
is_root_isolate);
}
void DartIsolate::SetMessageHandlingTaskRunner( fml::RefPtr<fml::TaskRunner> runner, bool is_root_isolate) {
if (!is_root_isolate || !runner) {
return;
}
message_handling_task_runner_ = runner;
message_handler().Initialize(
[runner](std::function<void()> task) { runner->PostTask(task); });
}
复制代码
从这里就能够看出,后续的 callback 会在 message_handling_task_runner_ 中运行,可是它实际上仍是 UITaskRunner ,因此说,Future 的异步是单线程的异步,而后再看:
// third_party/tonic/dart_message_handler.cc
void DartMessageHandler::Initialize(TaskDispatcher dispatcher) {1
// Only can be called once.
TONIC_CHECK(!task_dispatcher_ && dispatcher);
task_dispatcher_ = dispatcher;
Dart_SetMessageNotifyCallback(MessageNotifyCallback);
}
void DartMessageHandler::MessageNotifyCallback(Dart_Isolate dest_isolate) {
auto dart_state = DartState::From(dest_isolate);
TONIC_CHECK(dart_state);
dart_state->message_handler().OnMessage(dart_state);
}
复制代码
// third_party/sdk/runtime/vm/dart_api_impl.cc
DART_EXPORT void Dart_SetMessageNotifyCallback( Dart_MessageNotifyCallback message_notify_callback) {
Isolate* isolate = Isolate::Current();
CHECK_ISOLATE(isolate);
{
NoSafepointScope no_safepoint_scope;
isolate->set_message_notify_callback(message_notify_callback);
}
if (message_notify_callback != nullptr && isolate->HasPendingMessages()) {
::Dart_ExitIsolate();
// If a new handler gets installed and there are pending messages in the
// queue (e.g. OOB messages for doing vm service work) we need to notify
// the newly registered callback, otherwise the embedder might never get
// notified about the pending messages.
message_notify_callback(Api::CastIsolate(isolate));
::Dart_EnterIsolate(Api::CastIsolate(isolate));
}
}
复制代码
从以上代码能够看出,message_notify_callback 就是 MessageNotifyCallback 函数,这个函数调用了 OnMessage ,而 task_dispatcher_ ,就是上面给出的 UITaskRunner 的 dispatcher 。
// third_party/tonic/dart_message_handler.cc
void DartMessageHandler::MessageNotifyCallback(Dart_Isolate dest_isolate) {
auto dart_state = DartState::From(dest_isolate);
TONIC_CHECK(dart_state);
dart_state->message_handler().OnMessage(dart_state);
}
void DartMessageHandler::OnMessage(DartState* dart_state) {
auto task_dispatcher_ = dart_state->message_handler().task_dispatcher_;
// Schedule a task to run on the message loop thread.
auto weak_dart_state = dart_state->GetWeakPtr();
task_dispatcher_([weak_dart_state]() {
if (auto dart_state = weak_dart_state.lock()) {
dart_state->message_handler().OnHandleMessage(dart_state.get());
}
});
}
复制代码
从 OnHandleMessage 开始,剩下的代码开始在 task_dispatcher_ 中执行,也能够说从这里开始,开始了接收任务阶段。
void DartMessageHandler::OnHandleMessage(DartState* dart_state) {
if (isolate_had_fatal_error_) {
// Don't handle any more messages.
return;
}
DartIsolateScope scope(dart_state->isolate());
DartApiScope dart_api_scope;
Dart_Handle result = Dart_Null();
bool error = false;
// On the first message, check if we should pause on isolate start.
if (!handled_first_message()) {
set_handled_first_message(true);
if (Dart_ShouldPauseOnStart()) {
// Mark that we are paused on isolate start.
Dart_SetPausedOnStart(true);
}
}
if (Dart_IsPausedOnStart()) {
} else if (Dart_IsPausedOnExit()) {
} else {
// We are processing messages normally.
result = Dart_HandleMessage();
// If the Dart program has set a return code, then it is intending to shut
// down by way of a fatal error, and so there is no need to emit a log
// message.
if (dart_state->has_set_return_code() && Dart_IsError(result) &&
Dart_IsFatalError(result)) {
error = true;
} else {
error = LogIfError(result);
}
dart_state->MessageEpilogue(result);
if (!Dart_CurrentIsolate()) {
isolate_exited_ = true;
return;
}
}
}
复制代码
正常状况下会调用 Dart_HandleMessage:
// third_party/sdk/runtime/vm/dart_api_impl.cc
DART_EXPORT Dart_Handle Dart_HandleMessage() {
Thread* T = Thread::Current();
Isolate* I = T->isolate();
CHECK_API_SCOPE(T);
CHECK_CALLBACK_STATE(T);
API_TIMELINE_BEGIN_END_BASIC(T);
TransitionNativeToVM transition(T);
if (I->message_handler()->HandleNextMessage() != MessageHandler::kOK) {
return Api::NewHandle(T, T->StealStickyError());
}
return Api::Success();
}
// third_party/sdk/runtime/vm/message_handler.cc
MessageHandler::MessageStatus MessageHandler::HandleNextMessage() {
// We can only call HandleNextMessage when this handler is not
// assigned to a thread pool.
MonitorLocker ml(&monitor_);
ASSERT(pool_ == NULL);
ASSERT(!delete_me_);
#if defined(DEBUG)
CheckAccess();
#endif
return HandleMessages(&ml, true, false);
}
MessageHandler::MessageStatus MessageHandler::HandleMessages( MonitorLocker* ml, bool allow_normal_messages, bool allow_multiple_normal_messages) {
ASSERT(monitor_.IsOwnedByCurrentThread());
// Scheduling of the mutator thread during the isolate start can cause this
// thread to safepoint.
// We want to avoid holding the message handler monitor during the safepoint
// operation to avoid possible deadlocks, which can occur if other threads are
// sending messages to this message handler.
//
// If isolate() returns nullptr [StartIsolateScope] does nothing.
ml->Exit();
StartIsolateScope start_isolate(isolate());
ml->Enter();
MessageStatus max_status = kOK;
Message::Priority min_priority =
((allow_normal_messages && !paused()) ? Message::kNormalPriority
: Message::kOOBPriority);
std::unique_ptr<Message> message = DequeueMessage(min_priority);
while (message != nullptr) {
intptr_t message_len = message->Size();
if (FLAG_trace_isolates) {
OS::PrintErr(
"[<] Handling message:\n"
"\tlen: %" Pd
"\n"
"\thandler: %s\n"
"\tport: %" Pd64 "\n",
message_len, name(), message->dest_port());
}
// Release the monitor_ temporarily while we handle the message.
// The monitor was acquired in MessageHandler::TaskCallback().
ml->Exit();
Message::Priority saved_priority = message->priority();
Dart_Port saved_dest_port = message->dest_port();
MessageStatus status = HandleMessage(std::move(message));
if (status > max_status) {
max_status = status;
}
ml->Enter();
if (FLAG_trace_isolates) {
OS::PrintErr(
"[.] Message handled (%s):\n"
"\tlen: %" Pd
"\n"
"\thandler: %s\n"
"\tport: %" Pd64 "\n",
MessageStatusString(status), message_len, name(), saved_dest_port);
}
// If we are shutting down, do not process any more messages.
if (status == kShutdown) {
ClearOOBQueue();
break;
}
// Remember time since the last message. Don't consider OOB messages so
// using Observatory doesn't trigger additional idle tasks.
if ((FLAG_idle_timeout_micros != 0) &&
(saved_priority == Message::kNormalPriority)) {
idle_start_time_ = OS::GetCurrentMonotonicMicros();
}
// Some callers want to process only one normal message and then quit. At
// the same time it is OK to process multiple OOB messages.
if ((saved_priority == Message::kNormalPriority) &&
!allow_multiple_normal_messages) {
// We processed one normal message. Allow no more.
allow_normal_messages = false;
}
// Reevaluate the minimum allowable priority. The paused state
// may have changed as part of handling the message. We may also
// have encountered an error during message processing.
//
// Even if we encounter an error, we still process pending OOB
// messages so that we don't lose the message notification.
min_priority = (((max_status == kOK) && allow_normal_messages && !paused())
? Message::kNormalPriority
: Message::kOOBPriority);
message = DequeueMessage(min_priority);
}
return max_status;
}
复制代码
直到 MessageHandler::HandleMessages 为止,这里又是一个 while 循环,不断调用 DequeueMessage 取出 message ,直到全部的 message 执行完毕,单个 message 的处理,则是调用 HandleMessage ,
在 HandleMessage 中首先作的是获取 msg_handler ,调用的是 DartLibraryCalls::LookupHandler ,
RawObject* DartLibraryCalls::LookupHandler(Dart_Port port_id) {
Thread* thread = Thread::Current();
Zone* zone = thread->zone();
Function& function = Function::Handle(
zone, thread->isolate()->object_store()->lookup_port_handler());
const int kTypeArgsLen = 0;
const int kNumArguments = 1;
if (function.IsNull()) {
Library& isolate_lib = Library::Handle(zone, Library::IsolateLibrary());
ASSERT(!isolate_lib.IsNull());
const String& class_name = String::Handle(
zone, isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));
const String& function_name = String::Handle(
zone, isolate_lib.PrivateName(Symbols::_lookupHandler()));
function = Resolver::ResolveStatic(isolate_lib, class_name, function_name,
kTypeArgsLen, kNumArguments,
Object::empty_array());
ASSERT(!function.IsNull());
thread->isolate()->object_store()->set_lookup_port_handler(function);
}
const Array& args = Array::Handle(zone, Array::New(kNumArguments));
args.SetAt(0, Integer::Handle(zone, Integer::New(port_id)));
const Object& result =
Object::Handle(zone, DartEntry::InvokeFunction(function, args));
return result.raw();
}
复制代码
能够看出,这就是一个典型的 c++ 调用 dart 的流程,先找到 function ,而后构建参数,最后 DartEntry::InvokeFunction 调用这个函数。从 8~20 行得知,这是 _RawReceivePortImpl 的 _lookupHandler 函数:
@pragma("vm:entry-point", "call")
static _lookupHandler(int id) {
var result = _handlerMap[id];
return result;
}
复制代码
根据 id 从 _handlerMap 中找到一个值返回,这个值,其实就是会调函数,设置时机以下:
void set handler(Function value) {
_handlerMap[this._get_id()] = value;
}
复制代码
这就是在初始化 _RawReceivePortImpl 以后调用的,_get_id 返回的也正是 dest_id 。而后,HandleMessage 中对 message 分为三种状况进行处理:
正常的处理以下:
const Object& result =
Object::Handle(zone, DartLibraryCalls::HandleMessage(msg_handler, msg));
if (result.IsError()) {
status = ProcessUnhandledException(Error::Cast(result));
} else {
ASSERT(result.IsNull());
}
复制代码
DartLibraryCalls::HandleMessage:
// third_party/sdk/runtime/vm/dart_entry.cc
RawObject* DartLibraryCalls::HandleMessage(const Object& handler, const Instance& message) {
Thread* thread = Thread::Current();
Zone* zone = thread->zone();
Isolate* isolate = thread->isolate();
Function& function = Function::Handle(
zone, isolate->object_store()->handle_message_function());
const int kTypeArgsLen = 0;
const int kNumArguments = 2;
if (function.IsNull()) {
Library& isolate_lib = Library::Handle(zone, Library::IsolateLibrary());
ASSERT(!isolate_lib.IsNull());
const String& class_name = String::Handle(
zone, isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));
const String& function_name = String::Handle(
zone, isolate_lib.PrivateName(Symbols::_handleMessage()));
function = Resolver::ResolveStatic(isolate_lib, class_name, function_name,
kTypeArgsLen, kNumArguments,
Object::empty_array());
ASSERT(!function.IsNull());
isolate->object_store()->set_handle_message_function(function);
}
const Array& args = Array::Handle(zone, Array::New(kNumArguments));
args.SetAt(0, handler);
args.SetAt(1, message);
#if !defined(PRODUCT)
if (isolate->debugger()->IsStepping()) {
// If the isolate is being debugged and the debugger was stepping
// through code, enable single stepping so debugger will stop
// at the first location the user is interested in.
isolate->debugger()->SetResumeAction(Debugger::kStepInto);
}
#endif
const Object& result =
Object::Handle(zone, DartEntry::InvokeFunction(function, args));
ASSERT(result.IsNull() || result.IsError());
return result.raw();
}
复制代码
这又是从 c++ 中调用 dart 函数,从代码可知,调用的是 _RawReceivePortImpl 的 _handleMessage 函数:
@pragma("vm:entry-point", "call")
static void _handleMessage(Function handler, var message) {
// TODO(floitsch): this relies on the fact that any exception aborts the
// VM. Once we have non-fatal global exceptions we need to catch errors
// so that we can run the immediate callbacks.
handler(message);
_runPendingImmediateCallback();
}
复制代码
以 message 为参数,调用 handler 函数,即最初传进来的 _Timer 的 _handleMessage 函数:
// third_party/sdk/runtime/lib/timer_impl.dart
static void _handleMessage(msg) {
var pendingTimers;
if (msg == _ZERO_EVENT) {
pendingTimers = _queueFromZeroEvent();
assert(pendingTimers.length > 0);
} else {
assert(msg == _TIMEOUT_EVENT);
_scheduledWakeupTime = null; // Consumed the last scheduled wakeup now.
pendingTimers = _queueFromTimeoutEvent();
}
_runTimers(pendingTimers);
// Notify the event handler or shutdown the port if no more pending
// timers are present.
_notifyEventHandler();
}
复制代码
这里根据 msg 的不一样从不一样的队列中取 pendingTimers ,两者分别实现以下:
static List _queueFromZeroEvent() {
var pendingTimers = new List();
assert(_firstZeroTimer != null);
// Collect pending timers from the timer heap that have an expiration prior
// to the currently notified zero timer.
var timer;
while (!_heap.isEmpty && (_heap.first._compareTo(_firstZeroTimer) < 0)) {
timer = _heap.removeFirst();
pendingTimers.add(timer);
}
// Append the first zero timer to the pending timers.
timer = _firstZeroTimer;
_firstZeroTimer = timer._indexOrNext;
timer._indexOrNext = null;
pendingTimers.add(timer);
return pendingTimers;
}
static List _queueFromTimeoutEvent() {
var pendingTimers = new List();
if (_firstZeroTimer != null) {
// Collect pending timers from the timer heap that have an expiration
// prior to the next zero timer.
// By definition the first zero timer has been scheduled before the
// current time, meaning all timers which are "less than" the first zero
// timer are expired. The first zero timer will be dispatched when its
// corresponding message is delivered.
var timer;
while (!_heap.isEmpty && (_heap.first._compareTo(_firstZeroTimer) < 0)) {
timer = _heap.removeFirst();
pendingTimers.add(timer);
}
} else {
// Collect pending timers from the timer heap which have expired at this
// time.
var currentTime = VMLibraryHooks.timerMillisecondClock();
var timer;
while (!_heap.isEmpty && (_heap.first._wakeupTime <= currentTime)) {
timer = _heap.removeFirst();
pendingTimers.add(timer);
}
}
return pendingTimers;
}
复制代码
当 msg 为 _ZERO_EVENT 时,会取出一个 _firstZeroTimer 队列中的任务和 n 个 _heap 队列中达到执行时间的任务,而当 msg 不为 _ZERO_EVENT 时,则会取出 n 个执行时间先于 _firstZeroTimer 第一个任务的的任务,或者是执行时间先于当前时间的任务。取完以后则是调用 _runTimers 执行任务。
static void _runTimers(List pendingTimers) {
// If there are no pending timers currently reset the id space before we
// have a chance to enqueue new timers.
if (_heap.isEmpty && (_firstZeroTimer == null)) {
_idCount = 0;
}
// Fast exit if no pending timers.
if (pendingTimers.length == 0) {
return;
}
// Trigger all of the pending timers. New timers added as part of the
// callbacks will be enqueued now and notified in the next spin at the
// earliest.
_handlingCallbacks = true;
var i = 0;
try {
for (; i < pendingTimers.length; i++) {
// Next pending timer.
var timer = pendingTimers[i];
timer._indexOrNext = null;
// One of the timers in the pending_timers list can cancel
// one of the later timers which will set the callback to
// null. Or the pending zero timer has been canceled earlier.
if (timer._callback != null) {
var callback = timer._callback;
if (!timer._repeating) {
// Mark timer as inactive.
timer._callback = null;
} else if (timer._milliSeconds > 0) {
var ms = timer._milliSeconds;
int overdue =
VMLibraryHooks.timerMillisecondClock() - timer._wakeupTime;
if (overdue > ms) {
int missedTicks = overdue ~/ ms;
timer._wakeupTime += missedTicks * ms;
timer._tick += missedTicks;
}
}
timer._tick += 1;
callback(timer);
// Re-insert repeating timer if not canceled.
if (timer._repeating && (timer._callback != null)) {
timer._advanceWakeupTime();
timer._enqueue();
}
// Execute pending micro tasks.
var immediateCallback = _removePendingImmediateCallback();
if (immediateCallback != null) {
immediateCallback();
}
}
}
} finally {
_handlingCallbacks = false;
// Re-queue timers we didn't get to.
for (i++; i < pendingTimers.length; i++) {
var timer = pendingTimers[i];
timer._enqueue();
}
_notifyEventHandler();
}
}
复制代码
基本逻辑就是一个 for 循环,对于每个 timer,执行其 callback 函数,一步步回溯回去,就会调用到 result._complete(computation()) 函数,今后处开始,就到了上层逻辑代码。