推薦學習這個C++開源項目

大家好,我是唐唐,祝大家中秋快樂。
最近發(fā)現(xiàn)了適合C++開發(fā)者進階的開源項目,這個項目的名字叫workflow,項目地址如下:https://github.com/sogou/workflow
workflow是搜狗公司的服務器引擎,幾乎搜狗所有的后端C++服務和其他幾十家公司都在使用這個引擎,每日處理超百億請求。
其實去年在purecpp大會就聽過一個美女架構(gòu)師大佬介紹過這個項目,當時就感慨這個項目怎么這么牛逼
。最近聽到好多次別人提起這個項目,于是我就仔細看了看項目的介紹,好家伙,一入源碼深似海,研究了半個多月源碼,由衷佩服作者超強的架構(gòu)能力,明白了自己和架構(gòu)師之間巨大的差距,還是得多學習啊,肝。
目錄
框架有什么特點?
框架能做什么?
我為什么要推薦這個開源項目?
框架有什么特點?
用戶體驗相當好:接口簡潔,支持常用協(xié)議,使用簡單,具體怎么簡單我下面會介紹;
性能好:不單網(wǎng)絡(luò)、磁盤IO、CPU計算等,workflow著眼于所有異步資源都盡可能全部調(diào)起,有相當充足的測試數(shù)據(jù)證明該框架的性能較目前主流的服務端框架更好;
穩(wěn)定性高:搜狗和其他好多公司都在使用這個引擎,穩(wěn)定性肯定高啊,大家也可以自己去查數(shù)據(jù),我就不貼了;
支持多種平臺:項目支持Linux、macOS、Windows、Android等操作系統(tǒng)。
解放用戶生產(chǎn)力:用戶接觸到的只有任務(Task)和任務流(series)兩種概念,框架將資源高度封裝,用戶無需接觸到線程池、連接池、文件IO與各種異步通知機制等。用戶無需關(guān)心內(nèi)部細節(jié),可以將更多精力用在實現(xiàn)復雜的業(yè)務邏輯上。
設(shè)計理念新穎:源碼值得學習,我也是看了這個項目的源碼后才推薦給大家的,看完才知道,原來代碼可以這么寫,繼承可以這么玩。
框架能做什么?
框架能做的事情很多,我這里只介紹一些個人認為比較重要的功能,更多功能還需要大家自行解鎖。
輕松的搭建server:不用多說,服務端框架如果不能搭建server那還玩啥了,但使用這個框架非常方便,以http server為例,只需要簡單幾行代碼即可:
int main() {WFHttpServer server([](WFHttpTask *task) {task->get_resp()->append_output_body("Hello World!");});if (server.start(8888) == 0) { // start server on port 8888getchar(); // press "Enter" to end.server.stop();}return 0;}
輕松高效的發(fā)起客戶端請求:項目號稱可作為萬能異步客戶端,目前支持http,redis,mysql、websocket和kafka協(xié)議,下面是官方給出的一個mysql的客戶端示例:
int main(int argc, char *argv[]) {...WFMySQLTask *task = WFTaskFactory::create_mysql_task(url, RETRY_MAX, mysql_callback);task->get_req()->set_query("SHOW TABLES;");...task->start();...}
以往的C++ server需要訪問mysql時,可能使用的是傳統(tǒng)的客戶端。在一個線程下以同步阻塞的方式等待數(shù)據(jù)到來。如果有多個網(wǎng)絡(luò)請求希望并發(fā),那么用戶需要管理好多個mysql cli對象。
workflow完美的解決了這一系列問題,把所有這種用戶請求交給內(nèi)部的poller線程統(tǒng)一管理,實現(xiàn)了高效的非阻塞IO行為,提升了server作為客戶端請求數(shù)據(jù)時的性能表現(xiàn)。再也不用擔心這種客戶端行為影響server整體的性能。
再舉個例子,如果想要完成個wget的功能,只需要在WFHttpTask的回調(diào)函數(shù)中解析http返回的消息體即可:
int main(int argc, char *argv[]) {WFHttpTask *task;std::string url = argv[1];url = "http://" + url;task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,wget_callback);protocol::HttpRequest *req = task->get_req();req->add_header_pair("Accept", "*/*");req->add_header_pair("User-Agent", "Wget/1.14 (linux-gnu)");req->add_header_pair("Connection", "close");task->start();wait_group.wait();return 0;}
wget首先發(fā)起http請求,在wget_callback中處理http的響應消息體:
void wget_callback(WFHttpTask *task) {protocol::HttpRequest *req = task->get_req();protocol::HttpResponse *resp = task->get_resp();int state = task->get_state();int error = task->get_error();std::string name;std::string value;protocol::HttpHeaderCursor req_cursor(req);while (req_cursor.next(name, value))fprintf(stderr, "%s: %s\r\n", name.c_str(), value.c_str());fprintf(stderr, "\r\n");/* Print response header. */fprintf(stderr, "%s %s %s\r\n", resp->get_http_version(),resp->get_status_code(),resp->get_reason_phrase());protocol::HttpHeaderCursor resp_cursor(resp);while (resp_cursor.next(name, value))fprintf(stderr, "%s: %s\r\n", name.c_str(), value.c_str());fprintf(stderr, "\r\n");/* Print response body. */const void *body;size_t body_len;resp->get_parsed_body(&body, &body_len);fwrite(body, 1, body_len, stdout);fflush(stdout);fprintf(stderr, "\nSuccess. Press Ctrl-C to exit.\n");}
通過workflow輕松的就完成了wget的功能。
支持自定義協(xié)議client/server:用戶可構(gòu)建自己的RPC系統(tǒng),搜狗有個開源項目srpc就是以這個框架為基礎(chǔ)實現(xiàn)的。
可構(gòu)建異步任務流:支持串聯(lián),支持并聯(lián),支持串并聯(lián)的組合體,也支持復雜的DAG結(jié)構(gòu)。
異步IO:在Linux系統(tǒng)下可作為文件異步IO工具使用,性能超過任何標準調(diào)用。
通信與計算一體化:多數(shù)框架都著重于網(wǎng)絡(luò)IO的效率問題,而計算與任務調(diào)度等需要用戶自己實現(xiàn),workflow會自動對任務進行調(diào)度,打通網(wǎng)絡(luò)和磁盤等資源,特別適合需要網(wǎng)絡(luò)通信的重計算模塊。
我為什么要推薦這個項目?
主要就一點:值得學習,適合C++開發(fā)者進階,那具體學習什么?
學習系統(tǒng)的設(shè)計,所謂初級重實現(xiàn),中級重炫技,高級重設(shè)計。
在作者的設(shè)計理念中,一切業(yè)務邏輯皆是任務,多個任務會組成任務流,任務流可組成圖,這個圖可能是串聯(lián)圖,可能是并聯(lián)圖,也有可能是串并聯(lián)圖,類似于這種:

也有可能是這種復雜的DAG圖:

當然圖的層次結(jié)構(gòu)可由用戶自定義,個人認為框架最牛逼的一點就是支持動態(tài)創(chuàng)建任務流。
使用下面這段代碼可以很直觀友好的構(gòu)造出圖的結(jié)構(gòu),你有沒有很好奇這段代碼是怎么實現(xiàn)的?
WFGraphNode a, b, c, d;a-->b;a-->c;b-->d;c-->d;
這里先賣個關(guān)子,感興趣的自己去看吧,總貼人家代碼也不好。
我認為項目最值得大家學習的就是架構(gòu)的設(shè)計,特別任務與任務流的設(shè)計,我現(xiàn)在還沒看完代碼,畫不出架構(gòu)的設(shè)計圖(也怕畫錯了),只能籠統(tǒng)的說一句牛逼,因為確實驚艷到我了。
貼一個workflow的關(guān)于Task的架構(gòu)圖:

再簡單的貼一個定時器Task的實現(xiàn)代碼給大家看看:
項目里所有的Task都通過工廠創(chuàng)建:
static WFTimerTask *create_timer_task(const std::string& timer_name,unsigned int microseconds,timer_callback_t callback);
看下WFTimerTask的設(shè)計:
class WFTimerTask : public SleepRequest {public:void start() {assert(!series_of(this));Workflow::start_series_work(this, nullptr);}void dismiss() {assert(!series_of(this));delete this;}protected:virtual SubTask *done() {if (this->callback)this->callback(this);}};
再看下Workflow::start_series_work()的方法:
inline void Workflow::start_series_work(SubTask *first, series_callback_t callback) {new SeriesWork(first, std::move(callback));first->dispatch();}
然后是SleepRequest:
class SleepRequest : public SubTask, public SleepSession {public:virtual void dispatch() {if (this->scheduler->sleep(this) < 0) {this->state = SS_STATE_ERROR;this->error = errno;this->subtask_done();}}protected:CommScheduler *scheduler;virtual void handle(int state, int error) {this->state = state;this->error = error;this->subtask_done();}};
再看下scheduler中的這個sleep()方法:
int Communicator::sleep(SleepSession *session) {struct timespec value;if (session->duration(&value) >= 0) {if (mpoller_add_timer(&value, session, this->mpoller) >= 0)return 0;}return -1;}
然后是SubTask:
class SubTask {public:virtual void dispatch() = 0;private:virtual SubTask *done() = 0;protected:void subtask_done();};
然后是subtask_done()方法的實現(xiàn):
void SubTask::subtask_done() {SubTask *cur = this;ParallelTask *parent;SubTask **entry;while (1) {parent = cur->parent;entry = cur->entry;cur = cur->done();xxxbreak;}}
然后是SleepSession:
class SleepSession {private:virtual int duration(struct timespec *value) = 0;virtual void handle(int state, int error) = 0;public:virtual ~SleepSession() { }friend class Communicator;};
看了這么多源碼,那WFTimerTask是如何實現(xiàn)的定時功能呢?
我總結(jié)了下面幾步:
● 步驟一
用戶調(diào)用WFTimerTask的start();
● 步驟二
start()中調(diào)用Workflow::start_series_work()方法;
● 步驟三
start_series_work()中調(diào)用SubTask的dispatch()方法,這個dispatch()方法由SubTask的子類SleepRequest(WFTimerTask的父類)實現(xiàn);
● 步驟四
SleepRequest類的dispath()方法會調(diào)用scheduler->sleep()方法;
● 步驟五
sleep()方法會調(diào)用SleepSession的duration()方法獲取具體sleep的時間,框架內(nèi)部用了timerfd把超時時間交給操作系統(tǒng),時間到了會通知框架層,進而觸發(fā)SleepSession中的handle()調(diào)用;
● 步驟六
handle()的實現(xiàn)中會調(diào)用subtask_done()方法;
● 步驟七
subtask_done()中會調(diào)用SubTask中的done()方法;
● 步驟八
這個done()方法具體由WFTimerTask覆蓋,實現(xiàn)中會調(diào)用到具體時間后觸發(fā)的回調(diào)函數(shù)。
乍一看可能感覺非常麻煩,為什么實現(xiàn)一個普通的定時功能會搞這么多繼承關(guān)系,但你真正看了源碼后就會發(fā)現(xiàn),項目抽象出的所有Task,比如計數(shù)器Task、文件IOTask、網(wǎng)絡(luò)Task、MySQLTask等,都是通過這種SubTask、XXXRequest、XXXSession的形式來實現(xiàn),后期再來個XXXTask可以很方便的擴展,這才是優(yōu)秀項目該有的架構(gòu),真的佩服。
讀者們,你們可以設(shè)計出這么高端的架構(gòu)嗎?反正我要肝完這個項目,也推薦給大家一起學習。
最后總結(jié)了該項目中個人認為值得我們學習的地方:
小總結(jié)
接口的設(shè)計:項目的接入極其簡單,幾行代碼就可搭建個client或者server,幾行代碼也可構(gòu)建出簡單的任務流圖,可用于處理復雜的業(yè)務邏輯;
架構(gòu)的設(shè)計:項目中的各種類是如何派生的,作者的設(shè)計思路是怎么樣的;
網(wǎng)絡(luò)通信:項目沒有使用任何網(wǎng)絡(luò)框架,而是使用網(wǎng)絡(luò)裸接口進行網(wǎng)絡(luò)通信,我們都知道在大型項目中使用網(wǎng)絡(luò)裸接口進行網(wǎng)絡(luò)通信需要處理很多異常條件,這里值得學習一波;
任務流的封裝:為什么可以動態(tài)的構(gòu)建任務流的串并聯(lián)圖,并在項目內(nèi)部靈活的調(diào)度呢?
文件I/O:項目號稱內(nèi)部文件I/O操作比標準調(diào)用性能還好,它是怎么做到的?
內(nèi)存的管理:項目沒有使用任何智能指針,卻能管理好內(nèi)存問題,這是個技術(shù)活,當然,也得益于這優(yōu)秀的架構(gòu)設(shè)計。
我發(fā)現(xiàn)workflow團隊對這個項目相當重視,還特意建了個QQ交流群(群號碼是618773193),對此項目有任何問題都可以在這個群里探討,也方便了我們學習,真的不錯。
參考資料
https://zhuanlan.zhihu.com/p/358869362
https://zhuanlan.zhihu.com/p/165638263
項目地址如下:https://github.com/sogou/workflow,也可以點擊閱讀原文直達。
在訪問GitHub遇到困難時,可使用他們的Gitee官方倉庫:https://gitee.com/sogou/workflow
感覺這個項目值得學習的話就給人家個star,不要白嫖哈,對項目團隊來說也是一種認可和鼓勵。
點擊閱讀原文直達workflow倉庫。
↓ ↓ ↓
