<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          深度解析單線程的 Redis 如何做到每秒數(shù)萬 QPS 的超高處理能力!

          共 13981字,需瀏覽 28分鐘

           ·

          2022-05-26 01:30

          今天開篇先給大家講個飛哥自己的小故事。我在學(xué)校和剛畢業(yè)頭一年主要從事的客戶端開發(fā),那時候?qū)Ψ?wù)器端編程還不擅長。

          有一次去面試服務(wù)器端崗位,面試官問我有一個連接過來,你該怎么編程處理它。我答道:“主線程收到請求后,創(chuàng)建一個子線程處理?!?面試官接著問,那如果有一千個連接同時來呢?我說“那就多創(chuàng)建一點線程,搞個線程池”。面試官繼續(xù)追問如果一萬個呢?我答道:“......不會...”。

          事實上,服務(wù)器端只需要單線程可以達到非常高的處理能力,Redis 就是一個非常好的例子。僅僅靠單線程就可以支撐起每秒數(shù)萬 QPS 的高處理能力。今天我們就來帶大家看看 Redis 核心網(wǎng)絡(luò)模塊的內(nèi)部實現(xiàn),學(xué)習(xí)下 Redis 是如何做到如此的高性能的!

          一、理解多路復(fù)用原理

          在開始介紹 Redis 之前,我想有必要先來簡單介紹下 epoll。

          在傳統(tǒng)的同步阻塞網(wǎng)絡(luò)編程模型里(沒有協(xié)程以前),性能上不來的根本原因在于進程線程都是笨重的家伙。讓一個進(線)程只處理一個用戶請求確確實實是有點浪費了。

          先拋開高內(nèi)存開銷不說,在海量的網(wǎng)絡(luò)請求到來的時候,光是頻繁的進程線程上下文就讓 CPU 疲于奔命了。

          如果把進程比作牧羊人,一個進(線)程同時只能處理一個用戶請求,相當于一個人只能看一只羊,放完這一只才能放下一只。如果同時來了 1000 只羊,那就得 1000 個人去放,這人力成本是非常高的。

          性能提升思路很簡單,就是讓很多的用戶連接來復(fù)用同一個進(線)程,這就是多路復(fù)用。多路指的是許許多多個用戶的網(wǎng)絡(luò)連接。復(fù)用指的是對進(線)程的復(fù)用。換到牧羊人的例子里,就是一群羊只要一個牧羊人來處理就行了。

          不過復(fù)用實現(xiàn)起來是需要特殊的 socket 事件管理機制的,最典型和高效的方案就是 epoll。放到牧羊人的例子來,epoll 就相當于一只牧羊犬。

          在 epoll 的系列函數(shù)里, epoll_create 用于創(chuàng)建一個 epoll 對象,epoll_ctl 用來給 epoll 對象添加或者刪除一個 socket。epoll_wait 就是查看它當前管理的這些 socket 上有沒有可讀可寫事件發(fā)生。

          當網(wǎng)卡上收到數(shù)據(jù)包后,Linux 內(nèi)核進行一系列的處理后把數(shù)據(jù)放到 socket 的接收隊列。然后會檢查是否有 epoll 在管理它,如果是則在 epoll 的就緒隊列中插入一個元素。epoll_wait 的操作就非常的簡單了,就是到 epoll 的就緒隊列上來查詢有沒有事件發(fā)生就行了。關(guān)于 epoll 這只“牧羊犬”的工作原理參見深入揭秘 epoll 是如何實現(xiàn) IO 多路復(fù)用的 (Javaer 習(xí)慣把基于 epoll 的網(wǎng)絡(luò)開發(fā)模型叫做 NIO)

          在基于 epoll 的編程中,和傳統(tǒng)的函數(shù)調(diào)用思路不同的是,我們并不能主動調(diào)用某個 API 來處理。因為無法知道我們想要處理的事件啥時候發(fā)生。所以只好提前把想要處理的事件的處理函數(shù)注冊到一個事件分發(fā)器上去。當事件發(fā)生的時候,由這個事件分發(fā)器調(diào)用回調(diào)函數(shù)進行處理。這類基于實現(xiàn)注冊事件分發(fā)器的開發(fā)模式也叫 Reactor 模型。

          二、Redis 服務(wù)啟動初始化

          理解了 epoll 原理后,我們再來實際看 Redis 具體是如何使用 epoll 的。直接在 Github 上就可以非常方便地獲取 Redis 的源碼。我們切到 5.0.0 版本來看單線程版本的實現(xiàn)(多線程我們改天再講)。

          #?git?clone?https://github.com/redis/redis
          #?cd?redis
          #?git?checkout?-b?5.0.0?5.0.0

          其中整個 Redis 服務(wù)的代碼總?cè)肟谠?src/server.c 文件中,我把入口函數(shù)的核心部分摘了出來,如下。

          //file:?src/server.c
          int?main(int?argc,?char?**argv)?{
          ????......
          ????//?啟動初始化
          ????initServer();
          ????//?運行事件處理循環(huán),一直到服務(wù)器關(guān)閉為止
          ????aeMain(server.el);
          }

          其實整個 Redis 的工作過程,就只需要理解清楚 main 函數(shù)中調(diào)用的 initServer 和 aeMain 這兩個函數(shù)就足夠了。

          本節(jié)中我們重點介紹 initServer,在下一節(jié)介紹事件處理循環(huán) aeMain。在 initServer 這個函數(shù)內(nèi),Redis 做了這么三件重要的事情。

          • 創(chuàng)建一個 epoll 對象
          • 對配置的監(jiān)聽端口進行 listen
          • 把 listen socket 讓 epoll 給管理起來
          //file:?src/server.c
          void?initServer()?{
          ????//?2.1.1?創(chuàng)建?epoll
          ????server.el?=?aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

          ????//?2.1.2?綁定監(jiān)聽服務(wù)端口
          ????listenToPort(server.port,server.ipfd,&server.ipfd_count);

          ????//?2.1.3?注冊?accept?事件處理器
          ????for?(j?=?0;?j?????????aeCreateFileEvent(server.el,?server.ipfd[j],?AE_READABLE,
          ????????????acceptTcpHandler,NULL);
          ????}
          ????...
          }

          接下來我們分別來看。

          2.1 創(chuàng)建 epoll 對象

          本小節(jié)的邏輯看起來貌似不短,但其實只是創(chuàng)建了一個 epoll 對象出來而已。

          創(chuàng)建 epoll 對象的邏輯在 aeCreateEventLoop 中,在創(chuàng)建完后,Redis 將其保存在 redisServer 的 aeEventLoop 成員中,以備后續(xù)使用。

          struct?redisServer?{
          ????...
          ????aeEventLoop?*el;
          }

          我們來看 aeCreateEventLoop 詳細邏輯。Redis 在操作系統(tǒng)提供的 epoll 對象基礎(chǔ)上又封裝了一個 eventLoop 出來,所以創(chuàng)建的時候是先申請和創(chuàng)建 eventLoop。

          //file:src/ae.c
          aeEventLoop?*aeCreateEventLoop(int?setsize)?{
          ????aeEventLoop?*eventLoop;
          ????eventLoop?=?zmalloc(sizeof(*eventLoop);

          ????//將來的各種回調(diào)事件就都會存在這里
          ????eventLoop->events?=?zmalloc(sizeof(aeFileEvent)*setsize);
          ????......

          ????aeApiCreate(eventLoop);
          ????return?eventLoop;
          }

          在 eventLoop 里,我們稍微注意一下 eventLoop->events,將來在各種事件注冊的時候都會保存到這個數(shù)組里。

          //file:src/ae.h
          typedef?struct?aeEventLoop?{
          ????......
          ????aeFileEvent?*events;?/*?Registered?events?*/
          }

          具體創(chuàng)建 epoll 的過程在 ae_epoll.c 文件下的 aeApiCreate 中。在這里,真正調(diào)用了 epoll_create

          //file:src/ae_epoll.c
          static?int?aeApiCreate(aeEventLoop?*eventLoop)?{
          ????aeApiState?*state?=?zmalloc(sizeof(aeApiState));
          ????state->epfd?=?epoll_create(1024);?
          ????eventLoop->apidata?=?state;
          ????return?0;
          }

          2.2 綁定監(jiān)聽服務(wù)端口

          我們再來看 Redis 中的 listen 過程,它在 listenToPort 函數(shù)中。雖然調(diào)用鏈條很長,但其實主要就是執(zhí)行了個簡單 listen 而已。

          //file:?src/redis.c
          int?listenToPort(int?port,?int?*fds,?int?*count)?{
          ????for?(j?=?0;?j?0;?j++)?{
          ????????fds[*count]?=?anetTcpServer(server.neterr,port,NULL,
          ????????????????server.tcp_backlog);
          ????}
          }

          Redis 是支持開啟多個端口的,所以在 listenToPort 中我們看到是啟用一個循環(huán)來調(diào)用 anetTcpServer。在 anetTcpServer 中,逐步會展開調(diào)用,直到執(zhí)行到 bind 和 listen 系統(tǒng)調(diào)用。

          //file:src/anet.c
          int?anetTcpServer(char?*err,?int?port,?char?*bindaddr,?int?backlog)
          {
          ????return?_anetTcpServer(err,?port,?bindaddr,?AF_INET,?backlog);
          }
          static?int?_anetTcpServer(......)
          {
          ????//?設(shè)置端口重用
          ????anetSetReuseAddr(err,s)
          ????//?監(jiān)聽
          ????anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog)
          }
          static?int?anetListen(......)?{
          ????bind(s,sa,len);
          ????listen(s,?backlog);
          ????......
          }

          2.3 注冊事件回調(diào)函數(shù)

          我們回頭再看一下 initServer,它調(diào)用 aeCreateEventLoop 創(chuàng)建了 epoll,調(diào)用 listenToPort 進行了服務(wù)端口的 bind 和 listen。接著就開始調(diào)用 aeCreateFileEvent 來注冊一個 accept 事件處理器。

          //file:?src/server.c
          void?initServer()?{
          ????//?2.1.1?創(chuàng)建?epoll
          ????server.el?=?aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

          ????//?2.1.2?監(jiān)聽服務(wù)端口
          ????listenToPort(server.port,server.ipfd,&server.ipfd_count);

          ????//?2.1.3?注冊?accept?事件處理器
          ????for?(j?=?0;?j?????????aeCreateFileEvent(server.el,?server.ipfd[j],?AE_READABLE,
          ????????????acceptTcpHandler,NULL);
          ????}
          ????...
          }

          我們來注意看調(diào)用 aeCreateFileEvent 時傳的重要參數(shù)是 acceptTcpHandler,它表示將來在 listen socket 上有新用戶連接到達的時候,該函數(shù)將被調(diào)用執(zhí)行。我們來看 aeCreateFileEvent 具體代碼。

          //file:?src/ae.c
          int?aeCreateFileEvent(aeEventLoop?*eventLoop,?int?fd,?int?mask,
          ????????aeFileProc?*proc,?void?*clientData)

          {
          ????//?取出一個文件事件結(jié)構(gòu)
          ????aeFileEvent?*fe?=?&eventLoop->events[fd];

          ????//?監(jiān)聽指定?fd?的指定事件
          ????aeApiAddEvent(eventLoop,?fd,?mask);

          ????//?設(shè)置文件事件類型,以及事件的處理器
          ????fe->mask?|=?mask;
          ????if?(mask?&?AE_READABLE)?fe->rfileProc?=?proc;
          ????if?(mask?&?AE_WRITABLE)?fe->wfileProc?=?proc;

          ????//?私有數(shù)據(jù)
          ????fe->clientData?=?clientData;
          }

          函數(shù) aeCreateFileEvent 一開始,從 eventLoop->events 獲取了一個 aeFileEvent 對象。在 2.1 中我們介紹過 eventLoop->events 數(shù)組,注冊的各種事件處理器會保存在這個地方。

          接下來調(diào)用 aeApiAddEvent。這個函數(shù)其實就是對 epoll_ctl 的一個封裝。主要就是實際執(zhí)行 epoll_ctl EPOLL_CTL_ADD。

          //file:src/ae_epoll.c
          static?int?aeApiAddEvent(aeEventLoop?*eventLoop,?int?fd,?int?mask)?{
          ????//?add?or?mod
          ????int?op?=?eventLoop->events[fd].mask?==?AE_NONE??
          ????????????EPOLL_CTL_ADD?:?EPOLL_CTL_MOD;
          ????......

          ????//?epoll_ctl?添加事件
          ????epoll_ctl(state->epfd,op,fd,&ee);
          ????return?0;
          }

          每一個 eventLoop->events 元素都指向一個 aeFileEvent 對象。在這個對象上,設(shè)置了三個關(guān)鍵東西

          • rfileProc:讀事件回調(diào)
          • wfileProc:寫事件回調(diào)
          • clientData:一些額外的擴展數(shù)據(jù)

          將來 當 epoll_wait 發(fā)現(xiàn)某個 fd 上有事件發(fā)生的時候,這樣 redis 首先根據(jù) fd 到 eventLoop->events 中查找 aeFileEvent 對象,然后再看 rfileProc、wfileProc 就可以找到讀、寫回調(diào)處理函數(shù)。

          回頭看 initServer 調(diào)用 aeCreateFileEvent 時傳參來看。

          //file:?src/server.c
          void?initServer()?{
          ????......
          ????//?2.1.3?注冊?accept?事件處理器
          ????for?(j?=?0;?j?????????aeCreateFileEvent(server.el,?server.ipfd[j],?AE_READABLE,
          ????????????acceptTcpHandler,NULL);
          ????}
          }

          listen fd 對應(yīng)的讀回調(diào)函數(shù) rfileProc 事實上就被設(shè)置成了 acceptTcpHandler,寫回調(diào)沒有設(shè)置,私有數(shù)據(jù) client_data 也為 null。

          三、Redis 事件處理循環(huán)

          在上一節(jié)介紹完了 Redis 的啟動初始化過程,創(chuàng)建了 epoll,也進行了綁定監(jiān)聽,也注冊了 accept 事件處理函數(shù)為 acceptTcpHandler。

          //file:?src/server.c
          int?main(int?argc,?char?**argv)?{
          ????......
          ????//?啟動初始化
          ????initServer();
          ????//?運行事件處理循環(huán),一直到服務(wù)器關(guān)閉為止
          ????aeMain(server.el);
          }

          接下來,Redis 就會進入 aeMain 開始進行真正的用戶請求處理了。在 aeMain 函數(shù)中,是一個無休止的循環(huán)。在每一次的循環(huán)中,要做如下幾件事情。

          • 通過 epoll_wait 發(fā)現(xiàn) listen socket 以及其它連接上的可讀、可寫事件
          • 若發(fā)現(xiàn) listen socket 上有新連接到達,則接收新連接,并追加到 epoll 中進行管理
          • 若發(fā)現(xiàn)其它 socket 上有命令請求到達,則讀取和處理命令,把命令結(jié)果寫到緩存中,加入寫任務(wù)隊列
          • 每一次進入 epoll_wait 前都調(diào)用 beforesleep 來將寫任務(wù)隊列中的數(shù)據(jù)實際進行發(fā)送
          • 如若有首次未發(fā)送完畢的,當寫事件發(fā)生時繼續(xù)發(fā)送
          //file:src/ae.c
          void?aeMain(aeEventLoop?*eventLoop)?{

          ????eventLoop->stop?=?0;
          ????while?(!eventLoop->stop)?{

          ????????//?如果有需要在事件處理前執(zhí)行的函數(shù),那么運行它
          ????????//?3.4?beforesleep?處理寫任務(wù)隊列并實際發(fā)送之
          ????????if?(eventLoop->beforesleep?!=?NULL)
          ????????????eventLoop->beforesleep(eventLoop);

          ????????//?開始等待事件并處理
          ????????//?3.1?epoll_wait?發(fā)現(xiàn)事件
          ????????//?3.2?處理新連接請求
          ????????//?3.3?處理客戶連接上的可讀事件
          ????????aeProcessEvents(eventLoop,?AE_ALL_EVENTS);
          ????}
          }

          以上就是 aeMain 函數(shù)的核心邏輯所在,接下來我們分別對如上提到的四件事情進行詳細的闡述。

          3.1 ?epoll_wait 發(fā)現(xiàn)事件

          Redis 不管有多少個用戶連接,都是通過 epoll_wait 來統(tǒng)一發(fā)現(xiàn)和管理其上的可讀(包括 liisten socket 上的 accept事件)、可寫事件的。甚至連 timer,也都是交給 epoll_wait 來統(tǒng)一管理的。

          每當 epoll_wait 發(fā)現(xiàn)特定的事件發(fā)生的時候,就會調(diào)用相應(yīng)的事先注冊好的事件處理函數(shù)進行處理。我們來詳細看 aeProcessEvents 對 epoll_wait 的封裝。

          //file:src/ae.c
          int?aeProcessEvents(aeEventLoop?*eventLoop,?int?flags)
          {
          ????//?獲取最近的時間事件
          ????tvp?=?xxx

          ????//?處理文件事件,阻塞時間由?tvp?決定
          ????numevents?=?aeApiPoll(eventLoop,?tvp);
          ????for?(j?=?0;?j?????????//?從已就緒數(shù)組中獲取事件
          ????????aeFileEvent?*fe?=?&eventLoop->events[eventLoop->fired[j].fd];

          ????????//如果是讀事件,并且有讀回調(diào)函數(shù)
          ????????fe->rfileProc()

          ????????//如果是寫事件,并且有寫回調(diào)函數(shù)
          ????????fe->wfileProc()
          ????}
          }

          //file:?src/ae_epoll.c
          static?int?aeApiPoll(aeEventLoop?*eventLoop,?struct?timeval?*tvp)?{
          ????//?等待事件
          ????aeApiState?*state?=?eventLoop->apidata;
          ????epoll_wait(state->epfd,state->events,eventLoop->setsize,
          ????????????tvp???(tvp->tv_sec*1000?+?tvp->tv_usec/1000)?:?-1);
          ????...
          }

          aeProcessEvents 就是調(diào)用 epoll_wait 來發(fā)現(xiàn)事件。當發(fā)現(xiàn)有某個 fd 上事件發(fā)生以后,則調(diào)為其事先注冊的事件處理器函數(shù) rfileProc 和 wfileProc。

          3.2 處理新連接請求

          我們假設(shè)現(xiàn)在有新用戶連接到達了。前面在我們看到 listen socket 上的 rfileProc 注冊的是 acceptTcpHandler。也就是說,如果有連接到達的時候,會回調(diào)到 acceptTcpHandler。

          在 acceptTcpHandler 中,主要做了幾件事情

          • 調(diào)用 accept 系統(tǒng)調(diào)用把用戶連接給接收回來
          • 為這個新連接創(chuàng)建一個唯一 redisClient 對象
          • 將這個新連接添加到 epoll,并注冊一個讀事件處理函數(shù)

          接下來讓我們看上面這三件事情都分別是如何被處理的。

          //file:src/networking.c
          void?acceptTcpHandler(aeEventLoop?*el,?int?fd,?...)?{
          ????cfd?=?anetTcpAccept(server.neterr,?fd,?cip,?...);
          ????acceptCommonHandler(cfd,0);
          }

          在 anetTcpAccept 中執(zhí)行非常的簡單,就是調(diào)用 accept 把連接接收回來。

          //file:?src/anet.c
          int?anetTcpAccept(......)?{
          ????anetGenericAccept(err,s,(struct?sockaddr*)&sa,&salen)
          }
          static?int?anetGenericAccept(......)?{
          ????fd?=?accept(s,sa,len)
          }

          接下來在 acceptCommonHandler 為這個新的客戶端連接 socket,創(chuàng)建一個 redisClient 對象。

          //file:?src/networking.c
          static?void?acceptCommonHandler(int?fd,?int?flags)?{
          ????//?創(chuàng)建?redisClient?對象
          ????redisClient?*c;
          ????c?=?createClient(fd);
          ????......
          }

          在 createClient 中,創(chuàng)建 client 對象,并且為該用戶連接注冊了讀事件處理器。

          //file:src/networking.c
          redisClient?*createClient(int?fd)?{

          ????//?為用戶連接創(chuàng)建?client?對象
          ????redisClient?*c?=?zmalloc(sizeof(redisClient));

          ????if?(fd?!=?-1)?{
          ????????...

          ????????//?為用戶連接注冊讀事件處理器
          ????????aeCreateFileEvent(server.el,fd,AE_READABLE,
          ????????????readQueryFromClient,?c)
          ????}
          ????...
          }

          關(guān)于 aeCreateFileEvent 的處理過程這里就不贅述了,詳情參見 2.3 節(jié)。其效果就是將該用戶連接 socket fd 對應(yīng)的讀處理函數(shù)設(shè)置為 readQueryFromClient, 并且設(shè)置私有數(shù)據(jù)為 redisClient c。

          3.3 處理客戶連接上的可讀事件

          現(xiàn)在假設(shè)該用戶連接有命令到達了,就假設(shè)用戶發(fā)送了GET XXXXXX_KEY 命令。那么在 Redis 的時間循環(huán)中調(diào)用 epoll_wait 發(fā)現(xiàn)該連接上有讀時間后,會調(diào)用在上一節(jié)中討論的為其注冊的讀處理函數(shù) readQueryFromClient。

          在讀處理函數(shù) readQueryFromClient 中主要做了這么幾件事情。

          • 解析并查找命令
          • 調(diào)用命令處理
          • 添加寫任務(wù)到隊列
          • 將輸出寫到緩存等待發(fā)送

          我們來詳細地看 readQueryFromClient 的代碼。在 readQueryFromClient 中會調(diào)用 processInputBuffer,然后進入 processCommand 對命令進行處理。其調(diào)用鏈如下:

          //file:?src/networking.c
          void?readQueryFromClient(aeEventLoop?*el,?int?fd,?void?*privdata,?...)?{
          ????redisClient?*c?=?(redisClient*)?privdata;
          ????processInputBufferAndReplicate(c);
          }

          void?processInputBufferAndReplicate(client?*c)?{
          ????...
          ????processInputBuffer(c);
          }

          //?處理客戶端輸入的命令內(nèi)容
          void?processInputBuffer(redisClient?*c)?{
          ????//?執(zhí)行命令,
          ????processCommand(c);
          }

          我們再來詳細看 processCommand 。

          //file:
          int?processCommand(redisClient?*c)?{?
          ????//?查找命令,并進行命令合法性檢查,以及命令參數(shù)個數(shù)檢查
          ????c->cmd?=?c->lastcmd?=?lookupCommand(c->argv[0]->ptr);

          ????......

          ????//?處理命令
          ????//?如果是?MULTI?事務(wù),則入隊,否則調(diào)用?call?直接處理
          ????if?(c->flags?&?CLIENT_MULTI?&&?...)
          ????{
          ????????queueMultiCommand(c);
          ????}?else?{
          ????????call(c,CMD_CALL_FULL);
          ????????...
          ????}
          ????return?C_OK;
          }

          我們先忽略 queueMultiCommand,直接看核心命令處理方法 call。

          //file:src/server.c
          void?call(client?*c,?int?flags)?{
          ????//?查找處理命令,
          ????struct?redisCommand?*real_cmd?=?c->cmd;
          ????//?調(diào)用命令處理函數(shù)
          ????c->cmd->proc(c);
          ????......
          }

          在 server.c 中定義了每一個命令對應(yīng)的處理函數(shù)

          //file:src/server.c
          struct?redisCommand?redisCommandTable[]?=?{
          ????{"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
          ????{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
          ????{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
          ????{"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
          ????{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
          ????......

          ????{"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0},
          ????{"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
          ????{"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
          ????{"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
          ????......
          }

          對于 get 命令來說,其對應(yīng)的命令處理函數(shù)就是 getCommand。也就是說當處理 GET 命令執(zhí)行到 c->cmd->proc 的時候會進入到 getCommand 函數(shù)中來。

          //file:?src/t_string.c
          void?getCommand(client?*c)?{
          ????getGenericCommand(c);
          }
          int?getGenericCommand(client?*c)?{
          ????robj?*o;

          ????if?((o?=?lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))?==?NULL)
          ????????return?C_OK;
          ????...
          ????addReplyBulk(c,o);
          ????return?C_OK;
          }

          getGenericCommand 方法會調(diào)用 lookupKeyReadOrReply 來從內(nèi)存中查找對應(yīng)的 key值。如果找不到,則直接返回 C_OK;如果找到了,調(diào)用 addReplyBulk 方法將值添加到輸出緩沖區(qū)中。

          //file:?src/networking.c
          void?addReplyBulk(client?*c,?robj?*obj)?{
          ????addReplyBulkLen(c,obj);
          ????addReply(c,obj);
          ????addReply(c,shared.crlf);
          }

          其主題是調(diào)用 addReply 來設(shè)置回復(fù)數(shù)據(jù)。在 addReply 方法中做了兩件事情:

          • prepareClientToWrite 判斷是否需要返回數(shù)據(jù),并且將當前 client 添加到等待寫返回數(shù)據(jù)隊列中。
          • 調(diào)用 _addReplyToBuffer 和 _addReplyObjectToList 方法將返回值寫入到輸出緩沖區(qū)中,等待寫入 socekt
          //file:src/networking.c
          void?addReply(client?*c,?robj?*obj)?{
          ????if?(prepareClientToWrite(c)?!=?C_OK)?return;

          ????if?(sdsEncodedObject(obj))?{
          ????????if?(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr))?!=?C_OK)
          ????????????_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
          ????}?else?{
          ????????......????????
          ????}
          }

          先來看 prepareClientToWrite 的詳細實現(xiàn),

          //file:?src/networking.c
          int?prepareClientToWrite(client?*c)?{
          ????......
          ????if?(!clientHasPendingReplies(c)?&&?!(c->flags?&?CLIENT_PENDING_READ))
          ????????clientInstallWriteHandler(c);
          }

          //file:src/networking.c
          void?clientInstallWriteHandler(client?*c)?{
          ????c->flags?|=?CLIENT_PENDING_WRITE;
          ????listAddNodeHead(server.clients_pending_write,c);
          }

          其中 server.clients_pending_write 就是我們說的任務(wù)隊列,隊列中的每一個元素都是有待寫返回數(shù)據(jù)的 client 對象。在 prepareClientToWrite 函數(shù)中,把 client 添加到任務(wù)隊列 server.clients_pending_write 里就算完事。

          接下再來 _addReplyToBuffer,該方法是向固定緩存中寫,如果寫不下的話就繼續(xù)調(diào)用 _addReplyStringToList 往鏈表里寫。簡單起見,我們只看 _addReplyToBuffer 的代碼。

          //file:src/networking.c
          int?_addReplyToBuffer(client?*c,?const?char?*s,?size_t?len)?{
          ????......
          ????//?拷貝到?client?對象的?Response?buffer?中
          ????memcpy(c->buf+c->bufpos,s,len);
          ????c->bufpos+=len;
          ????return?C_OK;
          }

          3.4 beforesleep 處理寫任務(wù)隊列

          回想在 aeMain 函數(shù)中,每次在進入 aeProcessEvents 前都需要先進行 beforesleep 處理。這個函數(shù)名字起的怪怪的,但實際上大有用處。

          //file:src/ae.c
          void?aeMain(aeEventLoop?*eventLoop)?{
          ????eventLoop->stop?=?0;
          ????while?(!eventLoop->stop)?{
          ????????// beforesleep 處理寫任務(wù)隊列并實際發(fā)送之
          ????????if?(eventLoop->beforesleep?!=?NULL)
          ????????????eventLoop->beforesleep(eventLoop);

          ????????aeProcessEvents(eventLoop,?AE_ALL_EVENTS);
          ????}
          }

          該函數(shù)處理了許多工作,其中一項便是遍歷發(fā)送任務(wù)隊列,并將 client 發(fā)送緩存區(qū)中的處理結(jié)果通過 write 發(fā)送到客戶端手中。

          我們來看下 beforeSleep 的實際源碼。

          //file:src/server.c
          void?beforeSleep(struct?aeEventLoop?*eventLoop)?{
          ????......
          ????handleClientsWithPendingWrites();
          }
          //file:src/networking.c
          int?handleClientsWithPendingWrites(void)?{
          ????listIter?li;
          ????listNode?*ln;
          ????int?processed?=?listLength(server.clients_pending_write);

          ????//遍歷寫任務(wù)隊列?server.clients_pending_write
          ????listRewind(server.clients_pending_write,&li);
          ????while((ln?=?listNext(&li)))?{
          ????????client?*c?=?listNodeValue(ln);
          ????????c->flags?&=?~CLIENT_PENDING_WRITE;
          ????????listDelNode(server.clients_pending_write,ln);

          ????????//實際將?client?中的結(jié)果數(shù)據(jù)發(fā)送出去
          ????????writeToClient(c->fd,c,0)

          ????????//如果一次發(fā)送不完則準備下一次發(fā)送
          ????????if?(clientHasPendingReplies(c))?{
          ????????????//注冊一個寫事件處理器,等待?epoll_wait?發(fā)現(xiàn)可寫后再處理?
          ????????????aeCreateFileEvent(server.el,?c->fd,?ae_flags,
          ????????????????sendReplyToClient,?c);
          ????????}
          ????????......
          ????}
          }

          在 handleClientsWithPendingWrites 中,遍歷了發(fā)送任務(wù)隊列 server.clients_pending_write,并調(diào)用 writeToClient 進行實際的發(fā)送處理。

          值得注意的是,發(fā)送 write 并不總是能一次性發(fā)送完的。假如要發(fā)送的結(jié)果太大,而系統(tǒng)為每個 socket 設(shè)置的發(fā)送緩存區(qū)又是有限的。

          在這種情況下,clientHasPendingReplies 判斷仍然有未發(fā)送完的數(shù)據(jù)的話,就需要注冊一個寫事件處理函數(shù)到 epoll 上。等待 epoll 發(fā)現(xiàn)該 socket 可寫的時候再次調(diào)用 sendReplyToClient進行發(fā)送。

          //file:src/networking.c
          int?writeToClient(int?fd,?client?*c,?int?handler_installed)?{
          ????while(clientHasPendingReplies(c))?{
          ????????//?先發(fā)送固定緩沖區(qū)
          ????????if?(c->bufpos?>?0)?{
          ????????????nwritten?=?write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
          ????????????if?(nwritten?<=?0)?break;
          ????????????......

          ????????//?再發(fā)送回復(fù)鏈表中數(shù)據(jù)
          ????????}?else?{
          ????????????o?=?listNodeValue(listFirst(c->reply));
          ????????????nwritten?=?write(fd,?o->buf?+?c->sentlen,?objlen?-?c->sentlen);
          ????????????......
          ????????}
          ????}
          }

          writeToClient 中的主要邏輯就是調(diào)用 write 系統(tǒng)調(diào)用讓內(nèi)核幫其把數(shù)據(jù)發(fā)送出去即可。由于每個命令的處理結(jié)果大小是不固定的。所以 Redis 采用的做法用固定的 buf + 可變鏈表來儲存結(jié)果字符串。這里自然發(fā)送的時候就需要分別對固定緩存區(qū)和鏈表來進行發(fā)送了。

          四、高性能 Redis 網(wǎng)絡(luò)原理總結(jié)

          Redis 服務(wù)器端只需要單線程可以達到非常高的處理能力,每秒可以達到數(shù)萬 QPS 的高處理能力。如此高性能的程序其實就是對 Linux 提供的多路復(fù)用機制 epoll 的一個較為完美的運用而已。

          在 Redis 源碼中,核心邏輯其實就是兩個,一個是 initServer 啟動服務(wù),另外一個就是 aeMain 事件循環(huán)。把這兩個函數(shù)弄懂了,Redis 就吃透一大半了。

          //file:?src/server.c
          int?main(int?argc,?char?**argv)?{
          ????......
          ????//?啟動初始化
          ????initServer();
          ????//?運行事件處理循環(huán),一直到服務(wù)器關(guān)閉為止
          ????aeMain(server.el);
          }

          在 initServer 這個函數(shù)內(nèi),Redis 做了這么三件重要的事情。

          • 創(chuàng)建一個 epoll 對象
          • 對配置的監(jiān)聽端口進行 listen
          • 把 listen socket 讓 epoll 給管理起來

          在 aeMain 函數(shù)中,是一個無休止的循環(huán),它是 Redis 中最重要的部分。在每一次的循環(huán)中,要做的事情可以總結(jié)為如下圖。

          • 通過 epoll_wait 發(fā)現(xiàn) listen socket 以及其它連接上的可讀、可寫事件
          • 若發(fā)現(xiàn) listen socket 上有新連接到達,則接收新連接,并追加到 epoll 中進行管理
          • 若發(fā)現(xiàn)其它 socket 上有命令請求到達,則讀取和處理命令,把命令結(jié)果寫到緩存中,加入寫任務(wù)隊列
          • 每一次進入 epoll_wait 前都調(diào)用 beforesleep 來將寫任務(wù)隊列中的數(shù)據(jù)實際進行發(fā)送

          其實事件分發(fā)器還處理了一個不明顯的邏輯,那就是如果 beforesleep 在將結(jié)果寫回給客戶端的時候,如果由于內(nèi)核 socket 發(fā)送緩存區(qū)過小而導(dǎo)致不能一次發(fā)送完畢的時候,也會注冊一個寫事件處理器。等到 epoll_wait 發(fā)現(xiàn)對應(yīng)的 socket 可寫的時候,再執(zhí)行 write 寫處理。

          整個 Redis 的網(wǎng)絡(luò)核心模塊就在咱們這一篇文章中都敘述透了(剩下的 Redis 就是對各種數(shù)據(jù)結(jié)構(gòu)的建立和處理了)。相信吃透這一篇對于你對網(wǎng)絡(luò)編程的理解會有極大的幫助!

          還等什么,快把這篇文章也分享給你身邊和你一樣愛好深度技術(shù)的好友吧!

          瀏覽 64
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美日韩免费 | 美女全裸18禁 | 人人看人人摸人人干 | 亚洲中文无码视频 | 青娱乐在线视频自拍好爽好舒服啊 |