先说结论,V8引擎初始化一个默认Platform,会建立一个线程池,而其中有个线程是用于处理相似于setTimeout的延时任务
另外附一些图,包括继承树、关键属性归属、纯逻辑工做流程,对代码木得兴趣的看完图能够X掉了。
上一篇讲了V8初始化默认Platform对象时会作三件事,其中生成空白DefaultPlatform、获取线程池大小已经讲过了,剩下线程启动相关的内容。
写以前花了10几分钟学了下mac下C++的线程,对API有一个初步了解,给一个简单的例子,大概流程以下。
const int stack_size = 1 * 1024 * 512;
int tmp = 0;
void* add(void* number){
tmp = tmp + *(int*)number;
printf("tmp: %i\n", tmp);
return nullptr;
};
int main(int argc, const char * argv[]) {
pthread_t pt;
pthread_attr_t attr;
memset(&attr, 0, sizeof(attr));
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, stack_size);
int num = 5;
int* ptr = #
int ret = pthread_create(&pt, &attr, add, ptr);
if(ret != 0) printf("cannot create thread");
return 0;
}复制代码
经过几个步骤,就能够建立一条线程来处理任务,启动后的输出就懒得截图了,反正就是打印一个5。
有了上面的例子,能够慢慢来看V8初始化时多线程的启动过程,首先是入门方法。
void DefaultPlatform::EnsureBackgroundTaskRunnerInitialized() {
base::MutexGuard guard(&lock_);
if (!worker_threads_task_runner_) {
worker_threads_task_runner_ =
std::make_shared<DefaultWorkerThreadsTaskRunner>(
thread_pool_size_, time_function_for_testing_
? time_function_for_testing_
: DefaultTimeFunction);
}
}
double DefaultTimeFunction() {
return base::TimeTicks::HighResolutionNow().ToInternalValue() /
static_cast<double>(base::Time::kMicrosecondsPerSecond);
}复制代码
if中的worker_threads_task_runner是DefaultPlatform的私有属性,因为初始化时默认值为NULL,这里作一个定义赋值。第一个参数是在第二步获取的线程池大小,第二个参数是一个计数方法,默认引用以前Time模块里的东西,返回硬件时间戳,具体实现能够看我以前写的。
接下来看DefaultWorkerThreadsTaskRunner类的构造函数,接受2个参数。
DefaultWorkerThreadsTaskRunner::DefaultWorkerThreadsTaskRunner(
uint32_t thread_pool_size, TimeFunction time_function)
: queue_(time_function),
time_function_(time_function),
thread_pool_size_(thread_pool_size) {
for (uint32_t i = 0; i < thread_pool_size; ++i) {
thread_pool_.push_back(base::make_unique<WorkerThread>(this));
}
}复制代码
用2个参数初始化了3个属性,而且根据size往线程池中添加线程,thread_pool_这个属性用vector在管理,push_back至关于JS的push,当成数组来理解就好了。
添加的WorkerThread类是在DefaultWorkerThreadsTaskRunner里面的一个私有内部类,继承于Thread,单纯的用来管理线程。C++的this比较简单,没有JS那么多概念,就是一个指向当前对象的指针,来看一下线程类的构造函数。
DefaultWorkerThreadsTaskRunner::WorkerThread::WorkerThread(DefaultWorkerThreadsTaskRunner* runner)
: Thread(Options("V8 DefaultWorkerThreadsTaskRunner WorkerThread")),
runner_(runner) {
Start();
}复制代码
这里同时调用了父类构造函数并初始化自己的属性,runner就是上面那个对象自己。这个构造函数长得比较奇怪,其中Options类是Thread的内部类,有一个接受一个类型为字符串的构造函数,而Thread的构造函数只接受Options类型,因此会这样,代码以下。
class Thread {
public:
// Opaque data type for thread-local storage keys.
using LocalStorageKey = int32_t;
class Options {
public:
Options() : name_("v8:<unknown>"), stack_size_(0) {}
explicit Options(const char* name, int stack_size = 0)
: name_(name), stack_size_(stack_size) {}
// ...
};
// Create new thread.
explicit Thread(const Options& options);
// ...
}复制代码
能够简单理解这里给线程取了一个名字,在给Options命名的同时,其实也给Thread命名了,以下。
Thread::Thread(const Options& options)
: data_(new PlatformData),
stack_size_(options.stack_size()),
start_semaphore_(nullptr) {
if (stack_size_ > 0 && static_cast<size_t>(stack_size_) < PTHREAD_STACK_MIN) {
stack_size_ = PTHREAD_STACK_MIN;
}
set_name(options.name());
}
class Thread {
static const int kMaxThreadNameLength = 16;
char name_[kMaxThreadNameLength];
}
void Thread::set_name(const char* name) {
strncpy(name_, name, sizeof(name_));
name_[sizeof(name_) - 1] = '\0';
}复制代码
看注释说,因为Linux的prctl方法限制了长度,因此这里的name也最多只能保存16位,并且C++的字符串的最后一位还要留给结束符,因此理论上传入Options的超长字符串
"V8 DefaultWorkerThreadsTaskRunner WorkerThread"只有前15位做为Thread的name保存下来了,也就是"V8 Defaultworke",很是戏剧性的把r给砍掉了。。。
初始化完成后,会调用Start方法启动线程,这个方法并不须要子类实现,而是基类已经定义好了,保留关键代码以下。
void Thread::Start() {
int result;
pthread_attr_t attr;
memset(&attr, 0, sizeof(attr));
result = pthread_attr_init(&attr);
size_t stack_size = stack_size_;
if (stack_size == 0) {
stack_size = 1 * 1024 * 1024;
}
if (stack_size > 0) {
result = pthread_attr_setstacksize(&attr, stack_size);
}
{
result = pthread_create(&data_->thread_, &attr, ThreadEntry, this);
}
result = pthread_attr_destroy(&attr);
}复制代码
参照一下文章开始的demo,能够看出去掉了合法性检测和宏以后,在初始化和启动线程基本上V8的形式是同样的。
简单总结一下,V8初始化了一个DefaultPlatform类,计算了一下可用线程池大小,生成了几条线程弄进线程池,而每条线程的任务就是那个ThreadEntry,这篇所有写完算了。
// 3-5
static void* ThreadEntry(void* arg) {
Thread* thread = reinterpret_cast<Thread*>(arg);
// We take the lock here to make sure that pthread_create finished first since
// we don't know which thread will run first (the original thread or the new
// one).
{ MutexGuard lock_guard(&thread->data()->thread_creation_mutex_); }
// 3-6
SetThreadName(thread->name());
// 3-7
thread->NotifyStartedAndRun();
return nullptr;
}复制代码
因为线程任务的参数定义与返回值都是void*,这里直接作一个强转。随后会加一个线程锁,由于这几个线程在初始化的时候并不须要同时执行这个任务。执行的第一个方法虽然从名字来看只是简单的给线程设置名字,可是内容却不简单。
传入SetThreadName方法的参数是以前那个被截断的字符串,看一下这个方法。
static void SetThreadName(const char* name) {
int (*dynamic_pthread_setname_np)(const char*);
*reinterpret_cast<void**>(&dynamic_pthread_setname_np) =
dlsym(RTLD_DEFAULT, "pthread_setname_np");
if (dynamic_pthread_setname_np == nullptr) return;
static const int kMaxNameLength = 63;
dynamic_pthread_setname_np(name);
}复制代码
里面用了一个很玄的api的叫dlsym,官方解释以下。
The function dlsym() takes a "handle" of a dynamic library returned by dlopen() and the null-terminated symbol name, returning the address where that symbol is loaded into memory.
大概就是根据句柄读取一个动态连接库,名字就是那个字符串,返回其在内存中的地址,因此这块的调试全是机器码,根本看不懂,最后返回的一个函数。
知道这是个函数就好了,至于怎么设置线程名字我也不太想知道。
第二步的方法名就是运行线程的任务,调用链比较长,会来回在几个类之间穿梭,调用各自属性的方法。
void NotifyStartedAndRun() {
if (start_semaphore_) start_semaphore_->Signal();
Run();
}
void DefaultWorkerThreadsTaskRunner::WorkerThread::Run() {
runner_->single_worker_thread_id_.store(base::OS::GetCurrentThreadId(), std::memory_order_relaxed);
while (std::unique_ptr<Task> task = runner_->GetNext()) {
task->Run();
}
}
std::unique_ptr<Task> DefaultWorkerThreadsTaskRunner::GetNext() {
return queue_.GetNext();
}复制代码
不理清楚,这个地方真的很麻烦,绕得很,能够看顶部的继承图。总之,最后调用的是DefaultWorkerThreadsTaskRunner类上一个类型为DelayedTaskQueue类的GetNext方法,返回类型是Task类,V8只是简单定义了一个基类,实际运行时的因此task都须要继承这个类并实现其Run方法以便线程执行。
最后的最后,GetNext的逻辑其实能够参考libuv的逻辑,机制都大同小异,方法的源码以下。
std::unique_ptr<Task> DelayedTaskQueue::GetNext() {
base::MutexGuard guard(&lock_);
for (;;) {
double now = MonotonicallyIncreasingTime();
std::unique_ptr<Task> task = PopTaskFromDelayedQueue(now);
while (task) {
task_queue_.push(std::move(task));
task = PopTaskFromDelayedQueue(now);
}
if (!task_queue_.empty()) {
std::unique_ptr<Task> result = std::move(task_queue_.front());
task_queue_.pop();
return result;
}
if (terminated_) {
queues_condition_var_.NotifyAll();
return nullptr;
}
if (task_queue_.empty() && !delayed_task_queue_.empty()) {
double wait_in_seconds = delayed_task_queue_.begin()->first - now;
base::TimeDelta wait_delta = base::TimeDelta::FromMicroseconds(base::TimeConstants::kMicrosecondsPerSecond * wait_in_seconds);
bool notified = queues_condition_var_.WaitFor(&lock_, wait_delta);
USE(notified);
} else {
queues_condition_var_.Wait(&lock_);
}
}
}复制代码