為什么單線程的 Redis 如何做到每秒數(shù)萬 QPS ?

圖解網(wǎng)站:https://xiaolincoding.com/
大家好,我是小林。
之前分享過網(wǎng)絡(luò)模型的相關(guān):
在網(wǎng)站(https://xiaolincoding.com)的這部分:

上面的文章比較偏理論,很多同學(xué)反饋有沒有實(shí)際的案例進(jìn)行學(xué)習(xí)。
今天就跟大家用 Redis 的源碼,帶大家分析下單線程的 Redis 如何做到每秒數(shù)萬 QPS 的超高處理能力的。當(dāng)然,最好有我上面說的那兩篇文章的基礎(chǔ),再來看本篇會比較好點(diǎn)。
今天跟大家分享 Redis 核心網(wǎng)絡(luò)模塊的內(nèi)部實(shí)現(xiàn),學(xué)習(xí)下 Redis 是如何做到如此的高性能的!
一、理解多路復(fù)用原理
在開始介紹 Redis 之前,我想有必要先來簡單介紹下 epoll。
在傳統(tǒng)的同步阻塞網(wǎng)絡(luò)編程模型里(沒有協(xié)程以前),性能上不來的根本原因在于進(jìn)程線程都是笨重的家伙。讓一個進(jìn)(線)程只處理一個用戶請求確確實(shí)實(shí)是有點(diǎn)浪費(fèi)了。

先拋開高內(nèi)存開銷不說,在海量的網(wǎng)絡(luò)請求到來的時候,光是頻繁的進(jìn)程線程上下文就讓 CPU 疲于奔命了。

如果把進(jìn)程比作牧羊人,一個進(jìn)(線)程同時只能處理一個用戶請求,相當(dāng)于一個人只能看一只羊,放完這一只才能放下一只。如果同時來了 1000 只羊,那就得 1000 個人去放,這人力成本是非常高的。

性能提升思路很簡單,就是讓很多的用戶連接來復(fù)用同一個進(jìn)(線)程,這就是多路復(fù)用。多路指的是許許多多個用戶的網(wǎng)絡(luò)連接。復(fù)用指的是對進(jìn)(線)程的復(fù)用。換到牧羊人的例子里,就是一群羊只要一個牧羊人來處理就行了。
不過復(fù)用實(shí)現(xiàn)起來是需要特殊的 socket 事件管理機(jī)制的,最典型和高效的方案就是 epoll。放到牧羊人的例子來,epoll 就相當(dāng)于一只牧羊犬。
在 epoll 的系列函數(shù)里, epoll_create 用于創(chuàng)建一個 epoll 對象,epoll_ctl 用來給 epoll 對象添加或者刪除一個 socket。epoll_wait 就是查看它當(dāng)前管理的這些 socket 上有沒有可讀可寫事件發(fā)生。

當(dāng)網(wǎng)卡上收到數(shù)據(jù)包后,Linux 內(nèi)核進(jìn)行一系列的處理后把數(shù)據(jù)放到 socket 的接收隊(duì)列。然后會檢查是否有 epoll 在管理它,如果是則在 epoll 的就緒隊(duì)列中插入一個元素。epoll_wait 的操作就非常的簡單了,就是到 epoll 的就緒隊(duì)列上來查詢有沒有事件發(fā)生就行了。關(guān)于 epoll 這只“牧羊犬”的工作原理參見深入揭秘 epoll 是如何實(shí)現(xiàn) IO 多路復(fù)用的 (Javaer 習(xí)慣把基于 epoll 的網(wǎng)絡(luò)開發(fā)模型叫做 NIO)
在基于 epoll 的編程中,和傳統(tǒng)的函數(shù)調(diào)用思路不同的是,我們并不能主動調(diào)用某個 API 來處理。因?yàn)闊o法知道我們想要處理的事件啥時候發(fā)生。所以只好提前把想要處理的事件的處理函數(shù)注冊到一個事件分發(fā)器上去。當(dāng)事件發(fā)生的時候,由這個事件分發(fā)器調(diào)用回調(diào)函數(shù)進(jìn)行處理。這類基于實(shí)現(xiàn)注冊事件分發(fā)器的開發(fā)模式也叫 Reactor 模型。
二、Redis 服務(wù)啟動初始化
理解了 epoll 原理后,我們再來實(shí)際看 Redis 具體是如何使用 epoll 的。直接在 Github 上就可以非常方便地獲取 Redis 的源碼。我們切到 5.0.0 版本來看單線程版本的實(shí)現(xiàn)(多線程我們改天再講)。
#?git?clone?https://github.com/redis/redis
#?cd?redis
#?git?checkout?-b?5.0.0?5.0.0
其中整個 Redis 服務(wù)的代碼總?cè)肟谠?src/server.c 文件中,我把入口函數(shù)的核心部分摘了出來,如下。
//file:?src/server.c
int?main(int?argc,?char?**argv)?{
????......
????//?啟動初始化
????initServer();
????//?運(yùn)行事件處理循環(huán),一直到服務(wù)器關(guān)閉為止
????aeMain(server.el);
}
其實(shí)整個 Redis 的工作過程,就只需要理解清楚 main 函數(shù)中調(diào)用的 initServer 和 aeMain 這兩個函數(shù)就足夠了。
本節(jié)中我們重點(diǎn)介紹 initServer,在下一節(jié)介紹事件處理循環(huán) aeMain。在 initServer 這個函數(shù)內(nèi),Redis 做了這么三件重要的事情。

創(chuàng)建一個 epoll 對象 對配置的監(jiān)聽端口進(jìn)行 listen 把 listen socket 讓 epoll 給管理起來
//file:?src/server.c
void?initServer()?{
????//?2.1.1?創(chuàng)建?epoll
????server.el?=?aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
????//?2.1.2?綁定監(jiān)聽服務(wù)端口
????listenToPort(server.port,server.ipfd,&server.ipfd_count);
????//?2.1.3?注冊?accept?事件處理器
????for?(j?=?0;?j?????????aeCreateFileEvent(server.el,?server.ipfd[j],?AE_READABLE,
????????????acceptTcpHandler,NULL);
????}
????...
}
接下來我們分別來看。
2.1 創(chuàng)建 epoll 對象
本小節(jié)的邏輯看起來貌似不短,但其實(shí)只是創(chuàng)建了一個 epoll 對象出來而已。
創(chuàng)建 epoll 對象的邏輯在 aeCreateEventLoop 中,在創(chuàng)建完后,Redis 將其保存在 redisServer 的 aeEventLoop 成員中,以備后續(xù)使用。
struct?redisServer?{
????...
????aeEventLoop?*el;
}
我們來看 aeCreateEventLoop 詳細(xì)邏輯。Redis 在操作系統(tǒng)提供的 epoll 對象基礎(chǔ)上又封裝了一個 eventLoop 出來,所以創(chuàng)建的時候是先申請和創(chuàng)建 eventLoop。
//file:src/ae.c
aeEventLoop?*aeCreateEventLoop(int?setsize)?{
????aeEventLoop?*eventLoop;
????eventLoop?=?zmalloc(sizeof(*eventLoop);
????//將來的各種回調(diào)事件就都會存在這里
????eventLoop->events?=?zmalloc(sizeof(aeFileEvent)*setsize);
????......
????aeApiCreate(eventLoop);
????return?eventLoop;
}
在 eventLoop 里,我們稍微注意一下 eventLoop->events,將來在各種事件注冊的時候都會保存到這個數(shù)組里。
//file:src/ae.h
typedef?struct?aeEventLoop?{
????......
????aeFileEvent?*events;?/*?Registered?events?*/
}
具體創(chuàng)建 epoll 的過程在 ae_epoll.c 文件下的 aeApiCreate 中。在這里,真正調(diào)用了 epoll_create
//file:src/ae_epoll.c
static?int?aeApiCreate(aeEventLoop?*eventLoop)?{
????aeApiState?*state?=?zmalloc(sizeof(aeApiState));
????state->epfd?=?epoll_create(1024);?
????eventLoop->apidata?=?state;
????return?0;
}
2.2 綁定監(jiān)聽服務(wù)端口
我們再來看 Redis 中的 listen 過程,它在 listenToPort 函數(shù)中。雖然調(diào)用鏈條很長,但其實(shí)主要就是執(zhí)行了個簡單 listen 而已。
//file:?src/redis.c
int?listenToPort(int?port,?int?*fds,?int?*count)?{
????for?(j?=?0;?j?0;?j++)?{
????????fds[*count]?=?anetTcpServer(server.neterr,port,NULL,
????????????????server.tcp_backlog);
????}
}
Redis 是支持開啟多個端口的,所以在 listenToPort 中我們看到是啟用一個循環(huán)來調(diào)用 anetTcpServer。在 anetTcpServer 中,逐步會展開調(diào)用,直到執(zhí)行到 bind 和 listen 系統(tǒng)調(diào)用。
//file:src/anet.c
int?anetTcpServer(char?*err,?int?port,?char?*bindaddr,?int?backlog)
{
????return?_anetTcpServer(err,?port,?bindaddr,?AF_INET,?backlog);
}
static?int?_anetTcpServer(......)
{
????//?設(shè)置端口重用
????anetSetReuseAddr(err,s)
????//?監(jiān)聽
????anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog)
}
static?int?anetListen(......)?{
????bind(s,sa,len);
????listen(s,?backlog);
????......
}
2.3 注冊事件回調(diào)函數(shù)
我們回頭再看一下 initServer,它調(diào)用 aeCreateEventLoop 創(chuàng)建了 epoll,調(diào)用 listenToPort 進(jìn)行了服務(wù)端口的 bind 和 listen。接著就開始調(diào)用 aeCreateFileEvent 來注冊一個 accept 事件處理器。
//file:?src/server.c
void?initServer()?{
????//?2.1.1?創(chuàng)建?epoll
????server.el?=?aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
????//?2.1.2?監(jiān)聽服務(wù)端口
????listenToPort(server.port,server.ipfd,&server.ipfd_count);
????//?2.1.3?注冊?accept?事件處理器
????for?(j?=?0;?j?????????aeCreateFileEvent(server.el,?server.ipfd[j],?AE_READABLE,
????????????acceptTcpHandler,NULL);
????}
????...
}
我們來注意看調(diào)用 aeCreateFileEvent 時傳的重要參數(shù)是 acceptTcpHandler,它表示將來在 listen socket 上有新用戶連接到達(dá)的時候,該函數(shù)將被調(diào)用執(zhí)行。我們來看 aeCreateFileEvent 具體代碼。
//file:?src/ae.c
int?aeCreateFileEvent(aeEventLoop?*eventLoop,?int?fd,?int?mask,
????????aeFileProc?*proc,?void?*clientData)
{
????//?取出一個文件事件結(jié)構(gòu)
????aeFileEvent?*fe?=?&eventLoop->events[fd];
????//?監(jiān)聽指定?fd?的指定事件
????aeApiAddEvent(eventLoop,?fd,?mask);
????//?設(shè)置文件事件類型,以及事件的處理器
????fe->mask?|=?mask;
????if?(mask?&?AE_READABLE)?fe->rfileProc?=?proc;
????if?(mask?&?AE_WRITABLE)?fe->wfileProc?=?proc;
????//?私有數(shù)據(jù)
????fe->clientData?=?clientData;
}
函數(shù) aeCreateFileEvent 一開始,從 eventLoop->events 獲取了一個 aeFileEvent 對象。在 2.1 中我們介紹過 eventLoop->events 數(shù)組,注冊的各種事件處理器會保存在這個地方。
接下來調(diào)用 aeApiAddEvent。這個函數(shù)其實(shí)就是對 epoll_ctl 的一個封裝。主要就是實(shí)際執(zhí)行 epoll_ctl EPOLL_CTL_ADD。
//file:src/ae_epoll.c
static?int?aeApiAddEvent(aeEventLoop?*eventLoop,?int?fd,?int?mask)?{
????//?add?or?mod
????int?op?=?eventLoop->events[fd].mask?==?AE_NONE??
????????????EPOLL_CTL_ADD?:?EPOLL_CTL_MOD;
????......
????//?epoll_ctl?添加事件
????epoll_ctl(state->epfd,op,fd,&ee);
????return?0;
}
每一個 eventLoop->events 元素都指向一個 aeFileEvent 對象。在這個對象上,設(shè)置了三個關(guān)鍵東西
rfileProc:讀事件回調(diào) wfileProc:寫事件回調(diào) clientData:一些額外的擴(kuò)展數(shù)據(jù)
將來 當(dāng) epoll_wait 發(fā)現(xiàn)某個 fd 上有事件發(fā)生的時候,這樣 redis 首先根據(jù) fd 到 eventLoop->events 中查找 aeFileEvent 對象,然后再看 rfileProc、wfileProc 就可以找到讀、寫回調(diào)處理函數(shù)。
回頭看 initServer 調(diào)用 aeCreateFileEvent 時傳參來看。
//file:?src/server.c
void?initServer()?{
????......
????//?2.1.3?注冊?accept?事件處理器
????for?(j?=?0;?j?????????aeCreateFileEvent(server.el,?server.ipfd[j],?AE_READABLE,
????????????acceptTcpHandler,NULL);
????}
}
listen fd 對應(yīng)的讀回調(diào)函數(shù) rfileProc 事實(shí)上就被設(shè)置成了 acceptTcpHandler,寫回調(diào)沒有設(shè)置,私有數(shù)據(jù) client_data 也為 null。
三、Redis 事件處理循環(huán)
在上一節(jié)介紹完了 Redis 的啟動初始化過程,創(chuàng)建了 epoll,也進(jìn)行了綁定監(jiān)聽,也注冊了 accept 事件處理函數(shù)為 acceptTcpHandler。
//file:?src/server.c
int?main(int?argc,?char?**argv)?{
????......
????//?啟動初始化
????initServer();
????//?運(yùn)行事件處理循環(huán),一直到服務(wù)器關(guān)閉為止
????aeMain(server.el);
}
接下來,Redis 就會進(jìn)入 aeMain 開始進(jìn)行真正的用戶請求處理了。在 aeMain 函數(shù)中,是一個無休止的循環(huán)。在每一次的循環(huán)中,要做如下幾件事情。

通過 epoll_wait 發(fā)現(xiàn) listen socket 以及其它連接上的可讀、可寫事件 若發(fā)現(xiàn) listen socket 上有新連接到達(dá),則接收新連接,并追加到 epoll 中進(jìn)行管理 若發(fā)現(xiàn)其它 socket 上有命令請求到達(dá),則讀取和處理命令,把命令結(jié)果寫到緩存中,加入寫任務(wù)隊(duì)列 每一次進(jìn)入 epoll_wait 前都調(diào)用 beforesleep 來將寫任務(wù)隊(duì)列中的數(shù)據(jù)實(shí)際進(jìn)行發(fā)送 如若有首次未發(fā)送完畢的,當(dāng)寫事件發(fā)生時繼續(xù)發(fā)送
//file:src/ae.c
void?aeMain(aeEventLoop?*eventLoop)?{
????eventLoop->stop?=?0;
????while?(!eventLoop->stop)?{
????????//?如果有需要在事件處理前執(zhí)行的函數(shù),那么運(yùn)行它
????????//?3.4?beforesleep?處理寫任務(wù)隊(duì)列并實(shí)際發(fā)送之
????????if?(eventLoop->beforesleep?!=?NULL)
????????????eventLoop->beforesleep(eventLoop);
????????//?開始等待事件并處理
????????//?3.1?epoll_wait?發(fā)現(xiàn)事件
????????//?3.2?處理新連接請求
????????//?3.3?處理客戶連接上的可讀事件
????????aeProcessEvents(eventLoop,?AE_ALL_EVENTS);
????}
}
以上就是 aeMain 函數(shù)的核心邏輯所在,接下來我們分別對如上提到的四件事情進(jìn)行詳細(xì)的闡述。
3.1 ?epoll_wait 發(fā)現(xiàn)事件
Redis 不管有多少個用戶連接,都是通過 epoll_wait 來統(tǒng)一發(fā)現(xiàn)和管理其上的可讀(包括 liisten socket 上的 accept事件)、可寫事件的。甚至連 timer,也都是交給 epoll_wait 來統(tǒng)一管理的。

每當(dāng) epoll_wait 發(fā)現(xiàn)特定的事件發(fā)生的時候,就會調(diào)用相應(yīng)的事先注冊好的事件處理函數(shù)進(jìn)行處理。我們來詳細(xì)看 aeProcessEvents 對 epoll_wait 的封裝。
//file:src/ae.c
int?aeProcessEvents(aeEventLoop?*eventLoop,?int?flags)
{
????//?獲取最近的時間事件
????tvp?=?xxx
????//?處理文件事件,阻塞時間由?tvp?決定
????numevents?=?aeApiPoll(eventLoop,?tvp);
????for?(j?=?0;?j?????????//?從已就緒數(shù)組中獲取事件
????????aeFileEvent?*fe?=?&eventLoop->events[eventLoop->fired[j].fd];
????????//如果是讀事件,并且有讀回調(diào)函數(shù)
????????fe->rfileProc()
????????//如果是寫事件,并且有寫回調(diào)函數(shù)
????????fe->wfileProc()
????}
}
//file:?src/ae_epoll.c
static?int?aeApiPoll(aeEventLoop?*eventLoop,?struct?timeval?*tvp)?{
????//?等待事件
????aeApiState?*state?=?eventLoop->apidata;
????epoll_wait(state->epfd,state->events,eventLoop->setsize,
????????????tvp???(tvp->tv_sec*1000?+?tvp->tv_usec/1000)?:?-1);
????...
}
aeProcessEvents 就是調(diào)用 epoll_wait 來發(fā)現(xiàn)事件。當(dāng)發(fā)現(xiàn)有某個 fd 上事件發(fā)生以后,則調(diào)為其事先注冊的事件處理器函數(shù) rfileProc 和 wfileProc。
3.2 處理新連接請求
我們假設(shè)現(xiàn)在有新用戶連接到達(dá)了。前面在我們看到 listen socket 上的 rfileProc 注冊的是 acceptTcpHandler。也就是說,如果有連接到達(dá)的時候,會回調(diào)到 acceptTcpHandler。
在 acceptTcpHandler 中,主要做了幾件事情

調(diào)用 accept 系統(tǒng)調(diào)用把用戶連接給接收回來 為這個新連接創(chuàng)建一個唯一 redisClient 對象 將這個新連接添加到 epoll,并注冊一個讀事件處理函數(shù)
接下來讓我們看上面這三件事情都分別是如何被處理的。
//file:src/networking.c
void?acceptTcpHandler(aeEventLoop?*el,?int?fd,?...)?{
????cfd?=?anetTcpAccept(server.neterr,?fd,?cip,?...);
????acceptCommonHandler(cfd,0);
}
在 anetTcpAccept 中執(zhí)行非常的簡單,就是調(diào)用 accept 把連接接收回來。
//file:?src/anet.c
int?anetTcpAccept(......)?{
????anetGenericAccept(err,s,(struct?sockaddr*)&sa,&salen)
}
static?int?anetGenericAccept(......)?{
????fd?=?accept(s,sa,len)
}
接下來在 acceptCommonHandler 為這個新的客戶端連接 socket,創(chuàng)建一個 redisClient 對象。
//file:?src/networking.c
static?void?acceptCommonHandler(int?fd,?int?flags)?{
????//?創(chuàng)建?redisClient?對象
????redisClient?*c;
????c?=?createClient(fd);
????......
}
在 createClient 中,創(chuàng)建 client 對象,并且為該用戶連接注冊了讀事件處理器。
//file:src/networking.c
redisClient?*createClient(int?fd)?{
????//?為用戶連接創(chuàng)建?client?對象
????redisClient?*c?=?zmalloc(sizeof(redisClient));
????if?(fd?!=?-1)?{
????????...
????????//?為用戶連接注冊讀事件處理器
????????aeCreateFileEvent(server.el,fd,AE_READABLE,
????????????readQueryFromClient,?c)
????}
????...
}
關(guān)于 aeCreateFileEvent 的處理過程這里就不贅述了,詳情參見 2.3 節(jié)。其效果就是將該用戶連接 socket fd 對應(yīng)的讀處理函數(shù)設(shè)置為 readQueryFromClient, 并且設(shè)置私有數(shù)據(jù)為 redisClient c。
3.3 處理客戶連接上的可讀事件
現(xiàn)在假設(shè)該用戶連接有命令到達(dá)了,就假設(shè)用戶發(fā)送了GET XXXXXX_KEY 命令。那么在 Redis 的時間循環(huán)中調(diào)用 epoll_wait 發(fā)現(xiàn)該連接上有讀時間后,會調(diào)用在上一節(jié)中討論的為其注冊的讀處理函數(shù) readQueryFromClient。

在讀處理函數(shù) readQueryFromClient 中主要做了這么幾件事情。
解析并查找命令 調(diào)用命令處理 添加寫任務(wù)到隊(duì)列 將輸出寫到緩存等待發(fā)送
我們來詳細(xì)地看 readQueryFromClient 的代碼。在 readQueryFromClient 中會調(diào)用 processInputBuffer,然后進(jìn)入 processCommand 對命令進(jìn)行處理。其調(diào)用鏈如下:
//file:?src/networking.c
void?readQueryFromClient(aeEventLoop?*el,?int?fd,?void?*privdata,?...)?{
????redisClient?*c?=?(redisClient*)?privdata;
????processInputBufferAndReplicate(c);
}
void?processInputBufferAndReplicate(client?*c)?{
????...
????processInputBuffer(c);
}
//?處理客戶端輸入的命令內(nèi)容
void?processInputBuffer(redisClient?*c)?{
????//?執(zhí)行命令,
????processCommand(c);
}
我們再來詳細(xì)看 processCommand 。
//file:
int?processCommand(redisClient?*c)?{?
????//?查找命令,并進(jìn)行命令合法性檢查,以及命令參數(shù)個數(shù)檢查
????c->cmd?=?c->lastcmd?=?lookupCommand(c->argv[0]->ptr);
????......
????//?處理命令
????//?如果是?MULTI?事務(wù),則入隊(duì),否則調(diào)用?call?直接處理
????if?(c->flags?&?CLIENT_MULTI?&&?...)
????{
????????queueMultiCommand(c);
????}?else?{
????????call(c,CMD_CALL_FULL);
????????...
????}
????return?C_OK;
}
我們先忽略 queueMultiCommand,直接看核心命令處理方法 call。
//file:src/server.c
void?call(client?*c,?int?flags)?{
????//?查找處理命令,
????struct?redisCommand?*real_cmd?=?c->cmd;
????//?調(diào)用命令處理函數(shù)
????c->cmd->proc(c);
????......
}
在 server.c 中定義了每一個命令對應(yīng)的處理函數(shù)
//file:src/server.c
struct?redisCommand?redisCommandTable[]?=?{
????{"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
????{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
????{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
????{"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
????{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
????......
????{"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0},
????{"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
????{"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
????{"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
????......
}
對于 get 命令來說,其對應(yīng)的命令處理函數(shù)就是 getCommand。也就是說當(dāng)處理 GET 命令執(zhí)行到 c->cmd->proc 的時候會進(jìn)入到 getCommand 函數(shù)中來。
//file:?src/t_string.c
void?getCommand(client?*c)?{
????getGenericCommand(c);
}
int?getGenericCommand(client?*c)?{
????robj?*o;
????if?((o?=?lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))?==?NULL)
????????return?C_OK;
????...
????addReplyBulk(c,o);
????return?C_OK;
}
getGenericCommand 方法會調(diào)用 lookupKeyReadOrReply 來從內(nèi)存中查找對應(yīng)的 key值。如果找不到,則直接返回 C_OK;如果找到了,調(diào)用 addReplyBulk 方法將值添加到輸出緩沖區(qū)中。
//file:?src/networking.c
void?addReplyBulk(client?*c,?robj?*obj)?{
????addReplyBulkLen(c,obj);
????addReply(c,obj);
????addReply(c,shared.crlf);
}
其主題是調(diào)用 addReply 來設(shè)置回復(fù)數(shù)據(jù)。在 addReply 方法中做了兩件事情:
prepareClientToWrite 判斷是否需要返回?cái)?shù)據(jù),并且將當(dāng)前 client 添加到等待寫返回?cái)?shù)據(jù)隊(duì)列中。 調(diào)用 _addReplyToBuffer 和 _addReplyObjectToList 方法將返回值寫入到輸出緩沖區(qū)中,等待寫入 socekt
//file:src/networking.c
void?addReply(client?*c,?robj?*obj)?{
????if?(prepareClientToWrite(c)?!=?C_OK)?return;
????if?(sdsEncodedObject(obj))?{
????????if?(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr))?!=?C_OK)
????????????_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
????}?else?{
????????......????????
????}
}
先來看 prepareClientToWrite 的詳細(xì)實(shí)現(xiàn),
//file:?src/networking.c
int?prepareClientToWrite(client?*c)?{
????......
????if?(!clientHasPendingReplies(c)?&&?!(c->flags?&?CLIENT_PENDING_READ))
????????clientInstallWriteHandler(c);
}
//file:src/networking.c
void?clientInstallWriteHandler(client?*c)?{
????c->flags?|=?CLIENT_PENDING_WRITE;
????listAddNodeHead(server.clients_pending_write,c);
}
其中 server.clients_pending_write 就是我們說的任務(wù)隊(duì)列,隊(duì)列中的每一個元素都是有待寫返回?cái)?shù)據(jù)的 client 對象。在 prepareClientToWrite 函數(shù)中,把 client 添加到任務(wù)隊(duì)列 server.clients_pending_write 里就算完事。
接下再來 _addReplyToBuffer,該方法是向固定緩存中寫,如果寫不下的話就繼續(xù)調(diào)用 _addReplyStringToList 往鏈表里寫。簡單起見,我們只看 _addReplyToBuffer 的代碼。
//file:src/networking.c
int?_addReplyToBuffer(client?*c,?const?char?*s,?size_t?len)?{
????......
????//?拷貝到?client?對象的?Response?buffer?中
????memcpy(c->buf+c->bufpos,s,len);
????c->bufpos+=len;
????return?C_OK;
}
3.4 beforesleep 處理寫任務(wù)隊(duì)列
回想在 aeMain 函數(shù)中,每次在進(jìn)入 aeProcessEvents 前都需要先進(jìn)行 beforesleep 處理。這個函數(shù)名字起的怪怪的,但實(shí)際上大有用處。
//file:src/ae.c
void?aeMain(aeEventLoop?*eventLoop)?{
????eventLoop->stop?=?0;
????while?(!eventLoop->stop)?{
????????// beforesleep 處理寫任務(wù)隊(duì)列并實(shí)際發(fā)送之
????????if?(eventLoop->beforesleep?!=?NULL)
????????????eventLoop->beforesleep(eventLoop);
????????aeProcessEvents(eventLoop,?AE_ALL_EVENTS);
????}
}
該函數(shù)處理了許多工作,其中一項(xiàng)便是遍歷發(fā)送任務(wù)隊(duì)列,并將 client 發(fā)送緩存區(qū)中的處理結(jié)果通過 write 發(fā)送到客戶端手中。

我們來看下 beforeSleep 的實(shí)際源碼。
//file:src/server.c
void?beforeSleep(struct?aeEventLoop?*eventLoop)?{
????......
????handleClientsWithPendingWrites();
}
//file:src/networking.c
int?handleClientsWithPendingWrites(void)?{
????listIter?li;
????listNode?*ln;
????int?processed?=?listLength(server.clients_pending_write);
????//遍歷寫任務(wù)隊(duì)列?server.clients_pending_write
????listRewind(server.clients_pending_write,&li);
????while((ln?=?listNext(&li)))?{
????????client?*c?=?listNodeValue(ln);
????????c->flags?&=?~CLIENT_PENDING_WRITE;
????????listDelNode(server.clients_pending_write,ln);
????????//實(shí)際將?client?中的結(jié)果數(shù)據(jù)發(fā)送出去
????????writeToClient(c->fd,c,0)
????????//如果一次發(fā)送不完則準(zhǔn)備下一次發(fā)送
????????if?(clientHasPendingReplies(c))?{
????????????//注冊一個寫事件處理器,等待?epoll_wait?發(fā)現(xiàn)可寫后再處理?
????????????aeCreateFileEvent(server.el,?c->fd,?ae_flags,
????????????????sendReplyToClient,?c);
????????}
????????......
????}
}
在 handleClientsWithPendingWrites 中,遍歷了發(fā)送任務(wù)隊(duì)列 server.clients_pending_write,并調(diào)用 writeToClient 進(jìn)行實(shí)際的發(fā)送處理。
值得注意的是,發(fā)送 write 并不總是能一次性發(fā)送完的。假如要發(fā)送的結(jié)果太大,而系統(tǒng)為每個 socket 設(shè)置的發(fā)送緩存區(qū)又是有限的。
在這種情況下,clientHasPendingReplies 判斷仍然有未發(fā)送完的數(shù)據(jù)的話,就需要注冊一個寫事件處理函數(shù)到 epoll 上。等待 epoll 發(fā)現(xiàn)該 socket 可寫的時候再次調(diào)用 sendReplyToClient進(jìn)行發(fā)送。
//file:src/networking.c
int?writeToClient(int?fd,?client?*c,?int?handler_installed)?{
????while(clientHasPendingReplies(c))?{
????????//?先發(fā)送固定緩沖區(qū)
????????if?(c->bufpos?>?0)?{
????????????nwritten?=?write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
????????????if?(nwritten?<=?0)?break;
????????????......
????????//?再發(fā)送回復(fù)鏈表中數(shù)據(jù)
????????}?else?{
????????????o?=?listNodeValue(listFirst(c->reply));
????????????nwritten?=?write(fd,?o->buf?+?c->sentlen,?objlen?-?c->sentlen);
????????????......
????????}
????}
}
writeToClient 中的主要邏輯就是調(diào)用 write 系統(tǒng)調(diào)用讓內(nèi)核幫其把數(shù)據(jù)發(fā)送出去即可。由于每個命令的處理結(jié)果大小是不固定的。所以 Redis 采用的做法用固定的 buf + 可變鏈表來儲存結(jié)果字符串。這里自然發(fā)送的時候就需要分別對固定緩存區(qū)和鏈表來進(jìn)行發(fā)送了。
四、高性能 Redis 網(wǎng)絡(luò)原理總結(jié)
Redis 服務(wù)器端只需要單線程可以達(dá)到非常高的處理能力,每秒可以達(dá)到數(shù)萬 QPS 的高處理能力。如此高性能的程序其實(shí)就是對 Linux 提供的多路復(fù)用機(jī)制 epoll 的一個較為完美的運(yùn)用而已。
在 Redis 源碼中,核心邏輯其實(shí)就是兩個,一個是 initServer 啟動服務(wù),另外一個就是 aeMain 事件循環(huán)。把這兩個函數(shù)弄懂了,Redis 就吃透一大半了。
//file:?src/server.c
int?main(int?argc,?char?**argv)?{
????......
????//?啟動初始化
????initServer();
????//?運(yùn)行事件處理循環(huán),一直到服務(wù)器關(guān)閉為止
????aeMain(server.el);
}
在 initServer 這個函數(shù)內(nèi),Redis 做了這么三件重要的事情。
創(chuàng)建一個 epoll 對象 對配置的監(jiān)聽端口進(jìn)行 listen 把 listen socket 讓 epoll 給管理起來
在 aeMain 函數(shù)中,是一個無休止的循環(huán),它是 Redis 中最重要的部分。在每一次的循環(huán)中,要做的事情可以總結(jié)為如下圖。

通過 epoll_wait 發(fā)現(xiàn) listen socket 以及其它連接上的可讀、可寫事件 若發(fā)現(xiàn) listen socket 上有新連接到達(dá),則接收新連接,并追加到 epoll 中進(jìn)行管理 若發(fā)現(xiàn)其它 socket 上有命令請求到達(dá),則讀取和處理命令,把命令結(jié)果寫到緩存中,加入寫任務(wù)隊(duì)列 每一次進(jìn)入 epoll_wait 前都調(diào)用 beforesleep 來將寫任務(wù)隊(duì)列中的數(shù)據(jù)實(shí)際進(jìn)行發(fā)送
其實(shí)事件分發(fā)器還處理了一個不明顯的邏輯,那就是如果 beforesleep 在將結(jié)果寫回給客戶端的時候,如果由于內(nèi)核 socket 發(fā)送緩存區(qū)過小而導(dǎo)致不能一次發(fā)送完畢的時候,也會注冊一個寫事件處理器。等到 epoll_wait 發(fā)現(xiàn)對應(yīng)的 socket 可寫的時候,再執(zhí)行 write 寫處理。
整個 Redis 的網(wǎng)絡(luò)核心模塊就在咱們這一篇文章中都敘述透了(剩下的 Redis 就是對各種數(shù)據(jù)結(jié)構(gòu)的建立和處理了)。相信吃透這一篇對于你對網(wǎng)絡(luò)編程的理解會有極大的幫助!
