使用过node的朋友都知道,它最重要的也是最值得称道的就是使用了异步事件驱动的框架libuv,这个框架使得被称为玩具语言的JavaScript也在后端语言中占了一席之地(固然V8的高性能也是功不可没,并且libuv的代码很是优雅,很值得你们的学习。不过libuv整个框架很大,咱们不可能只经过一篇文章就能了解到它全部的东西,因此我挑选了node中最简单fs模块同步读和异步读文件的过程讲解来对libuv的一个大概过程有所了解。node
fs.readSync
这个方法我相信没有人会陌生,在node中同步读取文件,不过再不少文章中都不推荐使用这个方法,由于会形成node单线程的阻塞,对于一些比较繁忙的node实例来讲是很是不友好的,不过今天咱们不讨论这些,只讨论其中的实现。实现咱们在node工程的lib目录中找到fs.js能够看到它的代码:后端
function(fd, buffer, offset, length, position) {
if (length === 0) {
return 0;
}
return binding.read(fd, buffer, offset, length, position);
};
复制代码
其中直接调用了binding.read
其中的binding的申明是这样的binding = process.binding('fs')
,这个天然就是node的builtin_module,因此咱们直接找到src/node_flie.cc文件中。app
从node::InitFs
方法中咱们能够看到全部的方法申明,其中返回对象中的read方法对应的是static void Read(const FunctionCallbackInfo<Value>& args)
咱们来看一下他的核心代码:框架
static void Read(const FunctionCallbackInfo<Value>& args) {
//获取传入参数,并对参数进行处理
....
//将传入的buffer的内存的地址取出并用来存储read出的内容
char * buf = nullptr;
Local<Object> buffer_obj = args[1]->ToObject(env->isolate());
char *buffer_data = Buffer::Data(buffer_obj);
size_t buffer_length = Buffer::Length(buffer_obj);
...
buf = buffer_data + off;
uv_buf_t uvbuf = uv_buf_init(const_cast<char*>(buf), len);
//执行read操做
req = args[5];
if (req->IsObject()) {
ASYNC_CALL(read, req, UTF8, fd, &uvbuf, 1, pos);
} else {
SYNC_CALL(read, 0, fd, &uvbuf, 1, pos)
args.GetReturnValue().Set(SYNC_RESULT);
}
}
复制代码
从上面的代码,咱们能够看出第六个参数是个很关键的参数,若是传入了一个对象则使用异步操做,而咱们的fs.readSync
方法没有传入第六个参数随意使用的是同步操做,而且在操做完成后当即返回结果。异步
而SYNC_CALL
这个宏中具体作了什么呢,他主要是调用另一个宏:async
#define SYNC_CALL(func, path, ...) \
SYNC_DEST_CALL(func, path, nullptr, __VA_ARGS__) \
复制代码
其中__VA_ARGS__表示的是除了func和path外其余传入宏的参数,接下来咱们来看一下SYNC_DEST_CALL宏:ide
#define SYNC_DEST_CALL(func, path, dest, ...) \
fs_req_wrap req_wrap; \
env->PrintSyncTrace(); \
int err = uv_fs_ ## func(env->event_loop(), \
&req_wrap.req, \
__VA_ARGS__, \
nullptr); \
if (err < 0) { \
return env->ThrowUVException(err, #func, nullptr, path, dest); \
} \
复制代码
其中在宏命令中的 ##
标记是链接符的意思,因此这里其实就是调用uv_fs_read方法,而env->PrintSyncTrace()
是为了在node打开--trace-sync-io
时用来追踪代码中何处使用了同步io时使用,能够经过这个方法打出代码中调用同步io的位置,因此当你的代码常常发生阻塞的时候你能够经过这个来调优你的代码(固然阻塞的缘由未必是同步io形成的)。uv_fs_read方法是libuv是来读取文件的调用,咱们找到这个方法的位置,就在deps/uv/src/unix/fs.c中:函数
int uv_fs_read(uv_loop_t* loop, uv_fs_t* req,
uv_file file,
const uv_buf_t bufs[],
unsigned int nbufs,
int64_t off,
uv_fs_cb cb) {
INIT(READ);
if (bufs == NULL || nbufs == 0)
return -EINVAL;
req->file = file;
req->nbufs = nbufs;
req->bufs = req->bufsml;
if (nbufs > ARRAY_SIZE(req->bufsml))
req->bufs = uv__malloc(nbufs * sizeof(*bufs));
if (req->bufs == NULL) {
if (cb != NULL)
uv__req_unregister(loop, req);
return -ENOMEM;
}
memcpy(req->bufs, bufs, nbufs * sizeof(*bufs));
req->off = off;
POST;
}
复制代码
首先咱们来看宏调用INIT(READ)
:oop
#define INIT(subtype) \
do { \
if (req == NULL) \
return -EINVAL; \
req->type = UV_FS; \
if (cb != NULL) \
uv__req_init(loop, req, UV_FS); \
req->fs_type = UV_FS_ ## subtype; \
req->result = 0; \
req->ptr = NULL; \
req->loop = loop; \
req->path = NULL; \
req->new_path = NULL; \
req->cb = cb; \
} \
while (0)
复制代码
这是一个很明显的初始化操做,这里主要说两个最重要地方,首先是将req的loop指向node的event_loop,其次是指定了fs_type喂UV_FS_READ
这个是一个重要的标志,为后面的工做作识别。作了这些操做之后回到uv_fs_read
方法来,咱们能够看到在POST宏调用以前都是一些对参数的处理工做,这个没什么可讲的,咱们主要来看看POST宏:post
#define POST \
do { \
if (cb != NULL) { \
uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done); \
return 0; \
} \
else { \
uv__fs_work(&req->work_req); \
return req->result; \
} \
} \
while (0)
复制代码
从上面的代码咱们能够看到在有cb的时候调用的是uv__work_submit,这就是异步的状况下调用,等会儿咱们再讲。如今咱们先说uv__fs_work 的方法:
static void uv__fs_work(struct uv__work* w) {
int retry_on_eintr;
uv_fs_t* req;
ssize_t r;
req = container_of(w, uv_fs_t, work_req);
retry_on_eintr = !(req->fs_type == UV_FS_CLOSE);
do {
errno = 0;
#define X(type, action) \
case UV_FS_ ## type: \
r = action; \
break;
switch (req->fs_type) {
...
X(WRITE, uv__fs_buf_iter(req, uv__fs_write));
X(OPEN, uv__fs_open(req));
X(READ, uv__fs_buf_iter(req, uv__fs_read));
...
}
#undef X
} while (r == -1 && errno == EINTR && retry_on_eintr);
if (r == -1)
req->result = -errno;
else
req->result = r;
if (r == 0 && (req->fs_type == UV_FS_STAT ||
req->fs_type == UV_FS_FSTAT ||
req->fs_type == UV_FS_LSTAT)) {
req->ptr = &req->statbuf;
}
}
复制代码
这个方法由于是fs文件共用的方法,因此在其中会根据不一样的类型的来执行不一样的方法,刚刚咱们看到了在初始化req的时候给了它UV_FS_READ的type,因此会执行方法uv__fs_buf_iter(req, uv__fs_read)
,uv__fs_buf_iter方法中主要是调用了传入的第二个参数uv__fs_read
函数,这里的代码就不贴了很简单,就是普通的read(还有readv和pread)操做,不过其中有个点就是这段代码:
#if defined(_AIX)
struct stat buf;
if(fstat(req->file, &buf))
return -1;
if(S_ISDIR(buf.st_mode)) {
errno = EISDIR;
return -1;
}
#endif
复制代码
这段代码很好地解释了node文档中关于fs.readFileSync
的这一段
Note: Similar to fs.readFile(), when the path is a directory, the behavior of fs.readFileSync() is platform-specific.
// macOS, Linux, and Windows
fs.readFileSync('<directory>');
// => [Error: EISDIR: illegal operation on a directory, read <directory>]
// FreeBSD
fs.readFileSync('<directory>'); // => null, <data>
复制代码
在uv__fs_read
成功读取文件后,req->bufs中就已经有了所需的内容了,从node_file.cc的static void Read(const FunctionCallbackInfo<Value>& args)
方法中咱们能够知道req->bufs的内存所指向的则是binding.read(fd, buffer, offset, length, position)
传入的buffer内存段。这个时候就已经获得了想要读取的内容了。而咱们平时常用的fs.readFileSync
则是先打开文件获得其fd,并生成一段buffer而后调用fs.readSync
,是生成的buffer中取得文件内容再返回,简化了不少操做,因此更受到你们的青睐。到这里咱们的同步读取就已经结束了,算是很简单,由于read这些操做都是阻塞性的操做,因此对于单线程的node进程来讲确实容易遇到性能瓶颈,下面咱们来讲一下node的异步读取fs.read
函数。
异步的操做远比同步要复杂不少,咱们来一步步的了解。首先咱们先来看 ocess.nextTick(function() { callback && callback(null, 0, buffer); }); }
function wrapper(err, bytesRead) {
// Retain a reference to buffer so that it can't be GC'ed too soon.
callback && callback(err, bytesRead || 0, buffer);
}
var req = new FSReqWrap();
req.oncomplete = wrapper;
binding.read(fd, buffer, offset, length, position, req);
};
复制代码
从刚刚同步的分析中,咱们知道当bingd.read
传入第六个参数的时候则会异步执行read操做,这里就传入了第六个参数req, req = new FSReqWrap();
req是FSReqWrap = binding.FSReqWrap
的实例,因此咱们从node::InitFs
中能够看到以下代码:
Local<FunctionTemplate> fst = FunctionTemplate::New(env->isolate(), NewFSReqWrap);
fst->InstanceTemplate()->SetInternalFieldCount(1);
AsyncWrap::AddWrapMethods(env, fst);
Local<String> wrapString =
FIXED_ONE_BYTE_STRING(env->isolate(), "FSReqWrap");
fst->SetClassName(wrapString);
target->Set(wrapString, fst->GetFunction());
复制代码
上面的代码使用v8提供的API生成FSReqWrap的构造函数而void NewFSReqWrap(const FunctionCallbackInfo<Value>& args)
就会提及构造函数的内容。这个函数主要的主要工做只有一个object->SetAlignedPointerInInternalField(0, nullptr);
,不过这个只跟C++对象的嵌入有关。从以前咱们讨论过的static void Read(const FunctionCallbackInfo<Value>& args)
方法中聊到过,当传入req对象的时候回调用宏命令ASYNC_CALL,这个宏命令跟以前的SYNC_CALL同样的调用,经过ASYNC_DEST_CALL(func, req, nullptr, encoding, __VA_ARGS__)
去调用真正的逻辑,因此咱们直接来看ASYNC_DEST_CALL
的代码:
#define ASYNC_DEST_CALL(func, request, dest, encoding, ...) \
Environment* env = Environment::GetCurrent(args); \
CHECK(request->IsObject()); \
FSReqWrap* req_wrap = FSReqWrap::New(env, request.As<Object>(), \
#func, dest, encoding); \
int err = uv_fs_ ## func(env->event_loop(), \
req_wrap->req(), \
__VA_ARGS__, \
After); \
req_wrap->Dispatched(); \
if (err < 0) { \
uv_fs_t* uv_req = req_wrap->req(); \
uv_req->result = err; \
uv_req->path = nullptr; \
After(uv_req); \
req_wrap = nullptr; \
} else { \
args.GetReturnValue().Set(req_wrap->persistent()); \
}
复制代码
上面的代码咱们能够看到经过FSReqWrap::New来生成了req_wrap,这个方法的执行是node生成对象的一个基本逻辑,因此咱们着重说一下,首先咱们来看一下FSReqWrap::New
的代码:
const bool copy = (data != nullptr && ownership == COPY);
const size_t size = copy ? 1 + strlen(data) : 0;
FSReqWrap* that;
char* const storage = new char[sizeof(*that) + size];
that = new(storage) FSReqWrap(env, req, syscall, data, encoding);
if (copy)
that->data_ = static_cast<char*>(memcpy(that->inline_data(), data, size));
return that;
复制代码
这段代码咱们主要了解一下new(storage) FSReqWrap(env, req, syscall, data, encoding);
,首先咱们经过一张图来了解一下FSReqWrap的继承关系:
上图中咱们给出了一些关键对象的关键属性和方法,因此咱们能够看出FSReqWrap各个继承对象的主要做用:
1.继承ReqWrap对象的关键属性uv_fs_t,和关键方法ReqWrap<T>::Dispatched
,使用该方法中的req_.data = this;
在libuv的方法中传递自身。
2.继承AsyncWrap中的MakeCallback,这个函数会执行咱们传入的异步读取完成后的回调,在这个例子中就是使用js中经过req.oncomplete = wrapper;
传入的wrapper函数。
3.继承BaseObject对象中的关键属性Persistent<Object> persistent_handle_
和Environment* env_
,前者是v8中的持久化js对象,和Local的关系能够参见v8官方的解释:
Local handles are held on a stack and are deleted when the appropriate destructor is called. These handles' lifetime is determined by a handle scope, which is often created at the beginning of a function call. When the handle scope is deleted, the garbage collector is free to deallocate those objects previously referenced by handles in the handle scope, provided they are no longer accessible from JavaScript or other handles.
Persistent handles provide a reference to a heap-allocated JavaScript Object, just like a local handle. There are two flavors, which differ in the lifetime management of the reference they handle. Use a persistent handle when you need to keep a reference to an object for more than one function call, or when handle lifetimes do not correspond to C++ scopes.
复制代码
大概的意思就是,Local会随着在栈上分配的scope析构而被GC清理掉,可是Persistent不会。有点相似栈上分配的内存和堆上分配内存的关系,想要在超过一个function中使用就要使用Persistent的v8对象,然后者是node的执行环境,几乎囊括了node执行中所须要的一切方法和属性(这一块很是大,涉及的也不少,实在很难一两句讲清楚,跟本文讨论内容无直接联系只能略过)。
最后,在FSReqWrap的构造函数中经过Wrap(object(), this)
将咱们上面提到的Persistent<Object> persistent_handle_
持久化js对象和FSReqWrap的C++对象关联起来,这是node中最经常使用的方式(也是ebmed开发中最经常使用的技巧)。咱们回到宏ASYNC_DEST_CALL
中来,如今知道经过方法FSReqWrap::New
方法使得FSReqWrap对象实例和刚刚在js中new的req对象链接了起来,也使libuv的uv_fs_t和其实例联系了起来。这个时候就跟前面同样开始调用uv_fs_read
,此次在最后一个参数cb中传入了函数void After(uv_fs_t *req)
做为回调函数,从以前同步讨论中咱们就说过传入回调函数后状况的不一样,首先是INIT
宏中在会多一步操做,经过uv__req_init
中的QUEUE_INSERT_TAIL(&(loop)->active_reqs, &(req)->active_queue);
宏方法将req放入loop的acitve_reqs的循环链表中(libuv的循环链表实现很是的有意思,有兴趣的朋友能够参考文章:libuv queue的实现)。而在POST
中有回调的函数的状况是直接经过uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done)
调用来完成任务,咱们来看一下uv__work_submit
函数的代码,这个方法在deps/uv/src/threadpool中:
uv_once(&once, init_once);
w->loop = loop;
w->work = work;
w->done = done;
post(&w->wq);
复制代码
该方法首先经过uv_once在第一次调用该方法是启动几个工做线程,这些线程主要执行static void worker(void* arg)
方法:
for (;;) {
uv_mutex_lock(&mutex);
while (QUEUE_EMPTY(&wq)) {
idle_threads += 1;
uv_cond_wait(&cond, &mutex);
idle_threads -= 1;
}
q = QUEUE_HEAD(&wq);
if (q == &exit_message)
uv_cond_signal(&cond);
else {
QUEUE_REMOVE(q);
QUEUE_INIT(q);
}
uv_mutex_unlock(&mutex);
if (q == &exit_message)
break;
w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL;
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
}
复制代码
其中wq是一个循环链表的队列,记录了全部注册的任务,当没有任务时会经过uv_cond_wait
使该线程阻塞,而在有任务的时候会在队列中取出该任务再经过w->work(w)
执行其任务,在执行完成后会将任务注册在loop->wq
的队列中再经过uv_async_send(&w->loop->wq_async)
通知主线程从loop->wq
的队列取出该任务并执行其回调。
再回到uv__work_submit
经过work
方法咱们就知道它接下来的工做是作什么了,注册work函数也就是传入uv__fs_work
函数,这个函数咱们以前就介绍过了,这里就很少作解释了,只是在异步中是经过worker线程来完成的,不会阻塞主线程。而第二个函数则是注册完成后主线执行的回调,也就是uv__fs_done
:
req = container_of(w, uv_fs_t, work_req);
uv__req_unregister(req->loop, req);
if (status == -ECANCELED) {
assert(req->result == 0);
req->result = -ECANCELED;
}
req->cb(req);
复制代码
从中咱们能够看到,这个函数会将该任务的req从loop的acitve_reqs去去掉,而后执行传入uv_fs_read中的回调函数。而最后的post中主要是将当前任务注册到wq的列表中,并使用条件变量的uv_cond_signal
函数触发uv_cond_wait
中阻塞的函数运做起来,接着worker进程就能执行咱们刚刚说的过程了。
上面咱们讲解了大概的过程,从这个过程当中就能明白异步的读操做是如何执行的,经过使用wokrer线程来作实际的读操做,而主线程则是在worker线程完成操做后,执行回调。不过如今回过头来咱们看看,在worker线程之后是如何通知主线程呢?刚刚咱们说到了是经过uv_async_send(&w->loop->wq_async)
的调用通知的,这里咱们来看看他具体是如何作的。首先咱们要回到loop的初始化处,函数uv_loop_init
中,在这个函数中有这样一个调用: uv_async_init(loop, &loop->wq_async, uv__work_done);
。这个调用会生成一个管道,并经过如下语句:
uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
uv__io_start(loop, &loop->async_io_watcher, POLLIN);
loop->async_wfd = pipefd[1];
复制代码
实现当有数据往pipefd[1]中写时,主线会在读取数据后执行uv__async_io
的调用,在uv__async_io
中最重要的工做就是执行其async_cb,而在loop初始化的时候注册的async_cb是函数uv__work_done
:
//取数据的操做
...
while (!QUEUE_EMPTY(&wq)) {
q = QUEUE_HEAD(&wq);
QUEUE_REMOVE(q);
w = container_of(q, struct uv__work, wq);
err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
w->done(w, err);
}
复制代码
这里咱们能够看到,会从loop->wq
队列中取出放入其中全部任务,并经过w->done(w, err)
执行其回调,而刚刚在worker线程中的调用uv_async_send(&w->loop->wq_async)
便是经过往loop->async_wfd
,即上面提到的pipefd[1]写一个字节来触发整个过程。到这里最开始uv_fs_read
中注册的函数uv__fs_done
就能够执行了,而这个函数的主要任务便是调用传入uv_fs_read
的cb参数,即void After(uv_fs_t *req)
函数,这个函数处理的状况比较多,就不贴代码了惟一要讲的就是他的第一句
FSReqWrap* req_wrap = static_cast<FSReqWrap*>(req->data);
复制代码
这里就回到了咱们前面所说的经过req->data将FSReqWrap的对象实例串联起来,到这里就能顺利的经过这个实例获得以前初始化的js对象,并执行它的oncomplete函数了。回到js的代码中咱们能够看到这个函数执行的操做就是调用咱们传入的callback的函数:
callback && callback(err, bytesRead || 0, buffer);
复制代码
至此,fs.read整个异步操做就已经完成了,至于fs.readFile这个操做放在异步中就复杂了许多,先异步打开文件,再经过回调中注册异步任务取得文件的stat,最后经过回调去读取文件,并且若是文件太大不能一次读完(一次最多读8*1024的字节),会不断的回调继续读取文件,直到读完才异步关闭文件,而且经过异步关闭文件的回调执行传入的回调函数。可见为了咱们平时开发中的方便,node的开发者仍是付出了不少的努力的。
在了解了node对于文件读取的同步和异步实现后,咱们就能看出libuv的精妙之处了。特别是异步时经过子线程处理任务,再用管道通知主线执行回调的方式,真的是为node这样的单线程语言量身定作,固然可能也有同窗有疑问,主线是如何读取管道值的呢?这又是一个很大的问题,咱们只能之后的文章再来解释了。这篇文章就先到此为止了,但愿经过该文能帮助你们对node背后的逻辑会多一点了解。