struct ngx_event_s { /* 事件上下文数据,一般data都是指向ngx_connection_t链接对象。 * 开启文件异步I/O时,它可能会指向ngx_event_aio_t结构体。 */ void *data; /* 标志位,为1时表示事件是可写的。一般它表示对应的TCP链接可写,也就是链接处于能够发送网络包的状态。*/ unsigned write:1; /* 标志位,为1时表示为此事件能够创建新的链接。 一般在ngx_cycle_t中的listening动态数组中,每个监听对象ngx_listening_t,对应的读事件中的accept标志位才会是1。 */ unsigned accept:1; /* used to detect the stale events in kqueue, rtsig, and epoll * 这个标志位用于区分当前事件是否过时,它仅仅是给事件驱动模块使用的,而事件消费模块可不用关心。 * 为何须要这个标志位呢?当开始处理一批事件时,处理前面的事件可能会关闭一些链接, 而这些链接有可能影响这批事件中还未处理到的后面的事件,这时可经过instance来避免处理后面的过时事件。 */ unsigned instance:1; /* the event was passed or would be passed to a kernel; * in aio mode - operation was posted. * 标志位,为1表示当前事件是活跃的,为0表示事件是不活跃的。 * 这个状态对应着事件驱动模块处理方式的不一样。例如,在添加事件,删除事件和处理事件时, * active标志位的不一样都会对应着不一样的处理方式。在使用事件时,通常不会直接改变active标志位。 */ unsigned active:1; /* 标志位,为1表示禁用事件,仅在kqueue或者rtsig事件驱动模块中有效,对于epoll事件驱动模块则没有意义。 */ unsigned disabled:1; /* the ready event; in aio mode 0 means that no operation can be posted * 标志位,为1表示当前事件准备就绪,也就是说,容许这个事件的handler处理这个事件。 * 在HTTP框架中,常常会检查事件的ready标志位,以肯定是否能够接收请求或者发送相应。 */ unsigned ready:1; /* 该标志位仅对kqueue,eventport等模块有意义,而对于linux上的epoll事件驱动模块则是无心义的。*/ unsigned oneshot:1; /* aio operation is complete 用于异步aio事件的处理*/ unsigned complete:1; /* 标志位,eof表示当前处理的字符流已经结束,error表示事件处理过程出错了*/ unsigned eof:1; unsigned error:1; /* 标志位,为1表示这个事件超时,用以提示handler作超时处理,它与timer_set都用了定时器 */ unsigned timedout:1; unsigned timer_set:1; /* 标志位,delayed为1表示须要延迟处理这个事件,它仅用于限速功能 */ unsigned delayed:1; /* 标志位,为1表示延迟创建TCP链接,也就是TCP三次握手后并不创建链接,而是等到真正收到数据包后才建链接 */ unsigned deferred_accept:1; /* the pending eof reported by kqueue, epoll or in aio chain operation */ /* 标志位,为1表示等待字符流结束,它只与kqueue和aio事件驱动机制有关 */ unsigned pending_eof:1; //接受、读、写 unsigned posted:1; //关闭 unsigned closed:1; /* to test on worker exit */ unsigned channel:1; unsigned resolver:1; unsigned cancelable:1; #if (NGX_HAVE_KQUEUE) unsigned kq_vnode:1; /* the pending errno reported by kqueue */ int kq_errno; #endif /* * kqueue only: * accept: number of sockets that wait to be accepted * read: bytes to read when event is ready * or lowat when event is set with NGX_LOWAT_EVENT flag * write: available space in buffer when event is ready * or lowat when event is set with NGX_LOWAT_EVENT flag * * epoll with EPOLLRDHUP: * accept: 1 if accept many, 0 otherwise * read: 1 if there can be data to read, 0 otherwise * * iocp: TODO * * otherwise: * accept: 1 if accept many, 0 otherwise */ #if (NGX_HAVE_KQUEUE) || (NGX_HAVE_IOCP) int available; #else unsigned available:1; #endif /* 这个事件发生时的处理方法 */ ngx_event_handler_pt handler; #if (NGX_HAVE_IOCP) ngx_event_ovlp_t ovlp; #endif ngx_uint_t index; ngx_log_t *log; ngx_rbtree_node_t timer; /* the posted queue */ ngx_queue_t queue; #if 0 /* the threads support */ /* * the event thread context, we store it here * if $(CC) does not understand __thread declaration * and pthread_getspecific() is too costly */ void *thr_ctx; #if (NGX_EVENT_T_PADDING) /* event should not cross cache line in SMP */ uint32_t padding[NGX_EVENT_T_PADDING]; #endif #endif };
struct ngx_event_aio_s { void *data; ngx_event_handler_pt handler; ngx_file_t *file; ngx_fd_t fd; #if (NGX_HAVE_AIO_SENDFILE || NGX_COMPAT) ssize_t (*preload_handler)(ngx_buf_t *file); #endif #if (NGX_HAVE_EVENTFD) int64_t res; #endif #if !(NGX_HAVE_EVENTFD) || (NGX_TEST_BUILD_EPOLL) ngx_err_t err; size_t nbytes; #endif ngx_aiocb_t aiocb; ngx_event_t event; };
typedef struct { /* 添加/移出事件方法,负责把事件添加/移出到操做系统提供的事件驱动机制(如epoll,kqueue等)中, 这样在事件发生以后,将能够/没法调用下面的process_envets时获取这个事件。 */ ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); /* 启用/禁用一个事件,目前事件框架不会调用,大部分事件驱动模块对该方法的实现都与add/del彻底一致 * ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); /* 向事件驱动机制中添加/移除一个新的链接,这意味着链接上的读写事件都添加到/移出事件驱动机制中了 */ ngx_int_t (*add_conn)(ngx_connection_t *c); ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags); ngx_int_t (*notify)(ngx_event_handler_pt handler); 'nginx的核心事件,将epoll事件封装到nginx事件列表中' '启动nginx的事件列表' /* 在正常的工做循环中,将经过调用process_events方法来处理事件。 * 这个方法仅在ngx_process_events_and_timers方法中调用,它是处理分发事件的核心。*/ ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags); /* 初始化和退出事件驱动模块的方法:初始化epoll */ ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer); void (*done)(ngx_cycle_t *cycle); } ngx_event_actions_t;
/** 模块的进程启动函数 */ static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) { ngx_uint_t m, i; ngx_event_t *rev, *wev; ngx_listening_t *ls; ngx_connection_t *c, *next, *old; ngx_core_conf_t *ccf; ngx_event_conf_t *ecf; ngx_event_module_t *module; ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module); if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) { ngx_use_accept_mutex = 1; ngx_accept_mutex_held = 0; ngx_accept_mutex_delay = ecf->accept_mutex_delay; } else { ngx_use_accept_mutex = 0; } //初始化nginx的事件列表 ngx_queue_init(&ngx_posted_accept_events); ngx_queue_init(&ngx_posted_events); if (ngx_event_timer_init(cycle->log) NGX_ERROR) { return NGX_ERROR; } //初始化 NGX_EVENT_MODULE类型模块的事件上下文: // epoll事件引擎:epoll_create for (m = 0; cycle->modules[m]; m++) { if (cycle->modules[m]->type != NGX_EVENT_MODULE) { continue; } if (cycle->modules[m]->ctx_index != ecf->use) { continue; } module = cycle->modules[m]->ctx; if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) { /* fatal */ exit(2); } break; } if (ngx_timer_resolution && !(ngx_event_flags & NGX_USE_TIMER_EVENT)) { ngx_log_error(NGX_LOG_WARN, cycle->log, 0, "the \"timer_resolution\" directive is not supported " "with the configured event method, ignored"); ngx_timer_resolution = 0; } cycle->connections = ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log); if (cycle->connections NULL) { return NGX_ERROR; } c = cycle->connections; cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); if (cycle->read_events NULL) { return NGX_ERROR; } rev = cycle->read_events; for (i = 0; i < cycle->connection_n; i++) { rev[i].closed = 1; rev[i].instance = 1; } cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); if (cycle->write_events NULL) { return NGX_ERROR; } wev = cycle->write_events; for (i = 0; i < cycle->connection_n; i++) { wev[i].closed = 1; } i = cycle->connection_n; next = NULL; do { i--; c[i].data = next; c[i].read = &cycle->read_events[i]; c[i].write = &cycle->write_events[i]; c[i].fd = (ngx_socket_t) -1; next = &c[i]; } while (i); cycle->free_connections = next; cycle->free_connection_n = cycle->connection_n; /* for each listening socket */ ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { c = ngx_get_connection(ls[i].fd, cycle->log); if (c NULL) { return NGX_ERROR; } c->type = ls[i].type; c->log = &ls[i].log; c->listening = &ls[i]; ls[i].connection = c; rev = c->read; rev->log = c->log; rev->accept = 1; if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) { if (ls[i].previous) { old = ls[i].previous->connection; if (ngx_del_event(old->read, NGX_READ_EVENT, NGX_CLOSE_EVENT) NGX_ERROR) { return NGX_ERROR; } old->fd = (ngx_socket_t) -1; } } '初始化tcp/UNIX域的accept事件' rev->handler = (c->type SOCK_STREAM) ? ngx_event_accept : ngx_event_recvmsg; '获取ngx_use_accept_mutex' if (ngx_use_accept_mutex) { continue; } '获取ngx_use_accept_mutex成功后,将该套接字注册到I/O引擎上' if (ngx_add_event(rev, NGX_READ_EVENT, 0) NGX_ERROR) { return NGX_ERROR; } return NGX_OK; }
ngx_trylock_accept_mutex 当获取到标志位后才进行** accept 事件注册**node
ngx_process_events 处理事件linux
释放 accept_mutex 锁nginx
ngx_event_process_posted 处理 posted 队列的事件数组
ngx_process_events_and_timers()网络
/** nginx的事件初始器 */ void ngx_process_events_and_timers(ngx_cycle_t *cycle) { ngx_uint_t flags; ngx_msec_t timer, delta; if (ngx_timer_resolution) { timer = NGX_TIMER_INFINITE; flags = 0; } else { timer = ngx_event_find_timer(); flags = NGX_UPDATE_TIME; } if (ngx_use_accept_mutex) { if (ngx_accept_disabled > 0) { ngx_accept_disabled--; } else { //获取ngx_use_accept_mutex,将套接字注册到I/O事件引擎上,事件类型accept if (ngx_trylock_accept_mutex(cycle) NGX_ERROR) { return; } if (ngx_accept_mutex_held) { flags |= NGX_POST_EVENTS; } else { if (timer NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay) { timer = ngx_accept_mutex_delay; } } } } delta = ngx_current_msec; '调用处理好的 read、write放到ngx_posted_events中,等待nginx处理' (void) ngx_process_events(cycle, timer, flags); delta = ngx_current_msec - delta; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "timer delta: %M", delta); //处理nginx事件队列中的事件 ngx_event_process_posted(cycle, &ngx_posted_accept_events); if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); } if (delta) { ngx_event_expire_timers(); } ngx_event_process_posted(cycle, &ngx_posted_events); }
/** 获取ngx_accept_mutex锁,添加事件 */ ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle) { if (ngx_shmtx_trylock(&ngx_accept_mutex)) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex locked"); if (ngx_accept_mutex_held && ngx_accept_events 0) { return NGX_OK; } if (ngx_enable_accept_events(cycle) NGX_ERROR) { ngx_shmtx_unlock(&ngx_accept_mutex); return NGX_ERROR; } ngx_accept_events = 0; ngx_accept_mutex_held = 1; return NGX_OK; } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex lock failed: %ui", ngx_accept_mutex_held); if (ngx_accept_mutex_held) { if (ngx_disable_accept_events(cycle, 0) NGX_ERROR) { return NGX_ERROR; } ngx_accept_mutex_held = 0; } return NGX_OK; }
/** 处理事件队列中的事件 */ void ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted) { ngx_queue_t *q; ngx_event_t *ev; while (!ngx_queue_empty(posted)) { q = ngx_queue_head(posted); ev = ngx_queue_data(q, ngx_event_t, queue); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "posted event %p", ev); ngx_delete_posted_event(ev); ev->handler(ev); } }
#define ngx_post_event(ev, q) \ \ if (!(ev)->posted) { \ (ev)->posted = 1; \ ngx_queue_insert_tail(q, &(ev)->queue); \ \ ngx_log_debug1(NGX_LOG_DEBUG_CORE, (ev)->log, 0, "post event %p", ev);\ \ } else { \ ngx_log_debug1(NGX_LOG_DEBUG_CORE, (ev)->log, 0, \ "update posted event %p", ev); \ }
static ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle) { ngx_uint_t i; ngx_listening_t *ls; ngx_connection_t *c; ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { c = ls[i].connection; if (c NULL || c->read->active) { continue; } '调用epoll的 ngx_epoll_add_event,添加套接字到I/O事件引擎上' if (ngx_add_event(c->read, NGX_READ_EVENT, 0) NGX_ERROR) { return NGX_ERROR; } } return NGX_OK; }
/* nginx事件:用来处理I/O引擎中tcp的accept类型事件 根据其它模块(http、stream、mail)设置的 */ void ngx_event_accept(ngx_event_t *ev) { socklen_t socklen; ngx_err_t err; ngx_log_t *log; ngx_uint_t level; ngx_socket_t s; ngx_event_t *rev, *wev; ngx_sockaddr_t sa; ngx_listening_t *ls; ngx_connection_t *c, *lc; ngx_event_conf_t *ecf; if (ev->timedout) { if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) { return; } ev->timedout = 0; } ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module); if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) { ev->available = ecf->multi_accept; } lc = ev->data; ls = lc->listening; ev->ready = 0; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, "accept on %V, ready: %d", &ls->addr_text, ev->available); do { socklen = sizeof(ngx_sockaddr_t); // 接受 s = accept(lc->fd, &sa.sockaddr, &socklen); if (s (ngx_socket_t) -1) { err = ngx_socket_errno; if (err NGX_EAGAIN) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err, "accept() not ready"); return; } level = NGX_LOG_ALERT; if (err NGX_ECONNABORTED) { level = NGX_LOG_ERR; } else if (err NGX_EMFILE || err NGX_ENFILE) { level = NGX_LOG_CRIT; } ngx_log_error(level, ev->log, err, "accept() failed"); if (err NGX_ECONNABORTED) { if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; } if (ev->available) { continue; } } if (err NGX_EMFILE || err NGX_ENFILE) { if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle, 1) != NGX_OK) { return; } if (ngx_use_accept_mutex) { if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); ngx_accept_mutex_held = 0; } ngx_accept_disabled = 1; } else { ngx_add_timer(ev, ecf->accept_mutex_delay); } } return; } ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n; c = ngx_get_connection(s, ev->log); if (c NULL) { if (ngx_close_socket(s) -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_close_socket_n " failed"); } return; } c->type = SOCK_STREAM; c->pool = ngx_create_pool(ls->pool_size, ev->log); if (c->pool NULL) { ngx_close_accepted_connection(c); return; } if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) { socklen = sizeof(ngx_sockaddr_t); } c->sockaddr = ngx_palloc(c->pool, socklen); if (c->sockaddr NULL) { ngx_close_accepted_connection(c); return; } ngx_memcpy(c->sockaddr, &sa, socklen); log = ngx_palloc(c->pool, sizeof(ngx_log_t)); if (log NULL) { ngx_close_accepted_connection(c); return; } /* set a blocking mode for iocp and non-blocking mode for others */ if (ngx_inherited_nonblocking) { if (ngx_event_flags & NGX_USE_IOCP_EVENT) { if (ngx_blocking(s) -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_blocking_n " failed"); ngx_close_accepted_connection(c); return; } } } else { if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) { if (ngx_nonblocking(s) -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_nonblocking_n " failed"); ngx_close_accepted_connection(c); return; } } } *log = ls->log; c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->log = log; c->pool->log = log; c->socklen = socklen; c->listening = ls; c->local_sockaddr = ls->sockaddr; c->local_socklen = ls->socklen; rev = c->read; wev = c->write; wev->ready = 1; if (ngx_event_flags & NGX_USE_IOCP_EVENT) { rev->ready = 1; } if (ev->deferred_accept) { rev->ready = 1; } rev->log = log; wev->log = log; /* * TODO: MT: - ngx_atomic_fetch_add() * or protection by critical section or light mutex * * TODO: MP: - allocated in a shared memory * - ngx_atomic_fetch_add() * or protection by critical section or light mutex */ c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); if (ls->addr_ntop) { c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len); if (c->addr_text.data NULL) { ngx_close_accepted_connection(c); return; } c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen, c->addr_text.data, ls->addr_text_max_len, 0); if (c->addr_text.len 0) { ngx_close_accepted_connection(c); return; } } if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) 0) { if (ngx_add_conn(c) NGX_ERROR) { ngx_close_accepted_connection(c); return; } } log->data = NULL; log->handler = NULL; '根据其它模块(http、stream、mail)设置的' ls->handler(c); if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; } } while (ev->available); }
特色:框架
例如:启用epoll模块事件结构:将全部的I/O操做都封装成一个nginx事件,而后在work中循环调用异步
ngx_event_module_t ngx_epoll_module_ctx = { &epoll_name, ngx_epoll_create_conf, /* create configuration */ ngx_epoll_init_conf, /* init configuration */ //nginx的事件处理函数 { ngx_epoll_add_event, /* add an event */ ngx_epoll_del_event, /* delete an event */ ngx_epoll_add_event, /* enable an event */ ngx_epoll_del_event, /* disable an event */ ngx_epoll_add_connection, /* add an connection */ ngx_epoll_del_connection, /* delete an connection */ ngx_epoll_process_events, /* process the events */ ngx_epoll_init, /* init the events */ ngx_epoll_done, /* done the events */ } };
/** NGX_EVENT_MOUDLE中epoll的actions->init()函数 */ static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) { ngx_epoll_conf_t *epcf; //获取epoll的配置信息 epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module); if (ep -1) { ep = epoll_create(cycle->connection_n / 2); //建立epoll if (ep -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "epoll_create() failed"); return NGX_ERROR; } } if (nevents < epcf->events) { if (event_list) { ngx_free(event_list); } event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events, cycle->log); if (event_list NULL) { return NGX_ERROR; } } nevents = epcf->events; ngx_io = ngx_os_io; ngx_event_actions = ngx_epoll_module_ctx.actions; //模块的总事件 return NGX_OK; }
/** 添加nginx事件(一个套接字)到epoll中 */ static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) { int op; uint32_t events, prev; ngx_event_t *e; ngx_connection_t *c; struct epoll_event ee; c = ev->data; events = (uint32_t) event; if (event NGX_READ_EVENT) { e = c->write; prev = EPOLLOUT; //表示对应的文件描述符能够写; } else { e = c->read; } if (e->active) { op = EPOLL_CTL_MOD; //修改已经注册的fd的监听事件 events |= prev; } else { op = EPOLL_CTL_ADD; //注册新的fd到epfd中; } ee.events = events | (uint32_t) flags; ee.data.ptr = (void *) ((uintptr_t) c | ev->instance); ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0, "epoll add event: fd:%d op:%d ev:%08XD", c->fd, op, ee.events); '将套接字添加到I/O引擎中' if (epoll_ctl(ep, op, c->fd, &ee) -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "epoll_ctl(%d, %d) failed", op, c->fd); return NGX_ERROR; } ev->active = 1; return NGX_OK; }
/** I/O引擎事件处理函数 **/ static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { int events; uint32_t revents; ngx_int_t instance, i; ngx_uint_t level; ngx_err_t err; ngx_event_t *rev, *wev; ngx_queue_t *queue; ngx_connection_t *c; /* NGX_TIMER_INFINITE INFTIM */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll timer: %M", timer); //获取epoll处理好的事件 events = epoll_wait(ep, event_list, (int) nevents, timer); err = (events -1) ? ngx_errno : 0; if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { ngx_time_update(); } if (err) { if (err NGX_EINTR) { if (ngx_event_timer_alarm) { ngx_event_timer_alarm = 0; return NGX_OK; } level = NGX_LOG_INFO; } else { level = NGX_LOG_ALERT; } ngx_log_error(level, cycle->log, err, "epoll_wait() failed"); return NGX_ERROR; } if (events 0) { if (timer != NGX_TIMER_INFINITE) { return NGX_OK; } ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "epoll_wait() returned no events without timeout"); return NGX_ERROR; } //循环处理 for (i = 0; i < events; i++) { c = event_list[i].data.ptr; instance = (uintptr_t) c & 1; c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); rev = c->read; if (c->fd -1 || rev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } revents = event_list[i].events; ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: fd:%d ev:%04XD d:%p", c->fd, revents, event_list[i].data.ptr); // 事件的错误信息 if (revents & (EPOLLERR|EPOLLHUP)) { ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll_wait() error on fd:%d ev:%04XD", c->fd, revents); /* * if the error events were returned, add EPOLLIN and EPOLLOUT * to handle the events at least in one active handler */ revents |= EPOLLIN|EPOLLOUT; } //读事件 if ((revents & EPOLLIN) && rev->active) { rev->ready = 1; if (flags & NGX_POST_EVENTS) { queue = rev->accept ? &ngx_posted_accept_events : &ngx_posted_events; '将准备好读事件添加到ngx_posted_accept_events和ngx_posted_events' ngx_post_event(rev, queue); } else { '若是不是NGX_POST_EVENTS,直接调用gx_posted_accept_events()和ngx_posted_events()' rev->handler(rev); } } wev = c->write; //写事件 if ((revents & EPOLLOUT) && wev->active) { if (c->fd -1 || wev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } wev->ready = 1; if (flags & NGX_POST_EVENTS) { '将准备好的写事件添加到ngx_posted_events' ngx_post_event(wev, &ngx_posted_events); } else { wev->handler(wev); } } } return NGX_OK; }
/** http链接处理函数 1.初始化ngx_http_connection_t 2.设置read,write的处理函数 3.将这个链接封装成一个nginx事件,添加到事件队列中 */ void ngx_http_init_connection(ngx_connection_t *c) { ngx_uint_t i; ngx_event_t *rev; struct sockaddr_in *sin; ngx_http_port_t *port; ngx_http_in_addr_t *addr; ngx_http_log_ctx_t *ctx; ngx_http_connection_t *hc; #if (NGX_HAVE_INET6) struct sockaddr_in6 *sin6; ngx_http_in6_addr_t *addr6; #endif //初始化http链接 hc = ngx_pcalloc(c->pool, sizeof(ngx_http_connection_t)); if (hc NULL) { ngx_http_close_connection(c); return; } //分配链接 c->data = hc; /* find the server configuration for the address:port */ //监听端口 port = c->listening->servers; if (port->naddrs > 1) { /* * there are several addresses on this port and one of them * is an "*:port" wildcard so getsockname() in ngx_http_server_addr() * is required to determine a server address */ if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) { ngx_http_close_connection(c); return; } switch (c->local_sockaddr->sa_family) { #if (NGX_HAVE_INET6) case AF_INET6: sin6 = (struct sockaddr_in6 *) c->local_sockaddr; addr6 = port->addrs; /* the last address is "*" */ for (i = 0; i < port->naddrs - 1; i++) { if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) 0) { break; } } hc->addr_conf = &addr6[i].conf; break; #endif default: /* AF_INET */ sin = (struct sockaddr_in *) c->local_sockaddr; //初始化地址 addr = port->addrs; /* the last address is "*" */ for (i = 0; i < port->naddrs - 1; i++) { if (addr[i].addr sin->sin_addr.s_addr) { break; } } hc->addr_conf = &addr[i].conf; break; } } else { switch (c->local_sockaddr->sa_family) { #if (NGX_HAVE_INET6) case AF_INET6: addr6 = port->addrs; hc->addr_conf = &addr6[0].conf; break; #endif default: /* AF_INET */ addr = port->addrs; hc->addr_conf = &addr[0].conf; break; } } /* the default server configuration for the address:port */ hc->conf_ctx = hc->addr_conf->default_server->ctx; //配置文件 ctx = ngx_palloc(c->pool, sizeof(ngx_http_log_ctx_t)); if (ctx NULL) { ngx_http_close_connection(c); return; } ctx->connection = c; ctx->request = NULL; ctx->current_request = NULL; c->log->connection = c->number; c->log->handler = ngx_http_log_error; c->log->data = ctx; c->log->action = "waiting for request"; c->log_error = NGX_ERROR_INFO; rev = c->read; rev->handler = ngx_http_wait_request_handler; //读事件 c->write->handler = ngx_http_empty_handler; //写事件 #if (NGX_HTTP_V2) if (hc->addr_conf->http2) { rev->handler = ngx_http_v2_init; } #endif #if (NGX_HTTP_SSL) { ngx_http_ssl_srv_conf_t *sscf; sscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_ssl_module); if (sscf->enable || hc->addr_conf->ssl) { c->log->action = "SSL handshaking"; if (hc->addr_conf->ssl && sscf->ssl.ctx NULL) { ngx_log_error(NGX_LOG_ERR, c->log, 0, "no \"ssl_certificate\" is defined " "in server listening on SSL port"); ngx_http_close_connection(c); return; } hc->ssl = 1; rev->handler = ngx_http_ssl_handshake; //SSL处理事件 } } #endif if (hc->addr_conf->proxy_protocol) { hc->proxy_protocol = 1; c->log->action = "reading PROXY protocol"; } if (rev->ready) { /* the deferred accept(), iocp */ if (ngx_use_accept_mutex) { ngx_post_event(rev, &ngx_posted_events); '' return; } rev->handler(rev); '直接执行read处理函数' return; } //设置结构的时间 ngx_add_timer(rev, c->listening->post_accept_timeout); ngx_reusable_connection(c, 1); '将这个链接套接字放到I/O事件引擎上' if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_http_close_connection(c); return; } }
/** http 链接的读事件 1. 初始化ngx_http_connection_t 2. 设置buffer的大小 3. 测试链接是否可用 3. 若是有动态代理,则将其封装成一个事件放到nginx的事件队列 5. 从新设置链接事件为:ngx_http_process_request_line 6. 解析读到的内容 */ static void ngx_http_wait_request_handler(ngx_event_t *rev) { u_char *p; size_t size; ssize_t n; ngx_buf_t *b; ngx_connection_t *c; ngx_http_connection_t *hc; ngx_http_core_srv_conf_t *cscf; //初始化ngx_connection_t c = rev->data; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http wait request handler"); if (rev->timedout) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); ngx_http_close_connection(c); return; } if (c->close) { ngx_http_close_connection(c); return; } //初始化ngx_http_connection_t链接 hc = c->data; cscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_core_module); //初始化buffer的大小 size = cscf->client_header_buffer_size; b = c->buffer; if (b NULL) { b = ngx_create_temp_buf(c->pool, size); if (b NULL) { ngx_http_close_connection(c); return; } c->buffer = b; } else if (b->start NULL) { b->start = ngx_palloc(c->pool, size); if (b->start NULL) { ngx_http_close_connection(c); return; } b->pos = b->start; b->last = b->start; b->end = b->last + size; } //读取数据:用于测试链接是否可用 n = c->recv(c, b->last, size); //若是出错 if (n NGX_AGAIN) { if (!rev->timer_set) { ngx_add_timer(rev, c->listening->post_accept_timeout); ngx_reusable_connection(c, 1); } if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_http_close_connection(c); return; } /* * We are trying to not hold c->buffer's memory for an idle connection. */ if (ngx_pfree(c->pool, b->start) NGX_OK) { b->start = NULL; } return; } if (n NGX_ERROR) { ngx_http_close_connection(c); return; } if (n 0) { ngx_log_error(NGX_LOG_INFO, c->log, 0, "client closed connection"); ngx_http_close_connection(c); return; } b->last += n; //调用动态代理 if (hc->proxy_protocol) { hc->proxy_protocol = 0; //返回动态代理的地址 p = ngx_proxy_protocol_read(c, b->pos, b->last); if (p NULL) { ngx_http_close_connection(c); return; } b->pos = p; if (b->pos b->last) { c->log->action = "waiting for request"; b->pos = b->start; b->last = b->start; '将这个链接封装成一个nginx事件,添加到事件队列中' ngx_post_event(rev, &ngx_posted_events); return; } } c->log->action = "reading client request line"; ngx_reusable_connection(c, 0); c->data = ngx_http_create_request(c); if (c->data NULL) { ngx_http_close_connection(c); return; } '从新设置事件的处理函数' rev->handler = ngx_http_process_request_line; '解析读到的内容' ngx_http_process_request_line(rev); }