一般咱们在讨论 Node.js 的时候都会涉及到异步这个特性。实际上 Node.js 在执行异步调用的时候,不一样的场景下有着不一样的处理方式。本文将经过 libuv 源码来分析 Node.js 是如何经过 libuv 的线程池完成异步调用。本文描述的 Node.js 版本为 v11.15.0,libuv 版本为 1.24.0 。javascript
如下面的代码为例,它经过调用 fs.access 来异步地判断文件是否存在并在回调中打印日志,在 Node.js 中这是一个典型的异步调用。html
const fs = require('fs') const cb = function (err) { console.log(`Is myfile exists: ${!err}`) } fs.access('myfile', cb)
在分析上面这段代码的调用过程以前,咱们先来了解一些 libuv 概念。java
主动经过 libuv 发起的操做被 libuv 称为[请求]请求,libuv 的线程池做用于如下 4 种枚举的异步请求:node
UV_FS
: fs 模块的异步函数(除了 uv_fs_req_cleanup ),fs.access、fs.stat 等。UV_GETADDRINFO
:dns 模块的异步函数,dns.lookup 等。UV_GETNAMEINFO
:dns 模块的异步函数,dns.lookupService 等。UV_WORK
:zlib 模块的 zlib.unzip、zlib.gzip 等;在 Node.js 的 Addon(C/C++) 中经过 uv_queue_work 建立的多线程请求。其它的 UV_CONNECT
、UV_WRITE
、UDP_SEND
等则并不会经过线程池去执行。linux
这 4 种枚举请求 libuv 内部把它们分为 3 种[任务类型]任务类型:git
UV__WORK_CPU
:CPU 密集型,UV_WORK
类型的请求被定义为这种类型。所以根据这个分类,不推荐在 uv_queue_work
中作 I/O 密集的操做。UV__WORK_FAST_IO
:快 IO 型,UV_FS
类型的请求被定义为这种类型。UV__WORK_SLOW_IO
:慢 IO 型,UV_GETADDRINFO
和 UV_GETNAMEINFO
类型的请求被定义为这种类型。UV__WORK_SLOW_IO
执行不一样于 UV__WORK_CPU
与 UV__WORK_FAST_IO
,libuv 执行它的时候流程会有些差别,这个后面会提到。github
libuv 经过init_threads 函数初始化线程池,初始化时会根据一个名为 UV_THREADPOOL_SIZE 的环境变量来初始化内部线程池的大小,线程最大数量为 128 ,默认为 4 。若是以单进程的架构去部署服务,能够根据服务器 CPU 的核心数量及业务状况来设置线程池大小,达到资源利用的最大化。uv loop 线程在建立 worker 线程时,会初始化如下变量:算法
UV__WORK_CPU
和 UV__WORK_FAST_IO
类型的请求后将其插到此队列的尾部,并经过 uv_cond_signal 唤醒 worker 线程去处理,这是线程池请求的主队列。UV__WORK_SLOW_IO
类型的请求后将其插到此队列的尾部。worker 线程的入口函数均为 worker 函数,这个咱们后面再说。 init_threads 实现以下:c#
static void init_threads(void) { unsigned int i; const char* val; uv_sem_t sem; // 6-23 行初始化线程池大小 nthreads = ARRAY_SIZE(default_threads); val = getenv("UV_THREADPOOL_SIZE"); // 根据环境变量设置线程池大小 if (val != NULL) nthreads = atoi(val); if (nthreads == 0) nthreads = 1; if (nthreads > MAX_THREADPOOL_SIZE) nthreads = MAX_THREADPOOL_SIZE; threads = default_threads; if (nthreads > ARRAY_SIZE(default_threads)) { threads = uv__malloc(nthreads * sizeof(threads[0])); if (threads == NULL) { nthreads = ARRAY_SIZE(default_threads); threads = default_threads; } } // 初始化条件变量 if (uv_cond_init(&cond)) abort(); // 初始化互斥量 if (uv_mutex_init(&mutex)) abort(); // 初始化队列和节点 QUEUE_INIT(&wq); // 工做队列 QUEUE_INIT(&slow_io_pending_wq); // 慢 I/O 队列 QUEUE_INIT(&run_slow_work_message); // 若是有慢 I/O 请求,将此节点做为标志位插入到 wq 中 // 初始化信号量 if (uv_sem_init(&sem, 0)) abort(); // 后续线程同步须要依赖这个信号量,所以这个信号量建立失败了则终止进程 // 建立 worker 线程 for (i = 0; i < nthreads; i++) if (uv_thread_create(threads + i, worker, &sem)) // 初始化 worker 线程 abort(); // woker 线程建立错误缘由为 EAGAIN、EINVAL、EPERM 其中之一,具体请参考 man3 // 等待 worker 建立完成 for (i = 0; i < nthreads; i++) uv_sem_wait(&sem); // 等待 worker 线程建立完毕 // 回收信号量资源 uv_sem_destroy(&sem); }
libuv 有两个函数能够建立多线程请求:api
uv__work_submit 函数主要作 2 件事:
实现以下:
void uv__work_submit(uv_loop_t* loop, struct uv__work* w, enum uv__work_kind kind, void (*work)(struct uv__work* w), void (*done)(struct uv__work* w, int status)) { // 在收到请求后才开始初始化线程池,可是只会初始化一次 uv_once(&once, init_once); w->loop = loop; w->work = work; w->done = done; post(&w->wq, kind); } static void init_once(void) { // fork 后子进程的 mutex 、condition variables 等 pthread 变量的状态是父进程 fork 时的复制,因此子进程建立时须要重置状态 // 具体请参考 http://man7.org/linux/man-pages/man2/fork.2.html if (pthread_atfork(NULL, NULL, &reset_once)) abort(); // 初始化线程池 init_threads(); } static void reset_once(void) { // 重置 once 变量 uv_once_t child_once = UV_ONCE_INIT; memcpy(&once, &child_once, sizeof(child_once)); }
post 函数主要作 2 件事:
判断请求的请求类型是不是 UV__WORK_SLOW_IO
:
slow_io_pending_wq
的尾部,同时在请求队列 wq
的尾部插入一个 run_slow_work_message
节点做为标志位,告知请求队列 wq
当前存在慢 I/O 请求。wq
尾部。并发的慢 I/O 的请求数量不会超过线程池大小的一半,这样作的好处是避免多个慢 I/O 的请求在某段时间内把全部线程都占满,致使其它可以快速执行的请求须要排队。
post 函数实现以下:
static void post(QUEUE* q, enum uv__work_kind kind) { // 加锁 uv_mutex_lock(&mutex); if (kind == UV__WORK_SLOW_IO) { /* 插入到慢 I/O 队列中 */ QUEUE_INSERT_TAIL(&slow_io_pending_wq, q); /* 若是 run_slow_work_message 节点不为空表明其已在 wq 队列中,无需再次插入 */ if (!QUEUE_EMPTY(&run_slow_work_message)) { uv_mutex_unlock(&mutex); return; } // 不在 wq 队列中则将 run_slow_work_message 做为标志位插到 wq 尾部 q = &run_slow_work_message; } // 将请求插到请求队列尾部 QUEUE_INSERT_TAIL(&wq, q); // 若是有空闲的线程,唤醒某一个去执行请求 if (idle_threads > 0) uv_cond_signal(&cond); // 唤醒一个 worker 线程 uv_mutex_unlock(&mutex); }
worker 线程的入口函数 worker 在线程建立好并初始化完成后将按照下面的步骤不断的循环:
static void worker(void* arg) { struct uv__work* w; QUEUE* q; int is_slow_work; // 通知 uv loop 线程此 worker 线程已建立完毕 uv_sem_post((uv_sem_t*) arg); arg = NULL; uv_mutex_lock(&mutex); // 经过这个死循环来不断的执行请求 for (;;) { /* 这个 while 有2个判断 1. 在多核处理器下,pthread_cond_signal 可能会激活多于一个线程,经过一个 while 来避免这种状况致使的问题,具体请参考 https://linux.die.net/man/3/pthread_cond_signal 2. 限制慢 I/O 请求的数量小于线程数量的一半 */ while (QUEUE_EMPTY(&wq) || (QUEUE_HEAD(&wq) == &run_slow_work_message && QUEUE_NEXT(&run_slow_work_message) == &wq && slow_io_work_running >= slow_work_thread_threshold())) { idle_threads += 1; // worker 线程初始化完成或没有请求执行时进入阻塞状态,直到被新的请求唤醒 uv_cond_wait(&cond, &mutex); idle_threads -= 1; } // 唤醒而且达到执行请求的条件后取出队列头部的请求 q = QUEUE_HEAD(&wq); // 若是头部请求是退出,则跳出循环,结束 worker 线程 if (q == &exit_message) { // 继续唤醒其它 worker 去结束线程 uv_cond_signal(&cond); uv_mutex_unlock(&mutex); break; } // 将这个请求节点从请求队列 wq 中移除 QUEUE_REMOVE(q); QUEUE_INIT(q); is_slow_work = 0; // 若是这个请求是慢 I/O 的标志位 if (q == &run_slow_work_message) { /* 控制慢 I/O 请求数量,超过则插到队列尾部,等待前面的请求执行完 */ if (slow_io_work_running >= slow_work_thread_threshold()) { QUEUE_INSERT_TAIL(&wq, q); continue; } /* 判断慢 I/O 请求队列中是否有请求,请求有可能被取消 */ if (QUEUE_EMPTY(&slow_io_pending_wq)) continue; is_slow_work = 1; slow_io_work_running++; // 取出慢 I/O 请求队列中头部的请求 q = QUEUE_HEAD(&slow_io_pending_wq); QUEUE_REMOVE(q); QUEUE_INIT(q); // 若是慢 I/O 请求队列中还有请求,则将 run_slow_work_message 这个标志位从新插到请求队列 wq 的尾部 if (!QUEUE_EMPTY(&slow_io_pending_wq)) { QUEUE_INSERT_TAIL(&wq, &run_slow_work_message); if (idle_threads > 0) uv_cond_signal(&cond); // 唤醒一个线程继续执行 } } uv_mutex_unlock(&mutex); w = QUEUE_DATA(q, struct uv__work, wq); // 上面处理了这多,终于在这里开始执行请求的函数了 w->work(w); uv_mutex_lock(&w->loop->wq_mutex); w->work = NULL; // 为保证线程安全,请求执行完后不会当即回调请求,而是将完成的请求插到已完成的请求队列中,在uv loop 线程完成回调 QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); // 经过 uv_async_send 同步 uv loop 线程:线程池完成了一个请求 uv_async_send(&w->loop->wq_async); uv_mutex_unlock(&w->loop->wq_mutex); uv_mutex_lock(&mutex); if (is_slow_work) { slow_io_work_running--; } } }
在 uv_loop_init 时,线程池的 [wq_async]wq_async 句柄经过 uv_async_init 初始化并插入到 uv loop 的 async_handles 队列中,而后在 uv loop 线程中遍历 async_handles 队列并完成回调。
worker 线程 和 uv loop 线程经过 uv_async_send 进行同步,而uv_async_send 只作了一件事:向 async_wfd 句柄写了一个长度为 1 个字节的字符串(只有 \0
这个字符)。
uv_async_send 实现以下:
int uv_async_send(uv_async_t* handle) { if (ACCESS_ONCE(int, handle->pending) != 0) return 0; // cmpxchgi 函数设置标志位,若是已经设置过则不会重复调用 uv__async_send if (cmpxchgi(&handle->pending, 0, 1) == 0) uv__async_send(handle->loop); return 0; } static void uv__async_send(uv_loop_t* loop) { const void* buf; ssize_t len; int fd; int r; buf = ""; len = 1; fd = loop->async_wfd; #if defined(__linux__) if (fd == -1) { static const uint64_t val = 1; buf = &val; len = sizeof(val); fd = loop->async_io_watcher.fd; /* eventfd */ } #endif do r = write(fd, buf, len); // 向 fd 写入内容 while (r == -1 && errno == EINTR); if (r == len) return; if (r == -1) if (errno == EAGAIN || errno == EWOULDBLOCK) return; abort(); }
对 async_wfd 写内容为何能作到同步呢?实际上在 worker 线程对 async_wfd 写入时,uv loop 线程同时也在不断的循环去接收处理各类各样的事件或请求,其中就包括对 async_wfd 可读事件的监听。
uv loop 是在 uv_run 函数中执行的,它在 Node.js 启动时 被调用, uv_run 实现以下:
int uv_run(uv_loop_t* loop, uv_run_mode mode) { int timeout; int r; int ran_pending; r = uv__loop_alive(loop); if (!r) uv__update_time(loop); while (r != 0 && loop->stop_flag == 0) { // 更新计时器时间 uv__update_time(loop); // 回调超时的计时器,setTimeout、setInterval 都是由这个函数回调 uv__run_timers(loop); // 处理某些没有在 uv__io_poll 完成的回调 ran_pending = uv__run_pending(loop); // 官方解释:Idle handle is needed only to stop the event loop from blocking in poll. // 实际上 napi 中某些函数好比 napi_call_threadsafe_function 会往 idle 队列中插入回调,而后在这个阶段执行 uv__run_idle(loop); // process._startProfilerIdleNotifier 的回调 uv__run_prepare(loop); timeout = 0; if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT) timeout = uv_backend_timeout(loop); // 计算 uv__io_poll 超时时间,算法请参考 https://github.com/libuv/libuv/blob/v1.24.0/src/unix/core.c#L318 // 对 async_wfd 可读的监听在 uv__io_poll 这个函数中 // 第二个参数 timeout 为上面计算出来,用来设置 epoll_wait 等函数等待 I/O 事件的时间 uv__io_poll(loop, timeout); // setImmediate 的回调 // ps: 我的以为从实现上讲 setImmediate 和 nextTick 应该互换名字 :-) uv__run_check(loop); // 关闭句柄是个异步操做 // 通常结束 uv loop 时会先调用 uv_walk 遍历全部句柄并关闭它们,而后再执行一次 uv loop 经过这个函数来完成关闭,最后再调用 uv_loop_close,不然的话会出现内存泄露 uv__run_closing_handles(loop); if (mode == UV_RUN_ONCE) { /* UV_RUN_ONCE implies forward progress: at least one callback must have * been invoked when it returns. uv__io_poll() can return without doing * I/O (meaning: no callbacks) when its timeout expires - which means we * have pending timers that satisfy the forward progress constraint. * * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from * the check. */ uv__update_time(loop); uv__run_timers(loop); } r = uv__loop_alive(loop); if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT) break; } /* The if statement lets gcc compile it to a conditional store. Avoids * dirtying a cache line. */ if (loop->stop_flag != 0) loop->stop_flag = 0; return r; }
能够看到 uv loop 里面其实就是在不断的循环去更新计时器、处理各类类型的回调、轮询 I/O 事件,Node.js 的异步即是经过 uv loop 完成的。
libuv 的异步采用的是 Reactor 模型进行多路复用,在 uv__io_poll 中去处理 I/O 相关的事件, uv__io_poll 在不一样的平台下经过 epoll、kqueue 等不一样的方式实现。因此当往 async_wfd 写入内容时,在 uv__io_poll 中将会轮询到 async_wfd 可读的事件,这个事件仅仅是用来通知 uv loop 线程: 非 uv loop 线程有回调须要在 uv loop 线程执行。
当轮询到 async_wfd 可读后,uv__io_poll 会回调对应的函数 uv__async_io,它主要作了下面 2 件事:
实现以下:
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { char buf[1024]; ssize_t r; QUEUE queue; QUEUE* q; uv_async_t* h; assert(w == &loop->async_io_watcher); // 这个 for 循环用来确认是否有 uv_async_send 调用 for (;;) { r = read(w->fd, buf, sizeof(buf)); if (r == sizeof(buf)) continue; if (r != -1) break; if (errno == EAGAIN || errno == EWOULDBLOCK) break; if (errno == EINTR) continue; abort(); } // 交换 loop->async_handle 和 queue内容,避免在遍历 loop->async_handles 时有新的 async_handle 插入到队列 // loop->async_handles 队列中除了线程池的句柄还有其它的 QUEUE_MOVE(&loop->async_handles, &queue); while (!QUEUE_EMPTY(&queue)) { q = QUEUE_HEAD(&queue); h = QUEUE_DATA(q, uv_async_t, queue); QUEUE_REMOVE(q); // 将 uv_async_t 从新插入到 loop->async_handles 中,uv_async_t 须要手动调用 uv__async_stop 才会从队列中移除 QUEUE_INSERT_TAIL(&loop->async_handles, q); // 确认这个 async_handle 是否须要回调 if (cmpxchgi(&h->pending, 1, 0) == 0) continue; if (h->async_cb == NULL) continue; // 调用经过 uv_async_init 初始化 uv_async_t 时绑定的回调函数 // 线程池的 uv_async_t 是在 uv_loop_init 时初始化的,它绑定的回调是 uv__work_done // 所以若是 h == loop->wq_async,这里 h->async_cb 实际是调用了 uv__work_done(h); // 详情请参考 https://github.com/libuv/libuv/blob/v1.24.0/src/unix/loop.c#L88 h->async_cb(h); } }
调用线程池的 h->async_cb
后会回到线程池的 uv__work_done 函数:
void uv__work_done(uv_async_t* handle) { struct uv__work* w; uv_loop_t* loop; QUEUE* q; QUEUE wq; int err; loop = container_of(handle, uv_loop_t, wq_async); uv_mutex_lock(&loop->wq_mutex); // 清空已完成的 loop->wq 队列 QUEUE_MOVE(&loop->wq, &wq); uv_mutex_unlock(&loop->wq_mutex); while (!QUEUE_EMPTY(&wq)) { q = QUEUE_HEAD(&wq); QUEUE_REMOVE(q); w = container_of(q, struct uv__work, wq); // 若是在回调前调用了 uv_cancel 取消请求,则即便请求已经执行完,依旧算出错 err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; w->done(w, err); } }
最后经过 w->done(w, err)
回调 uv__fs_done,并由 uv__fs_done 回调 JS 函数:
static void uv__fs_done(struct uv__work* w, int status) { uv_fs_t* req; req = container_of(w, uv_fs_t, work_req); uv__req_unregister(req->loop, req); // 若是取消了则抛出异常 if (status == UV_ECANCELED) { assert(req->result == 0); req->result = UV_ECANCELED; } // 回调 JS req->cb(req); }
以上就是 libuv 是线程池从建立到执行多线程请求的过程。
再回到文章开头提到的代码,咱们来分析它的调用过程。
const fs = require('fs') const cb = function (err) { console.log(`Is myfile exists: ${!err}`) } fs.access('myfile', cb)
假设线程池大小为 2 ,下面描述了执行 fs.access 时 3 个线程的状态(略过了 Node.js 启动和 JavaScript 和 Native 函数调用过程),时间轴从上到下:
空白
表明处于阻塞状态,-
表明线程还没有启动
uv loop thread | worker thread 1 | worker thread 2 |
---|---|---|
[fs.access]fs.access | - | - |
JavaScript 经过 v8 调用 Native 函数 | - | - |
uv_fs_access | - | - |
uv__work_submit | - | - |
init_threads | worker | worker |
uv_sem_wait | uv_sem_post | uv_sem_post |
uv_cond_wait | uv_cond_wait | |
uv_cond_signal | ||
uv__io_poll | access | |
uv__io_poll | ||
uv__io_poll | uv_async_send | |
uv__io_poll | uv_cond_wait | |
uv__io_poll | ||
uv__async_io | ||
uv__work_done | ||
uv__fs_done | ||
Native 经过 v8 回调 JavaScript 函数 | ||
cb | ||
console.log(`Is myfile exists: ${exists}`) |
能够看到调用过程以下:
worker thread 1
去执行请求,同时 uv loop 线程不断的轮询是否完成了请求。worker thread 1
同步的调用 access 函数判断目标文件是否可读。worker thread 1
经过 uv_async_send 同步 uv loop 线程请求已完成,同时自身进入阻塞状态,等待新的请求将其唤醒。整个过程因为没有新的请求进来, worker thread 2
始终处于阻塞状态。
经过对 fs.access 的调用过程分析,咱们了解了 libuv 是如何经过线程池进行异步调用的。另外也能够看到针对不一样的平台,libuv 对 uv__io_poll 的实现是不一样的,后面咱们将介绍 uv__io_poll 实现异步 I/O 的方式。