前几天nodejs发布了新版本4.0,其中涉及到一个更新比较多的模块,那就是下面要介绍的timer模块。html
timers: Improved timer performance from porting the 0.12 implementation, plus minor fixes (Jeremiah Senkpiel)#2540, (Julien Gilli)nodejs/node-v0.x-archive#8751nodejs/node-v0.x-archive#8905node
以前也对timer模块有过比较多的研究,断断续续的看过这个模块在github上的一些改动,因而借着此次机会整理一下本身对timer模块的理解,和小伙伴们一块儿分享timer模块的优化过程。linux
也许你在使用nodejs开发项目时并无使用到timer模块,诸如setTimeout以及setInterval和setImediate方法等等。但若是你开发的是web项目,那么你的项目中必定涉及到了timer模块。c++
细心的同窗在平时的http接口开发调试中可能会注意到,每一个http的request header里都有一个Connection:keep-alive
标识,这是http/1.1开始引入的,表示客户端须要和服务端一直保持着tcp链接。固然了,这个链接不能就这么一直保持着,因此通常都会有一个超时时间,超过这个时间客户端尚未发送新的http请求,那么服务器就须要自动断开从而继续为其余客户端提供服务。git
nodejs提供的http服务器即是采用timer模块来知足这种请求,每个新的链接到来构造出一个socket对象,便会调用socket.setTimeout设置一个定时器用于超时后自动断开链接。github
在nodejs开发的web项目中,timer模块的使用频率是很是高的,每个新的链接到来都会设置它的超时时间,并且每一个链接的超时时间都同样,在http server中默认是2 * 60 * 1000ms。nodejs使用c++包裹的Timer对象来实现定时器功能,下面的代码示例了使用Timer对象来实现一个很是简单的定时器。web
const Timer = process.binding('timer_wrap').Timer; const kOnTimeout = Timer.kOnTimeout | 0; var mySetTimeout = function (fn, ms) { var timer = new Timer(); timer.start(ms, 0); timer[kOnTimeout] = fn; return timer; } var myClearTimeout = function(timer){ if(timer && timer.close) { timer.close(); } } mySetTimeout(function() { console.log('timeout!'); },1000);
那咱们是否就能够用上面实现的mySetTimeout来对每一个socket进行超时操做呢算法
mySetTimeout(function(){socket.close();},2 * 60 * 1000);
能够是能够,可是这样真的好吗?设想咱们作的是一个很是棒的产品,天天好几百万上千万的用户,高峰期在2 * 60 * 1000ms这段时间内会产生很是多的新链接,必然会建立很是多的Timer对象,这个开销还真不小!编程
nodejs在设计之初就很是很是注重性能,因此像上面这种这么简单的方案必然是不能接受的。api
实际上在这2分钟以内,nodejs中的timer模块只会建立一个Timer对象,一个Timer对象如何来知足这么多链接的超时处理呢?
timer模块会使用一个链表来保存全部超时时间相同的对象,每一个对象中都会存储开始时间_idleStart以及超时时间_idleTimeout。链表中第一个加入的对象必定会比后面加入的对象先超时,当第一个对象超时完成处理后,从新计算下一个对象是否已经到时或者还有多久到时,以前建立的Timer对象便会再次启动并设置新的超时时间,直到当链表上全部的对象都已经完成超时处理,此时便会关闭这个Timer对象。
经过这种巧妙的设计,使得一个Timer对象获得了最大的重用,从而极大的提高了timer模块的性能。这一场景其实在libev中已早有研究 http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#Be_smart_about_timeouts
上面说到timer模块经过c++提供的Timer对象,最终生成setTimeout以及setInterval等函数暴露给用户使用。那Timer对象是如何实现的呢,下面咱们就来一探究竟。
熟悉linux网络编程的同窗必定据说过epoll吧,
epoll是什么?按照man手册的说法:是为处理大批量句柄而做了改进的poll。固然,这不是2.6内核才有的,它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44),它几乎具有了以前所说的一切优势,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
其中有这么一个函数
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
收集在epoll监控的事件中已经发送的事件。参数events是分配好的epoll_event结构体数组,epoll将会把发生的事件赋值到events数组中(events不能够是空指针,内核只负责把数据复制到这个events数组中,不会去帮助咱们在用户态中分配内存)。maxevents告以内核这个events有多大,这个 maxevents的值不能大于建立epoll_create()时的size,参数timeout是超时时间(毫秒,0会当即返回,-1将不肯定,也有说法说是永久阻塞)。若是函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时。
当咱们监听一个fd上的事件时,能够设置等待事件发生的超时时间。利用这个特性即可以很是简单的实现一个定时器功能。
因为我使用的是mac系统,因此就用kqueue来代替epoll(它们之间很是类似,具体的详细介绍以及使用方法感兴趣的能够自行查阅相关资料)
#include <stdio.h> #include <stdlib.h> #include <sys/types.h> #include <sys/event.h> #include <sys/time.h> #include <netinet/in.h> #include <arpa/inet.h> #include <errno.h> const int MAX_EVENT_COUNT = 5000; int main() { struct timeval t_start,t_end; int fd = -1;//构造一个不会有任何事件发生的fd int kq = kqueue(); struct kevent changes[1]; EV_SET(&changes[0], fd, EVFILT_READ, EV_ADD, 0, 0, NULL); int timeout = 3500; struct kevent events[MAX_EVENT_COUNT]; struct timespec spec; spec.tv_sec = timeout / 1000; spec.tv_nsec = (timeout % 1000) * 1000000; gettimeofday(&t_start, NULL); kevent(kq, NULL, 0, events, MAX_EVENT_COUNT, &spec); gettimeofday(&t_end, NULL); printf("timeout = %d, run time is %ld\n", timeout, t_end.tv_sec*1000+t_end.tv_usec/1000 - (t_start.tv_sec*1000+t_start.tv_usec/1000)); return 0; }
至此咱们便利用kqueue实现了一个很是简单很是底层的定时器。
前面讲到,在http server中每个新的链接并不会真的就去建立一个Timer对象。一样,在nodejs底层的定时器中,并不会每次建立一个Timer对象就在kqueue上注册一个事件等待超时。优化的思路和nodejs中的timer模块很类似,只不过如今不能保证每一个定时器的超时时间都同样。
定时器有一个很是显著的特征,超时时间最短的定时器必定最早触发,假设咱们有不少的定时任务,每一个任务的执行时间都不一样。当第一个定时器超时后,便从这些任务中查找出已经到点的任务并执行对应的超时处理,而后再从新计算余下任务中最早执行的时间,并根据这个时间再次开启一个定时器。
对应的算法需求就是每次都须要查找集合中最小的元素,显然二叉堆中的最小堆(父结点的键值老是小于或等于任何一个子节点的键值)是最适合不过的一种数据结构了。因为最小的元素老是处于根节点,咱们能够以O(1)时间找到最小值。对于插入操做,在最坏的状况下,新插入的节点须要不断的和它的父节点进行交换,直到它为根节点为止。假设堆的高度为h, 二叉树最多有2^(h+1) - 1 个 节点. 所以新插入一个节点最多须要log(n+1) -1 次比较,其算法复杂度为O(logn)。
libuv中已经实现了一个最小二叉堆的算法https://github.com/joyent/libuv/blob/master/src/heap-inl.h, 下面咱们就用这个算法来实现一个支持设置不一样超时时间的定时器。
#include <stdio.h> #include <stdlib.h> #include <sys/types.h> #include <sys/event.h> #include <sys/time.h> #include <netinet/in.h> #include <arpa/inet.h> #include <errno.h> //https://github.com/joyent/libuv/blob/master/src/heap-inl.h #include "heap-inl.h" #define container_of(ptr, type, member) \ ((type *) ((char *) (ptr) - offsetof(type, member))) const int MAX_EVENT_COUNT = 5000; typedef struct { struct heap_node* heap_node[3]; int value; }node_t; static int less_than(const struct heap_node* ha, const struct heap_node* hb) { const node_t* a; const node_t* b; a = container_of(ha, const node_t, heap_node); b = container_of(hb, const node_t, heap_node); if (a->value < b->value) return 1; return 0; } int main() { struct heap *heap_p = malloc(sizeof(node_t)); heap_init(heap_p); int a[] = {10,9,8,6,7,3,5,4,2}; int len = sizeof(a)/sizeof(int); for(int i=0;i<len;i++){ node_t *node_p = malloc(sizeof(node_t)); node_p->value = a[i]*1000; heap_insert(heap_p, (struct heap_node*)node_p->heap_node, less_than); } int fd = -1; int kq = kqueue(); struct kevent changes[1]; EV_SET(&changes[0], fd, EVFILT_READ, EV_ADD, 0, 0, NULL); struct kevent events[MAX_EVENT_COUNT]; struct timeval t_start,t_end; while(heap_p->nelts) { node_t *node_p = container_of(heap_p->min, node_t, heap_node); struct timespec spec; spec.tv_sec = node_p->value / 1000; spec.tv_nsec = (node_p->value % 1000) * 1000000; gettimeofday(&t_start, NULL); kevent(kq, NULL, 0, events, MAX_EVENT_COUNT, &spec); gettimeofday(&t_end, NULL); printf("timeout = %d, run time is %ld\n", node_p->value, t_end.tv_sec*1000+t_end.tv_usec/1000 - (t_start.tv_sec*1000+t_start.tv_usec/1000)); heap_dequeue(heap_p, less_than); } printf("timer is over!\n"); return 0; }
执行gcc timer.c -o timer && ./timer后输出
timeout = 2000, run time is 2004 timeout = 3000, run time is 3000 timeout = 4000, run time is 4003 timeout = 5000, run time is 5005 timeout = 6000, run time is 6005 timeout = 7000, run time is 7005 timeout = 8000, run time is 8004 timeout = 9000, run time is 9000 timeout = 10000, run time is 10005 timer is over!
能够看到咱们设置的9个定时器都预期执行了,除了有5ms之内的误差。这就是nodejs中最底层的定时器实现了。
咱们再回到nodejs中的timer模块,为了避免影响到nodejs中的event loop,timer模块专门提供了一些内部的api(timers._unrefActive)给像socket这样的对象使用。
timer内部会维护一个unrefList链表以及一个unrefTimer Timer对象,当有新的超时任务到来时便会添加到unrefList中,超时后便从unrefList中取出任务执行。
在最初的设计中,每次执行_unrefActive添加任务时都会维持着unrefList的顺序,保证超时时间最小的处于前面。这样在定时器超时后即可以以最快的速度处理超时任务并设置下一个定时器,可是在添加任务时最坏的状况下须要遍历unrefList链表中的全部节点。具体实现可参考https://github.com/nodejs/node/blob/5abd4ac079b390467360d671a186a061b5aba736/lib/timers.js
很显然,在web开发中创建链接是最频繁的操做,那么向unrefList链表中添加节点也就很是频繁了,并且最开始设置的定时器其实最后真正会超时的很是少,由于中间涉及到io的正常操做时便会取消定时器。因此问题就变成最耗性能的操做很是频繁,而几乎不花时间的操做却不多被执行到。
针对这种状况,如何解决呢?目前在node社区主要有2种方案。
主要思路就是将对unrefList链表的遍历操做,移到unrefTimeout定时器超时处理中。这样每次查找出已经超时的任务就须要花比较多的时间了O(n),可是插入操做却变得很是简单O(1),而插入节点正是最频繁的操做。
原理和libuv中的timer实现同样,添加和查找一个节点都能达到O(log(n))的复杂度(找出最小节点自己很快,可是删除它须要O(log(n))的复杂度),可以在两者之间保持一个很好的平衡。
这2种方案都有比较详细benchamark数据, 具体可参考https://github.com/nodejs/node-v0.x-archive/wiki/Optimizing-_unrefActive
在高并发链接到来而且不多有实际的超时事件发生时unrefList使用没有排序的链表来存储超时任务时性能是很是棒的。可是一旦出现不少超时事件都发生的状况下,对超时事件的处理会再次变成一个瓶颈。
而使用二叉堆来存储超时任务时,当有大量超时事件发生时性能会比链表好不少,没有超时事件触发时性能比链表稍差。
可见nodejs在不一样的场景中使用的定时器实现也不都同样。当咱们本身在实际的开发时,若是须要使用到定时器功能,不妨好好思考下哪一种方案更适合业务场景,可以最大的提高timer模块的性能。