libuv我在今年四月份的时候开始接触,一开始也遇到了不少坑,但后来理解并遵照了它的设计思想,一切就变得很方便。这几天开始着手精读它的源码,本着记录本身的学习痕迹,也但愿能增长别人搜索相关问题结果数的目的,所以就有了这些东西,这个系列至少会有四篇,后续再说吧。
html
那么它是什么,一个高效轻量的跨平台异步io库,在linux下,它整合了libevent,在windows下,它用iocp重写了一套。它有那些功能,以下面这幅官网上的图所示:linux
它的总体结构基于事件循环,简单的说就是外部的接口实际上是对内层的一个个请求,并无作真正的事,这些请求都存储在内部一个请求队列中,在事件循环中,再从请求队列中取出他们,而后作具体的事情,作完了利用回调函数通知调用者,这样一来,全部的外部接口均可以变成异步的。windows
它的世界有三类元素:api
它的实现基于一个假设,运行于单线程.异步
下面主要来分析它的主体和经常使用部分,循环,timer,tcp,udp,fs,特殊的uv_handle_t.此次分析是基于其windows部分的,由于我在windows上用它比较多,不过其不一样平台上的思路是一致的,只是具体实现上采用的系统api不一样罢了.库版本基于1.7.0版本.socket
uv_loop_tasync
先来看定义,在include/uv.h的1459行:tcp
struct uv_loop_s { /* User data - use this for whatever. */
void* data; /* Loop reference counting. */ unsigned int active_handles; void* handle_queue[2]; void* active_reqs[2]; /* Internal flag to signal loop stop. */ unsigned int stop_flag; UV_LOOP_PRIVATE_FIELDS }; #define UV_LOOP_PRIVATE_FIELDS \
/* The loop's I/O completion port */ \ HANDLE iocp; \ /* The current time according to the event loop. in msecs. */ \ uint64_t time; \ /* Tail of a single-linked circular queue of pending reqs. If the queue */ \ /* is empty, tail_ is NULL. If there is only one item, */ \ /* tail_->next_req == tail_ */ \ uv_req_t* pending_reqs_tail; \ /* Head of a single-linked list of closed handles */ \ uv_handle_t* endgame_handles; \ /* The head of the timers tree */ \ struct uv_timer_tree_s timers; \ /* Lists of active loop (prepare / check / idle) watchers */ \ uv_prepare_t* prepare_handles; \ uv_check_t* check_handles; \ uv_idle_t* idle_handles; \ /* This pointer will refer to the prepare/check/idle handle whose */ \ /* callback is scheduled to be called next. This is needed to allow */ \ /* safe removal from one of the lists above while that list being */ \ /* iterated over. */ \ uv_prepare_t* next_prepare_handle; \ uv_check_t* next_check_handle; \ uv_idle_t* next_idle_handle; \ /* This handle holds the peer sockets for the fast variant of uv_poll_t */ \ SOCKET poll_peer_sockets[UV_MSAFD_PROVIDER_COUNT]; \ /* Counter to keep track of active tcp streams */ \ unsigned int active_tcp_streams; \ /* Counter to keep track of active udp streams */ \ unsigned int active_udp_streams; \ /* Counter to started timer */ \ uint64_t timer_counter; \ /* Threadpool */ \ void* wq[2]; \ uv_mutex_t wq_mutex; \ uv_async_t wq_async;
active_handles是uv_loop_t的引用计数,每投递一个uv_handle_t或uv_req_t都会使其递增,结束一个请求都会使其递减。
handle_queue是uv_handle_t族的队列。
active_queue是uv_req_t族的队列。
data是用户数据域。
uv_prepare_t,uv_check_t,uv_idle_t,uv_async_t是个特殊的uv_handle_t,随后会说明。
pending_reqs_tail是用来装已经处理过的请求的队列。
endgame_handles是用来装执行了关闭的uv_handle_t族链表。
timers是计时器结构,用最小堆实现。
iocp是完成端口的句柄。
这个结构外部除了data外,其余都不该该使用。与其直接有关的接口有两个, uv_loop_init和uv_run。
先来看uv_loop_init,在/src/core.c的126行:
int uv_loop_init(uv_loop_t* loop) { int err; /* Initialize libuv itself first */ uv__once_init(); /* Create an I/O completion port */ loop->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1); if (loop->iocp == NULL) return uv_translate_sys_error(GetLastError()); /* To prevent uninitialized memory access, loop->time must be initialized * to zero before calling uv_update_time for the first time. */ loop->time = 0; uv_update_time(loop); QUEUE_INIT(&loop->wq); QUEUE_INIT(&loop->handle_queue); QUEUE_INIT(&loop->active_reqs); loop->active_handles = 0; loop->pending_reqs_tail = NULL; loop->endgame_handles = NULL; RB_INIT(&loop->timers); loop->check_handles = NULL; loop->prepare_handles = NULL; loop->idle_handles = NULL; loop->next_prepare_handle = NULL; loop->next_check_handle = NULL; loop->next_idle_handle = NULL; memset(&loop->poll_peer_sockets, 0, sizeof loop->poll_peer_sockets); loop->active_tcp_streams = 0; loop->active_udp_streams = 0; loop->timer_counter = 0; loop->stop_flag = 0; err = uv_mutex_init(&loop->wq_mutex); if (err) goto fail_mutex_init; err = uv_async_init(loop, &loop->wq_async, uv__work_done); if (err) goto fail_async_init; uv__handle_unref(&loop->wq_async); loop->wq_async.flags |= UV__HANDLE_INTERNAL; return 0; fail_async_init: uv_mutex_destroy(&loop->wq_mutex); fail_mutex_init: CloseHandle(loop->iocp); loop->iocp = INVALID_HANDLE_VALUE; return err; }
这里初始化了本身各个字段,并用uv__once_init模拟了pthread_once_t初始化了库的各个子系统。函数
再来看看uv_run,这是核心函数,库的入口与发动机,在src/core.c的372行:oop
1 int uv_run(uv_loop_t *loop, uv_run_mode mode) { 2 DWORD timeout; 3 int r; 4 int ran_pending; 5 void (*poll)(uv_loop_t* loop, DWORD timeout); 6
7 if (pGetQueuedCompletionStatusEx) 8 poll = &uv_poll_ex; 9 else
10 poll = &uv_poll; 11
12 r = uv__loop_alive(loop); 13 if (!r) 14 uv_update_time(loop); 15
16 while (r != 0 && loop->stop_flag == 0) { 17 uv_update_time(loop); 18 uv_process_timers(loop); 19
20 ran_pending = uv_process_reqs(loop); 21 uv_idle_invoke(loop); 22 uv_prepare_invoke(loop); 23
24 timeout = 0; 25 if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT) 26 timeout = uv_backend_timeout(loop); 27
28 (*poll)(loop, timeout); 29
30 uv_check_invoke(loop); 31 uv_process_endgames(loop); 32
33 if (mode == UV_RUN_ONCE) { 34 /* UV_RUN_ONCE implies forward progress: at least one callback must have 35 * been invoked when it returns. uv__io_poll() can return without doing 36 * I/O (meaning: no callbacks) when its timeout expires - which means we 37 * have pending timers that satisfy the forward progress constraint. 38 * 39 * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from 40 * the check. 41 */
42 uv_process_timers(loop); 43 } 44
45 r = uv__loop_alive(loop); 46 if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT) 47 break; 48 } 49
50 /* The if statement lets the compiler compile it to a conditional store. 51 * Avoids dirtying a cache line. 52 */
53 if (loop->stop_flag != 0) 54 loop->stop_flag = 0; 55
56 return r; 57 }
循环的流程用官方一张图来表示:
这些具体的处理函数也基本是各个子系统的入口函数,在后面分析各个子系统时会详细说明,这里只分析它的三种运行模式,也就是mode参数指定的:
1 int uv_backend_timeout(const uv_loop_t* loop) { 2 if (loop->stop_flag != 0) 3 return 0; 4
5 if (!uv__has_active_handles(loop) && !uv__has_active_reqs(loop)) 6 return 0; 7
8 if (loop->pending_reqs_tail) 9 return 0; 10
11 if (loop->endgame_handles) 12 return 0; 13
14 if (loop->idle_handles) 15 return 0; 16
17 return uv__next_timeout(loop);
是这样的,A有任何可处理的请求,返回0.B取最小计时器事件的时间与当前时间的差值。C没有计时器事件返回INFINITE,直到有一个完成端口事件。
最后来讲一下uv_loopt_t是如何退出的,A.stop_flag为1.B.没有任何请求(也就是uv_loop_alive的判断)。用uv_stop能够设置stop_flag为1。
未完待续,下一篇讲四大特殊uv_handle_t。