ngx_event_core_module模塊的ngx_event_process_init方法對事件模塊做了一些初始化。其中包括將“請求連接”這樣一個讀事件對應的處理方法(handler)設置為ngx_event_accept函數,并將此事件添加到epoll模塊中。當有新連接事件發生時,ngx_event_accept就會被調用。大致流程是這樣:
worker進程在ngx_worker_process_cycle方法中不斷循環調用ngx_process_events_and_timers函數處理事件,這個函數是事件處理的總入口。
ngx_process_events_and_timers會調用ngx_process_events,這是一個宏,相當于ngx_event_actions.process_events,ngx_event_actions是個全局的結構體,存儲了對應事件驅動模塊(這里是epoll模塊)的10個函數接口。所以這里就是調用了ngx_epoll_module_ctx.actions.process_events函數,也就是ngx_epoll_process_events函數來處理事件。
ngx_epoll_process_events調用linux函數接口epoll_wait獲得“有新連接”這個事件,然后調用這個事件的handler處理函數來對這個事件進行處理。
在上面已經說過handler已經被設置成了ngx_event_accept函數,所以就調用ngx_event_accept進行實際的處理。
下面分析ngx_event_accept方法,它的流程圖如下所示:
經過精簡的代碼如下,注釋中的序號對應上圖的序號:
void ngx_event_accept(ngx_event_t?*ev) { ?socklen_t??socklen; ?ngx_err_t??err; ?ngx_log_t??*log; ?ngx_uint_t??level; ?ngx_socket_t??s; ?ngx_event_t??*rev,?*wev; ?ngx_listening_t??*ls; ?ngx_connection_t?*c,?*lc; ?ngx_event_conf_t?*ecf; ?u_char??sa[ngx_sockaddrlen]; ? ?if?(ev->timedout)?{ ??if?(ngx_enable_accept_events((ngx_cycle_t?*)?ngx_cycle)?!=?ngx_ok)?{ ???return; ??} ? ??ev->timedout?=?0; ?} ? ?ecf?=?ngx_event_get_conf(ngx_cycle->conf_ctx,?ngx_event_core_module); ? ?if?(ngx_event_flags?&?ngx_use_rtsig_event)?{ ??ev->available?=?1; ? ?}?else?if?(!(ngx_event_flags?&?ngx_use_kqueue_event))?{ ??ev->available?=?ecf->multi_accept; ?} ? ?lc?=?ev->data; ?ls?=?lc->listening; ?ev->ready?=?0; ? ?do?{ ??socklen?=?ngx_sockaddrlen; ? ??/*?1、accept方法試圖建立連接,非阻塞調用?*/ ??s?=?accept(lc->fd,?(struct?sockaddr?*)?sa,?&socklen); ? ??if?(s?==?(ngx_socket_t)?-1) ??{ ???err?=?ngx_socket_errno; ? ???if?(err?==?ngx_eagain) ???{ ????/*?沒有連接,直接返回?*/ ????return; ???} ? ???level?=?ngx_log_alert; ? ???if?(err?==?ngx_econnaborted)?{ ????level?=?ngx_log_err; ? ???}?else?if?(err?==?ngx_emfile?||?err?==?ngx_enfile)?{ ????level?=?ngx_log_crit; ???} ? ???if?(err?==?ngx_econnaborted)?{ ????if?(ngx_event_flags?&?ngx_use_kqueue_event)?{ ?????ev->available--; ????} ? ????if?(ev->available)?{ ?????continue; ????} ???} ? ???if?(err?==?ngx_emfile?||?err?==?ngx_enfile)?{ ????if?(ngx_disable_accept_events((ngx_cycle_t?*)?ngx_cycle) ?????!=?ngx_ok) ????{ ?????return; ????} ? ????if?(ngx_use_accept_mutex)?{ ?????if?(ngx_accept_mutex_held)?{ ??????ngx_shmtx_unlock(&ngx_accept_mutex); ??????ngx_accept_mutex_held?=?0; ?????} ? ?????ngx_accept_disabled?=?1; ? ????}?else?{ ?????ngx_add_timer(ev,?ecf->accept_mutex_delay); ????} ???} ? ???return; ??} ? ??/*?2、設置負載均衡閾值?*/ ??ngx_accept_disabled?=?ngx_cycle->connection_n?/?8 ????????-?ngx_cycle->free_connection_n; ? ??/*?3、從連接池獲得一個連接對象?*/ ??c?=?ngx_get_connection(s,?ev->log); ? ??/*?4、為連接創建內存池?*/ ??c->pool?=?ngx_create_pool(ls->pool_size,?ev->log); ? ??c->sockaddr?=?ngx_palloc(c->pool,?socklen); ? ??ngx_memcpy(c->sockaddr,?sa,?socklen); ? ??log?=?ngx_palloc(c->pool,?sizeof(ngx_log_t)); ? ??/*?set?a?blocking?mode?for?aio?and?non-blocking?mode?for?others?*/ ??/*?5、設置套接字屬性為阻塞或非阻塞?*/ ??if?(ngx_inherited_nonblocking)?{ ???if?(ngx_event_flags?&?ngx_use_aio_event)?{ ????if?(ngx_blocking(s)?==?-1)?{ ?????ngx_log_error(ngx_log_alert,?ev->log,?ngx_socket_errno, ?????????ngx_blocking_n?"?failed"); ?????ngx_close_accepted_connection(c); ?????return; ????} ???} ? ??}?else?{ ???if?(!(ngx_event_flags?&?(ngx_use_aio_event|ngx_use_rtsig_event)))?{ ????if?(ngx_nonblocking(s)?==?-1)?{ ?????ngx_log_error(ngx_log_alert,?ev->log,?ngx_socket_errno, ?????????ngx_nonblocking_n?"?failed"); ?????ngx_close_accepted_connection(c); ?????return; ????} ???} ??} ? ??*log?=?ls->log; ? ??c->recv?=?ngx_recv; ??c->send?=?ngx_send; ??c->recv_chain?=?ngx_recv_chain; ??c->send_chain?=?ngx_send_chain; ? ??c->log?=?log; ??c->pool->log?=?log; ? ??c->socklen?=?socklen; ??c->listening?=?ls; ??c->local_sockaddr?=?ls->sockaddr; ??c->local_socklen?=?ls->socklen; ? ??c->unexpected_eof?=?1; ? ??rev?=?c->read; ??wev?=?c->write; ? ??wev->ready?=?1; ? ??if?(ngx_event_flags?&?(ngx_use_aio_event|ngx_use_rtsig_event))?{ ???/*?rtsig,?aio,?iocp?*/ ???rev->ready?=?1; ??} ? ??if?(ev->deferred_accept)?{ ???rev->ready?=?1; ? ??} ? ??rev->log?=?log; ??wev->log?=?log; ? ??/* ???*?todo:?mt:?-?ngx_atomic_fetch_add() ???*??or?protection?by?critical?section?or?light?mutex ???* ???*?todo:?mp:?-?allocated?in?a?shared?memory ???*???-?ngx_atomic_fetch_add() ???*??or?protection?by?critical?section?or?light?mutex ???*/ ? ??c->number?=?ngx_atomic_fetch_add(ngx_connection_counter,?1); ? ??if?(ls->addr_ntop)?{ ???c->addr_text.data?=?ngx_pnalloc(c->pool,?ls->addr_text_max_len); ???if?(c->addr_text.data?==?null)?{ ????ngx_close_accepted_connection(c); ????return; ???} ? ???c->addr_text.len?=?ngx_sock_ntop(c->sockaddr,?c->socklen, ????????????c->addr_text.data, ????????????ls->addr_text_max_len,?0); ???if?(c->addr_text.len?==?0)?{ ????ngx_close_accepted_connection(c); ????return; ???} ??} ? ??/*?6、將新連接對應的讀寫事件添加到epoll對象中?*/ ??if?(ngx_add_conn?&&?(ngx_event_flags?&?ngx_use_epoll_event)?==?0)?{ ???if?(ngx_add_conn(c)?==?ngx_error)?{ ????ngx_close_accepted_connection(c); ????return; ???} ??} ? ??log->data?=?null; ??log->handler?=?null; ? ??/*?7、tcp建立成功調用的方法,這個方法在ngx_listening_t結構體中?*/ ??ls->handler(c); ? ?}?while?(ev->available);?/*?available標志表示一次盡可能多的建立連接,由配置項multi_accept決定?*/ }
nginx中的“驚群”問題
nginx一般會運行多個worker進程,這些進程會同時監聽同一端口。當有新連接到來時,內核將這些進程全部喚醒,但只有一個進程能夠和客戶端連接成功,導致其它進程在喚醒時浪費了大量開銷,這被稱為“驚群”現象。nginx解決“驚群”的方法是,讓進程獲得互斥鎖ngx_accept_mutex,讓進程互斥地進入某一段臨界區。在該臨界區中,進程將它所要監聽的連接對應的讀事件添加到epoll模塊中,使得當有“新連接”事件發生時,該worker進程會作出反應。這段加鎖并添加事件的過程是在函數ngx_trylock_accept_mutex中完成的。而當其它進程也進入該函數想要添加讀事件時,發現互斥鎖被另外一個進程持有,所以它只能返回,它所監聽的事件也無法添加到epoll模塊,從而無法響應“新連接”事件。但這會出現一個問題:持有互斥鎖的那個進程在什么時候釋放互斥鎖呢?如果需要等待它處理完所有的事件才釋放鎖的話,那么會需要相當長的時間。而在這段時間內,其它worker進程無法建立新連接,這顯然是不可取的。nginx的解決辦法是:通過ngx_trylock_accept_mutex獲得了互斥鎖的進程,在獲得就緒讀/寫事件并從epoll_wait返回后,將這些事件歸類放入隊列中:
新連接事件放入ngx_posted_accept_events隊列
已有連接事件放入ngx_posted_events隊列
代碼如下:
if?(flags?&?ngx_post_events) { ?/*?延后處理這批事件?*/ ?queue?=?(ngx_event_t?**)?(rev->accept???&ngx_posted_accept_events?:?&ngx_posted_events); ? ?/*?將事件添加到延后執行隊列中?*/ ?ngx_locked_post_event(rev,?queue); } else { ?rev->handler(rev);?/*?不需要延后,則立即處理事件?*/ }
寫事件做類似處理。進程接下來處理ngx_posted_accept_events隊列中的事件,處理完后立即釋放互斥鎖,使該進程占用鎖的時間降到了最低。
nginx中的負載均衡問題
nginx中每個進程使用了一個處理負載均衡的閾值ngx_accept_disabled,它在上圖的第2步中被初始化:
ngx_accept_disabled = ngx_cycle->connection_n / 8 – ngx_cycle->free_connection_n;
它的初值為一個負數,該負數的絕對值等于總連接數的7/8.當閾值小于0時正常響應新連接事件,當閾值大于0時不再響應新連接事件,并將ngx_accept_disabled減1,代碼如下:
if?(ngx_accept_disabled?>?0) { ??ngx_accept_disabled--; } else { ?if?(ngx_trylock_accept_mutex(cycle)?==?ngx_error) ?{ ??return; ?} ?.... }
這說明,當某個進程當前的連接數達到能夠處理的總連接數的7/8時,負載均衡機制被觸發,進程停止響應新連接。