事件处理框架所要解决的问题是如何收集,管理,分发事件。这里所说的事件,主要以网络事件和定时器事件为主,而网络事件中又以TCP网络事件为主。因为网络事件与网卡中断处理程序,内核提供的系统调用密切相关,因此网络事件的驱动取决于不一样的操做系统平台,在同一操做系统中也受制于不一样的操做系统内核版本。所以不一样操做系统有不一样的事件驱动机制。nginx
基于模块化的设计思想,nginx对于事件处理分不一样的模块来完成。首先是ngx_events_module,它是NGX_CORE_MODULE类型的模块,主要负责配置文件events块配置项的解析;其次是ngx_event_core_module,它是NGX_EVENTS_MODULE类型的模块,这个模块会决定使用哪一种事件驱动机制,而且怎样调用事件驱动完成事件的管理;最后是ngx_epoll_module,ngx_kqueue_module,ngx_poll_module等一系列模块,这些模块实现了具体的事件驱动机制。服务器
在模块接口ngx_module_t中,有一个指向模块上下文的指针,不一样的模块采用不一样的结构体。网络
对于NGX_EVENT_MODULE类型的模块,其上下文结构体为ngx_event_module_t:数据结构
typedef struct { // 事件模块的名称 ngx_str_t * name; // 用于建立保存配置项参数的结构体 void * (*create_conf)(ngx_cycle_t * cycle); // 结合配置文件初始化配置项参数 char * (*init_conf)(ngx_cycle_t * cycle, void * conf); // 事件驱动机制的核心方法抽象 ngx_event_actions_t actions; } ngx_event_module_t; typedef struct { // 添加事件方法 负责把一个感兴趣的事件添加到事件驱动中 ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); // 删除事件方法 ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); // 启用一个事件 ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); // 禁用一个事件 ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); // 向事件驱动中添加一个新的链接,该链接的读写事件均会被添加到事件驱动中 ngx_int_t (*add_conn)(ngx_connection_t * c); // 从事件驱动中删除一个新的链接 ngx_int_t (*del_conn)(ngx_connection_t * c, ngx_uint_t flags); ngx_int_t (*notify)(ngx_event_handler_pt handler); // 处理事件方法 ngx_int_t (*process_event)(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags); // 事件驱动初始化方法 ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer); // 退出事件驱动前调用的方法 void (*done)(ngx_cycle_t *cycle); } ngx_event_actions_t;
ngx_epoll_module,ngx_kqueue_module,ngx_poll_module等模块均会实现ngx_event_actions_t中定义的方法以此实现对应的事件驱动机制。另外,在事件模块初始化的过程当中会对全局变量ngx_event_actions赋值,后续经过ngx_event_actions完成事件的管理,这样逻辑上就有了较明显的分层。框架
extern ngx_event_actions_t ngx_event_actions; #define ngx_process_events ngx_event_actions.process_events #define ngx_done_events ngx_event_actions.done #define ngx_add_event ngx_event_actions.add #define ngx_del_event ngx_event_actions.del #define ngx_add_conn ngx_event_actions.add_conn #define ngx_del_conn ngx_event_actions.del_conn #define ngx_notify ngx_event_actions.notify
事件模块的运行流程:socket
即调用ngx_event_core_module模块的ngx_event_module_init方法,在该方法中会初始化一些与系统相关的信息,例如进程打开的最大文件数,原子锁(文件锁)的初始化等。tcp
即调用ngx_event_core_module模块的ngx_event_process_init方法,在该方法中会初始化用于多进程侦听的锁,初始化选用的事件驱动机制、初始化链接池、读写事件、最后将侦听套接字做为可读事件添加到事件驱动中。模块化
经过ngx_event_actions开始事件循环。函数
每个事件都由ngx_event_t结构体来表示。该结构体中最核心的部分就是handler回调方法,它由每个事件消费模块实现,以此决定这个事件究竟如何被处理。ui
typedef struct ngx_event_s ngx_event_t; struct ngx_event_s { // 事件相关的对象, 一般指向ngx_connection_t链接对象 void * data; // 事件的处理方法, 每一个事件消费模块都会从新实现它 ngx_event_handler_pt handler; ... };
在nginx中,定义了基本的数据结构ngx_connection_t来表示链接。这个链接能够是被动链接:即客户端主动发起的,nginx服务器被动接受的tcp链接;也能够是主动链接:即nginx主动向上游服务器创建的链接,并以此链接与上游服务器通讯。主动链接由结构体ngx_peer_connection来表示,它是以ngx_connection_t结构体为基础来实现的。
typedef struct ngx_connection_s ngx_connection_t; struct ngx_connection_s { // 做为空闲链接时, 指向链接池中下一个空闲链接 // 做为非空闲链接时, 其意义由使用链接的模块定义, 例如http模块将data指向一个http请求(ngx_http_request_t) void * data; // 链接对应的读事件 ngx_event_t * read; // 链接对应的写事件 ngx_event_t * write; // 链接对应的侦听对象 ngx_listening_t * listening; // 链接的套接字句柄 ngx_socket_t fd; ... };
每一个nginx须要侦听的端口都由结构体ngx_listening_t来表示,该结构体中包含了socket套接字,侦听的ip地址,端口,以及侦听端口上成功创建新链接后的回调处理方法。
typedef struct ngx_listening_s ngx_listening_t; struct ngx_listening_s { // 侦听套接字 ngx_socket_t fd; // 侦听地址 struct sockaddr * sockaddr; // 新链接成功创建后的处理方法 ngx_connection_handler_pt handler; // 对应的链接对象 ngx_connection_t * connection; ... };
在初始化过程当中,每一个链接都会自动对应到一个读事件和写事件;将侦听套接字添加到事件驱动过程当中,为每一个侦听套接字分配一个链接,并对分配到的链接的读事件的处理函数赋值;当新链接创建后回调侦听的处理方法,在该方法中会修改新链接读写事件的处理方法,因为侦听是在不一样的模块中被初始化的,有了新链接的回调处理方法,不一样的模块就能方便的集成事件处理框架。例如http模块在解析配置构造ngx_listening_t结构体时,将其回调处理方法设置为ngx_http_init_connection,当有新链接成功创建时,该函数被回调,而且根据配置与实际状况将新链接的读写事件处理方法修改成ngx_http_wait_request_handler,ngx_http_empty_handler或其余http的处理方法。
相关源码:
static ngx_int_t ngx_event_process_init( ngx_cycle_t * cycle ) { ... // 为链接分配内存空间 cycle->connections = ngx_alloc(sizeof(ngx_connection_t)*cycle->connection_n, cycle->log); if( cycle->connections == NULL ) { return NGX_ERROR; } c = cycle->connections; // 为读事件分配内存空间并初始化 cycle->read_events = ngx_alloc(sizeof(ngx_event_t)*cycle->connection_n, cycle->log); if( cycle->read_events == NULL ) { return NGX_ERROR; } rev = cycle->read_events; for( i = 0; i < cycle->connection_n; i++ ) { rev[i].closed = 1; rev[i].instance = 1; } // 为写事件分配内存空间并初始化 cycle->write_events = ngx_alloc(sizeof(ngx_event_t)*cycle->connection_n, cycle->log); if( cycle->write_events == NULL ) { return NGX_ERROR; } wev = cycle->write_events; for( i = 0; i < cycle->connection_n; i++ ) { wev[i].closed = 1; wev[i].instance = 1; } // 每一个链接的读事件和写事件 i = cycle->connection_n; next = NULL; do { i--; c[i].data = next; c[i].read = &cycle->read_events[i]; c[i].write = &cycle->write_events[i]; c[i].fd = (ngx_socket_t)-1; } while(i); // 为每一个侦听分配一个链接, 用于事件管理 ls = cycle->listening.elts; for( i = 0 ; i < cycle->listening.nelts; i++ ) { c = ngx_get_connection(ls[i].fd, cycle->log); if( c == NULL ) { return NGX_ERROR; } c->type = ls[i].type; c->log = &ls[i].log; c->listening = &ls[i]; ls[i].connection = c; rev = c->read; rev->log = c->log; rev->accept = 1; rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept : ngx_event_recvmsg; if(ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ERROR; } } return NGX_OK; }
参考:
《深刻理解nginx》
《nginx开发从入门到精通》