浅析Node.js的Event Loop

目录

浅析Node.js的Event Loop

引出问题

首先看两段代码,下面两段代码的执行结果是什么?为何?javascript

// event-loop-1.js
setTimeout(() => {
    console.log('setTimeout');
}, 0);
setImmediate(() => {
    console.log('setImmediate');
});


// event-loop-2.js
const fs = require('fs');
fs.readFile(__filename, () => {
    setTimeout(() => {
        console.log('setTimeout');
    }, 0);
    setImmediate(() => {
        console.log('setImmediate');
    });
});

也许你内心已经有了答案,可是就是不太肯定,其实这里面涉及到的知识点就是今天要说的Event Loophtml

Node.js的基本架构

在讲Event Loop以前,首先介绍一下Node.js的基本架构。提到Node.js的时候,咱们耳熟能详的是: Node.js是一个基于ChromeV8引擎的JavaScript运行时。Node.js 使用高效、轻量级的事件驱动、非阻塞 I/O 模型。那么这句话真正想要表达的是什么呢?请看下图:java

  • Node standard library: Node的标准库,也就是咱们平时所用的fs, path, http, net, stream等模块。
  • Node bindlings: 是C++与JavaScript沟通的桥梁, 封装了V8和Libuv的细节,向上层提供API。
  • 最后一层是支撑Node的关键。

使用tree -L 1能够看到Node.js源码的目录以下:node

➜  node git:(master) tree -L 1
.
├── AUTHORS
├── BSDmakefile
├── BUILDING.md
├── CHANGELOG.md
├── CODE_OF_CONDUCT.md
├── COLLABORATOR_GUIDE.md
├── CONTRIBUTING.md
├── CPP_STYLE_GUIDE.md
├── GOVERNANCE.md
├── LICENSE
├── Makefile
├── README.md
├── android-configure
├── benchmark
├── common.gypi
├── configure
├── deps
├── doc
├── lib
├── node.gyp
├── node.gypi
├── src
├── test
├── tools
└── vcbuild.bat

而比较关键的几个目录是:android

  • deps: 一些Node.js的依赖库,好比Libuv, V8等。
  • src: 包含C++的源码,即Node bindings。
  • lib: 包含JavaScript的源码,存放的是Node.js的核心模块,即fs, path, http, net, stream等模块。

Libuv

咱们知道Node.js是一个Runtime, 它拥有异步,非阻塞的模型,那么它是如何实现非阻塞的呢?答案是:Libuv。git

什么是Libuv?Libuv是一个高性能的,事件驱动的I/O库,而且提供了 跨平台(如windows, *nix)的API。简单的来讲,Node.js的异步、非阻塞I/O,底层其实是Libuv实现的。github

具体更多关于Libuv的知识这里再也不阐述,感兴趣的同窗下来能够去多了解一下。windows

Event Loop

能够参考Node.js官方文档上的这样一篇文档: The Node.js Event Loop, Timers, and process.nextTick(), 咱们能够知道:promise

When Node.js starts, it initializes the event loop, processes the provided input script (or drops into the REPL, which is not covered in this document) which may make async API calls, schedule timers, or call process.nextTick(), then begins processing the event loop.浏览器

即在Node.js启动的时候,它会初始化Event Loop, 处理提供的输入脚本, 这可能会使异步API调用,调用timers,或者调用process.nextTick, 而后开始处理事件循环。
下图简单展现了事件循环的操做顺序:

┌───────────────────────┐
┌─>│        timers         │
│  └──────────┬────────────┘
│  ┌──────────┴────────────┐
│  │     I/O callbacks     │
│  └──────────┬────────────┘
│  ┌──────────┴────────────┐
│  │     idle, prepare     │
│  └──────────┬────────────┘      ┌───────────────┐
│  ┌──────────┴────────────┐      │   incoming:   │
│  │         poll          │<─────┤  connections, │
│  └──────────┬────────────┘      │   data, etc.  │
│  ┌──────────┴────────────┐      └───────────────┘
│  │        check          │
│  └──────────┬────────────┘
│  ┌──────────┴────────────┐
└──┤    close callbacks    │
└───────────────────────┘

注意:每一个盒子被看成Event Loop的一个阶段。

每一个阶段都有一个执行回调的FIFO队列(官网这么描述的,实际上不是的,好比timers的数据结构其实是堆), 简单概述,当Event Loop进入到某个阶段的时候,就会将该阶段的队列里的回调拿出来执行,直到队列为空(实际上要复杂一点儿)。

Event Loop Phases Overview

简单的介绍一下这几个阶段所作的事情:

  • timers: 这个阶段执行由setTimeout()setInterval()调度的回调。
  • I/O callbacks: 执行几乎全部的回调,除了close callbacks以及timers调度的回调和setImmediate()调度的回调。
  • idle, prepare: 只在内部使用。
  • poll: 检索新的I/O事件,node将在适当的时候阻塞。(retrieve new I/O events; node will block here when appropriate)
  • check: setImmediate()的回调将会在这个阶段执行。
  • close callbacks: 好比socket.on('close', ...)

上面的阶段仍是笔记容易理解的,就是poll阶段的解释有点儿让人迷惑,这是什么意思呢?官方文档给出了poll阶段的做用。

Poll Phase

poll阶段有两个功能:

  • 当timers到达指定的时间后,执行指定的timer的回调(Executing scripts for timers whose threshold has elapsed, then)。
  • 处理poll队列的事件(Processing events in the poll queue)。

当进入到poll阶段,而且没有timers被调用的时候,会发生下面的状况:

  • 若是poll队列不为空,Event Loop 将同步的执行poll queue里的callback,直到queue为空或者执行的callback到达上线。
  • 若是poll队列为空,则会发生下面的状况:
    • 若是脚本调用了setImmediate(), Event Loop将会结束poll阶段而且进入到check阶段执行setImmediate()的回调。
    • 若是脚本没有被setImmediate()调用,Event Loop将会等待回调被添加到队列中,而后当即执行它们。
      当进入到poll阶段,而且调用了timers的话,会发生下面的状况:
  • 一旦poll queue是空的话,Event Loop会检查是否timers, 若是有1个或多个timers时间已经到达,Event Loop将会回到timer阶段并执行那些timer的callback(即进入到下一次tick)。

看了上面的介绍,比较I/O callbacks阶段与poll阶段,可能会感到迷惑?为何在I/O callbacks是执行几乎全部的回调,而在poll阶段也是执行回调?我找到了Libuv的官方文档:

Pending callbacks are called. All I/O callbacks are called right after polling for I/O, for the most part. There are cases, however, in which calling such a callback is deferred for the next loop iteration. If the previous iteration deferred any I/O callback it will be run at this point.

结合Libuv官方文档给出的流程图

来看, 能够翻译为:Pending callbacks(即I/O callbacks)被调用。大多数状况下,全部的I/O callbacks都是在poll for I/O(即poll phase)后理解调用的。然而,有些状况,会在下一次tick调用,之前被推迟的I/O callback会在下一次tick的I/O阶段调用。

那么通常什么样的callback会在I/O callbacks阶段被调用呢?Node.js官方有提到:

This phase executes callbacks for some system operations such as types of TCP errors. For example if a TCP socket receives ECONNREFUSED when attempting to connect, some *nix systems want to wait to report the error. This will be queued to execute in the I/O callbacks phase.

即:这个阶段对某些系统操做(好比TCP类型错误)执行回调。举个例子,若是尝试链接时,一个TCP套接字收到了ECONNREFUSED,则某些*nix系统会等待报错。这将排队在I/O callbacks阶段执行。

对于文档上的说法去一探究竟,在Node.js源码里全局搜索: ECONNREFUSED, 在node/deps/uv/src/unix/tcp.c目录下,第206行,uv__tcp_connect函数,代码以下:

int uv__tcp_connect(uv_connect_t* req,
                    uv_tcp_t* handle,
                    const struct sockaddr* addr,
                    unsigned int addrlen,
                    uv_connect_cb cb) {
  int err;
  int r;

  assert(handle->type == UV_TCP);

  if (handle->connect_req != NULL)
    return -EALREADY;  /* FIXME(bnoordhuis) -EINVAL or maybe -EBUSY. */

  err = maybe_new_socket(handle,
                         addr->sa_family,
                         UV_STREAM_READABLE | UV_STREAM_WRITABLE);
  if (err)
    return err;

  handle->delayed_error = 0;

  do {
    errno = 0;
    r = connect(uv__stream_fd(handle), addr, addrlen);
  } while (r == -1 && errno == EINTR);

  /* We not only check the return value, but also check the errno != 0.
   * Because in rare cases connect() will return -1 but the errno
   * is 0 (for example, on Android 4.3, OnePlus phone A0001_12_150227)
   * and actually the tcp three-way handshake is completed.
   */
  if (r == -1 && errno != 0) {
    if (errno == EINPROGRESS)
      ; /* not an error */
    else if (errno == ECONNREFUSED)
    /* If we get a ECONNREFUSED wait until the next tick to report the
     * error. Solaris wants to report immediately--other unixes want to
     * wait.
     */
      handle->delayed_error = -errno;
    else
      return -errno;
  }

  uv__req_init(handle->loop, req, UV_CONNECT);
  req->cb = cb;
  req->handle = (uv_stream_t*) handle;
  QUEUE_INIT(&req->queue);
  handle->connect_req = req;

  uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);

  if (handle->delayed_error)
    uv__io_feed(handle->loop, &handle->io_watcher);

  return 0;
}

从上面的代码咱们能够知道,当errno === ECONNREFUSED时,会去调用uv__io_feed(handle->loop, &handle->io_watcher)方法,看一下uv__io_feed的的实现:

void uv__io_feed(uv_loop_t* loop, uv__io_t* w) {
  if (QUEUE_EMPTY(&w->pending_queue))
    QUEUE_INSERT_TAIL(&loop->pending_queue, &w->pending_queue);
}

从函数名字能够看出来,这里是在向pendingQueue插入发生错误时的回调。也就是说,I/O callbacks通常是对一些系统操做执行回调。

那么咱们能够得出结论:

  • 大部分的回调在poll阶段执行的。
  • I/O callbacks阶段通常执行的是系统操做的回调。

The Heart Of Event Loop

有了上面的知识后,咱们依然不能解决文章开头的问题。来看一下,Event Loop核心的代码

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  int timeout;
  int r;
  int ran_pending;

  r = uv__loop_alive(loop);
  if (!r)
    uv__update_time(loop);

  while (r != 0 && loop->stop_flag == 0) {
    uv__update_time(loop);
    uv__run_timers(loop);
    ran_pending = uv__run_pending(loop);
    uv__run_idle(loop);
    uv__run_prepare(loop);

    timeout = 0;
    if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
      timeout = uv_backend_timeout(loop);

    uv__io_poll(loop, timeout);
    uv__run_check(loop);
    uv__run_closing_handles(loop);

    if (mode == UV_RUN_ONCE) {
      /* UV_RUN_ONCE implies forward progress: at least one callback must have
       * been invoked when it returns. uv__io_poll() can return without doing
       * I/O (meaning: no callbacks) when its timeout expires - which means we
       * have pending timers that satisfy the forward progress constraint.
       *
       * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
       * the check.
       */
      uv__update_time(loop);
      uv__run_timers(loop);
    }

    r = uv__loop_alive(loop);
    if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
      break;
  }

  /* The if statement lets gcc compile it to a conditional store. Avoids
   * dirtying a cache line.
   */
  if (loop->stop_flag != 0)
    loop->stop_flag = 0;

  return r;
}

上面代码能够简化为下面的伪代码:

while(true) {
    uv__update_time(loop); // 使用Linux下的高精度Timer hrtime更新loop->time,即event loop的时间戳
    uv__run_timers(loop);
    uv__run_pending(loop);
    uv__run_idle(loop);
    uv__run_prepare(loop);
    uv__io__poll(loop, timeout);
    uv__run_check(loop);
    uv__run_closing_handlers(loop);

    // Node默认的mode是`UV_RUN_ONCE`
    if (mode == UV_RUN_ONCE) {
        uv__run_timers();
        uv__update_time(loop); // 更新loop->time
    }
}

实际上,在一次tick的时候,首先会去调用一次uv__run_timers去处理timers, 而后在最后if语句里,还会去调用uv__run_timers

我在timers的实现里, 找到下面的代码:

function Timeout(callback, after, args, isRepeat) {
  after *= 1; // coalesce to number or NaN
  if (!(after >= 1 && after <= TIMEOUT_MAX)) {
    if (after > TIMEOUT_MAX) {
      process.emitWarning(`${after} does not fit into` +
                          ' a 32-bit signed integer.' +
                          '\nTimeout duration was set to 1.',
                          'TimeoutOverflowWarning');
    }
    after = 1; // schedule on next tick, follows browser behavior
  }

  this._called = false;
  this._idleTimeout = after;
  this._idlePrev = this;
  this._idleNext = this;
  this._idleStart = null;
  // this must be set to null first to avoid function tracking
  // on the hidden class, revisit in V8 versions after 6.2
  this._onTimeout = null;
  this._onTimeout = callback;
  this._timerArgs = args;
  this._repeat = isRepeat ? after : null;
  this._destroyed = false;

  this[async_id_symbol] = ++async_id_fields[kAsyncIdCounter];
  this[trigger_async_id_symbol] = getDefaultTriggerAsyncId();
  if (async_hook_fields[kInit] > 0) {
    emitInit(this[async_id_symbol],
             'Timeout',
             this[trigger_async_id_symbol],
             this);
  }
}

也就是说,实际上setTimeout(fn, 0);最后会变为setTimeout(fn, 1);在一次tick的时候,大概的流程是这样的:

  • 首先更新loop->time(uv__update_time)
    c UV_UNUSED(static void uv__update_time(uv_loop_t* loop)) { /* Use a fast time source if available. We only need millisecond precision. */ loop->time = uv__hrtime(UV_CLOCK_FAST) / 1000000; }
  • 上面的uv__hrtime(UV_CLOCK_FAST)的值是精确到纳秒的,所以loop->time最后的结果多是大于1的,也有多是小于1的。
  • 而后uv__run_timers(loop)被调用:
    ```c
    void uv__run_timers(uv_loop_t* loop) {
    struct heap_node* heap_node;
    uv_timer_t* handle;

    for (;;) {
      heap_node = heap_min((struct heap*) &loop->timer_heap);
      if (heap_node == NULL)
        break;
    
      handle = container_of(heap_node, uv_timer_t, heap_node);
      if (handle->timeout > loop->time)
        break;
    
      uv_timer_stop(handle);
      uv_timer_again(handle);
      handle->timer_cb(handle);
    }

    }
    ```

有了上面的理解后,就能够获得文章最开始的答案了,对于event-loop-1.js:

/*
 * 若是第一次loop准备前的耗时超过1ms, 即loop->time > 1, 则会先执行setTimeout, 再执行setImmediate
 * 若是第一次loop准备前的耗时小于1ms,即loop->time < 1, 则会先执行setImediate,而后在执行setTimeout
 */
setTimeout(function() {
    console.log('setTimeout');
}, 0);

setImmediate(function() {
    console.log('setImmediate');
});

而对于event-loop-2.js:

/*
 * 因为是在回调里面调用的setTimeout, setImmediate两个函数
 * 首先在poll阶段,执行回调函数
 * 而后进入到check阶段,会执行setImmediate()的回调函数
 * 最后进入在执行setTimeout()的回调函数
 *
 */
const fs = require('fs');
fs.readFile(__filename, () => {
    setTimeout(function() {
        console.log('setTimeout');
    }, 0);
    setImmediate(function() {
        console.log('setImmediate');
    });
});

MacroTask VS MicroTask

Node.js官网文档的描述中,提到了process.nextTick(), 它不属于Libuv的部分,实际上,它是属于Node.js的一部分。

实际上,除了Libuv里面要处理的回调,在Node.js里还有另外两个queue,分别是Next Tick Queue以及MicroTask Queue

  • Next Tick Queue: 使用process.nextTick()添加的回调。
  • MicroTask Queue: 包含一些microtasks好比resolved promise callbacks

那MacroTask是什么呢?Macrotask实际上就是上面咱们遇到的那些异步任务,也被称为Task, 也就是说,有的人会将MacroTask Queue称为Task Queue

它是如何工做的?

咱们结合一张图来看看它在Event Loop是如何工做的:

在Event Loop完成一个阶段,而后到另外一个阶段以前,Event Loop将会执行这Next Tick Queue以及MicroTask Queue里面的回调, 直到这两个队列为空。一旦它们空了后,Event Loop会进入到下一个阶段。

不少人会将这两个队列都看成是MicroTask Queue, 由于它们是处于同一阶段执行的, 实际上,这两个队列执行依然是有一个前后顺序的: Next Tick Queue的优先级高于MicroTask Queue, 注意:咱们这里将两个队列称为Immediate Queue

E.g, The event loop is currently processing the immediates queue which has 5 handlers to be processed. Meanwhile, two handlers are added to the next tick queue. Once the event loop completes 5 handlers in the immediates queue, event loop will detect that there are two items to be processed in the next tick queue before moving to the close handlers queue. It will then execute all the handlers in the next tick queue and then will move to process the close handlers queue.

上面的那段话引用来自Event Loop and the Big Picture — NodeJS Event Loop Part 1, 即Event Loop在处理拥有5个handlersNext Tick Queue时,有2个handlers被添加到Next Tick Queue, 一旦5个handlers被处理完后,Event Loop会接着处理Next Tick Queue里面新增的两个handlers, 而后再处理MicroTask Queue里的回调,当Immediate Queue里面的回调都处理完成后,Event Loop将会进入到下一个阶段。举个例子:

Promise.resolve().then(() => {
  console.log('resolve1');
});

process.nextTick(function() {
  console.log('tick1');
  process.nextTick(function() {
    console.log('tick2');
  });
  process.nextTick(function() {
    console.log('tick3');
  });
});

Promise.resolve().then(() => {
  console.log('resolve2');
});

process.nextTick(function() {
  console.log('tick4');
});


Promise.resolve().then(() => {
  console.log('resolve3');
});

process.nextTick(function() {
  console.log('tick5');
});

那么上面的执行顺序是:tick1, tick4, tick5, tick2, tick3, resolve1, resolve2, resolve3。不要递归调用process.nextTick, 由于这会致使I/O starvation

推荐阅读

参考

相关文章
相关标签/搜索