揭開高性能服務(wù)器底層面紗

揭開高性能服務(wù)器底層面紗
一、前言
我們經(jīng)常聽說高性能服務(wù)器,那什么是高性能服務(wù)器;用大白話來解釋就是說處理事件快,效率高,占用服務(wù)器資源少,多路復(fù)用等等集萬千寵愛于一身;但是,往往要想做到高性能,這是非常難的,需要一個好的優(yōu)秀的架構(gòu)和底層接口。
這篇文章只限于 linux 平臺,對于 windows 平臺下,可以去參考下 IOCP 的用法,這里就不多說了~
目前主流的高性能服務(wù)器底層都是封裝了 EPOLL 接口,使用 epoll 進行事件處理,為什么 epoll 可以作為高性能服務(wù)器底層事件處理?那就讓我們從源碼下手,來揭開面紗~
二、源碼解讀
兩個至關(guān)重要的結(jié)構(gòu)體
eventpoll結(jié)構(gòu)體:
/*
* 此結(jié)構(gòu)體存儲在file->private_data中
*/
/*
eventpoll結(jié)構(gòu)體是epoll的核心里面存放著許多信息,主要包括
1. struct rb_root rbr;這是一顆紅黑樹的根節(jié)點,代表著一顆紅黑樹,
紅黑樹下面掛的是我們感興趣的socket的事件,當我們調(diào)用epoll_ctl向
epoll添加感興趣的socket事件時,系統(tǒng)將我們的傳遞的信息封裝成
struct epitem結(jié)構(gòu)體,然后掛到這顆紅黑樹的相應(yīng)節(jié)點上
2.struct list_head rdllist;這是一個雙向鏈表,這個雙向鏈表中存放
的是就緒的事件當我們調(diào)用epoll_wait的時候這些事件會返回給用戶
3.struct file *file;文件結(jié)構(gòu)指針,指向epoll文件
*/
struct eventpoll {
// 自旋鎖,在kernel內(nèi)部用自旋鎖加鎖,就可以同時多線(進)程對此結(jié)構(gòu)體進行操作
// 主要是保護ready_list
spinlock_t lock;
// 這個互斥鎖是為了保證在eventloop使用對應(yīng)的文件描述符的時候,文件描述符不會被移除掉
struct mutex mtx;
// epoll_wait使用的等待隊列,和進程喚醒有關(guān)
wait_queue_head_t wq;
// file->poll使用的等待隊列,和進程喚醒有關(guān)
wait_queue_head_t poll_wait;
// 就緒的描述符隊列,雙向鏈表
struct list_head rdllist;
// 通過紅黑樹來組織當前epoll關(guān)注的文件描述符
struct rb_root rbr;
// 在向用戶空間傳輸就緒事件的時候,將同時發(fā)生事件的文件描述符鏈入到這個鏈表里面
struct epitem *ovflist;
// 對應(yīng)的user
struct user_struct *user;
// 對應(yīng)的文件描述符
struct file *file;
// 下面兩個是用于環(huán)路檢測的優(yōu)化
int visited;
struct list_head visited_list_link;
};
epitem結(jié)構(gòu)體
// 對應(yīng)于一個加入到epoll的文件
struct epitem {
// 掛載到eventpoll 的紅黑樹節(jié)點
struct rb_node rbn;
// 掛載到eventpoll.rdllist 的節(jié)點
struct list_head rdllink;
// 連接到ovflist 的指針
struct epitem *next;
/* 文件描述符信息fd + file, 紅黑樹的key */
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
int nwait;
// 當前文件的等待隊列(eppoll_entry)列表
// 同一個文件上可能會監(jiān)視多種事件,
// 這些事件可能屬于不同的wait_queue中
// (取決于對應(yīng)文件類型的實現(xiàn)),
// 所以需要使用鏈表
struct list_head pwqlist;
// 當前epitem 的所有者
struct eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* epoll_ctl 傳入的用戶數(shù)據(jù) */
struct epoll_event event;
};
int epoll_create(int size);
作用:調(diào)用epoll_create方法創(chuàng)建一個epoll的句柄
源碼:
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return do_epoll_create(0);
}
從源碼來看,其實 size 這個參數(shù)并沒有什么作用,只要大于 0 就可以了~
我從其他地方獲取資料說的是:以前底層實現(xiàn)是哈希表,現(xiàn)在是紅黑樹,為了兼容所以才保留了這個參數(shù),也不知道真假,權(quán)當了解一下~
接著看下do_epoll_create
static int do_epoll_create(int flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
// 獲取尚未被使用的文件描述符,即描述符數(shù)組的槽位
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
//創(chuàng)建一個名叫[eventpoll]的文件,并返回其文件結(jié)構(gòu)指針,這個文件代表著epoll實例
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
ep->file = file;
// 將file填入到對應(yīng)的文件描述符數(shù)組的槽里面
fd_install(fd, file);
return fd;
out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}
這里error = ep_alloc(&ep);是分配eventpoll結(jié)構(gòu)并進行的初始化操作;
綜上所述,epoll 創(chuàng)建文件的過程,做了初始化和文件關(guān)聯(lián)等;
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
作用:epoll的 事件注冊函數(shù)
源碼:
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
struct epoll_event epds;
//錯誤處理:如果是刪除,且epoll_event結(jié)構(gòu)不為NULL則報錯
//如果是更改或者添加那就需要把從用戶空間將epoll_event結(jié)構(gòu)copy到內(nèi)核空間
if (ep_op_has_event(op) &&
// 復(fù)制用戶空間數(shù)據(jù)到內(nèi)核
copy_from_user(&epds, event, sizeof(struct epoll_event)))
return -EFAULT;
return do_epoll_ctl(epfd, op, fd, &epds, false);
}
我們看下函數(shù)do_epoll_ctl:
int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
bool nonblock)
{
int error;
int full_check = 0;
struct fd f, tf;
struct eventpoll *ep;
struct epitem *epi;
struct eventpoll *tep = NULL;
//省略校驗過程
.....
epi = ep_find(ep, tf.file, fd);
error = -EINVAL;
switch (op) {
//增加
case EPOLL_CTL_ADD:
if (!epi) {
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_insert(ep, epds, tf.file, fd, full_check);
} else
error = -EEXIST;
break;
//刪除
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
//修改
case EPOLL_CTL_MOD:
if (epi) {
if (!(epi->event.events & EPOLLEXCLUSIVE)) {
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_modify(ep, epi, epds);
}
} else
error = -ENOENT;
break;
}
if (tep != NULL)
mutex_unlock(&tep->mtx);
mutex_unlock(&ep->mtx);
error_tgt_fput:
if (full_check) {
clear_tfile_check_list();
mutex_unlock(&epmutex);
}
fdput(tf);
error_fput:
fdput(f);
error_return:
return error;
}
在do_epoll_ctl函數(shù)中,做的更多的是是對文件描述符的校驗,然后根據(jù)傳入的 fd 添加進去并且監(jiān)視,這里就看一下增加的操作吧~
//往epollfd里面添加一個監(jiān)聽fd
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
struct file *tfile, int fd, int full_check)
{
int error, pwake = 0;
__poll_t revents;
long user_watches;
struct epitem *epi;
struct ep_pqueue epq;
lockdep_assert_irqs_enabled();
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches))
return -ENOSPC;
//分配和初始化 epi結(jié)構(gòu)體
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
/* Item initialization follow here ... */
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
//將epoll對象掛載到該fd的epitem結(jié)構(gòu)的ep成員中
epi->ep = ep;
//設(shè)置被監(jiān)控的文件描述符及其對應(yīng)的文件對象到epitem的ffd成員中
ep_set_ffd(&epi->ffd, tfile, fd);
//保存fd感興趣的事件對象
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
if (epi->event.events & EPOLLWAKEUP) {
error = ep_create_wakeup_source(epi);
if (error)
goto error_create_wakeup_source;
} else {
RCU_INIT_POINTER(epi->ws, NULL);
}
/* Initialize the poll table using the queue callback */
epq.epi = epi;
//將ep_ptable_queue_proc注冊到epq.pt中。
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
/*
* Attach the item to the poll hooks and get current event bits.
* We can safely use the file* here because its usage count has
* been increased by the caller of this function. Note that after
* this operation completes, the poll callback can start hitting
* the new item.
*/
// 內(nèi)部會調(diào)用ep_ptable_queue_proc, 在文件對應(yīng)的wait queue head 上
// 注冊回調(diào)函數(shù), 并返回當前文件的狀態(tài)
revents = ep_item_poll(epi, &epq.pt, 1);
/*
* We have to check if something went wrong during the poll wait queue
* install process. Namely an allocation for a wait queue failed due
* high memory pressure.
*/
error = -ENOMEM;
if (epi->nwait < 0)
goto error_unregister;
/* Add the current item to the list of active epoll hook for this file */
//把epitem插入到f_ep_links鏈表的尾部
spin_lock(&tfile->f_lock);
list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);
/*
* Add the current item to the RB tree. All RB tree operations are
* protected by "mtx", and ep_insert() is called with "mtx" held.
*/
// 將該epitem插入到ep的紅黑樹中
ep_rbtree_insert(ep, epi);
/* now check if we've created too many backpaths */
error = -EINVAL;
if (full_check && reverse_path_check())
goto error_remove_epi;
/* We have to drop the new item inside our item list to keep track of it */
write_lock_irq(&ep->lock);
/* record NAPI ID of new item if present */
ep_set_busy_poll_napi_id(epi);
/* If the file is already "ready" we drop it inside the ready list */
//如果要監(jiān)視的文件狀態(tài)已經(jīng)就緒并且還沒有加入到就緒隊列中,則將當前的epitem加入到就緒
if (revents && !ep_is_linked(epi)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
// 通知sys_epoll_wait , 調(diào)用回調(diào)函數(shù)喚醒sys_epoll_wait 進程
wake_up(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
write_unlock_irq(&ep->lock);
atomic_long_inc(&ep->user->epoll_watches);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(ep, NULL);
return 0;
error_remove_epi:
spin_lock(&tfile->f_lock);
list_del_rcu(&epi->fllink);
spin_unlock(&tfile->f_lock);
rb_erase_cached(&epi->rbn, &ep->rbr);
error_unregister:
ep_unregister_pollwait(ep, epi);
/*
* We need to do this because an event could have been arrived on some
* allocated wait queue. Note that we don't care about the ep->ovflist
* list, since that is used/cleaned only inside a section bound by "mtx".
* And ep_insert() is called with "mtx" held.
*/
write_lock_irq(&ep->lock);
if (ep_is_linked(epi))
list_del_init(&epi->rdllink);
write_unlock_irq(&ep->lock);
wakeup_source_unregister(ep_wakeup_source(epi));
error_create_wakeup_source:
kmem_cache_free(epi_cache, epi);
return error;
}
這里做的更多的是對事件的一個綁定和掛載操作,如果這個 socket 有事件就緒,則會調(diào)用ep_poll_callback函數(shù),這個函數(shù)負責將事件加入就緒隊列并喚醒epoll_wait;
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
作用:等待在 epoll 監(jiān)控的事件中已經(jīng)發(fā)生的事件。
源碼;
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
return do_epoll_wait(epfd, events, maxevents, timeout);
}
直接去看下do_epoll_wait函數(shù)吧~
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, int timeout)
{
int error;
struct fd f;
struct eventpoll *ep;
/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* Verify that the area passed by the user is writeable */
if (!access_ok(events, maxevents * sizeof(struct epoll_event)))
return -EFAULT;
/* Get the "struct file *" for the eventpoll file */
//獲取epoll的struct file
//再通過對應(yīng)的struct file獲得eventpoll
f = fdget(epfd);
if (!f.file)
return -EBADF;
/*
* We have to check that the file structure underneath the fd
* the user passed to us _is_ an eventpoll file.
*/
error = -EINVAL;
if (!is_file_epoll(f.file))
goto error_fput;
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
// 根據(jù)private_data得到eventpoll結(jié)構(gòu)
ep = f.file->private_data;
/* Time to fish for events ... */
//等待事件的到來
error = ep_poll(ep, events, maxevents, timeout);
error_fput:
fdput(f);
return error;
}
看來核心在 ep_poll 函數(shù)呀~去看看吧
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res = 0, eavail, timed_out = 0;
u64 slack = 0;
wait_queue_entry_t wait;
ktime_t expires, *to = NULL;
lockdep_assert_irqs_enabled();
//如果就緒鏈表為空則阻塞直到timeout
if (timeout > 0) {
struct timespec64 end_time = ep_set_mstimeout(timeout);
slack = select_estimate_accuracy(&end_time);
to = &expires;
*to = timespec64_to_ktime(end_time);
//非阻塞
} else if (timeout == 0) {
/*
* Avoid the unnecessary trip to the wait queue loop, if the
* caller specified a non blocking operation. We still need
* lock because we could race and not see an epi being added
* to the ready list while in irq callback. Thus incorrectly
* returning 0 back to userspace.
*/
timed_out = 1;
write_lock_irq(&ep->lock);
eavail = ep_events_available(ep);
write_unlock_irq(&ep->lock);
goto send_events;
}
fetch_events:
//是否有就緒事件,或正在掃描處理eventpoll中的rdllist鏈表
if (!ep_events_available(ep))
ep_busy_loop(ep, timed_out);
eavail = ep_events_available(ep);
if (eavail)
goto send_events;
/*
* Busy poll timed out. Drop NAPI ID for now, we can add
* it back in when we have moved a socket with a valid NAPI
* ID onto the ready list.
*/
ep_reset_busy_poll_napi_id(ep);
do {
/*
* Internally init_wait() uses autoremove_wake_function(),
* thus wait entry is removed from the wait queue on each
* wakeup. Why it is important? In case of several waiters
* each new wakeup will hit the next waiter, giving it the
* chance to harvest new event. Otherwise wakeup can be
* lost. This is also good performance-wise, because on
* normal wakeup path no need to call __remove_wait_queue()
* explicitly, thus ep->lock is not taken, which halts the
* event delivery.
*/
init_wait(&wait);
write_lock_irq(&ep->lock);
/*
* Barrierless variant, waitqueue_active() is called under
* the same lock on wakeup ep_poll_callback() side, so it
* is safe to avoid an explicit barrier.
*/
//執(zhí)行ep_poll_callback()喚醒時應(yīng)當需要將當前進程喚醒,
//這就是我們將任務(wù)狀態(tài)設(shè)置為TASK_INTERRUPTIBLE的原因。
__set_current_state(TASK_INTERRUPTIBLE);
/*
* Do the final check under the lock. ep_scan_ready_list()
* plays with two lists (->rdllist and ->ovflist) and there
* is always a race when both lists are empty for short
* period of time although events are pending, so lock is
* important.
*/
eavail = ep_events_available(ep);
if (!eavail) {
if (signal_pending(current))
res = -EINTR;
else
__add_wait_queue_exclusive(&ep->wq, &wait);
}
write_unlock_irq(&ep->lock);
if (eavail || res)
break;
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) {
timed_out = 1;
break;
}
/* We were woken up, thus go and try to harvest some events */
eavail = 1;
} while (0);
//醒來
__set_current_state(TASK_RUNNING);
if (!list_empty_careful(&wait.entry)) {
write_lock_irq(&ep->lock);
__remove_wait_queue(&ep->wq, &wait);
write_unlock_irq(&ep->lock);
}
send_events:
if (fatal_signal_pending(current)) {
/*
* Always short-circuit for fatal signals to allow
* threads to make a timely exit without the chance of
* finding more events available and fetching
* repeatedly.
*/
res = -EINTR;
}
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
/* 如果一切正常, 有event發(fā)生, 就開始準備數(shù)據(jù)copy給用戶空間了... */
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;
return res;
}
ep_send_events()函數(shù)將用戶傳入的內(nèi)存簡單封裝到ep_send_events_data結(jié)構(gòu)中,然后調(diào)用ep_scan_ready_list()將就緒隊列中的事件傳入用戶空間的內(nèi)存。用戶空間訪問這個結(jié)果,進行處理。
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
return esed.res;
}
static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct ep_send_events_data *esed = priv;
__poll_t revents;
struct epitem *epi, *tmp;
struct epoll_event __user *uevent = esed->events;
struct wakeup_source *ws;
poll_table pt;
init_poll_funcptr(&pt, NULL);
esed->res = 0;
/*
* We can loop without lock because we are passed a task private list.
* Items cannot vanish during the loop because ep_scan_ready_list() is
* holding "mtx" during this call.
*/
lockdep_assert_held(&ep->mtx);
list_for_each_entry_safe(epi, tmp, head, rdllink) {
if (esed->res >= esed->maxevents)
break;
/*
* Activate ep->ws before deactivating epi->ws to prevent
* triggering auto-suspend here (in case we reactive epi->ws
* below).
*
* This could be rearranged to delay the deactivation of epi->ws
* instead, but then epi->ws would temporarily be out of sync
* with ep_is_linked().
*/
ws = ep_wakeup_source(epi);
if (ws) {
if (ws->active)
__pm_stay_awake(ep->ws);
__pm_relax(ws);
}
list_del_init(&epi->rdllink);
/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding ep->mtx, so no operations coming from userspace
* can change the item.
*/
revents = ep_item_poll(epi, &pt, 1);
if (!revents)
continue;
//把當前事件和用戶傳入的數(shù)據(jù)copy到用戶空間
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
//復(fù)制失敗把epi重新插入到ready鏈表
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
if (!esed->res)
esed->res = -EFAULT;
return 0;
}
esed->res++;
uevent++;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
else if (!(epi->event.events & EPOLLET)) {
/*
* If this file has been added with Level
* Trigger mode, we need to insert back inside
* the ready list, so that the next call to
* epoll_wait() will check again the events
* availability. At this point, no one can insert
* into ep->rdllist besides us. The epoll_ctl()
* callers are locked out by
* ep_scan_ready_list() holding "mtx" and the
* poll callback will queue them in ep->ovflist.
*/
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
}
return 0;
}
看到__put_user就知道,從內(nèi)核拷貝數(shù)據(jù)到用戶空間使用了__put_user函數(shù),和所謂的共享內(nèi)存沒有一點關(guān)系,現(xiàn)在博客上面有很多錯誤,望大家修正~
三、總結(jié)
請大家原諒我水平和時間有限,這次閱讀 epoll 的源碼起源于在網(wǎng)上看到內(nèi)核與用戶態(tài)數(shù)據(jù)拷貝使用的方法存在爭議,所以找來 epoll 的源碼進行了粗略的閱讀,后續(xù)還會抽時間精讀一下,不過這次雖然是粗略的閱讀了 epoll 的源碼,但是收獲也很多,接下來就簡單做下總結(jié)~(這個總結(jié)有我自己看源碼得來的,也有從網(wǎng)絡(luò)上搜集的資料,如果錯誤,請大家不吝賜教)
epoll_create
epoll_create傳入?yún)?shù)的時候,只要保證參數(shù)大于 0 就可以,這個參數(shù)時無用的初始化等待隊列和初始化就緒鏈表,還有初始化紅黑樹的頭結(jié)點 分配 eventpoll結(jié)構(gòu)并進行的初始化操作;
epoll_ctl
將 epoll_event結(jié)構(gòu)拷貝到內(nèi)核空間中,并且判斷加入的 fd 是否支持 poll 結(jié)( epoll,poll,selectI/O 多路復(fù)用必須支持 poll 操作).ep = f.file->private_data;獲取event_poll對象;通過 op 判斷事件的修改、添加、刪除操作 首先在 eventpoll 結(jié)構(gòu)中的紅黑樹查找是否已經(jīng)存在了相對應(yīng)的 fd ,沒找到就支持插入操作,否則報重復(fù)的錯誤,還有修改,刪除操作。 插入操作時,會創(chuàng)建一個與 fd 對應(yīng)的 epitem 結(jié)構(gòu),并且初始化相關(guān)成員,并指定調(diào)用 poll_wait 時的回調(diào)函數(shù)用于數(shù)據(jù)就緒時喚醒進程,(其內(nèi)部,初始化設(shè)備的等待隊列,將該進程注冊到等待隊列)完成這一步, epitem 就跟這個 socket 關(guān)聯(lián)起來了, 當它有狀態(tài)變化時,會通過 ep_poll_callback()來通知.最后調(diào)用加入的 fd 的 fileoperation->poll函數(shù)(最后會調(diào)用 poll_wait 操作)用于完注冊操作,將 epitem 結(jié)構(gòu)添加到紅黑樹中。
epoll_wait
判斷 eventpoll 對象的鏈表是否為空,是否需要操作;初始化一個等待隊列,把自己掛上去,設(shè)置自己的進程狀態(tài) 若是可睡眠狀態(tài).判斷是否有信號到來(有的話直接被中斷醒來,),如果沒有那就調(diào)用 schedule_timeout 進行睡眠, 如果超時或者被喚醒,首先從自己初始化的等待隊列刪除,然后開始拷貝資源給用戶空間了 拷貝資源則是先把就緒事件鏈表轉(zhuǎn)移到中間鏈表,然后挨個遍歷拷貝到用戶空間,并且挨個判斷其是否為水平觸發(fā),是的話再次插入到就緒鏈表
用戶態(tài)和內(nèi)核態(tài)拷貝數(shù)據(jù)方式
用戶態(tài)拷貝數(shù)據(jù)到內(nèi)核態(tài),是調(diào)用了函數(shù): copy_from_user內(nèi)核態(tài)數(shù)據(jù)拷貝到用戶態(tài),調(diào)用了函數(shù): __put_user
這里注意,好多博客上面的說拷貝數(shù)據(jù)使用的是共享內(nèi)存,是錯誤的,千萬別信哈~~~~
ET和LT模式不同的原理
else if (!(epi->event.events & EPOLLET)) {
/*
* If this file has been added with Level
* Trigger mode, we need to insert back inside
* the ready list, so that the next call to
* epoll_wait() will check again the events
* availability. At this point, no one can insert
* into ep->rdllist besides us. The epoll_ctl()
* callers are locked out by
* ep_scan_ready_list() holding "mtx" and the
* poll callback will queue them in ep->ovflist.
*/
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
這里會判斷事件類型是否包含了 EPOLLET 位,如果不包含的話就會將該事件對應(yīng)的 epitem 對象重新加入到 epoll 的 rdllist 鏈表中,用戶態(tài)程序下次調(diào)用epoll_wait()返回時就又能獲取該 epitem 了;等到下一次epoll_wait時, 會立即返回, 并通知給用戶空間;
epoll 為什么高效(相比select) 來源:https://www.cnblogs.com/apprentice89/p/3234677.html
僅從上面的調(diào)用方式就可以看出 epoll 比 select/poll 的一個優(yōu)勢: select/poll 每次調(diào)用都要傳遞所要監(jiān)控的所有 fd 給 select/poll 系統(tǒng)調(diào)用(這意味著每次調(diào)用都要將 fd 列表從用戶態(tài)拷貝到內(nèi)核態(tài),當 fd 數(shù)目很多時,這會造成低效)。而每次調(diào)用 epoll_wait 時(作用相當于調(diào)用 select/poll),不需要再傳遞 fd 列表給內(nèi)核,因為已經(jīng)在 epoll_ctl 中將需要監(jiān)控的 fd 告訴了內(nèi)核( epoll_ctl 不需要每次都拷貝所有的 fd,只需要進行增量式操作)。所以,在調(diào)用 epoll_create 之后,內(nèi)核已經(jīng)在內(nèi)核態(tài)開始準備數(shù)據(jù)結(jié)構(gòu)存放要監(jiān)控的 fd 了。每次 epoll_ctl 只是對這個數(shù)據(jù)結(jié)構(gòu)進行簡單的維護。 此外,內(nèi)核使用了slab機制,為epoll提供了快速的數(shù)據(jù)結(jié)構(gòu):
在內(nèi)核里,一切皆文件。所以,epoll 向內(nèi)核注冊了一個文件系統(tǒng),用于存儲上述的被監(jiān)控的 fd。當你調(diào)用 epoll_create 時,就會在這個虛擬的 epoll 文件系統(tǒng)里創(chuàng)建一個 file 結(jié)點。當然這個 file 不是普通文件,它只服務(wù)于 epoll。epoll 在被內(nèi)核初始化時(操作系統(tǒng)啟動),同時會開辟出 epoll 自己的內(nèi)核高速 cache 區(qū),用于安置每一個我們想監(jiān)控的 fd,這些 fd 會以紅黑樹的形式保存在內(nèi)核 cache 里,以支持快速的查找、插入、刪除。這個內(nèi)核高速 cache 區(qū),就是建立連續(xù)的物理內(nèi)存頁,然后在之上建立 slab 層,簡單的說,就是物理上分配好你想要的 size 的內(nèi)存對象,每次使用時都是使用空閑的已分配好的對象。
epoll 的第三個優(yōu)勢在于:當我們調(diào)用 epoll_ctl 往里塞入百萬個 fd 時,epoll_wait 仍然可以飛快的返回,并有效的將發(fā)生事件的 fd 給我們用戶。這是由于我們在調(diào)用 epoll_create 時,內(nèi)核除了幫我們在 epoll 文件系統(tǒng)里建了個 file 結(jié)點,在內(nèi)核 cache 里建了個紅黑樹用于存儲以后 epoll_ctl 傳來的 fd 外,還會再建立一個 list 鏈表,用于存儲準備就緒的事件,當 epoll_wait 調(diào)用時,僅僅觀察這個 list 鏈表里有沒有數(shù)據(jù)即可。有數(shù)據(jù)就返回,沒有數(shù)據(jù)就 sleep,等到 timeout 時間到后即使鏈表沒數(shù)據(jù)也返回。所以,epoll_wait 非常高效。而且,通常情況下即使我們要監(jiān)控百萬計的 fd,大多一次也只返回很少量的準備就緒 fd 而已,所以,epoll_wait 僅需要從內(nèi)核態(tài) copy 少量的 fd 到用戶態(tài)而已。那么,這個準備就緒 list 鏈表是怎么維護的呢?當我們執(zhí)行 epoll_ctl 時,除了把 fd 放到 epoll 文件系統(tǒng)里 file 對象對應(yīng)的紅黑樹上之外,還會給內(nèi)核中斷處理程序注冊一個回調(diào)函數(shù),告訴內(nèi)核,如果這個 fd 的中斷到了,就把它放到準備就緒 list 鏈表里。所以,當一個 fd (例如 socket)上有數(shù)據(jù)到了,內(nèi)核在把設(shè)備(例如網(wǎng)卡)上的數(shù)據(jù) copy 到內(nèi)核中后就來把 fd(socket)插入到準備就緒 list 鏈表里了。
