<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          揭開高性能服務(wù)器底層面紗

          共 38190字,需瀏覽 77分鐘

           ·

          2021-08-25 13:55

          揭開高性能服務(wù)器底層面紗

          一、前言

          我們經(jīng)常聽說高性能服務(wù)器,那什么是高性能服務(wù)器;用大白話來解釋就是說處理事件快,效率高,占用服務(wù)器資源少,多路復(fù)用等等集萬千寵愛于一身;但是,往往要想做到高性能,這是非常難的,需要一個好的優(yōu)秀的架構(gòu)和底層接口。

          這篇文章只限于 linux 平臺,對于 windows 平臺下,可以去參考下 IOCP 的用法,這里就不多說了~

          目前主流的高性能服務(wù)器底層都是封裝了 EPOLL 接口,使用 epoll 進行事件處理,為什么 epoll 可以作為高性能服務(wù)器底層事件處理?那就讓我們從源碼下手,來揭開面紗~

          二、源碼解讀

          兩個至關(guān)重要的結(jié)構(gòu)體

          eventpoll結(jié)構(gòu)體:

          /*
           * 此結(jié)構(gòu)體存儲在file->private_data中
           */
          /*
              eventpoll結(jié)構(gòu)體是epoll的核心里面存放著許多信息,主要包括
              1. struct rb_root rbr;這是一顆紅黑樹的根節(jié)點,代表著一顆紅黑樹,
              紅黑樹下面掛的是我們感興趣的socket的事件,當我們調(diào)用epoll_ctl向
              epoll添加感興趣的socket事件時,系統(tǒng)將我們的傳遞的信息封裝成
              struct epitem結(jié)構(gòu)體,然后掛到這顆紅黑樹的相應(yīng)節(jié)點上
              2.struct list_head rdllist;這是一個雙向鏈表,這個雙向鏈表中存放
              的是就緒的事件當我們調(diào)用epoll_wait的時候這些事件會返回給用戶
              3.struct file *file;文件結(jié)構(gòu)指針,指向epoll文件
              */
          struct eventpoll {
            // 自旋鎖,在kernel內(nèi)部用自旋鎖加鎖,就可以同時多線(進)程對此結(jié)構(gòu)體進行操作
            // 主要是保護ready_list
            spinlock_t lock;
            // 這個互斥鎖是為了保證在eventloop使用對應(yīng)的文件描述符的時候,文件描述符不會被移除掉
            struct mutex mtx;
            // epoll_wait使用的等待隊列,和進程喚醒有關(guān)
            wait_queue_head_t wq;
            // file->poll使用的等待隊列,和進程喚醒有關(guān)
            wait_queue_head_t poll_wait;
            // 就緒的描述符隊列,雙向鏈表
            struct list_head rdllist;
            // 通過紅黑樹來組織當前epoll關(guān)注的文件描述符
            struct rb_root rbr;
            // 在向用戶空間傳輸就緒事件的時候,將同時發(fā)生事件的文件描述符鏈入到這個鏈表里面
            struct epitem *ovflist;
            // 對應(yīng)的user
            struct user_struct *user;
            // 對應(yīng)的文件描述符
            struct file *file;
            // 下面兩個是用于環(huán)路檢測的優(yōu)化
            int visited;
            struct list_head visited_list_link;
          };

          epitem結(jié)構(gòu)體

          // 對應(yīng)于一個加入到epoll的文件  
          struct epitem {  
              // 掛載到eventpoll 的紅黑樹節(jié)點  
              struct rb_node rbn;  
              // 掛載到eventpoll.rdllist 的節(jié)點  
              struct list_head rdllink;  
              // 連接到ovflist 的指針  
              struct epitem *next;  
              /* 文件描述符信息fd + file, 紅黑樹的key */  
              struct epoll_filefd ffd;  
              /* Number of active wait queue attached to poll operations */  
              int nwait;  
              // 當前文件的等待隊列(eppoll_entry)列表  
              // 同一個文件上可能會監(jiān)視多種事件,  
              // 這些事件可能屬于不同的wait_queue中  
              // (取決于對應(yīng)文件類型的實現(xiàn)),  
              // 所以需要使用鏈表  
              struct list_head pwqlist;  
              // 當前epitem 的所有者  
              struct eventpoll *ep;  
              /* List header used to link this item to the "struct file" items list */  
              struct list_head fllink;  
              /* epoll_ctl 傳入的用戶數(shù)據(jù) */  
              struct epoll_event event;  
          };

          int epoll_create(int size);

          作用:調(diào)用epoll_create方法創(chuàng)建一個epoll的句柄

          源碼:

          SYSCALL_DEFINE1(epoll_create, int, size)
          {
            if (size <= 0)
              return -EINVAL;

            return do_epoll_create(0);
          }

          從源碼來看,其實 size 這個參數(shù)并沒有什么作用,只要大于 0 就可以了~

          我從其他地方獲取資料說的是:以前底層實現(xiàn)是哈希表,現(xiàn)在是紅黑樹,為了兼容所以才保留了這個參數(shù),也不知道真假,權(quán)當了解一下~

          接著看下do_epoll_create

          static int do_epoll_create(int flags)
          {
            int error, fd;
            struct eventpoll *ep = NULL;
            struct file *file;

            /* Check the EPOLL_* constant for consistency.  */
            BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);

            if (flags & ~EPOLL_CLOEXEC)
              return -EINVAL;
            /*
             * Create the internal data structure ("struct eventpoll").
             */
            error = ep_alloc(&ep);
            if (error < 0)
              return error;
            /*
             * Creates all the items needed to setup an eventpoll file. That is,
             * a file structure and a free file descriptor.
             */
              // 獲取尚未被使用的文件描述符,即描述符數(shù)組的槽位
            fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
            if (fd < 0) {
              error = fd;
              goto out_free_ep;
            }
              //創(chuàng)建一個名叫[eventpoll]的文件,并返回其文件結(jié)構(gòu)指針,這個文件代表著epoll實例
            file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
                   O_RDWR | (flags & O_CLOEXEC));
            if (IS_ERR(file)) {
              error = PTR_ERR(file);
              goto out_free_fd;
            }
            ep->file = file;
              // 將file填入到對應(yīng)的文件描述符數(shù)組的槽里面
            fd_install(fd, file);
            return fd;

          out_free_fd:
            put_unused_fd(fd);
          out_free_ep:
            ep_free(ep);
            return error;
          }

          這里error = ep_alloc(&ep);是分配eventpoll結(jié)構(gòu)并進行的初始化操作;

          綜上所述,epoll 創(chuàng)建文件的過程,做了初始化和文件關(guān)聯(lián)等;

          int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

          作用:epoll的 事件注冊函數(shù)

          源碼:

          SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
              struct epoll_event __user *, event)
          {
            struct epoll_event epds;
            
               //錯誤處理:如果是刪除,且epoll_event結(jié)構(gòu)不為NULL則報錯
            //如果是更改或者添加那就需要把從用戶空間將epoll_event結(jié)構(gòu)copy到內(nèi)核空間
            if (ep_op_has_event(op) &&
                  // 復(fù)制用戶空間數(shù)據(jù)到內(nèi)核
                copy_from_user(&epds, event, sizeof(struct epoll_event)))
              return -EFAULT;

            return do_epoll_ctl(epfd, op, fd, &epds, false);
          }

          我們看下函數(shù)do_epoll_ctl

          int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
               bool nonblock)
          {
            int error;
            int full_check = 0;
            struct fd f, tf;
            struct eventpoll *ep;
            struct epitem *epi;
            struct eventpoll *tep = NULL;

              //省略校驗過程
            .....
              
            epi = ep_find(ep, tf.file, fd);

            error = -EINVAL;
            switch (op) {
                      //增加
            case EPOLL_CTL_ADD:
              if (!epi) {
                epds->events |= EPOLLERR | EPOLLHUP;
                error = ep_insert(ep, epds, tf.file, fd, full_check);
              } else
                error = -EEXIST;
              break;
                      //刪除
            case EPOLL_CTL_DEL:
              if (epi)
                error = ep_remove(ep, epi);
              else
                error = -ENOENT;
              break;
                      //修改
            case EPOLL_CTL_MOD:
              if (epi) {
                if (!(epi->event.events & EPOLLEXCLUSIVE)) {
                  epds->events |= EPOLLERR | EPOLLHUP;
                  error = ep_modify(ep, epi, epds);
                }
              } else
                error = -ENOENT;
              break;
            }
            if (tep != NULL)
              mutex_unlock(&tep->mtx);
            mutex_unlock(&ep->mtx);

          error_tgt_fput:
            if (full_check) {
              clear_tfile_check_list();
              mutex_unlock(&epmutex);
            }

            fdput(tf);
          error_fput:
            fdput(f);
          error_return:

            return error;
          }

          do_epoll_ctl函數(shù)中,做的更多的是是對文件描述符的校驗,然后根據(jù)傳入的 fd 添加進去并且監(jiān)視,這里就看一下增加的操作吧~

          //往epollfd里面添加一個監(jiān)聽fd
          static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
                   struct file *tfile, int fd, int full_check)
          {
            int error, pwake = 0;
            __poll_t revents;
            long user_watches;
            struct epitem *epi;
            struct ep_pqueue epq;

            lockdep_assert_irqs_enabled();

            user_watches = atomic_long_read(&ep->user->epoll_watches);
            if (unlikely(user_watches >= max_user_watches))
              return -ENOSPC;
              //分配和初始化 epi結(jié)構(gòu)體
            if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
              return -ENOMEM;

            /* Item initialization follow here ... */
            INIT_LIST_HEAD(&epi->rdllink);
            INIT_LIST_HEAD(&epi->fllink);
            INIT_LIST_HEAD(&epi->pwqlist);
              //將epoll對象掛載到該fd的epitem結(jié)構(gòu)的ep成員中
            epi->ep = ep;
              //設(shè)置被監(jiān)控的文件描述符及其對應(yīng)的文件對象到epitem的ffd成員中
            ep_set_ffd(&epi->ffd, tfile, fd);
              //保存fd感興趣的事件對象
            epi->event = *event;
            epi->nwait = 0;
            epi->next = EP_UNACTIVE_PTR;
            if (epi->event.events & EPOLLWAKEUP) {
              error = ep_create_wakeup_source(epi);
              if (error)
                goto error_create_wakeup_source;
            } else {
              RCU_INIT_POINTER(epi->ws, NULL);
            }

            /* Initialize the poll table using the queue callback */
            epq.epi = epi;
              //將ep_ptable_queue_proc注冊到epq.pt中。
            init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

            /*
             * Attach the item to the poll hooks and get current event bits.
             * We can safely use the file* here because its usage count has
             * been increased by the caller of this function. Note that after
             * this operation completes, the poll callback can start hitting
             * the new item.
             */
              //  內(nèi)部會調(diào)用ep_ptable_queue_proc, 在文件對應(yīng)的wait queue head 上  
              // 注冊回調(diào)函數(shù), 并返回當前文件的狀態(tài) 
            revents = ep_item_poll(epi, &epq.pt, 1);

            /*
             * We have to check if something went wrong during the poll wait queue
             * install process. Namely an allocation for a wait queue failed due
             * high memory pressure.
             */
            error = -ENOMEM;
            if (epi->nwait < 0)
              goto error_unregister;

            /* Add the current item to the list of active epoll hook for this file */
              //把epitem插入到f_ep_links鏈表的尾部
            spin_lock(&tfile->f_lock);
            list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
            spin_unlock(&tfile->f_lock);

            /*
             * Add the current item to the RB tree. All RB tree operations are
             * protected by "mtx", and ep_insert() is called with "mtx" held.
             */
              // 將該epitem插入到ep的紅黑樹中
            ep_rbtree_insert(ep, epi);

            /* now check if we've created too many backpaths */
            error = -EINVAL;
            if (full_check && reverse_path_check())
              goto error_remove_epi;

            /* We have to drop the new item inside our item list to keep track of it */
            write_lock_irq(&ep->lock);

            /* record NAPI ID of new item if present */
            ep_set_busy_poll_napi_id(epi);

            /* If the file is already "ready" we drop it inside the ready list */
              //如果要監(jiān)視的文件狀態(tài)已經(jīng)就緒并且還沒有加入到就緒隊列中,則將當前的epitem加入到就緒
            if (revents && !ep_is_linked(epi)) {
              list_add_tail(&epi->rdllink, &ep->rdllist);
              ep_pm_stay_awake(epi);

              /* Notify waiting tasks that events are available */
              if (waitqueue_active(&ep->wq))
                      // 通知sys_epoll_wait , 調(diào)用回調(diào)函數(shù)喚醒sys_epoll_wait 進程  
                wake_up(&ep->wq);
              if (waitqueue_active(&ep->poll_wait))
                pwake++;
            }

            write_unlock_irq(&ep->lock);

            atomic_long_inc(&ep->user->epoll_watches);

            /* We have to call this outside the lock */
            if (pwake)
              ep_poll_safewake(ep, NULL);

            return 0;

          error_remove_epi:
            spin_lock(&tfile->f_lock);
            list_del_rcu(&epi->fllink);
            spin_unlock(&tfile->f_lock);

            rb_erase_cached(&epi->rbn, &ep->rbr);

          error_unregister:
            ep_unregister_pollwait(ep, epi);

            /*
             * We need to do this because an event could have been arrived on some
             * allocated wait queue. Note that we don'
          t care about the ep->ovflist
             * list, since that is used/cleaned only inside a section bound by "mtx".
             * And ep_insert() is called with "mtx" held.
             */
            write_lock_irq(&ep->lock);
            if (ep_is_linked(epi))
              list_del_init(&epi->rdllink);
            write_unlock_irq(&ep->lock);

            wakeup_source_unregister(ep_wakeup_source(epi));

          error_create_wakeup_source:
            kmem_cache_free(epi_cache, epi);

            return error;
          }

          這里做的更多的是對事件的一個綁定和掛載操作,如果這個 socket 有事件就緒,則會調(diào)用ep_poll_callback函數(shù),這個函數(shù)負責將事件加入就緒隊列并喚醒epoll_wait;

          int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

          作用:等待在 epoll 監(jiān)控的事件中已經(jīng)發(fā)生的事件。

          源碼;

          SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
              int, maxevents, int, timeout)
          {
            return do_epoll_wait(epfd, events, maxevents, timeout);
          }

          直接去看下do_epoll_wait函數(shù)吧~

          static int do_epoll_wait(int epfd, struct epoll_event __user *events,
                 int maxevents, int timeout)
          {
            int error;
            struct fd f;
            struct eventpoll *ep;

            /* The maximum number of event must be greater than zero */
            if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
              return -EINVAL;

            /* Verify that the area passed by the user is writeable */
            if (!access_ok(events, maxevents * sizeof(struct epoll_event)))
              return -EFAULT;

            /* Get the "struct file *" for the eventpoll file */
              //獲取epoll的struct file
            //再通過對應(yīng)的struct file獲得eventpoll 
            f = fdget(epfd);
            if (!f.file)
              return -EBADF;

            /*
             * We have to check that the file structure underneath the fd
             * the user passed to us _is_ an eventpoll file.
             */
            error = -EINVAL;
            if (!is_file_epoll(f.file))
              goto error_fput;

            /*
             * At this point it is safe to assume that the "private_data" contains
             * our own data structure.
             */
              // 根據(jù)private_data得到eventpoll結(jié)構(gòu)
            ep = f.file->private_data;

            /* Time to fish for events ... */
              //等待事件的到來
            error = ep_poll(ep, events, maxevents, timeout);

          error_fput:
            fdput(f);
            return error;
          }

          看來核心在 ep_poll 函數(shù)呀~去看看吧

          static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
                 int maxevents, long timeout)
          {
            int res = 0, eavail, timed_out = 0;
            u64 slack = 0;
            wait_queue_entry_t wait;
            ktime_t expires, *to = NULL;

            lockdep_assert_irqs_enabled();
            //如果就緒鏈表為空則阻塞直到timeout
            if (timeout > 0) {
              struct timespec64 end_time = ep_set_mstimeout(timeout);

              slack = select_estimate_accuracy(&end_time);
              to = &expires;
              *to = timespec64_to_ktime(end_time);
            //非阻塞
            } else if (timeout == 0) {
              /*
               * Avoid the unnecessary trip to the wait queue loop, if the
               * caller specified a non blocking operation. We still need
               * lock because we could race and not see an epi being added
               * to the ready list while in irq callback. Thus incorrectly
               * returning 0 back to userspace.
               */
              timed_out = 1;

              write_lock_irq(&ep->lock);
              eavail = ep_events_available(ep);
              write_unlock_irq(&ep->lock);

              goto send_events;
            }

          fetch_events:
            //是否有就緒事件,或正在掃描處理eventpoll中的rdllist鏈表
            if (!ep_events_available(ep))
              ep_busy_loop(ep, timed_out);

            eavail = ep_events_available(ep);
            if (eavail)
              goto send_events;

            /*
             * Busy poll timed out.  Drop NAPI ID for now, we can add
             * it back in when we have moved a socket with a valid NAPI
             * ID onto the ready list.
             */
            ep_reset_busy_poll_napi_id(ep);

            do {
              /*
               * Internally init_wait() uses autoremove_wake_function(),
               * thus wait entry is removed from the wait queue on each
               * wakeup. Why it is important? In case of several waiters
               * each new wakeup will hit the next waiter, giving it the
               * chance to harvest new event. Otherwise wakeup can be
               * lost. This is also good performance-wise, because on
               * normal wakeup path no need to call __remove_wait_queue()
               * explicitly, thus ep->lock is not taken, which halts the
               * event delivery.
               */
              init_wait(&wait);

              write_lock_irq(&ep->lock);
              /*
               * Barrierless variant, waitqueue_active() is called under
               * the same lock on wakeup ep_poll_callback() side, so it
               * is safe to avoid an explicit barrier.
               */
                  //執(zhí)行ep_poll_callback()喚醒時應(yīng)當需要將當前進程喚醒,
              //這就是我們將任務(wù)狀態(tài)設(shè)置為TASK_INTERRUPTIBLE的原因。
              __set_current_state(TASK_INTERRUPTIBLE);

              /*
               * Do the final check under the lock. ep_scan_ready_list()
               * plays with two lists (->rdllist and ->ovflist) and there
               * is always a race when both lists are empty for short
               * period of time although events are pending, so lock is
               * important.
               */
              eavail = ep_events_available(ep);
              if (!eavail) {
                if (signal_pending(current))
                  res = -EINTR;
                else
                  __add_wait_queue_exclusive(&ep->wq, &wait);
              }
              write_unlock_irq(&ep->lock);

              if (eavail || res)
                break;

              if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) {
                timed_out = 1;
                break;
              }

              /* We were woken up, thus go and try to harvest some events */
              eavail = 1;

            } while (0);
            //醒來
            __set_current_state(TASK_RUNNING);

            if (!list_empty_careful(&wait.entry)) {
              write_lock_irq(&ep->lock);
              __remove_wait_queue(&ep->wq, &wait);
              write_unlock_irq(&ep->lock);
            }

          send_events:
            if (fatal_signal_pending(current)) {
              /*
               * Always short-circuit for fatal signals to allow
               * threads to make a timely exit without the chance of
               * finding more events available and fetching
               * repeatedly.
               */
              res = -EINTR;
            }
            /*
             * Try to transfer events to user space. In case we get 0 events and
             * there's still timeout left over, we go trying again in search of
             * more luck.
             */
              /* 如果一切正常, 有event發(fā)生, 就開始準備數(shù)據(jù)copy給用戶空間了... */
            if (!res && eavail &&
                !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
              goto fetch_events;

            return res;
          }

          ep_send_events()函數(shù)將用戶傳入的內(nèi)存簡單封裝到ep_send_events_data結(jié)構(gòu)中,然后調(diào)用ep_scan_ready_list()將就緒隊列中的事件傳入用戶空間的內(nèi)存。用戶空間訪問這個結(jié)果,進行處理。

          static int ep_send_events(struct eventpoll *ep,
                  struct epoll_event __user *events, int maxevents)
          {
            struct ep_send_events_data esed;

            esed.maxevents = maxevents;
            esed.events = events;

            ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
            return esed.res;
          }
          static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
                       void *priv)
          {
            struct ep_send_events_data *esed = priv;
            __poll_t revents;
            struct epitem *epi, *tmp;
            struct epoll_event __user *uevent = esed->events;
            struct wakeup_source *ws;
            poll_table pt;

            init_poll_funcptr(&pt, NULL);
            esed->res = 0;

            /*
             * We can loop without lock because we are passed a task private list.
             * Items cannot vanish during the loop because ep_scan_ready_list() is
             * holding "mtx" during this call.
             */
            lockdep_assert_held(&ep->mtx);

            list_for_each_entry_safe(epi, tmp, head, rdllink) {
              if (esed->res >= esed->maxevents)
                break;

              /*
               * Activate ep->ws before deactivating epi->ws to prevent
               * triggering auto-suspend here (in case we reactive epi->ws
               * below).
               *
               * This could be rearranged to delay the deactivation of epi->ws
               * instead, but then epi->ws would temporarily be out of sync
               * with ep_is_linked().
               */
              ws = ep_wakeup_source(epi);
              if (ws) {
                if (ws->active)
                  __pm_stay_awake(ep->ws);
                __pm_relax(ws);
              }

              list_del_init(&epi->rdllink);

              /*
               * If the event mask intersect the caller-requested one,
               * deliver the event to userspace. Again, ep_scan_ready_list()
               * is holding ep->mtx, so no operations coming from userspace
               * can change the item.
               */
              revents = ep_item_poll(epi, &pt, 1);
              if (!revents)
                continue;
              //把當前事件和用戶傳入的數(shù)據(jù)copy到用戶空間
              if (__put_user(revents, &uevent->events) ||
                  __put_user(epi->event.data, &uevent->data)) {
                      //復(fù)制失敗把epi重新插入到ready鏈表
                list_add(&epi->rdllink, head);
                ep_pm_stay_awake(epi);
                if (!esed->res)
                  esed->res = -EFAULT;
                return 0;
              }
              esed->res++;
              uevent++;
              if (epi->event.events & EPOLLONESHOT)
                epi->event.events &= EP_PRIVATE_BITS;
              else if (!(epi->event.events & EPOLLET)) {
                /*
                 * If this file has been added with Level
                 * Trigger mode, we need to insert back inside
                 * the ready list, so that the next call to
                 * epoll_wait() will check again the events
                 * availability. At this point, no one can insert
                 * into ep->rdllist besides us. The epoll_ctl()
                 * callers are locked out by
                 * ep_scan_ready_list() holding "mtx" and the
                 * poll callback will queue them in ep->ovflist.
                 */
                list_add_tail(&epi->rdllink, &ep->rdllist);
                ep_pm_stay_awake(epi);
              }
            }

            return 0;
          }

          看到__put_user就知道,從內(nèi)核拷貝數(shù)據(jù)到用戶空間使用了__put_user函數(shù),和所謂的共享內(nèi)存沒有一點關(guān)系,現(xiàn)在博客上面有很多錯誤,望大家修正~

          三、總結(jié)

          請大家原諒我水平和時間有限,這次閱讀 epoll 的源碼起源于在網(wǎng)上看到內(nèi)核與用戶態(tài)數(shù)據(jù)拷貝使用的方法存在爭議,所以找來 epoll 的源碼進行了粗略的閱讀,后續(xù)還會抽時間精讀一下,不過這次雖然是粗略的閱讀了 epoll 的源碼,但是收獲也很多,接下來就簡單做下總結(jié)~(這個總結(jié)有我自己看源碼得來的,也有從網(wǎng)絡(luò)上搜集的資料,如果錯誤,請大家不吝賜教)

          epoll_create

          • epoll_create傳入?yún)?shù)的時候,只要保證參數(shù)大于 0 就可以,這個參數(shù)時無用的
          • 初始化等待隊列和初始化就緒鏈表,還有初始化紅黑樹的頭結(jié)點
          • 分配eventpoll結(jié)構(gòu)并進行的初始化操作;

          epoll_ctl

          • epoll_event結(jié)構(gòu)拷貝到內(nèi)核空間中,并且判斷加入的 fd 是否支持 poll 結(jié)( epoll,poll,selectI/O 多路復(fù)用必須支持 poll 操作).
          • ep = f.file->private_data;獲取event_poll對象;
          • 通過 op 判斷事件的修改、添加、刪除操作
          • 首先在 eventpoll 結(jié)構(gòu)中的紅黑樹查找是否已經(jīng)存在了相對應(yīng)的 fd ,沒找到就支持插入操作,否則報重復(fù)的錯誤,還有修改,刪除操作。
          • 插入操作時,會創(chuàng)建一個與 fd 對應(yīng)的 epitem 結(jié)構(gòu),并且初始化相關(guān)成員,并指定調(diào)用 poll_wait 時的回調(diào)函數(shù)用于數(shù)據(jù)就緒時喚醒進程,(其內(nèi)部,初始化設(shè)備的等待隊列,將該進程注冊到等待隊列)完成這一步,
          • epitem 就跟這個 socket 關(guān)聯(lián)起來了, 當它有狀態(tài)變化時,會通過ep_poll_callback()來通知.
          • 最后調(diào)用加入的 fd 的fileoperation->poll函數(shù)(最后會調(diào)用 poll_wait 操作)用于完注冊操作,將 epitem 結(jié)構(gòu)添加到紅黑樹中。

          epoll_wait

          • 判斷 eventpoll 對象的鏈表是否為空,是否需要操作;初始化一個等待隊列,把自己掛上去,設(shè)置自己的進程狀態(tài)
          • 若是可睡眠狀態(tài).判斷是否有信號到來(有的話直接被中斷醒來,),如果沒有那就調(diào)用 schedule_timeout 進行睡眠,
          • 如果超時或者被喚醒,首先從自己初始化的等待隊列刪除,然后開始拷貝資源給用戶空間了
          • 拷貝資源則是先把就緒事件鏈表轉(zhuǎn)移到中間鏈表,然后挨個遍歷拷貝到用戶空間,并且挨個判斷其是否為水平觸發(fā),是的話再次插入到就緒鏈表

          用戶態(tài)和內(nèi)核態(tài)拷貝數(shù)據(jù)方式

          • 用戶態(tài)拷貝數(shù)據(jù)到內(nèi)核態(tài),是調(diào)用了函數(shù):copy_from_user
          • 內(nèi)核態(tài)數(shù)據(jù)拷貝到用戶態(tài),調(diào)用了函數(shù):__put_user

          這里注意,好多博客上面的說拷貝數(shù)據(jù)使用的是共享內(nèi)存,是錯誤的,千萬別信哈~~~~

          ET和LT模式不同的原理

          else if (!(epi->event.events & EPOLLET)) {
           
                  /*
                  * If this file has been added with Level
                  * Trigger mode, we need to insert back inside
                  * the ready list, so that the next call to
                  * epoll_wait() will check again the events
                  * availability. At this point, no one can insert
                  * into ep->rdllist besides us. The epoll_ctl()
                  * callers are locked out by
                  * ep_scan_ready_list() holding "mtx" and the
                  * poll callback will queue them in ep->ovflist.
                  */
                  list_add_tail(&epi->rdllink, &ep->rdllist);
                  ep_pm_stay_awake(epi);
                }

          這里會判斷事件類型是否包含了 EPOLLET 位,如果不包含的話就會將該事件對應(yīng)的 epitem 對象重新加入到 epoll 的 rdllist 鏈表中,用戶態(tài)程序下次調(diào)用epoll_wait()返回時就又能獲取該 epitem 了;等到下一次epoll_wait時, 會立即返回, 并通知給用戶空間;

          epoll 為什么高效(相比select) 來源:https://www.cnblogs.com/apprentice89/p/3234677.html

          • 僅從上面的調(diào)用方式就可以看出 epoll 比 select/poll 的一個優(yōu)勢: select/poll 每次調(diào)用都要傳遞所要監(jiān)控的所有 fd 給 select/poll 系統(tǒng)調(diào)用(這意味著每次調(diào)用都要將 fd 列表從用戶態(tài)拷貝到內(nèi)核態(tài),當 fd 數(shù)目很多時,這會造成低效)。而每次調(diào)用 epoll_wait 時(作用相當于調(diào)用 select/poll),不需要再傳遞 fd 列表給內(nèi)核,因為已經(jīng)在 epoll_ctl 中將需要監(jiān)控的 fd 告訴了內(nèi)核( epoll_ctl 不需要每次都拷貝所有的 fd,只需要進行增量式操作)。所以,在調(diào)用 epoll_create 之后,內(nèi)核已經(jīng)在內(nèi)核態(tài)開始準備數(shù)據(jù)結(jié)構(gòu)存放要監(jiān)控的 fd 了。每次 epoll_ctl 只是對這個數(shù)據(jù)結(jié)構(gòu)進行簡單的維護。
          • 此外,內(nèi)核使用了slab機制,為epoll提供了快速的數(shù)據(jù)結(jié)構(gòu):

          在內(nèi)核里,一切皆文件。所以,epoll 向內(nèi)核注冊了一個文件系統(tǒng),用于存儲上述的被監(jiān)控的 fd。當你調(diào)用 epoll_create 時,就會在這個虛擬的 epoll 文件系統(tǒng)里創(chuàng)建一個 file 結(jié)點。當然這個 file 不是普通文件,它只服務(wù)于 epoll。epoll 在被內(nèi)核初始化時(操作系統(tǒng)啟動),同時會開辟出 epoll 自己的內(nèi)核高速 cache 區(qū),用于安置每一個我們想監(jiān)控的 fd,這些 fd 會以紅黑樹的形式保存在內(nèi)核 cache 里,以支持快速的查找、插入、刪除。這個內(nèi)核高速 cache 區(qū),就是建立連續(xù)的物理內(nèi)存頁,然后在之上建立 slab 層,簡單的說,就是物理上分配好你想要的 size 的內(nèi)存對象,每次使用時都是使用空閑的已分配好的對象。

          • epoll 的第三個優(yōu)勢在于:當我們調(diào)用 epoll_ctl 往里塞入百萬個 fd 時,epoll_wait 仍然可以飛快的返回,并有效的將發(fā)生事件的 fd 給我們用戶。這是由于我們在調(diào)用 epoll_create 時,內(nèi)核除了幫我們在 epoll 文件系統(tǒng)里建了個 file 結(jié)點,在內(nèi)核 cache 里建了個紅黑樹用于存儲以后 epoll_ctl 傳來的 fd 外,還會再建立一個 list 鏈表,用于存儲準備就緒的事件,當 epoll_wait 調(diào)用時,僅僅觀察這個 list 鏈表里有沒有數(shù)據(jù)即可。有數(shù)據(jù)就返回,沒有數(shù)據(jù)就 sleep,等到 timeout 時間到后即使鏈表沒數(shù)據(jù)也返回。所以,epoll_wait 非常高效。而且,通常情況下即使我們要監(jiān)控百萬計的 fd,大多一次也只返回很少量的準備就緒 fd 而已,所以,epoll_wait 僅需要從內(nèi)核態(tài) copy 少量的 fd 到用戶態(tài)而已。那么,這個準備就緒 list 鏈表是怎么維護的呢?當我們執(zhí)行 epoll_ctl 時,除了把 fd 放到 epoll 文件系統(tǒng)里 file 對象對應(yīng)的紅黑樹上之外,還會給內(nèi)核中斷處理程序注冊一個回調(diào)函數(shù),告訴內(nèi)核,如果這個 fd 的中斷到了,就把它放到準備就緒 list 鏈表里。所以,當一個 fd (例如 socket)上有數(shù)據(jù)到了,內(nèi)核在把設(shè)備(例如網(wǎng)卡)上的數(shù)據(jù) copy 到內(nèi)核中后就來把 fd(socket)插入到準備就緒 list 鏈表里了。
          瀏覽 72
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产精品一二三级 | 91精品国产欧美一区二区百度云 | 操批免费视频 | 狠狠撸狠狠干 | 亚洲无码免费视频一区二区三区四虎 |