Nginx(二): worker 進程處理流程框架解析
Nginx 啟動起來之后,會有幾個進程運行:1. master 進程接收用戶命令并做出響應(yīng); 2. worker 進程負責(zé)處理各網(wǎng)絡(luò)事件,并同時接收來自master的處理協(xié)調(diào)命令;
master 主要是一控制命令,我們后面再說,而worker則是處理的nginx的核心任務(wù),請求轉(zhuǎn)發(fā)、反向代理、負載均衡等工作。所以我們先來啃啃worker這塊硬骨頭吧!
0. worker 主循環(huán)
worker 的啟動是被master 操作的,作為一個 fork 出來的進程,它擁有和master一樣的內(nèi)存數(shù)據(jù)信息。但它的活動范圍相對較小,所以它并不會替代master的位置。
// unix/ngx_process_cycle.cvoidngx_master_process_cycle(ngx_cycle_t *cycle){char *title;u_char *p;size_t size;ngx_int_t i;ngx_uint_t sigio;sigset_t set;struct itimerval itv;ngx_uint_t live;ngx_msec_t delay;ngx_core_conf_t *ccf;sigemptyset(&set);sigaddset(&set, SIGCHLD);sigaddset(&set, SIGALRM);sigaddset(&set, SIGIO);sigaddset(&set, SIGINT);sigaddset(&set, ngx_signal_value(NGX_RECONFIGURE_SIGNAL));sigaddset(&set, ngx_signal_value(NGX_REOPEN_SIGNAL));sigaddset(&set, ngx_signal_value(NGX_NOACCEPT_SIGNAL));sigaddset(&set, ngx_signal_value(NGX_TERMINATE_SIGNAL));sigaddset(&set, ngx_signal_value(NGX_SHUTDOWN_SIGNAL));sigaddset(&set, ngx_signal_value(NGX_CHANGEBIN_SIGNAL));if (sigprocmask(SIG_BLOCK, &set, NULL) == -1) {ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,"sigprocmask() failed");}sigemptyset(&set);size = sizeof(master_process);for (i = 0; i < ngx_argc; i++) {size += ngx_strlen(ngx_argv[i]) + 1;}title = ngx_pnalloc(cycle->pool, size);if (title == NULL) {/* fatal */exit(2);}p = ngx_cpymem(title, master_process, sizeof(master_process) - 1);for (i = 0; i < ngx_argc; i++) {*p++ = ' ';p = ngx_cpystrn(p, (u_char *) ngx_argv[i], size);}ngx_setproctitle(title);ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);// 啟動之后會主動啟動 worker 進程ngx_start_worker_processes(cycle, ccf->worker_processes,NGX_PROCESS_RESPAWN);ngx_start_cache_manager_processes(cycle, 0);ngx_new_binary = 0;delay = 0;sigio = 0;live = 1;for ( ;; ) {if (delay) {if (ngx_sigalrm) {sigio = 0;delay *= 2;ngx_sigalrm = 0;}ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"termination cycle: %M", delay);itv.it_interval.tv_sec = 0;itv.it_interval.tv_usec = 0;itv.it_value.tv_sec = delay / 1000;itv.it_value.tv_usec = (delay % 1000 ) * 1000;if (setitimer(ITIMER_REAL, &itv, NULL) == -1) {ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,"setitimer() failed");}}ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "sigsuspend");sigsuspend(&set);ngx_time_update();ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"wake up, sigio %i", sigio);if (ngx_reap) {ngx_reap = 0;ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "reap children");live = ngx_reap_children(cycle);}if (!live && (ngx_terminate || ngx_quit)) {ngx_master_process_exit(cycle);}if (ngx_terminate) {if (delay == 0) {delay = 50;}if (sigio) {sigio--;continue;}sigio = ccf->worker_processes + 2 /* cache processes */;if (delay > 1000) {ngx_signal_worker_processes(cycle, SIGKILL);} else {ngx_signal_worker_processes(cycle,ngx_signal_value(NGX_TERMINATE_SIGNAL));}continue;}if (ngx_quit) {ngx_signal_worker_processes(cycle,ngx_signal_value(NGX_SHUTDOWN_SIGNAL));ngx_close_listening_sockets(cycle);continue;}if (ngx_reconfigure) {ngx_reconfigure = 0;if (ngx_new_binary) {ngx_start_worker_processes(cycle, ccf->worker_processes,NGX_PROCESS_RESPAWN);ngx_start_cache_manager_processes(cycle, 0);ngx_noaccepting = 0;continue;}ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reconfiguring");cycle = ngx_init_cycle(cycle);if (cycle == NULL) {cycle = (ngx_cycle_t *) ngx_cycle;continue;}ngx_cycle = cycle;ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx,ngx_core_module);// 收到reconfig命令時,重啟worker 進程ngx_start_worker_processes(cycle, ccf->worker_processes,NGX_PROCESS_JUST_RESPAWN);ngx_start_cache_manager_processes(cycle, 1);/* allow new processes to start */ngx_msleep(100);live = 1;ngx_signal_worker_processes(cycle,ngx_signal_value(NGX_SHUTDOWN_SIGNAL));}if (ngx_restart) {ngx_restart = 0;// 收到重啟命令時,傳遞消息給 workerngx_start_worker_processes(cycle, ccf->worker_processes,NGX_PROCESS_RESPAWN);ngx_start_cache_manager_processes(cycle, 0);live = 1;}if (ngx_reopen) {ngx_reopen = 0;ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");ngx_reopen_files(cycle, ccf->user);ngx_signal_worker_processes(cycle,ngx_signal_value(NGX_REOPEN_SIGNAL));}if (ngx_change_binary) {ngx_change_binary = 0;ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "changing binary");ngx_new_binary = ngx_exec_new_binary(cycle, ngx_argv);}if (ngx_noaccept) {ngx_noaccept = 0;ngx_noaccepting = 1;ngx_signal_worker_processes(cycle,ngx_signal_value(NGX_SHUTDOWN_SIGNAL));}}}static voidngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type){ngx_int_t i;ngx_channel_t ch;ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes");ngx_memzero(&ch, sizeof(ngx_channel_t));ch.command = NGX_CMD_OPEN_CHANNEL;// n 代表worker的進程數(shù), 在 nginx.conf 中配置for (i = 0; i < n; i++) {// 依次啟動 worker 進程,實際上就是通過fork進行子進程啟動的ngx_spawn_process(cycle, ngx_worker_process_cycle,(void *) (intptr_t) i, "worker process", type);ch.pid = ngx_processes[ngx_process_slot].pid;ch.slot = ngx_process_slot;ch.fd = ngx_processes[ngx_process_slot].channel[0];ngx_pass_open_channel(cycle, &ch);}}ngx_pid_tngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,char *name, ngx_int_t respawn){u_long on;ngx_pid_t pid;ngx_int_t s;if (respawn >= 0) {s = respawn;} else {for (s = 0; s < ngx_last_process; s++) {if (ngx_processes[s].pid == -1) {break;}}if (s == NGX_MAX_PROCESSES) {ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,"no more than %d processes can be spawned",NGX_MAX_PROCESSES);return NGX_INVALID_PID;}}if (respawn != NGX_PROCESS_DETACHED) {/* Solaris 9 still has no AF_LOCAL */if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1){ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,"socketpair() failed while spawning \"%s\"", name);return NGX_INVALID_PID;}ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,"channel %d:%d",ngx_processes[s].channel[0],ngx_processes[s].channel[1]);if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,ngx_nonblocking_n " failed while spawning \"%s\"",name);ngx_close_channel(ngx_processes[s].channel, cycle->log);return NGX_INVALID_PID;}if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,ngx_nonblocking_n " failed while spawning \"%s\"",name);ngx_close_channel(ngx_processes[s].channel, cycle->log);return NGX_INVALID_PID;}on = 1;if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,"ioctl(FIOASYNC) failed while spawning \"%s\"", name);ngx_close_channel(ngx_processes[s].channel, cycle->log);return NGX_INVALID_PID;}if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,"fcntl(F_SETOWN) failed while spawning \"%s\"", name);ngx_close_channel(ngx_processes[s].channel, cycle->log);return NGX_INVALID_PID;}if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,"fcntl(FD_CLOEXEC) failed while spawning \"%s\"",name);ngx_close_channel(ngx_processes[s].channel, cycle->log);return NGX_INVALID_PID;}if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,"fcntl(FD_CLOEXEC) failed while spawning \"%s\"",name);ngx_close_channel(ngx_processes[s].channel, cycle->log);return NGX_INVALID_PID;}ngx_channel = ngx_processes[s].channel[1];} else {ngx_processes[s].channel[0] = -1;ngx_processes[s].channel[1] = -1;}ngx_process_slot = s;// fork 出子進程出來pid = fork();switch (pid) {case -1:ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,"fork() failed while spawning \"%s\"", name);ngx_close_channel(ngx_processes[s].channel, cycle->log);return NGX_INVALID_PID;case 0:ngx_parent = ngx_pid;ngx_pid = ngx_getpid();// 子進程將調(diào)用傳入的處理方法,worker 則會進入循環(huán)處理事件邏輯中// 即 ngx_worker_process_cycle 循環(huán)proc(cycle, data);break;default:break;}ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid);ngx_processes[s].pid = pid;ngx_processes[s].exited = 0;if (respawn >= 0) {return pid;}ngx_processes[s].proc = proc;ngx_processes[s].data = data;ngx_processes[s].name = name;ngx_processes[s].exiting = 0;switch (respawn) {case NGX_PROCESS_NORESPAWN:ngx_processes[s].respawn = 0;ngx_processes[s].just_spawn = 0;ngx_processes[s].detached = 0;break;case NGX_PROCESS_JUST_SPAWN:ngx_processes[s].respawn = 0;ngx_processes[s].just_spawn = 1;ngx_processes[s].detached = 0;break;case NGX_PROCESS_RESPAWN:ngx_processes[s].respawn = 1;ngx_processes[s].just_spawn = 0;ngx_processes[s].detached = 0;break;case NGX_PROCESS_JUST_RESPAWN:ngx_processes[s].respawn = 1;ngx_processes[s].just_spawn = 1;ngx_processes[s].detached = 0;break;case NGX_PROCESS_DETACHED:ngx_processes[s].respawn = 0;ngx_processes[s].just_spawn = 0;ngx_processes[s].detached = 1;break;}if (s == ngx_last_process) {ngx_last_process++;}return pid;}// os/unix/ngx_process_cycle.c// worker 主循環(huán)服務(wù)static voidngx_worker_process_cycle(ngx_cycle_t *cycle, void *data){ngx_int_t worker = (intptr_t) data;ngx_process = NGX_PROCESS_WORKER;ngx_worker = worker;ngx_worker_process_init(cycle, worker);// 進程標題 worker processngx_setproctitle("worker process");// 死循環(huán)處理 worker 事務(wù)for ( ;; ) {// 大部分邏輯在接受 master 傳遞過來折命令if (ngx_exiting) {if (ngx_event_no_timers_left() == NGX_OK) {ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");ngx_worker_process_exit(cycle);}}ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");// 這是其核心任務(wù),檢測事件、處理事件ngx_process_events_and_timers(cycle);// 大部分邏輯在接受 master 傳遞過來折命令if (ngx_terminate) {ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");ngx_worker_process_exit(cycle);}// 退出事件if (ngx_quit) {ngx_quit = 0;ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,"gracefully shutting down");ngx_setproctitle("worker process is shutting down");if (!ngx_exiting) {ngx_exiting = 1;ngx_set_shutdown_timer(cycle);ngx_close_listening_sockets(cycle);ngx_close_idle_connections(cycle);}}// reopen 事件if (ngx_reopen) {ngx_reopen = 0;ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");ngx_reopen_files(cycle, -1);}}}
上面就是nginx worker的主要功能體現(xiàn), 使用一個死循環(huán)提供服務(wù). 有很多是接口master命令進行響應(yīng)的邏輯, 咱們忽略其對master命令的響應(yīng),觀其業(yè)務(wù)核心: ngx_process_events_and_timers .
// event/ngx_event.c// nginx worker 處理io事件和超時隊列流程voidngx_process_events_and_timers(ngx_cycle_t *cycle){ngx_uint_t flags;ngx_msec_t timer, delta;if (ngx_timer_resolution) {timer = NGX_TIMER_INFINITE;flags = 0;} else {// 獲取timertimer = ngx_event_find_timer();flags = NGX_UPDATE_TIME;#if (NGX_WIN32)/* handle signals from master in case of network inactivity */if (timer == NGX_TIMER_INFINITE || timer > 500) {timer = 500;}#endif}// 使用鎖進行 tcp 監(jiān)聽// 該鎖基于 shm 實現(xiàn),多進程共享內(nèi)存if (ngx_use_accept_mutex) {// disabled 用于優(yōu)化監(jiān)聽鎖競爭,直到 ngx_accept_disabled 小于0if (ngx_accept_disabled > 0) {ngx_accept_disabled--;} else {// 通過 shm 獲取一個進程鎖,沒搶到鎖則直接返回了// 獲取到accept鎖之后,其會注冊 read 事件監(jiān)聽,所以,當其返回后,則意味著數(shù)據(jù)就緒if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {return;}// 獲取到鎖,設(shè)置 flagsif (ngx_accept_mutex_held) {flags |= NGX_POST_EVENTS;} else {if (timer == NGX_TIMER_INFINITE|| timer > ngx_accept_mutex_delay){timer = ngx_accept_mutex_delay;}}}}// post 事件隊列不為空,則觸發(fā)事件處理if (!ngx_queue_empty(&ngx_posted_next_events)) {ngx_event_move_posted_next(cycle);timer = 0;}delta = ngx_current_msec;// 處理事件 ngx_event_actions.process_events, 將會進行阻塞等待// 此處的 ngx_event_actions 由系統(tǒng)決定如何初始化,如 linux 下// 使用 event/modules/ngx_epoll_module.c 中的定義 ngx_event_actions = ngx_epoll_module_ctx.actions;// 而其他系統(tǒng)則另外決定, 總體來說可能有以下幾種可能// ngx_devpoll_module_ctx.actions;// ngx_epoll_module_ctx.actions;// ngx_eventport_module_ctx.actions;// ngx_iocp_module_ctx.actions;// ngx_kqueue_module_ctx.actions;// ngx_select_module_ctx.actions;// ngx_poll_module_ctx.actions;/*** 其定義樣例如下:static ngx_event_module_t ngx_select_module_ctx = {&select_name,NULL, /* create configuration */ngx_select_init_conf, /* init configuration */{ngx_select_add_event, /* add an event */ngx_select_del_event, /* delete an event */ngx_select_add_event, /* enable an event */ngx_select_del_event, /* disable an event */NULL, /* add an connection */NULL, /* delete an connection */NULL, /* trigger a notify */ngx_select_process_events, /* process the events */ngx_select_init, /* init the events */ngx_select_done /* done the events */}};*/(void) ngx_process_events(cycle, timer, flags);// 計算耗時delta = ngx_current_msec - delta;ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"timer delta: %M", delta);// 處理 posted 事件,它存放在 ngx_posted_accept_events 隊列中ngx_event_process_posted(cycle, &ngx_posted_accept_events);// 處理完事件后,釋放鎖if (ngx_accept_mutex_held) {ngx_shmtx_unlock(&ngx_accept_mutex);}// 處理超時的任務(wù)if (delta) {ngx_event_expire_timers();}// 讀寫事件將會被添加到 ngx_posted_events 隊列中ngx_event_process_posted(cycle, &ngx_posted_events);}
以上也就是nginx worker的主要功能框架了:
1. 先通過shm獲取tcp的監(jiān)聽鎖, 避免socket驚群;
2. 獲取到鎖的worker進程, 將會注冊accept的read事件,沒有搶到鎖的進程不會立即返回,因為他還可以繼續(xù)處理其他事件,以及在之前被監(jiān)聽到的socket(此處io事件處理決定了worker不會進行空轉(zhuǎn));
3. 如果有 ngx_posted_next_events 隊列, 則先處理其隊列請求;
4. 根據(jù)系統(tǒng)類型調(diào)用網(wǎng)絡(luò)io模塊, select 機制接收io事件;
5. 接入accept事件后, 釋放accept鎖(基于shm);
6. 處理過期超時隊列;
7. 處理普通的已接入的socket的讀寫事件;
一次處理往往只會處理部分事件, 比如可能只是處理了 accept, read 則需要在下一次或n次之后才會處理, 這也是異步機制非阻塞的體現(xiàn).
1.worker 時序圖
下面我先給到一個整個worker的工作時序圖, 以便有個整體的認知.

接下來我們從幾個點依次簡單看看 nginx 是如何處理各細節(jié)的.
2. 獲取accept鎖及注冊accept事件
由于nginx是基于多進程實現(xiàn)的并發(fā)處理, 那么各進程必然都需要監(jiān)聽相同的端口數(shù)據(jù), 如果沒有鎖控制, 則當有事件到達時, 必然導(dǎo)致各進程同時被喚醒, 即所謂的驚群. 所以, nginx 提供了一個鎖機制, 使同一時刻只有一個進程在監(jiān)聽某端口, 從而避免競爭.? 實現(xiàn)方式是基于共享內(nèi)存 shm 實現(xiàn).(如果是多線程方式會更簡單喲)
// event/ngx_event_accept.cngx_int_tngx_trylock_accept_mutex(ngx_cycle_t *cycle){// 首先獲取shm鎖, 通過 shm 實現(xiàn)進程數(shù)據(jù)共享if (ngx_shmtx_trylock(&ngx_accept_mutex)) {ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"accept mutex locked");// 如果上一次就是自己執(zhí)行的accept操作, 則直接返回// 否則需要重新注冊accept監(jiān)聽if (ngx_accept_mutex_held && ngx_accept_events == 0) {return NGX_OK;}// 注冊 accept 事件if (ngx_enable_accept_events(cycle) == NGX_ERROR) {ngx_shmtx_unlock(&ngx_accept_mutex);return NGX_ERROR;}ngx_accept_events = 0;ngx_accept_mutex_held = 1;return NGX_OK;}ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"accept mutex lock failed: %ui", ngx_accept_mutex_held);if (ngx_accept_mutex_held) {// 如果沒有獲取到鎖,則將之前注冊的 accept 事件取消,避免驚群if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {return NGX_ERROR;}ngx_accept_mutex_held = 0;}// 不管有沒有獲取到鎖, 都會執(zhí)行后續(xù)的邏輯, 因為除了 accept 外, 還有read/write事件需要處理return NGX_OK;}// core/ngx_shmtx.c, 獲取鎖,鎖的值為當前進程idngx_uint_tngx_shmtx_trylock(ngx_shmtx_t *mtx){return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));}// 注冊 accept 事件監(jiān)聽// event/ngx_event_accept.cngx_int_tngx_enable_accept_events(ngx_cycle_t *cycle){ngx_uint_t i;ngx_listening_t *ls;ngx_connection_t *c;ls = cycle->listening.elts;for (i = 0; i < cycle->listening.nelts; i++) {c = ls[i].connection;if (c == NULL || c->read->active) {continue;}// 注冊accept事件,READ ?// 交由 ngx_event_actions.add 處理, 實際運行由系統(tǒng)決定, 如 ngx_select_add_eventif (ngx_add_event(c->read, NGX_READ_EVENT, 0) == NGX_ERROR) {return NGX_ERROR;}}return NGX_OK;}// event/module/ngx_select_module.c// 注冊一個 io 事件監(jiān)聽, fd_setstatic ngx_int_tngx_select_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags){ngx_connection_t *c;c = ev->data;ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,"select add event fd:%d ev:%i", c->fd, event);if (ev->index != NGX_INVALID_INDEX) {ngx_log_error(NGX_LOG_ALERT, ev->log, 0,"select event fd:%d ev:%i is already set", c->fd, event);return NGX_OK;}if ((event == NGX_READ_EVENT && ev->write)|| (event == NGX_WRITE_EVENT && !ev->write)){ngx_log_error(NGX_LOG_ALERT, ev->log, 0,"invalid select %s event fd:%d ev:%i",ev->write ? "write" : "read", c->fd, event);return NGX_ERROR;}if (event == NGX_READ_EVENT) {FD_SET(c->fd, &master_read_fd_set);} else if (event == NGX_WRITE_EVENT) {FD_SET(c->fd, &master_write_fd_set);}if (max_fd != -1 && max_fd < c->fd) {max_fd = c->fd;}ev->active = 1;event_index[nevents] = ev;ev->index = nevents;nevents++;return NGX_OK;}
主要就是shm的應(yīng)用,以及fd_set處理。
3. 通用處理隊列實現(xiàn)
在 ngx_process_events_and_timers 中, 我們看到, 在io事件返回之后, 都會多次進行隊列處理. 它們的不同僅在于 隊列不同. 那么, 它是如何實現(xiàn)這個處理過程的呢?
我們分兩塊來看這事: 1. 隊列的數(shù)據(jù)結(jié)構(gòu); 2. 執(zhí)行隊列任務(wù); so... 就這樣唄.
// 1. 隊列數(shù)據(jù)結(jié)構(gòu)// 額, 兩個循環(huán)嵌套的指針就是其結(jié)構(gòu)了typedef struct ngx_queue_s ngx_queue_t;struct ngx_queue_s {ngx_queue_t *prev;ngx_queue_t *next;};// 實際上, 此處還會有一個強制類型轉(zhuǎn)換 ngx_event_ttypedef struct ngx_event_s ngx_event_t;struct ngx_event_s {void *data;unsigned write:1;unsigned accept:1;/* used to detect the stale events in kqueue and epoll */unsigned instance:1;/** the event was passed or would be passed to a kernel;* in aio mode - operation was posted.*/unsigned active:1;unsigned disabled:1;/* the ready event; in aio mode 0 means that no operation can be posted */unsigned ready:1;unsigned oneshot:1;/* aio operation is complete */unsigned complete:1;unsigned eof:1;unsigned error:1;unsigned timedout:1;unsigned timer_set:1;unsigned delayed:1;unsigned deferred_accept:1;/* the pending eof reported by kqueue, epoll or in aio chain operation */unsigned pending_eof:1;unsigned posted:1;unsigned closed:1;/* to test on worker exit */unsigned channel:1;unsigned resolver:1;unsigned cancelable:1;#if (NGX_HAVE_KQUEUE)unsigned kq_vnode:1;/* the pending errno reported by kqueue */int kq_errno;#endif/** kqueue only:* accept: number of sockets that wait to be accepted* read: bytes to read when event is ready* or lowat when event is set with NGX_LOWAT_EVENT flag* write: available space in buffer when event is ready* or lowat when event is set with NGX_LOWAT_EVENT flag** iocp: TODO** otherwise:* accept: 1 if accept many, 0 otherwise* read: bytes to read when event is ready, -1 if not known*/int available;// 這個handler 比較重要, 它決定了本事件如何進行處理ngx_event_handler_pt handler;#if (NGX_HAVE_IOCP)ngx_event_ovlp_t ovlp;#endifngx_uint_t index;ngx_log_t *log;ngx_rbtree_node_t timer;// queue 則是存放整個隊列所有數(shù)據(jù)的地方/* the posted queue */ngx_queue_t queue;#if 0/* the threads support *//** the event thread context, we store it here* if $(CC) does not understand __thread declaration* and pthread_getspecific() is too costly*/void *thr_ctx;#if (NGX_EVENT_T_PADDING)/* event should not cross cache line in SMP */uint32_t padding[NGX_EVENT_T_PADDING];#endif#endif};// 有了數(shù)據(jù)結(jié)構(gòu)支持后, 要處理隊列就簡單了, 只需遍歷數(shù)據(jù)即可// event/ngx_event_posted.cvoidngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted){ngx_queue_t *q;ngx_event_t *ev;while (!ngx_queue_empty(posted)) {q = ngx_queue_head(posted);ev = ngx_queue_data(q, ngx_event_t, queue);ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"posted event %p", ev);// 先刪除事件,再進行處理, 這在單進程單線程下沒有問題的喲ngx_delete_posted_event(ev);// 調(diào)用 event 對應(yīng)的handler 處理事件// 所以核心在于這個 handler 的定義ev->handler(ev);}}
以上的實現(xiàn), 雖然是面向過程語言寫的, 但因為有 struct 數(shù)據(jù)類型的支持, 實際上也是面向?qū)ο蟮母拍钅?
4. io事件的監(jiān)聽實現(xiàn)
作為一個web服務(wù)器或者反向代理服務(wù)器, 其核心必然是網(wǎng)絡(luò)io事件的處理. nginx 會根據(jù)不同的操作系統(tǒng)支持, 選擇不同的io模型進行io事件的監(jiān)聽, 充分發(fā)揮系統(tǒng)的性能. 這也是其制勝之道吧. 具體如何確定哪種類型, 實際上可以在進行編譯的時候, 獲取系統(tǒng)變量來斷定. (稍詳細的說明, 見前面代碼注釋)
我們以 select 的實現(xiàn)來看看細節(jié):
// event/module/ngx_select_module.c// io 事件監(jiān)聽static ngx_int_tngx_select_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,ngx_uint_t flags){int ready, nready;ngx_err_t err;ngx_uint_t i, found;ngx_event_t *ev;ngx_queue_t *queue;struct timeval tv, *tp;ngx_connection_t *c;// 獲取 max_fd, 系統(tǒng)傳值需要if (max_fd == -1) {for (i = 0; i < nevents; i++) {c = event_index[i]->data;if (max_fd < c->fd) {max_fd = c->fd;}}ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"change max_fd: %i", max_fd);}#if (NGX_DEBUG)if (cycle->log->log_level & NGX_LOG_DEBUG_ALL) {for (i = 0; i < nevents; i++) {ev = event_index[i];c = ev->data;ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"select event: fd:%d wr:%d", c->fd, ev->write);}ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"max_fd: %i", max_fd);}#endifif (timer == NGX_TIMER_INFINITE) {tp = NULL;} else {tv.tv_sec = (long) (timer / 1000);tv.tv_usec = (long) ((timer % 1000) * 1000);tp = &tv;}ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"select timer: %M", timer);work_read_fd_set = master_read_fd_set;work_write_fd_set = master_write_fd_set;// 在此處交由內(nèi)核進行處理網(wǎng)絡(luò)事件,epoll 機制,至少有一個事件到來時返回// tp 代表是否要超時退出ready = select(max_fd + 1, &work_read_fd_set, &work_write_fd_set, NULL, tp);err = (ready == -1) ? ngx_errno : 0;if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {// 事件結(jié)束后,先嘗試更新gmtTime 時間信息ngx_time_update();}ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"select ready %d", ready);if (err) {ngx_uint_t level;if (err == NGX_EINTR) {if (ngx_event_timer_alarm) {ngx_event_timer_alarm = 0;return NGX_OK;}level = NGX_LOG_INFO;} else {level = NGX_LOG_ALERT;}ngx_log_error(level, cycle->log, err, "select() failed");if (err == NGX_EBADF) {ngx_select_repair_fd_sets(cycle);}return NGX_ERROR;}if (ready == 0) {if (timer != NGX_TIMER_INFINITE) {return NGX_OK;}ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,"select() returned no events without timeout");return NGX_ERROR;}nready = 0;// 遍歷所有事件for (i = 0; i < nevents; i++) {ev = event_index[i];c = ev->data;found = 0;// 寫事件處理if (ev->write) {if (FD_ISSET(c->fd, &work_write_fd_set)) {found = 1;ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"select write %d", c->fd);}}// 讀或accept事件else {if (FD_ISSET(c->fd, &work_read_fd_set)) {found = 1;ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"select read %d", c->fd);}}// 讀寫就緒事件 found 都為1if (found) {ev->ready = 1;ev->available = -1;// 如果是 accept 事件則取 ngx_posted_accept_events 隊列// 否則取 ngx_posted_events 隊列queue = ev->accept ? &ngx_posted_accept_events: &ngx_posted_events;// 將事件插入到相應(yīng)隊列尾部ngx_post_event(ev, queue);// 有效就緒事件+1nready++;}}// 如果兩個值不相等,則需要修正下if (ready != nready) {ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,"select ready != events: %d:%d", ready, nready);ngx_select_repair_fd_sets(cycle);}return NGX_OK;}
上面就是io事件的處理的了, 因為是 select 的實現(xiàn), 所以調(diào)用系統(tǒng)的 select() 函數(shù)即可接收網(wǎng)絡(luò)事件了. 具體能獲取哪些事件, 實際上前面的工作已經(jīng)決定了. 此處只是一個執(zhí)行者的角色. 它是否高效, 則是取決于操作系統(tǒng)的io模型是否高效了. 有興趣的同學(xué)可以看下 epoll 的實現(xiàn).
5. accept 事件的處理
當系統(tǒng)發(fā)現(xiàn)有新的網(wǎng)絡(luò)連接進來時, 會生成一個accept的事件, 給到應(yīng)用. nginx 接收到accept事件后, 會放入 ngx_posted_accept_events 中, 然后調(diào)用通用隊列處理方法處理隊列. 此處的 handler 是 ngx_event_accept .? 其核心工作就是建立新的socket連接, 以便后續(xù)讀寫.
// event/ngx_event_accept.c// accept 事件處理入口voidngx_event_accept(ngx_event_t *ev){socklen_t socklen;ngx_err_t err;ngx_log_t *log;ngx_uint_t level;ngx_socket_t s;ngx_event_t *rev, *wev;ngx_sockaddr_t sa;ngx_listening_t *ls;ngx_connection_t *c, *lc;ngx_event_conf_t *ecf;#if (NGX_HAVE_ACCEPT4)static ngx_uint_t use_accept4 = 1;#endifif (ev->timedout) {if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) {return;}ev->timedout = 0;}// 獲取配置信息ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {ev->available = ecf->multi_accept;}lc = ev->data;ls = lc->listening;ev->ready = 0;ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,"accept on %V, ready: %d", &ls->addr_text, ev->available);// 循環(huán)處理socket數(shù)據(jù)do {socklen = sizeof(ngx_sockaddr_t);#if (NGX_HAVE_ACCEPT4)if (use_accept4) {// 調(diào)用accept() 方法接入socket連接s = accept4(lc->fd, &sa.sockaddr, &socklen, SOCK_NONBLOCK);} else {s = accept(lc->fd, &sa.sockaddr, &socklen);}#elses = accept(lc->fd, &sa.sockaddr, &socklen);#endifif (s == (ngx_socket_t) -1) {err = ngx_socket_errno;if (err == NGX_EAGAIN) {ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err,"accept() not ready");return;}level = NGX_LOG_ALERT;if (err == NGX_ECONNABORTED) {level = NGX_LOG_ERR;} else if (err == NGX_EMFILE || err == NGX_ENFILE) {level = NGX_LOG_CRIT;}#if (NGX_HAVE_ACCEPT4)ngx_log_error(level, ev->log, err,use_accept4 ? "accept4() failed" : "accept() failed");if (use_accept4 && err == NGX_ENOSYS) {use_accept4 = 0;ngx_inherited_nonblocking = 0;continue;}#elsengx_log_error(level, ev->log, err, "accept() failed");#endifif (err == NGX_ECONNABORTED) {if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {ev->available--;}if (ev->available) {continue;}}if (err == NGX_EMFILE || err == NGX_ENFILE) {if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle, 1)!= NGX_OK){return;}if (ngx_use_accept_mutex) {if (ngx_accept_mutex_held) {ngx_shmtx_unlock(&ngx_accept_mutex);ngx_accept_mutex_held = 0;}ngx_accept_disabled = 1;} else {ngx_add_timer(ev, ecf->accept_mutex_delay);}}return;}#if (NGX_STAT_STUB)(void) ngx_atomic_fetch_add(ngx_stat_accepted, 1);#endifngx_accept_disabled = ngx_cycle->connection_n / 8- ngx_cycle->free_connection_n;// 獲取socket讀寫指針c = ngx_get_connection(s, ev->log);if (c == NULL) {if (ngx_close_socket(s) == -1) {ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,ngx_close_socket_n " failed");}return;}c->type = SOCK_STREAM;#if (NGX_STAT_STUB)(void) ngx_atomic_fetch_add(ngx_stat_active, 1);#endif// 創(chuàng)建內(nèi)存空間c->pool = ngx_create_pool(ls->pool_size, ev->log);if (c->pool == NULL) {ngx_close_accepted_connection(c);return;}if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) {socklen = sizeof(ngx_sockaddr_t);}c->sockaddr = ngx_palloc(c->pool, socklen);if (c->sockaddr == NULL) {ngx_close_accepted_connection(c);return;}ngx_memcpy(c->sockaddr, &sa, socklen);log = ngx_palloc(c->pool, sizeof(ngx_log_t));if (log == NULL) {ngx_close_accepted_connection(c);return;}/* set a blocking mode for iocp and non-blocking mode for others */if (ngx_inherited_nonblocking) {if (ngx_event_flags & NGX_USE_IOCP_EVENT) {if (ngx_blocking(s) == -1) {ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,ngx_blocking_n " failed");ngx_close_accepted_connection(c);return;}}} else {if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {if (ngx_nonblocking(s) == -1) {ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,ngx_nonblocking_n " failed");ngx_close_accepted_connection(c);return;}}}*log = ls->log;// 創(chuàng)建各種上下文環(huán)境給到socket連接c->recv = ngx_recv;c->send = ngx_send;c->recv_chain = ngx_recv_chain;c->send_chain = ngx_send_chain;c->log = log;c->pool->log = log;c->socklen = socklen;c->listening = ls;c->local_sockaddr = ls->sockaddr;c->local_socklen = ls->socklen;#if (NGX_HAVE_UNIX_DOMAIN)if (c->sockaddr->sa_family == AF_UNIX) {c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;#if (NGX_SOLARIS)/* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */c->sendfile = 0;#endif}#endifrev = c->read;wev = c->write;wev->ready = 1;if (ngx_event_flags & NGX_USE_IOCP_EVENT) {rev->ready = 1;}if (ev->deferred_accept) {rev->ready = 1;#if (NGX_HAVE_KQUEUE || NGX_HAVE_EPOLLRDHUP)rev->available = 1;#endif}rev->log = log;wev->log = log;/** TODO: MT: - ngx_atomic_fetch_add()* or protection by critical section or light mutex** TODO: MP: - allocated in a shared memory* - ngx_atomic_fetch_add()* or protection by critical section or light mutex*/c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);#if (NGX_STAT_STUB)(void) ngx_atomic_fetch_add(ngx_stat_handled, 1);#endifif (ls->addr_ntop) {c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);if (c->addr_text.data == NULL) {ngx_close_accepted_connection(c);return;}c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,c->addr_text.data,ls->addr_text_max_len, 0);if (c->addr_text.len == 0) {ngx_close_accepted_connection(c);return;}}#if (NGX_DEBUG){ngx_str_t addr;u_char text[NGX_SOCKADDR_STRLEN];ngx_debug_accepted_connection(ecf, c);if (log->log_level & NGX_LOG_DEBUG_EVENT) {addr.data = text;addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text,NGX_SOCKADDR_STRLEN, 1);ngx_log_debug3(NGX_LOG_DEBUG_EVENT, log, 0,"*%uA accept: %V fd:%d", c->number, &addr, s);}}#endifif (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {if (ngx_add_conn(c) == NGX_ERROR) {ngx_close_accepted_connection(c);return;}}log->data = NULL;log->handler = NULL;// 處理就緒的io事件,讀寫事件,此處將會轉(zhuǎn)到 http 模塊處理ls->handler(c);if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {ev->available--;}} while (ev->available);}// http/ngx_http_request.c// 初始化socket連接, 接入 http模塊voidngx_http_init_connection(ngx_connection_t *c){ngx_uint_t i;ngx_event_t *rev;struct sockaddr_in *sin;ngx_http_port_t *port;ngx_http_in_addr_t *addr;ngx_http_log_ctx_t *ctx;ngx_http_connection_t *hc;#if (NGX_HAVE_INET6)struct sockaddr_in6 *sin6;ngx_http_in6_addr_t *addr6;#endif// 分配數(shù)據(jù)內(nèi)存hc = ngx_pcalloc(c->pool, sizeof(ngx_http_connection_t));if (hc == NULL) {ngx_http_close_connection(c);return;}c->data = hc;/* find the server configuration for the address:port */port = c->listening->servers;if (port->naddrs > 1) {/** there are several addresses on this port and one of them* is an "*:port" wildcard so getsockname() in ngx_http_server_addr()* is required to determine a server address*/if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) {ngx_http_close_connection(c);return;}// 根據(jù)網(wǎng)絡(luò)類型處理switch (c->local_sockaddr->sa_family) {#if (NGX_HAVE_INET6)case AF_INET6:sin6 = (struct sockaddr_in6 *) c->local_sockaddr;addr6 = port->addrs;/* the last address is "*" */for (i = 0; i < port->naddrs - 1; i++) {if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) == 0) {break;}}hc->addr_conf = &addr6[i].conf;break;#endifdefault: /* AF_INET */sin = (struct sockaddr_in *) c->local_sockaddr;addr = port->addrs;/* the last address is "*" */for (i = 0; i < port->naddrs - 1; i++) {if (addr[i].addr == sin->sin_addr.s_addr) {break;}}hc->addr_conf = &addr[i].conf;break;}} else {switch (c->local_sockaddr->sa_family) {#if (NGX_HAVE_INET6)case AF_INET6:addr6 = port->addrs;hc->addr_conf = &addr6[0].conf;break;#endifdefault: /* AF_INET */addr = port->addrs;hc->addr_conf = &addr[0].conf;break;}}/* the default server configuration for the address:port */hc->conf_ctx = hc->addr_conf->default_server->ctx;ctx = ngx_palloc(c->pool, sizeof(ngx_http_log_ctx_t));if (ctx == NULL) {ngx_http_close_connection(c);return;}ctx->connection = c;ctx->request = NULL;ctx->current_request = NULL;c->log->connection = c->number;// 每個http server 都有自己的日志記錄控制c->log->handler = ngx_http_log_error;c->log->data = ctx;c->log->action = "waiting for request";c->log_error = NGX_ERROR_INFO;rev = c->read;// 設(shè)置接收數(shù)據(jù)處理器為 ngx_http_wait_request_handlerrev->handler = ngx_http_wait_request_handler;c->write->handler = ngx_http_empty_handler;#if (NGX_HTTP_V2)if (hc->addr_conf->http2) {rev->handler = ngx_http_v2_init;}#endif#if (NGX_HTTP_SSL){ngx_http_ssl_srv_conf_t *sscf;sscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_ssl_module);if (sscf->enable || hc->addr_conf->ssl) {hc->ssl = 1;c->log->action = "SSL handshaking";rev->handler = ngx_http_ssl_handshake;}}#endifif (hc->addr_conf->proxy_protocol) {hc->proxy_protocol = 1;c->log->action = "reading PROXY protocol";}if (rev->ready) {/* the deferred accept(), iocp */if (ngx_use_accept_mutex) {ngx_post_event(rev, &ngx_posted_events);return;}rev->handler(rev);return;}// 將rev 放入到 ngx_event_timer_rbtree 隊列中, 紅黑樹實現(xiàn)ngx_add_timer(rev, c->listening->post_accept_timeout);// 重用 connectionngx_reusable_connection(c, 1);// 處理 讀就緒事件,注冊 read 監(jiān)聽if (ngx_handle_read_event(rev, 0) != NGX_OK) {ngx_http_close_connection(c);return;}}// event/ngx_event.c// 通用處理: 讀事件邏輯ngx_int_tngx_handle_read_event(ngx_event_t *rev, ngx_uint_t flags){if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {/* kqueue, epoll */if (!rev->active && !rev->ready) {if (ngx_add_event(rev, NGX_READ_EVENT, NGX_CLEAR_EVENT)== NGX_ERROR){return NGX_ERROR;}}return NGX_OK;} else if (ngx_event_flags & NGX_USE_LEVEL_EVENT) {/* select, poll, /dev/poll */if (!rev->active && !rev->ready) {// ngx_event_actions.add, 實際為 ngx_select_add_event// 注冊讀事件if (ngx_add_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT)== NGX_ERROR){return NGX_ERROR;}return NGX_OK;}if (rev->active && (rev->ready || (flags & NGX_CLOSE_EVENT))) {if (ngx_del_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT | flags)== NGX_ERROR){return NGX_ERROR;}return NGX_OK;}} else if (ngx_event_flags & NGX_USE_EVENTPORT_EVENT) {/* event ports */if (!rev->active && !rev->ready) {if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {return NGX_ERROR;}return NGX_OK;}if (rev->oneshot && !rev->ready) {if (ngx_del_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {return NGX_ERROR;}return NGX_OK;}}/* iocp */return NGX_OK;}
大體上就是,先調(diào)用內(nèi)核的accept() 方法,接入socket, 然后調(diào)用 http 模塊init handler, 注冊讀事件, 以便后續(xù)可以讀取數(shù)據(jù)。至于什么時候會進行真正地讀數(shù)據(jù)請求,則不一定。
6. read 事件處理
經(jīng)過前面的accept處理,nginx會注冊read事件,且會將handler設(shè)置為 ngx_http_wait_request_handler, 當數(shù)據(jù)就緒后,就會從 通用處理隊列 的入口處,轉(zhuǎn)到http處理模塊處理 io 事件。
// http/ngx_http_request.c// 處理socket讀事件static voidngx_http_wait_request_handler(ngx_event_t *rev){u_char *p;size_t size;ssize_t n;ngx_buf_t *b;ngx_connection_t *c;ngx_http_connection_t *hc;ngx_http_core_srv_conf_t *cscf;c = rev->data;ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http wait request handler");if (rev->timedout) {ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");ngx_http_close_connection(c);return;}if (c->close) {ngx_http_close_connection(c);return;}hc = c->data;cscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_core_module);// 默認1024 緩沖大小size = cscf->client_header_buffer_size;b = c->buffer;// 首次接入時,創(chuàng)建初始空間if (b == NULL) {// 創(chuàng)建緩沖區(qū)接收http傳過來的數(shù)據(jù)b = ngx_create_temp_buf(c->pool, size);if (b == NULL) {ngx_http_close_connection(c);return;}c->buffer = b;} else if (b->start == NULL) {// 緩沖沖填滿,需要另外增加空間?b->start = ngx_palloc(c->pool, size);if (b->start == NULL) {ngx_http_close_connection(c);return;}b->pos = b->start;b->last = b->start;b->end = b->last + size;}// 接收數(shù)據(jù)n = c->recv(c, b->last, size);if (n == NGX_AGAIN) {if (!rev->timer_set) {ngx_add_timer(rev, c->listening->post_accept_timeout);ngx_reusable_connection(c, 1);}if (ngx_handle_read_event(rev, 0) != NGX_OK) {ngx_http_close_connection(c);return;}/** We are trying to not hold c->buffer's memory for an idle connection.*/// 如果還要等待更多數(shù)據(jù),釋放占有空間if (ngx_pfree(c->pool, b->start) == NGX_OK) {b->start = NULL;}return;}if (n == NGX_ERROR) {ngx_http_close_connection(c);return;}if (n == 0) {ngx_log_error(NGX_LOG_INFO, c->log, 0,"client closed connection");ngx_http_close_connection(c);return;}b->last += n;// 如果配置了 proxy_pass (且匹配了模式), 則直代理邏輯if (hc->proxy_protocol) {hc->proxy_protocol = 0;p = ngx_proxy_protocol_read(c, b->pos, b->last);if (p == NULL) {ngx_http_close_connection(c);return;}b->pos = p;if (b->pos == b->last) {c->log->action = "waiting for request";b->pos = b->start;b->last = b->start;ngx_post_event(rev, &ngx_posted_events);return;}}c->log->action = "reading client request line";// 設(shè)置不可重用連接ngx_reusable_connection(c, 0);// 創(chuàng)建 http 連接請求, 分配內(nèi)存空, 設(shè)置下一個 handler 等等c->data = ngx_http_create_request(c);if (c->data == NULL) {ngx_http_close_connection(c);return;}// 設(shè)置讀取數(shù)據(jù)的處理器為 ngx_http_process_request_line, 以便下次使用rev->handler = ngx_http_process_request_line;ngx_http_process_request_line(rev);}// http/ngx_http_request.c// 讀取body數(shù)據(jù),并響應(yīng)客戶端static voidngx_http_process_request_line(ngx_event_t *rev){ssize_t n;ngx_int_t rc, rv;ngx_str_t host;ngx_connection_t *c;ngx_http_request_t *r;c = rev->data;r = c->data;ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0,"http process request line");if (rev->timedout) {ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");c->timedout = 1;ngx_http_close_request(r, NGX_HTTP_REQUEST_TIME_OUT);return;}rc = NGX_AGAIN;for ( ;; ) {if (rc == NGX_AGAIN) {// 讀取headern = ngx_http_read_request_header(r);if (n == NGX_AGAIN || n == NGX_ERROR) {break;}}// 讀取body 數(shù)據(jù), 按照http協(xié)議解析,非常長rc = ngx_http_parse_request_line(r, r->header_in);if (rc == NGX_OK) {/* the request line has been parsed successfully */r->request_line.len = r->request_end - r->request_start;r->request_line.data = r->request_start;r->request_length = r->header_in->pos - r->request_start;ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,"http request line: \"%V\"", &r->request_line);r->method_name.len = r->method_end - r->request_start + 1;r->method_name.data = r->request_line.data;if (r->http_protocol.data) {r->http_protocol.len = r->request_end - r->http_protocol.data;}// 處理 uri, 解析路徑if (ngx_http_process_request_uri(r) != NGX_OK) {break;}if (r->schema_end) {r->schema.len = r->schema_end - r->schema_start;r->schema.data = r->schema_start;}if (r->host_end) {host.len = r->host_end - r->host_start;host.data = r->host_start;rc = ngx_http_validate_host(&host, r->pool, 0);if (rc == NGX_DECLINED) {ngx_log_error(NGX_LOG_INFO, c->log, 0,"client sent invalid host in request line");ngx_http_finalize_request(r, NGX_HTTP_BAD_REQUEST);break;}if (rc == NGX_ERROR) {ngx_http_close_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);break;}if (ngx_http_set_virtual_server(r, &host) == NGX_ERROR) {break;}r->headers_in.server = host;}if (r->http_version < NGX_HTTP_VERSION_10) {if (r->headers_in.server.len == 0&& ngx_http_set_virtual_server(r, &r->headers_in.server)== NGX_ERROR){break;}ngx_http_process_request(r);break;}if (ngx_list_init(&r->headers_in.headers, r->pool, 20,sizeof(ngx_table_elt_t))!= NGX_OK){ngx_http_close_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);break;}c->log->action = "reading client request headers";rev->handler = ngx_http_process_request_headers;ngx_http_process_request_headers(rev);break;}if (rc != NGX_AGAIN) {/* there was error while a request line parsing */ngx_log_error(NGX_LOG_INFO, c->log, 0,ngx_http_client_errors[rc - NGX_HTTP_CLIENT_ERROR]);if (rc == NGX_HTTP_PARSE_INVALID_VERSION) {ngx_http_finalize_request(r, NGX_HTTP_VERSION_NOT_SUPPORTED);} else {ngx_http_finalize_request(r, NGX_HTTP_BAD_REQUEST);}break;}/* NGX_AGAIN: a request line parsing is still incomplete */if (r->header_in->pos == r->header_in->end) {rv = ngx_http_alloc_large_header_buffer(r, 1);if (rv == NGX_ERROR) {ngx_http_close_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);break;}if (rv == NGX_DECLINED) {r->request_line.len = r->header_in->end - r->request_start;r->request_line.data = r->request_start;ngx_log_error(NGX_LOG_INFO, c->log, 0,"client sent too long URI");ngx_http_finalize_request(r, NGX_HTTP_REQUEST_URI_TOO_LARGE);break;}}}// 處理請求, 響應(yīng)客戶端ngx_http_run_posted_requests(c);}// http/ngx_http_request.c// 已經(jīng)處理好的請求處理voidngx_http_run_posted_requests(ngx_connection_t *c){ngx_http_request_t *r;ngx_http_posted_request_t *pr;// 循環(huán)處理數(shù)據(jù),直到完成for ( ;; ) {if (c->destroyed) {return;}r = c->data;pr = r->main->posted_requests;if (pr == NULL) {return;}r->main->posted_requests = pr->next;r = pr->request;ngx_http_set_log_request(c->log, r);ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,"http posted request: \"%V?%V\"", &r->uri, &r->args);// 寫客戶端r->write_event_handler(r);}}
以上就是一個簡單視角的 http 請求的處理大體流程了。從中我們大概也理解了,nginx的處理邏輯,和我們想像的方案并沒有太大差別,先讀取url請求,判斷是否特殊轉(zhuǎn)發(fā)設(shè)置,讀取body數(shù)據(jù),如果沒有特殊設(shè)置則定位到相應(yīng)文件直接響應(yīng)客戶端。(具體如何響應(yīng),我們后續(xù)再說)
本篇主要站在一個全局的角度,整體上理解nginx的處理請求流程,希望對大家理解nginx有一定的幫助。當然有很多的細節(jié)還未厘清,敬請期待。

騰訊、阿里、滴滴后臺面試題匯總總結(jié) — (含答案)
面試:史上最全多線程面試題 !
最新阿里內(nèi)推Java后端面試題
JVM難學(xué)?那是因為你沒認真看完這篇文章

關(guān)注作者微信公眾號 —《JAVA爛豬皮》
了解更多java后端架構(gòu)知識以及最新面試寶典


看完本文記得給作者點贊+在看哦~~~大家的支持,是作者源源不斷出文的動力
作者:等你歸去來
出處:https://www.cnblogs.com/yougewe/p/13659987.html
