来源:https://zhuanlan.zhihu.com/p/50497450前端
前序:说说为啥要研究libuv,其实在好久以前(大概2年前吧)玩nodejs的时候就对这个核心库很是感兴趣,不过因为当年水平确实比较菜,大概看了看以后实在没能静下心来看下去。18年初的时候,360直播云官网作了React同构,那个时候我问本身若是真有百万并发,天天亿级的访问量有没有信心保证中间node层一次不挂(或者不出任何事故),其实我到今天仍然是没有足够底气的。缘由有两个吧:一是对nodejs和它底层的内容还远远不够了解,二是对监控层面作的不够好。咱们大概也都知道alinode,他们早在3 4年前就在nodejs上作了不少工做,好比v8内存监控等,可是比较遗憾的是alinode至今没有开源。因而乎有了个人第一篇关于libuv的文章,后面争取还会更新nodejs、v8等相关的内容。 本文从下面几个方面来介绍libuv,经过fs、net两方面介绍libuv的思想。java
首先咱们能够在libuv上找到libuv这个框架,在README.md里,咱们就能够在Build Instructions找到安装方法,做者电脑操做系统是macox(因此后面的实例也是以linux、unix为主,不会讲windows)。咱们首先把项目clone到咱们的电脑上,在项目根目录执行一下的命令,在执行过程当中可能会出现各类底层库没有安装的状况,按照提示自行安装就能够了,做者在执行 xcodebuild 的时候发现不能加上 -target All 的参数,不加的话能够顺利build过去。node
$ ./gyp_uv.py -f xcode
$ xcodebuild -ARCHS="x86_64" -project uv.xcodeproj \
-configuration Release -target All
build完成后 咱们能够在项目目录里找到 build/Release/libuv.a 文件,这个就是编译后的文件了,咱们稍后会用到。 准备工做作好以后咱们就能够建立一个C或者C++的工程了,在Mac上我通常使用xcode来编写oc、c、c++的项目。 首先建立一个C项目,这个时候咱们须要把咱们以前编译的libuv.a的文件加入到项目的依赖中,咱们在Build Phases中的 Link Binary with Libraries中添加libuv.a的路径,同时咱们须要在项目根目录引入uv.h等文件头。准备工做作好以后,咱们就开始学习怎么写标准的hello world了 哈哈哈哈。linux
#include <stdio.h>
#include <stdlib.h>
#include <uv.h>
int main() {
uv_loop_t *loop = malloc(sizeof(uv_loop_t));
uv_loop_init(loop);
printf("Now quitting.\n");
uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop);
free(loop);
return 0;
}
上述代码仅仅初始化了一个loop循环,并无执行任何内容,而后就close且退出了。虽然上述代码并无利用libuv的async功能,可是给咱们展现了 uv_loop_init uv_run 两个核心函数。咱们稍后会介绍他们作了什么。c++
在开始介绍整个整个libuv以前,我不得不首先介绍一个数据结构,由于这个数据结构在libuv里无处不在,这个数据结构就是--循环双向链表。 咱们在项目根目录下的src目录能够找到queue.h的头文件。不错,这个数据结构就是用宏实现的,那我让咱们一块儿来学习一下什么是链表。git
链表是一种物理存储单元上非连续、非顺序的存储结构github
双向链表其实就是头尾相连macos
看图咱们就明白了,所谓的循环链表就是把头尾相连。json
#define QUEUE_NEXT(q) (*(QUEUE **) &((*(q))[0]))
#define QUEUE_PREV(q) (*(QUEUE **) &((*(q))[1]))
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Public macros. */
#define QUEUE_DATA(ptr, type, field) \
((type *) ((char *) (ptr) - offsetof(type, field)))
#define QUEUE_INIT(q) \
do { \
QUEUE_NEXT(q) = (q); \
QUEUE_PREV(q) = (q); \
} \
while (0)
上述代码我只截取了部分的实现 其实这里我只想讲两个点 1:QUEUE_NEXT 的实现windows
(*(QUEUE **) &((*(q))[0]))
在这个宏里,他为何用这个复杂的方式来实现呢? 其实他有两个目的:强制类型转换、成为左值
*(q))[0]
这个步骤是取到数组的第一个元素
(QUEUE **)
这个步骤进行强制类型转换
(*(nnn) &(xxx))
这个步骤目的就是为了使xxx成为左值
2:QUEUE_DATA 获取链表的值 巧妙的使用了地址的偏移量来完成
#include "queue.h"
#include <stdio.h>
static QUEUE* q;
static QUEUE queue;
struct user_s {
int age;
char* name;
QUEUE node;
};
int main() {
struct user_s* user;
struct user_s john;
struct user_s henry;
john.name = "john";
john.age = 44;
henry.name = "henry";
henry.age = 32;
QUEUE_INIT(&queue);
QUEUE_INIT(&john.node);
QUEUE_INIT(&henry.node);
QUEUE_INIT(&willy.node);
QUEUE_INIT(&sgy.node);
((*(&queue))[0]) = john.node;
(*(QUEUE **) &((*(&queue))[0])) = &john.node;
QUEUE_INSERT_TAIL(&queue, &john.node);
QUEUE_INSERT_TAIL(&queue, &henry.node);
q = QUEUE_HEAD(&queue);
user = QUEUE_DATA(q, struct user_s, node);
printf("Received first inserted user: %s who is %d.\n",
user->name, user->age);
QUEUE_REMOVE(q);
QUEUE_FOREACH(q, &queue) {
user = QUEUE_DATA(q, struct user_s, node);
printf("Received rest inserted users: %s who is %d.\n",
user->name, user->age);
}
return 0;
}
从上面代码能够总结出5个方法 QUEUE_INIT 队列初始化 QUEUE_INSERT_TAIL 插入到队尾 QUEUE_HEAD 头部第一个元素 QUEUE_DATA 得到元素的内容 QUEUE_REMOVE 从队列中移除元素
那双向循环链表就先简单介绍到这。
libuv为何能够这么高效呢?实际他使用了操做系统提供的高并发异步模型
linux: epoll
freebsd: kqueue
windows: iocp
每一个咱们常见的操做系统都为咱们封装了相似的高并发异步模型,那libuv其实就是对各个操做系统进行封装,最后暴露出统一的api供开发者调用,开发者不须要关系底层是什么操做系统,什么API了。 咱们来看一下同步模型和异步模型的区别
咱们在一个线程中调用网络请求,以后线程就会被阻塞,直到返回结果才能继续执行线程
在异步模型中 咱们调用网络请求后不在去直接调用accept阻塞线程,而是轮询fd是否发生变化,在返回内容后咱们在调用cb执行咱们的代码,这个过程是非阻塞的。 说了这么多咱们经过2个例子了解一下其中的原理。
咱们首先了解一下 C是如何建立socket的,以后咱们在看一下若是经过高并发异步模型来建立socket,最后咱们在了解一下 libuv下怎么建立socket。
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/shm.h>
#define MYPORT 8887
#define QUEUE 20
#define BUFFER_SIZE 1024
int main()
{
//定义sockfd AF_INET(IPv4) AF_INET6(IPv6) AF_LOCAL(UNIX协议) AF_ROUTE(路由套接字) AF_KEY(秘钥套接字)
// SOCK_STREAM(字节流套接字) SOCK_DGRAM
int server_sockfd = socket(AF_INET, SOCK_STREAM, 0);
///定义sockaddr_in
struct sockaddr_in server_sockaddr;
server_sockaddr.sin_family = AF_INET;
server_sockaddr.sin_port = htons(MYPORT);
server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
///bind,成功返回0,出错返回-1
if(bind(server_sockfd,(struct sockaddr *)&server_sockaddr,sizeof(server_sockaddr))==-1)
{
perror("bind");
exit(1);
}
printf("监听%d端口\n", MYPORT);
///listen,成功返回0,出错返回-1
if(listen(server_sockfd, QUEUE) == -1)
{
perror("listen");
exit(1);
}
///客户端套接字
char buffer[BUFFER_SIZE];
struct sockaddr_in client_addr;
socklen_t length = sizeof(client_addr);
printf("等待客户端链接\n");
///成功返回非负描述字,出错返回-1
int conn = accept(server_sockfd, (struct sockaddr*)&client_addr, &length);
if(conn<0)
{
perror("connect");
exit(1);
}
printf("客户端成功链接\n");
while(1)
{
memset(buffer,0,sizeof(buffer));
long len = recv(conn, buffer, sizeof(buffer), 0);
//客户端发送exit或者异常结束时,退出
;
if(strcmp(buffer,"exit\n")==0 || len<=0) {
printf("出现异常");
break;
}
printf("来自客户端数据:\n");
fwrite(buffer, len, 1, stdout);
send(conn, buffer, len, 0);
printf("发送给客户端数据:\n");
fwrite(buffer, len, 1, stdout);
}
close(conn);
close(server_sockfd);
return 0;
}
代码一大坨,其实上咱们简单拆分一下
第一步:建立socket 文件描述符
第二步:定义socket addr
第三步:绑定文件描述符和地址 bind
第四步:监听文件描述符 listen
第五步:等待socket返回内容 accept
第六步:接收信息 recv
因为做者电脑是macos,因此只能使用kqueue,不能使用epoll。
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/shm.h>
#define MYPORT 8887
#define QUEUE 20
#define BUFFER_SIZE 1024
int main()
{
// 定义sockfd AF_INET(IPv4) AF_INET6(IPv6) AF_LOCAL(UNIX协议) AF_ROUTE(路由套接字) AF_KEY(秘钥套接字)
// SOCK_STREAM(字节流套接字) SOCK_DGRAM
int server_sockfd = socket(AF_INET, SOCK_STREAM, 0);
// 定义sockaddr_in
struct sockaddr_in server_sockaddr;
server_sockaddr.sin_family = AF_INET;
server_sockaddr.sin_port = htons(MYPORT);
server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
// bind,成功返回0,出错返回-1
if(bind(server_sockfd,(struct sockaddr *)&server_sockaddr,sizeof(server_sockaddr))==-1)
{
perror("bind");
exit(1);
}
printf("监听%d端口\n", MYPORT);
// listen,成功返回0,出错返回-1
if(listen(server_sockfd, QUEUE) == -1)
{
perror("listen");
exit(1);
}
//建立一个消息队列并返回kqueue描述符
int kq = kqueue();
struct kevent change_list; //想要监控的事件
struct kevent event_list[10000]; //用于kevent返回
char buffer[1024];
int nevents;
// 监听sock的读事件
EV_SET(&change_list, server_sockfd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
while(1) {
printf("new loop...\n");
// 等待监听事件的发生
nevents = kevent(kq, &change_list, 1, event_list, 2, NULL);
if (nevents < 0) {
printf("kevent error.\n"); // 监听出错
} else if (nevents > 0) {
printf("get events number: %d\n", nevents);
for (int i = 0; i < nevents; ++i) {
printf("loop index: %d\n", i);
struct kevent event = event_list[i]; //监听事件的event数据结构
int clientfd = (int) event.ident; // 监听描述符
// 表示该监听描述符出错
if (event.flags & EV_ERROR) {
close(clientfd);
printf("EV_ERROR: %s\n", strerror(event_list[i].data));
}
// 表示sock有新的链接
if (clientfd == server_sockfd) {
printf("new connection\n");
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int new_fd = accept(server_sockfd, (struct sockaddr *) &client_addr, &client_addr_len);
long len = recv(new_fd, buffer, sizeof(buffer), 0);
char remote[INET_ADDRSTRLEN];
printf("connected with ip: %s, port: %d\n",
inet_ntop(AF_INET, &client_addr.sin_addr, remote, INET_ADDRSTRLEN),
ntohs(client_addr.sin_port));
send(new_fd, buffer, len, 0);
}
}
}
}
return 0;
}
咱们能够看到,listen以前都是同样的,不在赘述,简化一下后面的步骤
第一步:建立 kqueue描述符
第二部:监听socket读事件 EV_SET
第三步:绑定kq 和 change_list kevent
一直while循环直到kevent返回能够的文件描述符数量 那到这里其实咱们就彻底弄懂了 如何直接用C写出高并发异步是怎么运行的。那么咱们就看看使用libuv的例子吧
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h>
#define DEFAULT_PORT 7000
#define DEFAULT_BACKLOG 128
uv_loop_t *loop;
struct sockaddr_in addr;
typedef struct {
uv_write_t req;
uv_buf_t buf;
} write_req_t;
void free_write_req(uv_write_t *req) {
write_req_t *wr = (write_req_t*) req;
free(wr->buf.base);
free(wr);
}
void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
buf->base = (char*) malloc(suggested_size);
buf->len = suggested_size;
}
void on_close(uv_handle_t* handle) {
free(handle);
}
void echo_write(uv_write_t *req, int status) {
if (status) {
fprintf(stderr, "Write error %s\n", uv_strerror(status));
}
free_write_req(req);
}
void echo_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
if (nread > 0) {
write_req_t *req = (write_req_t*) malloc(sizeof(write_req_t));
req->buf = uv_buf_init(buf->base, nread);
fwrite(buf->base, 30, 1, stdout);
uv_write((uv_write_t*) req, client, &req->buf, 1, echo_write);
return;
}
if (nread < 0) {
if (nread != UV_EOF)
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
uv_close((uv_handle_t*) client, on_close);
}
free(buf->base);
}
void on_new_connection(uv_stream_t *server, int status) {
if (status < 0) {
fprintf(stderr, "New connection error %s\n", uv_strerror(status));
// error!
return;
}
uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
if (uv_accept(server, (uv_stream_t*) client) == 0) {
uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
}
else {
uv_close((uv_handle_t*) client, on_close);
}
}
int main() {
loop = uv_default_loop();
uv_tcp_t server;
uv_tcp_init(loop, &server);
uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);
uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection);
if (r) {
fprintf(stderr, "Listen error %s\n", uv_strerror(r));
return 1;
}
return uv_run(loop, UV_RUN_DEFAULT);
}
实际上总体咱们均可以把libuv和咱们原生的c kqueue进行一一对应,发现相差很少,惟一不一样是咱们须要定义 uv_loop 这个内部循环,后面咱们在来说套循环机制。
咱们学习完了网络,那么咱们再来看看文件i/o是怎么处理的。
刚刚咱们玩转了socket来看这张图是否是很熟悉?可是发现右侧有了很大的不一样。文件操做、DNS、用户代码不是基于epoll这种模型吗? 显而易见咱们有了答案,这是为何呢?其实很简单文件的不少操做就是同步的,可是libuv为了统一异步,利用开辟线程进行文件等操做模拟了异步的过程!!原来咱们用了这么久才发现他是个骗子。哈哈!实际上是咱们学艺不精。 那其实讲到这里文件读写其实讲的差很少了,咱们仍是来看看例子吧!
#include <stdio.h>
#include <uv.h>
uv_fs_t open_req;
uv_fs_t _read;
static char buffer[1024];
static uv_buf_t iov;
void on_read(uv_fs_t *req) {
printf("%s\n",iov.base);
}
void on_open(uv_fs_t *req) {
printf("%zd\n",req->result);
iov = uv_buf_init(buffer, sizeof(buffer));
uv_fs_read(uv_default_loop(), &_read, (int)req->result,
&iov, 1, -1, on_read);
}
int main() {
const char* path = "/Users/sgy/koa/package.json";
// O_RDONLY 、 O_WRONLY 、 O_RDWR 、 O_CREAT
uv_fs_open(uv_default_loop(), &open_req, path, O_RDONLY, 0, on_open);
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
uv_fs_req_cleanup(&open_req);
return 0;
}
其实libuv底层对文件open和read的操做是分开的。 看到这里文件api没啥讲的了,咱们来简单讲讲线程池。
线程池就是对线程的统一管理,预先建立出线程,若是有任务就把任务放到线程池里去执行。
经过上图咱们能够看到有任务进来首先会插入到链表中进行排队等待, 直到线程空余就会去链表中去取。 经过阅读 src/threadpool.c文件咱们能够了解 MAX_THREADPOOL_SIZE 128 最大线程为128个 default_threads[4] 默认只会开辟4个线程 若是你对底层不了解 那当你在进行大量的文件i/o时 线程池数量就是阻碍你的最大障碍。 为啥最大只能建立128个线程呢?由于大多数操做系统建立一个线程大概花费1M的内存空间,外加用户自己代码也要占用大量的内存,因此这里设置了最大128的限制。
咱们经过网络和文件了解了libuv,那么咱们来看看libuv的循环机制
uv_loop_t *loop;
loop = uv_default_loop()
uv_run(loop, UV_RUN_DEFAULT);
首先咱们会建立 loop 而后一系列的骚操做以后 最后咱们执行了uv_run 嗯嗯 那uv_run 确定是突破口了 在src/unix/core.c 文件里 咱们找到了 uv_run的定义
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;
r = uv__loop_alive(loop);
if (!r)
uv__update_time(loop);
while (r != 0 && loop->stop_flag == 0) {
uv__update_time(loop);
uv__run_timers(loop);
ran_pending = uv__run_pending(loop);
uv__run_idle(loop);
uv__run_prepare(loop);
timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);
uv__io_poll(loop, timeout);
uv__run_check(loop);
uv__run_closing_handles(loop);
if (mode == UV_RUN_ONCE) {
/* UV_RUN_ONCE implies forward progress: at least one callback must have
* been invoked when it returns. uv__io_poll() can return without doing
* I/O (meaning: no callbacks) when its timeout expires - which means we
* have pending timers that satisfy the forward progress constraint.
*
* UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
* the check.
*/
uv__update_time(loop);
uv__run_timers(loop);
}
r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
}
从代码中 咱们就能够总结出libuv的运行周期 经过while循环不断的查询 loop中是否有中止符 若是有则退出 不然就不停的进行循环。
上面的图已经清楚的描述咱们uv_run的流程了 那其中的核心 就在*uvio_poll* 中 例如在 src/unix/linux-core.c 中的uvio_poll函数 咱们就能够找到 咱们 epoll 熟悉的身影了。实现逻辑也和咱们以前使用过的差很少。
洋洋洒洒写了这么多,最后总结一下也提出本身的思考。 其实libuv底层的 actor模型是很是高效的,不少游戏服务器内核也使用actor模型,那相对于火的不行的go(协程模型) nodejs一直没有在服务端发挥它的高效呢? 我以为其实缘由很简单,由于nodejs他并不高效,我以为nodejs可以快速的被开发出来而且js运行如此高效 v8功不可没。可是成也v8败也v8,JIT优化的在好 依然和编译型语言相差甚远。 可是一点的性能是阻碍大数据等框架使用go而不是用nodejs的缘由吗?我以为其实并非,最大的缘由我以为是生态!很是多的Apache开源框架使用java编写,不少大数据使用go来承载,nodejs有什么顶级生态吗?我以为并无,他大多数面向的是前端这个群体致使他的生态的发展。 谢谢你们能看到这里,上述的心得都是近期整理的,若是有不对的地方欢迎你们多多批评。上述内容若是转载请附带原文连接,感谢。
================= End