<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          從linux源碼看epoll

          共 14064字,需瀏覽 29分鐘

           ·

          2021-07-17 05:07

          從linux源碼看epoll

          前言

          在linux的高性能網絡編程中,繞不開的就是epoll。和select、poll等系統(tǒng)調用相比,epoll在需要監(jiān)視大量文件描述符并且其中只有少數(shù)活躍的時候,表現(xiàn)出無可比擬的優(yōu)勢。epoll能讓內核記住所關注的描述符,并在對應的描述符事件就緒的時候,在epoll的就緒鏈表中添加這些就緒元素,并喚醒對應的epoll等待進程。
          本文就是筆者在探究epoll源碼過程中,對kernel將就緒描述符添加到epoll并喚醒對應進程的一次源碼分析(基于linux-2.6.32內核版本)。由于篇幅所限,筆者聚焦于tcp協(xié)議下socket可讀事件的源碼分析。

          簡單的epoll例子

          下面的例子,是從筆者本人用c語言寫的dbproxy中的一段代碼。由于細節(jié)過多,所以做了一些刪減。

          int init_reactor(int listen_fd,int worker_count){
          ......
          // 創(chuàng)建多個epoll fd,以充分利用多核
          for(i=0;i<worker_count;i++){
          reactor->worker_fd = epoll_create(EPOLL_MAX_EVENTS);
          }
          /* epoll add listen_fd and accept */
          // 將accept后的事件加入到對應的epoll fd中
          int client_fd = accept(listen_fd,(struct sockaddr *)&client_addr,&client_len)));
          // 將連接描述符注冊到對應的worker里面
          epoll_ctl(reactor->client_fd,EPOLL_CTL_ADD,epifd,&event);
          }
          // reactor的worker線程
          static void* rw_thread_func(void* arg){
          ......

          for(;;){
          // epoll_wait等待事件觸發(fā)
          int retval = epoll_wait(epfd,events,EPOLL_MAX_EVENTS,500);
          if(retval > 0){
          for(j=0; j < retval; j++){
          // 處理讀事件
          if(event & EPOLLIN){
          handle_ready_read_connection(conn);
          continue;
          }
          /* 處理其它事件 */
          }
          }
          }
          ......
          }

          上述代碼事實上就是實現(xiàn)了一個reactor模式中的accept與read/write處理線程,如下圖所示:

          epoll_create

          Unix的萬物皆文件的思想在epoll里面也有體現(xiàn),epoll_create調用返回一個文件描述符,此描述符掛載在anon_inode_fs(匿名inode文件系統(tǒng))的根目錄下面。讓我們看下具體的epoll_create系統(tǒng)調用源碼:

          SYSCALL_DEFINE1(epoll_create, int, size)
          {
          if (size <= 0)
          return -EINVAL;

          return sys_epoll_create1(0);
          }

          由上述源碼可見,epoll_create的參數(shù)是基本沒有意義的,kernel簡單的判斷是否為0,然后就直接就調用了sys_epoll_create1。由于linux的系統(tǒng)調用是通過(SYSCALL_DEFINE1,SYSCALL_DEFINE2……SYSCALL_DEFINE6)定義的,那么sys_epoll_create1對應的源碼即是SYSCALL_DEFINE(epoll_create1)。
          (注:受限于寄存器數(shù)量的限制,(80x86下的)kernel限制系統(tǒng)調用最多有6個參數(shù)。據ulk3所述,這是由于32位80x86寄存器的限制)
          接下來,我們就看下epoll_create1的源碼:

          SYSCALL_DEFINE1(epoll_create1, int, flags)
          {
          // kzalloc(sizeof(*ep), GFP_KERNEL),用的是內核空間
          error = ep_alloc(&ep);
          // 獲取尚未被使用的文件描述符,即描述符數(shù)組的槽位
          fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
          // 在匿名inode文件系統(tǒng)中分配一個inode,并得到其file結構體
          // 且file->f_op = &eventpoll_fops
          // 且file->private_data = ep;
          file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
          O_RDWR | (flags & O_CLOEXEC));
          // 將file填入到對應的文件描述符數(shù)組的槽里面
          fd_install(fd,file);
          ep->file = file;
          return fd;
          }

          最后epoll_create生成的文件描述符如下圖所示:

          struct eventpoll

          所有的epoll系統(tǒng)調用都是圍繞eventpoll結構體做操作,現(xiàn)簡要描述下其中的成員:

          /*
          * 此結構體存儲在file->private_data中
          */
          struct eventpoll {
          // 自旋鎖,在kernel內部用自旋鎖加鎖,就可以同時多線(進)程對此結構體進行操作
          // 主要是保護ready_list
          spinlock_t lock;
          // 這個互斥鎖是為了保證在eventloop使用對應的文件描述符的時候,文件描述符不會被移除掉
          struct mutex mtx;
          // epoll_wait使用的等待隊列,和進程喚醒有關
          wait_queue_head_t wq;
          // file->poll使用的等待隊列,和進程喚醒有關
          wait_queue_head_t poll_wait;
          // 就緒的描述符隊列
          struct list_head rdllist;
          // 通過紅黑樹來組織當前epoll關注的文件描述符
          struct rb_root rbr;
          // 在向用戶空間傳輸就緒事件的時候,將同時發(fā)生事件的文件描述符鏈入到這個鏈表里面
          struct epitem *ovflist;
          // 對應的user
          struct user_struct *user;
          // 對應的文件描述符
          struct file *file;
          // 下面兩個是用于環(huán)路檢測的優(yōu)化
          int visited;
          struct list_head visited_list_link;
          };

          本文講述的是kernel是如何將就緒事件傳遞給epoll并喚醒對應進程上,因此在這里主要聚焦于(wait_queue_head_t wq)等成員。

          epoll_ctl(add)

          我們看下epoll_ctl(EPOLL_CTL_ADD)是如何將對應的文件描述符插入到eventpoll中的。
          借助于spin_lock(自旋鎖)和mutex(互斥鎖),epoll_ctl調用可以在多個KSE(內核調度實體,即進程/線程)中并發(fā)執(zhí)行。

          SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
          struct epoll_event __user *, event)
          {
          /* 校驗epfd是否是epoll的描述符 */
          // 此處的互斥鎖是為了防止并發(fā)調用epoll_ctl,即保護內部數(shù)據結構
          // 不會被并發(fā)的添加修改刪除破壞
          mutex_lock_nested(&ep->mtx, 0);
          switch (op) {
          case EPOLL_CTL_ADD:
          ...
          // 插入到紅黑樹中
          error = ep_insert(ep, &epds, tfile, fd);
          ...
          break;
          ......
          }
          mutex_unlock(&ep->mtx);
          }

          上述過程如下圖所示:

          ep_insert

          在ep_insert中初始化了epitem,然后初始化了本文關注的焦點,即事件就緒時候的回調函數(shù),代碼如下所示:

          static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
          struct file *tfile, int fd)
          {
          /* 初始化epitem */
          // &epq.pt->qproc = ep_ptable_queue_proc
          init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
          // 在這里將回調函數(shù)注入
          revents = tfile->f_op->poll(tfile, &epq.pt);
          // 如果當前有事件已經就緒,那么一開始就會被加入到ready list
          // 例如可寫事件
          // 另外,在tcp內部ack之后調用tcp_check_space,最終調用sock_def_write_space來喚醒對應的epoll_wait下的進程
          if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
          list_add_tail(&epi->rdllink, &ep->rdllist);
          // wake_up ep對應在epoll_wait下的進程
          if (waitqueue_active(&ep->wq)){
          wake_up_locked(&ep->wq);
          }
          ......
          }
          // 將epitem插入紅黑樹
          ep_rbtree_insert(ep, epi);
          ......
          }

          tfile->f_op->poll的實現(xiàn)

          向kernel更底層注冊回調函數(shù)的是tfile->f_op->poll(tfile, &epq.pt)這一句,我們來看一下對于對應的socket文件描述符,其fd=>file->f_op->poll的初始化過程:

              // 將accept后的事件加入到對應的epoll fd中
          int client_fd = accept(listen_fd,(struct sockaddr *)&client_addr,&client_len)));
          // 將連接描述符注冊到對應的worker里面
          epoll_ctl(reactor->client_fd,EPOLL_CTL_ADD,epifd,&event);

          回顧一下上述user space代碼,fd即client_fd是由tcp的listen_fd通過accept調用而來,那么我們看下accept調用鏈的關鍵路徑:

          accept
          |->accept4
          |->sock_attach_fd(newsock, newfile, flags & O_NONBLOCK);
          |->init_file(file,...,&socket_file_ops);
          |->file->f_op = fop;
          /* file->f_op = &socket_file_ops */
          |->fd_install(newfd, newfile); // 安裝fd

          那么,由accept獲得的client_fd的結構如下圖所示:

          (注:由于是tcp socket,所以這邊sock->ops=inet_stream_ops,這個初始化的過程在我的另一篇博客<<從linux源碼看socket的阻塞和非阻塞>>中,博客地址如下:
          https://my.oschina.net/alchemystar/blog/1791017)
          既然知道了tfile->f_op->poll的實現(xiàn),我們就可以看下此poll是如何將安裝回調函數(shù)的。

          回調函數(shù)的安裝

          kernel的調用路徑如下:

          sock_poll /*tfile->f_op->poll(tfile, &epq.pt)*/;
          |->sock->ops->poll
          |->tcp_poll
          /* 這邊重要的是拿到了sk_sleep用于KSE(進程/線程)的喚醒 */
          |->sock_poll_wait(file, sk->sk_sleep, wait);
          |->poll_wait
          |->p->qproc(filp, wait_address, p);
          /* p為&epq.pt,而且&epq.pt->qproc= ep_ptable_queue_proc*/
          |-> ep_ptable_queue_proc(filp,wait_address,p);

          繞了一大圈之后,我們的回調函數(shù)的安裝其實就是調用了eventpoll.c中的ep_ptable_queue_proc,而且向其中傳遞了sk->sk_sleep作為其waitqueue的head,其源碼如下所示:

          static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
          poll_table *pt)
          {
          // 取出當前client_fd對應的epitem
          struct epitem *epi = ep_item_from_epqueue(pt);
          // &pwq->wait->func=ep_poll_callback,用于回調喚醒
          // 注意,這邊不是init_waitqueue_entry,即沒有將當前KSE(current,當前進程/線程)寫入到
          // wait_queue當中,因為不一定是從當前安裝的KSE喚醒,而應該是喚醒epoll\_wait的KSE
          init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
          // 這邊的whead是sk->sk_sleep,將當前的waitqueue鏈入到socket對應的sleep列表
          add_wait_queue(whead, &pwq->wait);
          }

          這樣client_fd的結構進一步完善,如下圖所示:

          ep_poll_callback函數(shù)是喚醒對應epoll_wait的地方,我們將在后面一起講述。

          epoll_wait

          epoll_wait主要是調用了ep_poll:

          SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
          int, maxevents, int, timeout)
          {
          /* 檢查epfd是否是epoll\_create創(chuàng)建的fd */
          // 調用ep_poll
          error = ep_poll(ep, events, maxevents, timeout);
          ...
          }

          緊接著,我們看下ep_poll函數(shù):

          static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
          int maxevents, long timeout)
          {
          ......
          retry:
          // 獲取spinlock
          spin_lock_irqsave(&ep->lock, flags);
          // 將當前task_struct寫入到waitqueue中以便喚醒
          // wq_entry->func = default_wake_function;
          init_waitqueue_entry(&wait, current);
          // WQ_FLAG_EXCLUSIVE,排他性喚醒,配合SO_REUSEPORT從而解決accept驚群問題
          wait.flags |= WQ_FLAG_EXCLUSIVE;
          // 鏈入到ep的waitqueue中
          __add_wait_queue(&ep->wq, &wait);
          for (;;) {
          // 設置當前進程狀態(tài)為可打斷
          set_current_state(TASK_INTERRUPTIBLE);
          // 檢查當前線程是否有信號要處理,有則返回-EINTR
          if (signal_pending(current)) {
          res = -EINTR;
          break;
          }
          spin_unlock_irqrestore(&ep->lock, flags);
          // schedule調度,讓出CPU
          jtimeout = schedule_timeout(jtimeout);
          spin_lock_irqsave(&ep->lock, flags);
          }
          // 到這里,表明超時或者有事件觸發(fā)等動作導致進程重新調度
          __remove_wait_queue(&ep->wq, &wait);
          // 設置進程狀態(tài)為running
          set_current_state(TASK_RUNNING);
          ......
          // 檢查是否有可用事件
          eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
          ......
          // 向用戶空間拷貝就緒事件
          ep_send_events(ep, events, maxevents)
          }

          上述邏輯如下圖所示:

          ep_send_events

          ep_send_events函數(shù)主要就是調用了ep_scan_ready_list,顧名思義ep_scan_ready_list就是掃描就緒列表:

          static int ep_scan_ready_list(struct eventpoll *ep,
          int (*sproc)(struct eventpoll *,
          struct list_head *, void *),
          void *priv,
          int depth)
          {
          ...
          // 將epfd的rdllist鏈入到txlist
          list_splice_init(&ep->rdllist, &txlist);
          ...
          /* sproc = ep_send_events_proc */
          error = (*sproc)(ep, &txlist, priv);
          ...
          // 處理ovflist,即在上面sproc過程中又到來的事件
          ...
          }

          其主要調用了ep_send_events_proc:

          static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
          void *priv)
          {
          for (eventcnt = 0, uevent = esed->events;
          !list_empty(head) && eventcnt < esed->maxevents;) {
          // 遍歷ready list
          epi = list_first_entry(head, struct epitem, rdllink);
          list_del_init(&epi->rdllink);
          // readylist只是表明當前epi有事件,具體的事件信息還是得調用對應file的poll
          // 這邊的poll即是tcp_poll,根據tcp本身的信息設置掩碼(mask)等信息 & 上興趣事件掩碼,則可以得知當前事件是否是epoll_wait感興趣的事件
          revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
          epi->event.events;
          if(revents){
          /* 將event放入到用戶空間 */
          /* 處理ONESHOT邏輯 */
          // 如果不是邊緣觸發(fā),則將當前的epi重新加回到可用列表中,這樣就可以下一次繼續(xù)觸發(fā)poll,如果下一次poll的revents不為0,那么用戶空間依舊能感知 */
          else if (!(epi->event.events & EPOLLET)){
          list_add_tail(&epi->rdllink, &ep->rdllist);
          }
          /* 如果是邊緣觸發(fā),那么就不加回可用列表,因此只能等到下一個可用事件觸發(fā)的時候才會將對應的epi放到可用列表里面*/
          eventcnt++
          }
          /* 如poll出來的revents事件epoll_wait不感興趣(或者本來就沒有事件),那么也不會加回到可用列表 */
          ......
          }
          return eventcnt;
          }

          上述代碼邏輯如下所示:

          事件到來添加到epoll就緒隊列(rdllist)的過程

          經過上述章節(jié)的詳述之后,我們終于可以闡述,tcp在數(shù)據到來時是怎么加入到epoll的就緒隊列的了。

          可讀事件到來

          首先我們看下tcp數(shù)據包從網卡驅動到kernel內部tcp協(xié)議處理調用鏈:

          step1:

          網絡分組到來的內核路徑,網卡發(fā)起中斷后調用netif_rx將事件掛入CPU的等待隊列,并喚起軟中斷(soft_irq),再通過linux的軟中斷機制調用net_rx_action,如下圖所示:

          注:上圖來自PLKA(<<深入Linux內核架構>>)

          step2:

          緊接著跟蹤next_rx_action

          next_rx_action
          |-process_backlog
          ......
          |->packet_type->func 在這里我們考慮ip_rcv
          |->ipprot->handler 在這里ipprot重載為tcp_protocol
          (handler 即為tcp_v4_rcv)

          我們再看下對應的tcp_v4_rcv

          tcp_v4_rcv
          |->tcp_v4_do_rcv
          |->tcp_rcv_state_process
          |->tcp_data_queue
          |-> sk->sk_data_ready(sock_def_readable)
          |->wake_up_interruptible_sync_poll(sk->sleep,...)
          |->__wake_up
          |->__wake_up_common
          |->curr->func
          /* 這里已經被ep_insert添加為ep_poll_callback,而且設定了排它標識WQ_FLAG_EXCLUSIVE*/
          |->ep_poll_callback

          這樣,我們就看下最終喚醒epoll_wait的ep_poll_callback函數(shù):

          static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
          {
          // 獲取wait對應的epitem
          struct epitem *epi = ep_item_from_wait(wait);
          // epitem對應的eventpoll結構體
          struct eventpoll *ep = epi->ep;
          // 獲取自旋鎖,保護ready_list等結構
          spin_lock_irqsave(&ep->lock, flags);
          // 如果當前epi沒有被鏈入ep的ready list,則鏈入
          // 這樣,就把當前的可用事件加入到epoll的可用列表了
          if (!ep_is_linked(&epi->rdllink))
          list_add_tail(&epi->rdllink, &ep->rdllist);
          // 如果有epoll_wait在等待的話,則喚醒這個epoll_wait進程
          // 對應的&ep->wq是在epoll_wait調用的時候通過init_waitqueue_entry(&wait, current)而生成的
          // 其中的current即是對應調用epoll_wait的進程信息task_struct
          if (waitqueue_active(&ep->wq))
          wake_up_locked(&ep->wq);
          }

          上述過程如下圖所示:

          最后wake_up_locked調用__wake_up_common,然后調用了在init_waitqueue_entry注冊的default_wake_function,調用路徑為:

          wake_up_locked
          |->__wake_up_common
          |->default_wake_function
          |->try_wake_up (wake up a thread)
          |->activate_task
          |->enqueue_task running

          將epoll_wait進程推入可運行隊列,等待內核重新調度進程,然后epoll_wait對應的這個進程重新運行后,就從schedule恢復,繼續(xù)下面的ep_send_events(向用戶空間拷貝事件并返回)。
          wake_up過程如下圖所示:

          可寫事件到來

          可寫事件的運行過程和可讀事件大同小異:
          首先,在epoll_ctl_add的時候預先會調用一次對應文件描述符的poll,如果返回事件里有可寫掩碼的時候直接調用wake_up_locked以喚醒對應的epoll_wait進程。
          然后,在tcp在底層驅動有數(shù)據到來的時候可能攜帶了ack從而可以釋放部分已經被對端接收的數(shù)據,于是觸發(fā)可寫事件,這一部分的調用鏈為:

          tcp_input.c
          tcp_v4_rcv
          |-tcp_v4_do_rcv
          |-tcp_rcv_state_process
          |-tcp_data_snd_check
          |->tcp_check_space
          |->tcp_new_space
          |->sk->sk_write_space
          /* tcp下即是sk_stream_write_space*/

          最后在此函數(shù)里面sk_stream_write_space喚醒對應的epoll_wait進程

          void sk_stream_write_space(struct sock *sk)
          {
          // 即有1/3可寫空間的時候才觸發(fā)可寫事件
          if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk) && sock) {
          clear_bit(SOCK_NOSPACE, &sock->flags);

          if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
          wake_up_interruptible_poll(sk->sk_sleep, POLLOUT |
          POLLWRNORM | POLLWRBAND)
          ......
          }
          }

          關閉描述符(close fd)

          值得注意的是,我們在close對應的文件描述符的時候,會自動調用eventpoll_release將對應的file從其關聯(lián)的epoll_fd中刪除,kernel關鍵路徑如下:

          close fd
          |->filp_close
          |->fput
          |->__fput
          |->eventpoll_release
          |->ep_remove

          所以我們在關閉對應的文件描述符后,并不需要通過epoll_ctl_del來刪掉對應epoll中相應的描述符。

          學習Linux kernel源碼組好的莫過于ULK3 《深入理解Linux內核》

          總結

          epoll作為linux下非常優(yōu)秀的事件觸發(fā)機制得到了廣泛的運用。其源碼還是比較復雜的,本文只是闡述了epoll讀寫事件的觸發(fā)機制,探究linux kernel源碼的過程非常快樂^_^

          瀏覽 79
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲成人网站第一网站 | 性无码一区二区三区无码免费 | 亚洲无码蜜桃视频 | 国产熟女在线视频 | 国产真实乱人偷精品视频 |