图片来源:libuvhtml
本文做者:肖思元前端
对 Node.js 的学习,不管如何都绕不开 Libuv。本文选择沿着 Libuv 的 Linux 实现的脉络对其内部一探究竟node
As an asynchronous event-driven JavaScript runtime, Node.js is designed to build scalable network applicationslinux
Node.js 做为前端同窗探索服务端业务的利器,自身是立志能够构建一个具备伸缩性的网络应用程序。目前的服务端环境主要仍是 Linux,对于另外一个主要的服务端环境 Unix,则在 API 上和 Linux 具备很高类似性,因此选择 Linux 做为起始点,说不定能够有双倍收获和双倍快乐github
下面是 libuv 官网的架构图:c#
单以 Linux 平台来看,libuv 主要工做能够简单划为两部分:数组
为了追本溯源,咱们将从 epoll 开始markdown
简单来讲,epoll 是由 Linux 内核提供的一个系统调用(system call),咱们的应用程序能够经过它:网络
咱们经过一小段伪代码来演示使用 epoll 时的核心步骤:
// 建立 epoll 实例
int epfd = epoll_create(MAX_EVENTS);
// 向 epoll 实例中添加须要监听的文件描述符,这里是 `listen_sock`
epoll_ctl_add(epfd, listen_sock, EPOLLIN | EPOLLOUT | EPOLLET);
while(1) {
// 等待来自 epoll 的通知,通知会在其中的文件描述符状态改变时
// 由系统通知应用。通知的形式以下:
//
// epoll_wait 调用不会当即返回,系统会在其中的文件描述符状态发生
// 变化时返回
//
// epoll_wait 调用返回后:
// nfds 表示发生变化的文件描述符数量
// events 会保存当前的事件,它的数量就是 nfds
int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
// 遍历 events,对事件做出符合应用预期的响应
for (int i = 0; i < nfds; i++) {
// consume events[i]
}
}
复制代码
完整例子见 epoll-echo-server
上面的代码中已经包含了注释,能够大体归纳为下图:
因此处于 libuv 底层的 epoll 也是有「事件循环」的概念,可见事件循环并非 libuv 首创
提到 epoll,不得不提它的两种触发模式:水平触发(Level-triggered)、边缘触发(Edge-triggered)。不得不提是由于它们关系到 epoll 的事件触发机制,加上名字取得又有些晦涩
这两个术语都源自电子学领域,咱们从它们的原始含义开始理解
首先是水平触发:
上图是表示电压变化的时序图,VH 表示电压的峰值,VL 表示电话的谷值。水平触发的含义是,随着时间的变化,只要电压处于峰值,系统就会激活对应的电路(触发)
上图依然是表示电压变化的时序图,不过激活电路(触发)的条件是电压的改变,即电压由 VH -> VL、VL -> VH 的状态变化,在图中经过边来表示这个变化,即 Rising edge 和 Falling edge,因此称为 Edge-triggered 即边缘触发
咱们能够大体理解它们的形式与差异,继续结合下面的 epoll 中的表现进行理解
回到 epoll 中,水平触发和边缘触发做为原始含义的衍生,固然仍是具备相似电子学领域中的含义
咱们经过一个例子来理解,好比咱们有一个 fd(File descriptor) 表示刚创建的客户端链接,随后客户端给咱们发送了 5 bytes 的内容,
若是是水平触发:
若是是边缘触发:
咱们很难将水平触发、边缘触发的字面意思与上面的行为联系起来,好在咱们已经预先了解过它们在电子学领域的含义
水平触发,由于已是可读状态,因此它会一直触发,直到咱们读完缓冲区,且系统缓冲区没有新的客户端发送的内容;边缘触发对应的是状态的变化,每次有新的客户端发送内容,都会设置可读状态,所以只会在这个时机触发
水平触发是 epoll 默认的触发模式,而且 libuv 中使用的也是水平触发。在了解了水平触发和边缘触发的区别后,咱们其实就能够猜想 libuv 使用水平触发而不是边缘触发背后的考量:
若是是边缘触发,在 epoll 的客观能力上,咱们不被要求一次读取完缓冲区的内容(能够等到下一次客户端发送内容时继续读取)。可是实际业务中,客户端此时极可能在等待咱们的响应(能够结合 HTTP 协议理解),而咱们还在等待客户端的下一次写入,所以会陷入死锁的逻辑。由此一来,一次读取完缓冲区的内容几乎就成了边缘触发模式下的必选方式,这就不可避免的形成其余回调的等待时间变长,让 CPU 时间分配在各个回调之间显得不够均匀
epoll 并不可以做用在全部的 IO 操做上,好比文件的读写操做,就没法享受到 epoll 的便利性
因此 libuv 的工做能够大体归纳为:
回到 libuv,咱们将以 event-loop 为主要脉络,结合上文提到的 epoll,以及下面将会介绍到的线程池,继续 libuv 在 Linux 上的实现细节一探究竟
咱们将结合源码来回顾一下 event-loop 基本概念
下面这幅图也取自 libuv 官网,它描述了 event-loop 内部的工做:
单看流程图可能太抽象,下面是对应的 libuv 内部的实现 完整内容:
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;
r = uv__loop_alive(loop);
if (!r) uv__update_time(loop);
// 是循环,没错了
while (r != 0 && loop->stop_flag == 0) {
uv__update_time(loop);
// 处理 timer 队列
uv__run_timers(loop);
// 处理 pending 队列
ran_pending = uv__run_pending(loop);
// 处理 idle 队列
uv__run_idle(loop);
// 处理 prepare 队列
uv__run_prepare(loop);
// 执行 io_poll
uv__io_poll(loop, timeout);
uv__metrics_update_idle_time(loop);
// 执行 check 队列
uv__run_check(loop);
// 执行 closing 队列
uv__run_closing_handles(loop);
r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT) break;
}
return r;
}
复制代码
之因此各类形式的回调(好比 setTimeout
)在优先级上会有差异,就在于它们使用的是不一样的队列,而不一样的队列在每次事件循环的迭代中的执行顺序不一样
按照官网的描述,它们是对 event-loop 中执行的操做的抽象,前者表示须要长期存在的操做,后者表示短暂的操做。单看文字描述可能不太好理解,咱们看一下它们的使用方式有何不一样
对于 Handle 表示的长期存在的操做来讲,它们的 API 具备相似下面的形式:
// IO 操做
int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle, uv_os_sock_t socket);
int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb cb);
int uv_poll_stop(uv_poll_t* poll);
// timer
int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle);
int uv_timer_start(uv_timer_t* handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat);
int uv_timer_stop(uv_timer_t* handle);
复制代码
大体都有这三个步骤(并非所有):初始化 -> 开始 -> 中止
。很好理解吧,由于是长期存在的操做,它开始了就会持续被处理,因此须要安排一个「中止」的 API
而对于 Request 表示的短暂操做来讲,好比域名解析操做:
int uv_getaddrinfo(uv_loop_t* loop, uv_getaddrinfo_t* req, uv_getaddrinfo_cb getaddrinfo_cb, /* ... */);
复制代码
域名解析操做的交互形式是,咱们提交须要解析的地址,方法会返回解析的结果(这样的感受彷佛有点 HTTP 1.0 请求的样子),因此按「请求 - Request」来命名这样的操做的缘由就变得有画面感了
不过 Handle 和 Request 二者不是互斥的概念,Handle 内部实现可能也用到了 Request。由于一些宏观来看的长期操做,在每一个时间切片内是能够当作是 Request 的,好比咱们处理一个请求,能够当作是一个 Handle,而在当次的请求中,咱们极可能会作一些读取和写入的操做,这些操做就能够当作是 Request
咱们经过 timer 开放出来的 API 为线索,来分析它的内部实现:
int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle);
int uv_timer_start(uv_timer_t* handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat);
int uv_timer_stop(uv_timer_t* handle);
复制代码
uv_timer_init
没有什么特殊的地方,只是初始化一下 handle
的状态,并将其添加到 loop->handle_queue
中
uv_timer_start
内部作了这些工做:
int uv_timer_start(uv_timer_t* handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat) {
uint64_t clamped_timeout;
// loop->time 表示 loop 当前的时间。loop 每次迭代开始时,会用当次时间更新该值
// clamped_timeout 就是该 timer 将来超时的时间点,这里直接计算好,这样将来就不须要
// 计算了,直接从 timers 中取符合条件的便可
if (clamped_timeout < timeout)
clamped_timeout = (uint64_t) -1;
handle->timer_cb = cb;
handle->timeout = clamped_timeout;
handle->repeat = repeat;
// 除了预先计算好的 clamped_timeout 之外,将来当 clamped_timeout 相同时,使用这里的
// 自增 start_id 做为比较条件来以为 handle 的执行前后顺序
handle->start_id = handle->loop->timer_counter++;
// 将 handle 插入到 timer_heap 中,这里的 heap 是 binary min heap,因此根节点就是
// clamped_timeout 值(或者 start_id)最小的 handle
heap_insert(timer_heap(handle->loop),
(struct heap_node*) &handle->heap_node,
timer_less_than);
// 设置 handle 的开始状态
uv__handle_start(handle);
return 0;
}
复制代码
uv_timer_stop
内部作了这些工做:
int uv_timer_stop(uv_timer_t* handle) {
if (!uv__is_active(handle))
return 0;
// 将 handle 移出 timer_heap,和 heap_insert 操做同样,除了移出以外
// 还会维护 timer_heap 以保障其始终是 binary min heap
heap_remove(timer_heap(handle->loop),
(struct heap_node*) &handle->heap_node,
timer_less_than);
// 设置 handle 的状态为中止
uv__handle_stop(handle);
return 0;
}
复制代码
到目前为止,咱们已经知道所谓的 start
和 stop
其实能够粗略地归纳为,往属性 loop->timer_heap
中插入或者移出 handle,而且这个属性使用一个名为 binary min heap 的数据结构
而后咱们再回顾上文的 uv_run
:
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
// ...
while (r != 0 && loop->stop_flag == 0) {
// ...
uv__update_time(loop);
uv__run_timers(loop);
// ...
}
// ...
}
复制代码
uv__update_time
咱们已经见过了,做用就是在循环开头阶段、使用当前时间设置属性 loop->time
咱们只须要最后看一下 uv__run_timers
的内容,就能够串联整个流程:
void uv__run_timers(uv_loop_t* loop) {
struct heap_node* heap_node;
uv_timer_t* handle;
for (;;) {
// 取根节点,该值保证始终是全部待执行的 handle
// 中,最早超时的那一个
heap_node = heap_min(timer_heap(loop));
if (heap_node == NULL)
break;
handle = container_of(heap_node, uv_timer_t, heap_node);
if (handle->timeout > loop->time)
break;
// 中止、移出 handle、顺便维护 timer_heap
uv_timer_stop(handle);
// 若是是须要 repeat 的 handle,则从新加入到 timer_heap 中
// 会在下一次事件循环中、由本方法继续执行
uv_timer_again(handle);
// 执行超时 handle 其对应的回调
handle->timer_cb(handle);
}
}
复制代码
以上,就是 timer 在 Libuv 中的大体实现方式
后面咱们会看到,除了 timer 以外的 handle 都存放在名为 queue 的数据结构中,而存放 timer handle 的数据结构则为 min heap。那么咱们就来看看这样的差异选择有何深意
所谓 min heap 实际上是(如需更全面的介绍,能够参考 Binary Tree):
先看 binary tree(二元树的定义是):
进一步看 complete binary tree 的定义则是:
下面是几个例子:
complete binary tree 的例子:
18
/ \
15 30
/ \ / \
40 50 100 40
/ \ /
8 7 9
下面不是 complete binary tree,由于最后一层没有优先放满左边
18
/ \
40 30
/ \
100 40
min heap 的例子,根节点是最小值、父节点始终小于其子节点:
18
/ \
40 30
/ \
100 40
复制代码
在 libuv 中对 timer handle 所需的操做是:
clamped_timeout
最小的 timer handle而 min heap 兼顾了上面的需求:
heap 的实如今文件是 heap-inl.h,我加入了一些注释,有兴趣的同窗能够继续一探究竟
上面,咱们已经了解了每次事件循环迭代中、处于第一顺位的 timer 的处理,接下来咱们来看处在第二顺位的 pending 队列的处理:
static int uv__run_pending(uv_loop_t* loop) {
QUEUE* q;
QUEUE pq;
uv__io_t* w;
if (QUEUE_EMPTY(&loop->pending_queue))
return 0;
QUEUE_MOVE(&loop->pending_queue, &pq);
// 不断从队列中弹出元素进行操做
while (!QUEUE_EMPTY(&pq)) {
q = QUEUE_HEAD(&pq);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
w = QUEUE_DATA(q, uv__io_t, pending_queue);
w->cb(loop, w, POLLOUT);
}
return 1;
}
复制代码
从源码来看,仅仅是从队列 loop->pending_queue
中不断弹出元素而后执行,而且弹出的元素是 uv__io_t
结构体的属性,从名字来看大体应该是 IO 相关的操做
另外,对 loop->pending_queue
进行插入操做的只有函数 uv__io_feed,该函数的被调用点基本是执行一些 IO 相关的收尾工做
和上文出现的 min heap 同样,queue 也是主要用到的数据结构,因此咱们在第一次见到它的时候、顺便介绍一下
min heap 的实现相对更深一些,因此提供了基于源码的注释 heap-inl.h 让感兴趣的读者深刻了解一下,而 queue 则相对就简单一些,加上源码中随处会出现操做 queue 的宏,了解这些宏到底作了什么、会让阅读源码时更加安心
接下来咱们就一块儿看看 queue 和一些经常使用的操做它的宏,首先是起始状态:
queue 在 libuv 中被设计成一个环形结构,因此起始状态就是 next
和 prev
都指向自身
接下来咱们来看一下往 queue 插入一个新的元素是怎样的形式:
上图分两部分,上半部分是已有的 queue、h 表示其当前的 head,q 是待插入的元素。下半部分是插入后的结果,图中的红色表示 prev
的通路,紫色表示 next
的通路,顺着通路咱们能够发现它们始终是一个环形结构
上图演示的 QUEUE_INSERT_TAIL
顾名思义是插入到队尾,而由于是环形结构,咱们须要修改头、尾、待插入元素三者的引用关系
再看一下移除某个元素的形式:
移除某个元素就比较简单了,就是将该元素的 prev
和 next
链接起来便可,这样链接后,就跳过了该元素,使得该元素呈现出被移除的状态(没法在通路中访问到)
继续看下链接两个队列的操做:
看上去貌似很复杂,其实就是把两个环先解开,而后首尾相连成为一个新的环便可。这里经过意识流的做图方式,使用 1
和 2
标注了代码和链接动做的对应关系
最后看一下将队列一分为二的操做:
上图一样经过意识流的做图方式,使用 1
和 2
标注了代码和链接动做的对应关系;将本来以 h
开头的 queue,在 q
处剪开,h
和 q
以前的元素相链接成为一个新的 queue;n
做为另外一个 queue 的开头,链接 q
和断开前的队列的末尾,构成另外一个 queue
上面演示了一些具备有表明性的 queue 操做,感兴趣的同窗能够继续查看 queue.h 来一探究竟
你们或许会奇怪,为何没有按照它们在事件循环中的顺序进行介绍,并且还把它们三个放在了一块儿
若是你们在源码中搜索 uv__run_idle
或者 uv__run_check
会更加奇怪,由于咱们只能找到它们的声明,甚至找不到它们的定义
其实它们都是在 loop-watcher.c 中经过宏生成的,由于它们的操做都是同样的 - 从各自的队列中取出 handle 而后执行便可
须要说明的是,你们不要被 idle 的名字迷惑了,它并非事件循环闲置的时候才会执行的队列,而是在每次时间循环迭代中,都会执行的,彻底没有 idle 之意
不过要说彻底没有 idle 之意彷佛也不是特别合适,好比 idle 和 prepare 队列在内部实现上,无非是前后执行的队列而已:
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
// ...
while (r != 0 && loop->stop_flag == 0) {
// ...
uv__run_idle(loop);
uv__run_prepare(loop);
uv__io_poll(loop, timeout);
// ...
}
// ...
}
复制代码
那么如今有一个 handle,咱们但愿它在 uv__io_poll
以前执行,是添加到 idle 仍是 prepare 队列中呢?
我以为 prepare 是取「为了下面的 uv__io_poll
作准备」之意,因此若是是为了 io_poll 作准备的 handle,那么能够添加到 prepare 队列中,其他则能够添加到 idle 之中。一样的设定我以为也适用于 check,它运行在 io_poll 以后,可让用户作一些检验 IO 执行结果的工做,让任务队列更加语义化
对于 io_poll 咱们仍是从事件循环开始分析
下面是上文已经介绍过的事件循环的片断:
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
// ...
while (r != 0 && loop->stop_flag == 0) {
// ...
timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);
uv__io_poll(loop, timeout);
// ...
}
// ...
}
复制代码
上面的代码计算了一个 timeout
用于调用 uv__io_poll(loop, timeout)
uv__io_poll
定义在 linux-core.c 中,虽然这是一个包含注释在内接近 300 行的函数,但想必你们也发现了,其中的核心逻辑就是开头演示的 epoll 的用法:
void uv__io_poll(uv_loop_t* loop, int timeout) {
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
// ...
// `loop->backend_fd` 是使用 `epoll_create` 建立的 epoll 实例
epoll_ctl(loop->backend_fd, op, w->fd, &e)
// ...
}
// ...
for (;;) {
// ...
if (/* ... */) {
// ...
} else {
// ...
// `epoll_wait` 和 `epoll_pwait` 只有细微的差异,因此这里只考虑前者
nfds = epoll_wait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout);
// ...
}
}
// ...
for (i = 0; i < nfds; i++) {
// ...
w = loop->watchers[fd];
// ...
w->cb(loop, w, pe->events);
}
}
复制代码
epoll_wait
的 timeout
参数的含义是:
-1
表示一直等到有事件产生0
则当即返回,包含调用时产生的事件milliseconds
为单位,规约到将来某个系统时间片内结合上面这些,咱们看下 uv_backend_timeout
是如何计算 timeout
的:
int uv_backend_timeout(const uv_loop_t* loop) {
// 时间循环被外部中止了,因此让 `uv__io_poll` 理解返回
// 以便尽快结束事件循环
if (loop->stop_flag != 0)
return 0;
// 没有待处理的 handle 和 request,则也不须要等待了,一样让 `uv__io_poll`
// 尽快返回
if (!uv__has_active_handles(loop) && !uv__has_active_reqs(loop))
return 0;
// idle 队列不为空,也要求 `uv__io_poll` 尽快返回,这样尽快进入下一个时间循环
// 不然会致使 idle 产生太高的延迟
if (!QUEUE_EMPTY(&loop->idle_handles))
return 0;
// 和上一步目的同样,不过这里是换成了 pending 队列
if (!QUEUE_EMPTY(&loop->pending_queue))
return 0;
// 和上一步目的同样,不过这里换成,待关闭的 handles,都是为了不目标队列产生
// 太高的延迟
if (loop->closing_handles)
return 0;
return uv__next_timeout(loop);
}
int uv__next_timeout(const uv_loop_t* loop) {
const struct heap_node* heap_node;
const uv_timer_t* handle;
uint64_t diff;
heap_node = heap_min(timer_heap(loop));
// 若是没有 timer 待处理,则能够放心的 block 住,等待事件到达
if (heap_node == NULL)
return -1; /* block indefinitely */
handle = container_of(heap_node, uv_timer_t, heap_node);
// 有 timer,且 timer 已经到了要被执行的时间内,则需让 `uv__io_poll`
// 尽快返回,以在下一个事件循环迭代内处理超时的 timer
if (handle->timeout <= loop->time)
return 0;
// 没有 timer 超时,用最小超时间减去、当前的循环时间的差值,做为超时时间
// 由于在为了这个差值时间内是没有 timer 超时的,因此能够放心 block 以等待
// epoll 事件
diff = handle->timeout - loop->time;
if (diff > INT_MAX)
diff = INT_MAX;
return (int) diff;
}
复制代码
上面的 uv__next_timeout
实现主要分为三部分:
-1
,结合本节开头对 epoll_wait
的 timeout
参数的解释,-1
会让后续的 uv__io_poll
进入 block 状态、彻底等待事件的到达handle->timeout <= loop->time
,则返回 0
,这样 uv__io_poll
不会 block 住事件循环,目的是为了快速进入下一次事件循环、以执行超时的 timerdiff
来做为 uv__io_poll
的阻塞时间不知道你们发现没有,timeout 的计算,其核心指导思想就是要尽量的让 CPU 时间可以在事件循环的屡次迭代的、多个不一样任务队列的执行、中尽量的分配均匀,避免某个类型的任务产生很高的延迟
了解了 io_poll 队列是如何执行以后,咱们经过一个 echo server 的小栗子,来对 io_poll 有个总体的认识:
uv_loop_t *loop;
void echo_write(uv_write_t *req, int status) {
// ...
// 一些无所谓有,但有所谓无的收尾工做
}
void echo_read(uv_stream_t *client, ssize_t nread, uv_buf_t buf) {
// ...
// 建立一个写入请求(上文已经介绍过 Request 和 Handle 的区别),
// 将读取的客户端内容写回给客户端,写入完成后进入回调 `echo_write`
uv_write_t *write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
uv_write(write_req, client, &buf, 1, echo_write);
}
void on_new_connection(uv_stream_t *server, int status) {
// ...
// 建立 client 实例并关联到事件循环
uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
// 与创建客户端链接,并读取客户端输入,读取完成后进入 `echo_read` 回调
if (uv_accept(server, (uv_stream_t*) client) == 0) {
uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
}
// ...
}
int main() {
// 建立事件循环
loop = uv_default_loop();
// 建立 server 实例并关联事件循环
uv_tcp_t server;
uv_tcp_init(loop, &server);
// ...
// 绑定 server 到某个端口,并接受请求
uv_tcp_bind(&server, uv_ip4_addr("0.0.0.0", 7000));
// 新的客户端请求到达后,会进去到 `on_new_connection` 回调
uv_listen((uv_stream_t*) &server, 128, on_new_connection);
// ...
// 启动事件循环
return uv_run(loop, UV_RUN_DEFAULT);
}
复制代码
到目前为止,咱们已经确认过 io_poll 内部实现确实是使用的 epoll。在本文的开头,咱们也提到 epoll 目前并不能处理全部的 IO 操做,对于那些 epoll 不支持的 IO 操做,libuv 统一使用其内部的线程池来模拟出异步 IO。接下来咱们看看线程池的大体工做形式
由于咱们已经知道读写文件的操做是没法使用 epoll 的,那么就顺着这个线索,经过 uv_fs_read 的内部实现,找到 uv__work_submit 方法,发现是在其中初始化的线程池:
void uv__work_submit(uv_loop_t* loop, struct uv__work* w, enum uv__work_kind kind, void (*work)(struct uv__work* w), void (*done)(struct uv__work* w, int status)) {
uv_once(&once, init_once);
// ...
post(&w->wq, kind);
}
复制代码
因此线程池的建立、是一个延迟建立的单例。init_once
内部会调用 init_threads 来完成线程池初始化工做:
static uv_thread_t default_threads[4];
static void init_threads(void) {
// ...
nthreads = ARRAY_SIZE(default_threads);
val = getenv("UV_THREADPOOL_SIZE");
// ...
for (i = 0; i < nthreads; i++)
if (uv_thread_create(threads + i, worker, &sem))
abort();
// ...
}
复制代码
经过上面的实现,咱们知道默认的线程池中线程的数量是 4
,而且能够经过 UV_THREADPOOL_SIZE
环境变量从新指定该数值
除了对线程池进行单例延迟建立,uv__work_submit
固然仍是会提交任务的,这部分工做是由 post(&w->wq, kind)
完成的,咱们来看下 post 方法的实现细节:
static void post(QUEUE* q, enum uv__work_kind kind) {
uv_mutex_lock(&mutex);
// ...
// 将任务插入到 `wq` 这个线程共享的队列中
QUEUE_INSERT_TAIL(&wq, q);
// 若是有空闲线程,则通知它们开始工做
if (idle_threads > 0)
uv_cond_signal(&cond);
uv_mutex_unlock(&mutex);
}
复制代码
能够发现对于提交任务,其实就是将任务插入到线程共享队列 wq
,而且有空闲线程时才会通知它们工做。那么,若是此时没有空闲线程的话,是否是任务就被忽略了呢?答案是否,由于工做线程会在完成当前工做后,主动检查 wq
队列是否还有待完成的工做,有的话会继续完成,没有的话,则进入睡眠,等待下次被唤醒(后面会继续介绍这部分细节)
上面在建立线程的时候 uv_thread_create(threads + i, worker, &sem)
中的 worker
就是线程执行的内容,咱们来看下 worker 的大体内容:
// 线程池的 wq,提交的任务都先链到其中
static QUEUE wq;
static void worker(void* arg) {
// ...
// `idle_threads` 和 `run_slow_work_message` 这些是线程共享的,因此要加个锁
uv_mutex_lock(&mutex);
for (;;) {
// 这里的条件判断,能够大体当作是「没有任务」为 true
while (QUEUE_EMPTY(&wq) ||
(QUEUE_HEAD(&wq) == &run_slow_work_message &&
QUEUE_NEXT(&run_slow_work_message) == &wq &&
slow_io_work_running >= slow_work_thread_threshold())) {
// 轮转到当前进程时由于没有任务,则无事可作
// 空闲线程数 +1
idle_threads += 1;
// `uv_cond_wait` 内部是使用 `pthread_cond_wait` 调用后会:
// - 让线程进入等待状态,等待条件变量 `cond` 发生变动
// - 对 `mutex` 解锁
//
// 此后,其余线程中都可使用 `uv_cond_signal` 内部是 `pthread_cond_signal`
// 来广播一个条件变量 `cond` 变动的事件,操做系统内部会随机唤醒一个等待 `cond`
// 变动的线程,并在被唤醒线程的 uv_cond_wait 调用返回以前,对以前传入的 `mutex`
// 参数上锁
//
// 所以循环跳出(有任务)后,`mutex` 必定是上锁的
uv_cond_wait(&cond, &mutex);
idle_threads -= 1;
}
// ...
// 由于上锁了,因此放心进行队列的弹出操做
q = QUEUE_HEAD(&wq);
QUEUE_REMOVE(q);
// ...
// 由于已经完成了弹出,能够解锁,让其余线程能够继续操做队列
uv_mutex_unlock(&mutex);
// 利用 c 结构体的小特性,作字段偏移,拿到 `q` 所属的 `uv__work` 实例
w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);
// 下面要操做 `w->loop->wq` 因此要上锁
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL;
// 须要看仔细,和开头部分线程池中的 wq 区别开
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
// 唤醒主线程的事件循环
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
// 这一步上锁是必须的,由于下次迭代的开头又须要
// 操做共享内存,不过没必要担忧死锁,由于它和下一次迭代
// 中的 `uv_cond_wait` 解锁操做是对应的
uv_mutex_lock(&mutex);
// ...
}
}
复制代码
上面咱们保留了相对重要的内容,并加以注释。能够大体地归纳为:
uv_cond_wait
来等待被唤醒wq
中主动找一个任务作,完成任务就唤醒主线程,由于回调须要在主线程被执行uv_cond_wait
再次进入睡眠状态uv_cond_signal
来通知操做系统作调度当线程池完成任务后,须要通知主线程执行对应的回调。通知的方式颇有意思,咱们先来看下事件循环初始化操做 uv_loop_init:
int uv_loop_init(uv_loop_t* loop) {
// ...
// 初始化 min heap 和各类队列,用于存放各式的 handles
heap_init((struct heap*) &loop->timer_heap);
QUEUE_INIT(&loop->wq);
QUEUE_INIT(&loop->idle_handles);
QUEUE_INIT(&loop->async_handles);
QUEUE_INIT(&loop->check_handles);
QUEUE_INIT(&loop->prepare_handles);
QUEUE_INIT(&loop->handle_queue);
// ...
// 调用 `epoll_create` 建立 epoll 实例
err = uv__platform_loop_init(loop);
if (err)
goto fail_platform_init;
// ...
// 用于线程池通知的初始化
err = uv_async_init(loop, &loop->wq_async, uv__work_done);
// ...
}
复制代码
上面的代码中 uv_async_init 是用于初始化线程池通知相关的工做,下面是它的函数签名:
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb);
复制代码
因此第三个实参 uv__work_done 实际上是一个回调函数,咱们能够看下它的内容:
void uv__work_done(uv_async_t* handle) {
struct uv__work* w;
uv_loop_t* loop;
QUEUE* q;
QUEUE wq;
int err;
loop = container_of(handle, uv_loop_t, wq_async);
uv_mutex_lock(&loop->wq_mutex);
// 将目前的 `loop->wq` 所有移动到局部变量 `wq` 中,
//
// `loop->wq` 中的内容是在上文 worker 中任务完成后使用
// `QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq)` 添加的
//
// 这样尽快释放锁,让其余任务可尽快接入
QUEUE_MOVE(&loop->wq, &wq);
uv_mutex_unlock(&loop->wq_mutex);
// 遍历 `wq` 执行其中每一个任务的完成回调
while (!QUEUE_EMPTY(&wq)) {
q = QUEUE_HEAD(&wq);
QUEUE_REMOVE(q);
w = container_of(q, struct uv__work, wq);
err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
w->done(w, err);
}
}
复制代码
知道了 uv__work_done
就是负责执行任务完成回调的工做后,继续看一下 uv_async_init
的内容,看看其内部是如何使用 uv__work_done
的:
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
// ...
// 待调查
err = uv__async_start(loop);
// ...
// 建立了一个 async handle
uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
// 在目前的脉络中 `async_cb` 就是 `uv__work_done` 了
handle->async_cb = async_cb;
handle->pending = 0;
// 把 async handle 加入到队列 `loop->async_handles` 中
QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
// ...
}
复制代码
咱们继续看一下以前待调查的 uv__async_start 的内容:
static int uv__async_start(uv_loop_t* loop) {
// ...
// `eventfd` 能够建立一个 epoll 内部维护的 fd,该 fd 能够和其余真实的 fd(好比 socket fd)同样
// 添加到 epoll 实例中,能够监听它的可读事件,也能够对其进行写入操做,所以就用户代码就能够借助这个
// 看似虚拟的 fd 来实现的事件订阅了
err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (err < 0)
return UV__ERR(errno);
pipefd[0] = err;
pipefd[1] = -1;
// ...
uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
uv__io_start(loop, &loop->async_io_watcher, POLLIN);
loop->async_wfd = pipefd[1];
return 0;
}
复制代码
咱们知道 epoll 是支持 socket fd 的,对于支持的 fd,epoll 的事件调度将很是的高效。而对于不支持的 IO 操做,libuv 则使用 eventfd
建立一个虚拟的 fd,继续利用 fd 的事件调度功能
咱们继续看下上面出现的 uv__io_start 的细节,来确认一下事件订阅的步骤:
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
// ...
// 你们能够翻到上面 `uv__io_poll` 的部分,会发现其中有遍历 `loop->watcher_queue`
// 将其中的 fd 都加入到 epoll 实例中,以订阅它们的事件的动做
if (QUEUE_EMPTY(&w->watcher_queue))
QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
// 将 fd 和对应的任务关联的操做,一样能够翻看上面的 `uv__io_poll`,当接收到事件
// 通知后,会有从 `loop->watchers` 中根据 fd 取出任务并执行其完成回调的动做
// 另外,根据 fd 确保 watcher 不会被重复添加
if (loop->watchers[w->fd] == NULL) {
loop->watchers[w->fd] = w;
loop->nfds++;
}
}
复制代码
确认了事件订阅步骤之后,咱们来看下事件回调的内容。上面的形参 w
在咱们目前的脉络中,对应的实参是 loop->async_io_watcher
,而它是经过 uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0])
初始化的,咱们看一下 uv__io_init
的函数签名:
void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd);
复制代码
因此 uv__async_io 是接收到虚拟 fd 事件的回调函数,继续看下它的内容:
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
// ...
// 确保 `w` 一定是 `loop->async_io_watcher`
assert(w == &loop->async_io_watcher);
for (;;) {
// 从中读一些内容,`w->fd` 就是上面使用 `eventfd` 建立的虚拟 fd
// 不出意外的话,通知那端的方式、必定是往这个 fd 里面写入一些内容,咱们能够后面继续确认
// 从中读取一些内容的目的是避免缓冲区被通知所用的不含实际意义的字节占满
r = read(w->fd, buf, sizeof(buf));
// ...
}
// 执行 `loop->async_handles` 队列,任务实际的回调
QUEUE_MOVE(&loop->async_handles, &queue);
while (!QUEUE_EMPTY(&queue)) {
q = QUEUE_HEAD(&queue);
h = QUEUE_DATA(q, uv_async_t, queue);
QUEUE_REMOVE(q);
QUEUE_INSERT_TAIL(&loop->async_handles, q);
// ...
h->async_cb(h);
}
}
复制代码
咱们已经知道了事件的订阅,以及事件响应的方式
接着继续确认一下事件通知是如何在线程池中触发的。uv_async_send 是唤醒主线程的开放 API,它实际上是调用的内部 API uv__async_send:
static void uv__async_send(uv_loop_t* loop) {
const void* buf;
ssize_t len;
int fd;
// ...
fd = loop->async_io_watcher.fd;
do
// 果真事件通知这一端就是往 `eventfd` 建立的虚拟 fd 写入数据
// 剩下的就是交给 epoll 高效的事件调度机制唤醒事件订阅方就能够了
r = write(fd, buf, len);
while (r == -1 && errno == EINTR);
// ...
}
复制代码
咱们最后经过一副意识流的图,对上面的线程池的流程进行小结:
上图中咱们的任务是在 uv__run_idle(loop);
执行的回调中经过 uv__work_submit
完成的,可是实际上,对于使用事件循环的应用而言,整个应用的时间片都划分在了各个不一样的队列回调中,因此实际上、从其他的队列中提交任务也是可能的
咱们开头已经介绍过,只有 Handle 才配备了关闭的 API,由于 Request 是一个短暂任务。Handle 的关闭须要使用 uv_close:
void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
assert(!uv__is_closing(handle));
handle->flags |= UV_HANDLE_CLOSING;
handle->close_cb = close_cb;
switch (handle->type) {
// 根据不一样的 handle 类型,执行各自的资源回收工做
case UV_NAMED_PIPE:
uv__pipe_close((uv_pipe_t*)handle);
break;
case UV_TTY:
uv__stream_close((uv_stream_t*)handle);
break;
case UV_TCP:
uv__tcp_close((uv_tcp_t*)handle);
break;
// ...
default:
assert(0);
}
// 添加到 `loop->closing_handles`
uv__make_close_pending(handle);
}
void uv__make_close_pending(uv_handle_t* handle) {
assert(handle->flags & UV_HANDLE_CLOSING);
assert(!(handle->flags & UV_HANDLE_CLOSED));
handle->next_closing = handle->loop->closing_handles;
handle->loop->closing_handles = handle;
}
复制代码
调用 uv_close
关闭 Handle 后,libuv 会先释放 Handle 占用的资源(好比关闭 fd),随后经过调用 uv__make_close_pending
把 handle 链接到 closing_handles
队列中,该队列会在事件循环中被 uv__run_closing_handles(loop)
调用所执行
使用了事件循环后,业务代码的执行时机都在回调中,因为 closing_handles
是最后一个被执行的队列,因此在其他队列的回调中、那些执行 uv_close
时传递的回调,都会在当次迭代中被执行
本文沿着 Libuv 的 Linux 实现的脉络对其内部实现进行了简单的探索、尝试解开 libuv 的神秘面纱。很显然,只看这篇是不够的,希望有幸能够做为想深刻了解 libuv 的起始读物。后续咱们会结合 Node.js 来探究它们内部是如何衔接的
本文发布自 网易云音乐大前端团队,文章未经受权禁止任何形式的转载。咱们常年招收前端、iOS、Android,若是你准备换工做,又刚好喜欢云音乐,那就加入咱们 grp.music-fe (at) corp.netease.com!