快手基于 Flink 構(gòu)建實(shí)時(shí)數(shù)倉(cāng)場(chǎng)景化實(shí)踐
一、快手實(shí)時(shí)計(jì)算場(chǎng)景

公司級(jí)別的核心數(shù)據(jù):包括公司經(jīng)營(yíng)大盤(pán),實(shí)時(shí)核心日?qǐng)?bào),以及移動(dòng)版數(shù)據(jù)。相當(dāng)于團(tuán)隊(duì)會(huì)有公司的大盤(pán)指標(biāo),以及各個(gè)業(yè)務(wù)線,比如視頻相關(guān)、直播相關(guān),都會(huì)有一個(gè)核心的實(shí)時(shí)看板;
大型活動(dòng)實(shí)時(shí)指標(biāo):其中最核心的內(nèi)容是實(shí)時(shí)大屏。例如快手的春晚活動(dòng),我們會(huì)有一個(gè)總體的大屏去看總體活動(dòng)現(xiàn)狀。一個(gè)大型的活動(dòng)會(huì)分為 N 個(gè)不同的模塊,我們對(duì)每一個(gè)模塊不同的玩法會(huì)有不同的實(shí)時(shí)數(shù)據(jù)看板;
運(yùn)營(yíng)部分的數(shù)據(jù):運(yùn)營(yíng)數(shù)據(jù)主要包括兩方面,一個(gè)是創(chuàng)作者,另一個(gè)是內(nèi)容。對(duì)于創(chuàng)作者和內(nèi)容,在運(yùn)營(yíng)側(cè),比如上線一個(gè)大 V 的活動(dòng),我們想看到一些信息如直播間的實(shí)時(shí)現(xiàn)狀,以及直播間對(duì)于大盤(pán)的牽引情況?;谶@個(gè)場(chǎng)景,我們會(huì)做各種實(shí)時(shí)大屏的多維數(shù)據(jù),以及大盤(pán)的一些數(shù)據(jù)。
此外,這塊還包括運(yùn)營(yíng)策略的支撐,比如我們可能會(huì)實(shí)時(shí)發(fā)掘一些熱點(diǎn)內(nèi)容和熱點(diǎn)創(chuàng)作者,以及目前的一些熱點(diǎn)情況。我們基于這些熱點(diǎn)情況輸出策略,這個(gè)也是我們需要提供的一些支撐能力;
最后還包括 C 端數(shù)據(jù)展示,比如現(xiàn)在快手里有創(chuàng)作者中心和主播中心,這里會(huì)有一些如主播關(guān)播的關(guān)播頁(yè),關(guān)播頁(yè)的實(shí)時(shí)數(shù)據(jù)有一部分也是我們做的。
實(shí)時(shí)特征:包含搜索推薦特征和廣告實(shí)時(shí)特征。
二、快手實(shí)時(shí)數(shù)倉(cāng)架構(gòu)及保障措施
1. 目標(biāo)及難點(diǎn)

■ 1.1 目標(biāo)
首先由于我們是做數(shù)倉(cāng)的,因此希望所有的實(shí)時(shí)指標(biāo)都有離線指標(biāo)去對(duì)應(yīng),要求實(shí)時(shí)指標(biāo)和離線指標(biāo)整體的數(shù)據(jù)差異在 1% 以?xún)?nèi),這是最低標(biāo)準(zhǔn)。
其次是數(shù)據(jù)延遲,其 SLA 標(biāo)準(zhǔn)是活動(dòng)期間所有核心報(bào)表場(chǎng)景的數(shù)據(jù)延遲不能超過(guò) 5 分鐘,這 5 分鐘包括作業(yè)掛掉之后和恢復(fù)時(shí)間,如果超過(guò)則意味著 SLA 不達(dá)標(biāo)。
最后是穩(wěn)定性,針對(duì)一些場(chǎng)景,比如作業(yè)重啟后,我們的曲線是正常的,不會(huì)因?yàn)樽鳂I(yè)重啟導(dǎo)致指標(biāo)產(chǎn)出一些明顯的異常。
■ 1.2 難點(diǎn)
第一個(gè)難點(diǎn)是數(shù)據(jù)量大。每天整體的入口流量數(shù)據(jù)量級(jí)大概在萬(wàn)億級(jí)。在活動(dòng)如春晚的場(chǎng)景,QPS 峰值能達(dá)到億 / 秒。
第二個(gè)難點(diǎn)是組件依賴(lài)比較復(fù)雜??赡苓@條鏈路里有的依賴(lài)于 Kafka,有的依賴(lài) Flink,還有一些依賴(lài) KV 存儲(chǔ)、RPC 接口、OLAP 引擎等,我們需要思考在這條鏈路里如何分布,才能讓這些組件都能正常工作。
第三個(gè)難點(diǎn)是鏈路復(fù)雜。目前我們有 200+ 核心業(yè)務(wù)作業(yè),50+ 核心數(shù)據(jù)源,整體作業(yè)超過(guò) 1000。
2. 實(shí)時(shí)數(shù)倉(cāng) - 分層模型

最下層有三個(gè)不同的數(shù)據(jù)源,分別是客戶(hù)端日志、服務(wù)端日志以及 Binlog 日志;
在公共基礎(chǔ)層分為兩個(gè)不同的層次,一個(gè)是 DWD 層,做明細(xì)數(shù)據(jù),另一個(gè)是 DWS 層,做公共聚合數(shù)據(jù),DIM 是我們常說(shuō)的維度。我們有一個(gè)基于離線數(shù)倉(cāng)的主題預(yù)分層,這個(gè)主題預(yù)分層可能包括流量、用戶(hù)、設(shè)備、視頻的生產(chǎn)消費(fèi)、風(fēng)控、社交等。
DWD 層的核心工作是標(biāo)準(zhǔn)化的清洗;
DWS 層是把維度的數(shù)據(jù)和 DWD 層進(jìn)行關(guān)聯(lián),關(guān)聯(lián)之后生成一些通用粒度的聚合層次。
再往上是應(yīng)用層,包括一些大盤(pán)的數(shù)據(jù),多維分析的模型以及業(yè)務(wù)專(zhuān)題數(shù)據(jù);
最上面是場(chǎng)景。
第一步是做業(yè)務(wù)數(shù)據(jù)化,相當(dāng)于把業(yè)務(wù)的數(shù)據(jù)接進(jìn)來(lái);
第二步是數(shù)據(jù)資產(chǎn)化,意思是對(duì)數(shù)據(jù)做很多的清洗,然后形成一些規(guī)則有序的數(shù)據(jù);
第三步是數(shù)據(jù)業(yè)務(wù)化,可以理解數(shù)據(jù)在實(shí)時(shí)數(shù)據(jù)層面可以反哺業(yè)務(wù),為業(yè)務(wù)數(shù)據(jù)價(jià)值建設(shè)提供一些賦能。
3. 實(shí)時(shí)數(shù)倉(cāng) - 保障措施

我們先看藍(lán)色部分的質(zhì)量保障。針對(duì)質(zhì)量保障,可以看到在數(shù)據(jù)源階段,做了如數(shù)據(jù)源的亂序監(jiān)控,這是我們基于自己的 SDK 的采集做的,以及數(shù)據(jù)源和離線的一致性校準(zhǔn)。研發(fā)階段的計(jì)算過(guò)程有三個(gè)階段,分別是研發(fā)階段、上線階段和服務(wù)階段。
研發(fā)階段可能會(huì)提供一個(gè)標(biāo)準(zhǔn)化的模型,基于這個(gè)模型會(huì)有一些 Benchmark,并且做離線的比對(duì)驗(yàn)證,保證質(zhì)量是一致的;
上線階段更多的是服務(wù)監(jiān)控和指標(biāo)監(jiān)控;
在服務(wù)階段,如果出現(xiàn)一些異常情況,先做 Flink 狀態(tài)拉起,如果出現(xiàn)了一些不符合預(yù)期的場(chǎng)景,我們會(huì)做離線的整體數(shù)據(jù)修復(fù)。
第二個(gè)是時(shí)效性保障。針對(duì)數(shù)據(jù)源,我們把數(shù)據(jù)源的延遲情況也納入監(jiān)控。在研發(fā)階段其實(shí)還有兩個(gè)事情:
首先是壓測(cè),常規(guī)的任務(wù)會(huì)拿最近 7 天或者最近 14 天的峰值流量去看它是否存在任務(wù)延遲的情況;
通過(guò)壓測(cè)之后,會(huì)有一些任務(wù)上線和重啟性能評(píng)估,相當(dāng)于按照 CP 恢復(fù)之后,重啟的性能是什么樣子。
最后一個(gè)是穩(wěn)定保障。這在大型活動(dòng)中會(huì)做得比較多,比如切換演練和分級(jí)保障。我們會(huì)基于之前的壓測(cè)結(jié)果做限流,目的是保障作業(yè)在超過(guò)極限的情況下,仍然是穩(wěn)定的,不會(huì)出現(xiàn)很多的不穩(wěn)定或者 CP 失敗的情況。之后我們會(huì)有兩種不同的標(biāo)準(zhǔn),一種是冷備雙機(jī)房,另外一種是熱備雙機(jī)房。
冷備雙機(jī)房是:當(dāng)一個(gè)單機(jī)房掛掉,我們會(huì)從另一個(gè)機(jī)房去拉起;
熱備雙機(jī)房:相當(dāng)于同樣一份邏輯在兩個(gè)機(jī)房各部署一次。
三、快手場(chǎng)景問(wèn)題及解決方案
1. PV/UV 標(biāo)準(zhǔn)化
■ 1.1 場(chǎng)景

這個(gè)頁(yè)面來(lái)了多少人,或者有多少人點(diǎn)擊進(jìn)入這個(gè)頁(yè)面;
活動(dòng)一共來(lái)了多少人;
頁(yè)面里的某一個(gè)掛件,獲得了多少點(diǎn)擊、產(chǎn)生了多少曝光。
■ 1.2 方案



Watermark 推進(jìn)過(guò)了窗口的 event_time,它會(huì)進(jìn)行一次下發(fā)的觸發(fā),通過(guò)這種方式可以解決回溯的問(wèn)題,數(shù)據(jù)本身落在真實(shí)的窗口, Watermark 推進(jìn),在窗口結(jié)束后觸發(fā)。
此外,這種方式在一定程度上能夠解決亂序的問(wèn)題。比如它的亂序數(shù)據(jù)本身是一個(gè)不丟棄的狀態(tài),會(huì)記錄到最新的累計(jì)數(shù)據(jù)。
最后是語(yǔ)義一致性,它會(huì)基于事件時(shí)間,在亂序不嚴(yán)重的情況下,和離線計(jì)算出來(lái)的結(jié)果一致性是相當(dāng)高的。
2. DAU 計(jì)算
■ 2.1 背景介紹

活躍設(shè)備指的是當(dāng)天來(lái)過(guò)的設(shè)備;
新增設(shè)備指的是當(dāng)天來(lái)過(guò)且歷史沒(méi)有來(lái)過(guò)的設(shè)備;
回流設(shè)備指的是當(dāng)天來(lái)過(guò)且 N 天內(nèi)沒(méi)有來(lái)過(guò)的設(shè)備。
第一個(gè)問(wèn)題是:數(shù)據(jù)源是 6~8 個(gè),而且我們大盤(pán)的口徑經(jīng)常會(huì)做微調(diào),如果是單作業(yè)的話,每次微調(diào)的過(guò)程之中都要改,單作業(yè)的穩(wěn)定性會(huì)非常差;
第二個(gè)問(wèn)題是:數(shù)據(jù)量是萬(wàn)億級(jí),這會(huì)導(dǎo)致兩個(gè)情況,首先是這個(gè)量級(jí)的單作業(yè)穩(wěn)定性非常差,其次是實(shí)時(shí)關(guān)聯(lián)維表的時(shí)候用的 KV 存儲(chǔ),任何一個(gè)這樣的 RPC 服務(wù)接口,都不可能在萬(wàn)億級(jí)數(shù)據(jù)量的場(chǎng)景下保證服務(wù)穩(wěn)定性;
第三個(gè)問(wèn)題是:我們對(duì)于時(shí)延要求比較高,要求時(shí)延小于一分鐘。整個(gè)鏈路要避免批處理,如果出現(xiàn)了一些任務(wù)性能的單點(diǎn)問(wèn)題,我們還要保證高性能和可擴(kuò)容。
■ 2.2 技術(shù)方案

首先是穩(wěn)定性。松耦合可以簡(jiǎn)單理解為當(dāng)數(shù)據(jù)源 A 的邏輯和數(shù)據(jù)源 B 的邏輯需要修改時(shí),可以單獨(dú)修改。第二是任務(wù)可擴(kuò)容,因?yàn)槲覀儼阉羞壿嫴鸱值梅浅?/span>細(xì)粒度,當(dāng)一些地方出現(xiàn)了如流量問(wèn)題,不會(huì)影響后面的部分,所以它擴(kuò)容比較簡(jiǎn)單,除此之外還有服務(wù)化后置和狀態(tài)可控。
其次是時(shí)效性,我們做到毫秒延遲,并且維度豐富,整體上有 20+ 的維度做多維聚合。
最后是準(zhǔn)確性,我們支持?jǐn)?shù)據(jù)驗(yàn)證、實(shí)時(shí)監(jiān)控、模型出口統(tǒng)一等。
■ 2.3 延遲計(jì)算方案

第一種解決方案是用 “did + 維度 + 分鐘” 進(jìn)行去重,Value 設(shè)為 “是否來(lái)過(guò)”。比如同一個(gè) did,04:01 來(lái)了一條,它會(huì)進(jìn)行結(jié)果輸出。同樣的,04:02 和 04:04 也會(huì)進(jìn)行結(jié)果輸出。但如果 04:01 再來(lái),它就會(huì)丟棄,但如果 04:00 來(lái),依舊會(huì)進(jìn)行結(jié)果輸出。
這個(gè)解決方案存在一些問(wèn)題,因?yàn)槲覀儼捶昼姶?,?20 分鐘的狀態(tài)大小是存 10 分鐘的兩倍,到后面這個(gè)狀態(tài)大小有點(diǎn)不太可控,因此我們又換了解決方案 2。
第二種解決方案,我們的做法會(huì)涉及到一個(gè)假設(shè)前提,就是假設(shè)不存在數(shù)據(jù)源亂序的情況。在這種情況下,key 存的是 “did + 維度”,Value 為 “時(shí)間戳”,它的更新方式如上圖所示。
04:01 來(lái)了一條數(shù)據(jù),進(jìn)行結(jié)果輸出。04:02 來(lái)了一條數(shù)據(jù),如果是同一個(gè) did,那么它會(huì)更新時(shí)間戳,然后仍然做結(jié)果輸出。04:04 也是同樣的邏輯,然后將時(shí)間戳更新到 04:04,如果后面來(lái)了一條 04:01 的數(shù)據(jù),它發(fā)現(xiàn)時(shí)間戳已經(jīng)更新到 04:04,它會(huì)丟棄這條數(shù)據(jù)。
這樣的做法大幅度減少了本身所需要的一些狀態(tài),但是對(duì)亂序是零容忍,不允許發(fā)生任何亂序的情況,由于我們不好解決這個(gè)問(wèn)題,因此我們又想出了解決方案 3。
方案 3 是在方案 2 時(shí)間戳的基礎(chǔ)之上,加了一個(gè)類(lèi)似于環(huán)形緩沖區(qū),在緩沖區(qū)之內(nèi)允許亂序。
比如 04:01 來(lái)了一條數(shù)據(jù),進(jìn)行結(jié)果輸出;04:02 來(lái)了一條數(shù)據(jù),它會(huì)把時(shí)間戳更新到 04:02,并且會(huì)記錄同一個(gè)設(shè)備在 04:01 也來(lái)過(guò)。如果 04:04 再來(lái)了一條數(shù)據(jù),就按照相應(yīng)的時(shí)間差做一個(gè)位移,最后通過(guò)這樣的邏輯去保障它能夠容忍一定的亂序。
方案 1 在容忍 16 分鐘亂序的情況下,單作業(yè)的狀態(tài)大小在 480G 左右。這種情況雖然保證了準(zhǔn)確性,但是作業(yè)的恢復(fù)和穩(wěn)定性是完全不可控的狀態(tài),因此我們還是放棄了這個(gè)方案;
方案 2 是 30G 左右的狀態(tài)大小,對(duì)于亂序 0 容忍,但是數(shù)據(jù)不準(zhǔn)確,由于我們對(duì)準(zhǔn)確性的要求非常高,因此也放棄了這個(gè)方案;
方案 3 的狀態(tài)跟方案 1 相比,它的狀態(tài)雖然變化了但是增加的不多,而且整體能達(dá)到跟方案 1 同樣的效果。方案 3 容忍亂序的時(shí)間是 16 分鐘,我們正常更新一個(gè)作業(yè)的話,10 分鐘完全足夠重啟,因此最終選擇了方案 3。
3. 運(yùn)營(yíng)場(chǎng)景
■ 3.1 背景介紹

第一個(gè)是數(shù)據(jù)大屏支持,包括單直播間的分析數(shù)據(jù)和大盤(pán)的分析數(shù)據(jù),需要做到分鐘級(jí)延遲,更新要求比較高;
第二個(gè)是直播看板支持,直播看板的數(shù)據(jù)會(huì)有特定維度的分析,特定人群支持,對(duì)維度豐富性要求比較高;
第三個(gè)是數(shù)據(jù)策略榜單,這個(gè)榜單主要是預(yù)測(cè)熱門(mén)作品、爆款,要求的是小時(shí)級(jí)別的數(shù)據(jù),更新要求比較低;
第四個(gè)是 C 端實(shí)時(shí)指標(biāo)展示,查詢(xún)量比較大,但是查詢(xún)模式比較固定。

■ 3.2 技術(shù)方案

首先看一下基礎(chǔ)明細(xì)層 (圖中左方),數(shù)據(jù)源有兩條鏈路,其中一條鏈路是消費(fèi)的流,比如直播的消費(fèi)信息,還有觀看 / 點(diǎn)贊 / 評(píng)論。經(jīng)過(guò)一輪基礎(chǔ)清洗,然后做維度管理。上游的這些維度信息來(lái)源于 Kafka,Kafka 寫(xiě)入了一些內(nèi)容的維度,放到了 KV 存儲(chǔ)里邊,包括一些用戶(hù)的維度。
這些維度關(guān)聯(lián)了之后,最終寫(xiě)入 Kafka 的 DWD 事實(shí)層,這里為了做性能的提升,我們做了二級(jí)緩存的操作。
如圖中上方,我們讀取 DWD 層的數(shù)據(jù)然后做基礎(chǔ)匯總,核心是窗口維度聚合生成 4 種不同粒度的數(shù)據(jù),分別是大盤(pán)多維匯總 topic、直播間多維匯總 topic、作者多維匯總 topic、用戶(hù)多維匯總 topic,這些都是通用維度的數(shù)據(jù)。
如圖中下方,基于這些通用維度數(shù)據(jù),我們?cè)偃ゼ庸€(gè)性化維度的數(shù)據(jù),也就是 ADS 層。拿到了這些數(shù)據(jù)之后會(huì)有維度擴(kuò)展,包括內(nèi)容擴(kuò)展和運(yùn)營(yíng)維度的拓展,然后再去做聚合,比如會(huì)有電商實(shí)時(shí) topic,機(jī)構(gòu)服務(wù)實(shí)時(shí) topic 和大 V 直播實(shí)時(shí) topic。
分成這樣的兩個(gè)鏈路會(huì)有一個(gè)好處:一個(gè)地方處理的是通用維度,另一個(gè)地方處理的是個(gè)性化的維度。通用維度保障的要求會(huì)比較高一些,個(gè)性化維度則會(huì)做很多個(gè)性化的邏輯。如果這兩個(gè)耦合在一起的話,會(huì)發(fā)現(xiàn)任務(wù)經(jīng)常出問(wèn)題,并且分不清楚哪個(gè)任務(wù)的職責(zé)是什么,構(gòu)建不出這樣的一個(gè)穩(wěn)定層。
如圖中右方,最終我們用到了三種不同的引擎。簡(jiǎn)單來(lái)說(shuō)就是 Redis 查詢(xún)用到了 C 端的場(chǎng)景,OLAP 查詢(xún)用到了大屏、業(yè)務(wù)看板的場(chǎng)景。
四、未來(lái)規(guī)劃

第一部分是實(shí)時(shí)保障體系完善:
一方面做一些大型的活動(dòng),包括春晚活動(dòng)以及后續(xù)常態(tài)化的活動(dòng)。針對(duì)這些活動(dòng)如何去保障,我們有一套規(guī)范去做平臺(tái)化的建設(shè);
第二個(gè)是分級(jí)保障標(biāo)準(zhǔn)制定,哪些作業(yè)是什么樣的保障級(jí)別 / 標(biāo)準(zhǔn),會(huì)有一個(gè)標(biāo)準(zhǔn)化的說(shuō)明;
第三個(gè)是引擎平臺(tái)能力推動(dòng)解決,包括 Flink 任務(wù)的一些引擎,在這上面我們會(huì)有一個(gè)平臺(tái),基于這個(gè)平臺(tái)去做規(guī)范、標(biāo)準(zhǔn)化的推動(dòng)。
第二部分是實(shí)時(shí)數(shù)倉(cāng)內(nèi)容構(gòu)建:
一方面是場(chǎng)景化方案的輸出,比如針對(duì)活動(dòng)會(huì)有一些通用化的方案,而不是每次活動(dòng)都開(kāi)發(fā)一套新的解決方案;
另一方面是內(nèi)容數(shù)據(jù)層次沉淀,比如現(xiàn)在的數(shù)據(jù)內(nèi)容建設(shè),在厚度方面有一些場(chǎng)景的缺失,包括內(nèi)容如何更好地服務(wù)于上游的場(chǎng)景。
第三部分是 Flink SQL 場(chǎng)景化構(gòu)建,包括 SQL 持續(xù)推廣、SQL 任務(wù)穩(wěn)定性和 SQL 任務(wù)資源利用率。我們?cè)陬A(yù)估資源的過(guò)程中,會(huì)考慮比如在什么樣 QPS 的場(chǎng)景下, SQL 用什么樣的解決方案,能支撐到什么情況。Flink SQL 可以大幅減少人效,但是在這個(gè)過(guò)程中,我們想讓業(yè)務(wù)操作更加簡(jiǎn)單。
第四部分是批流一體探索。實(shí)時(shí)數(shù)倉(cāng)的場(chǎng)景其實(shí)就是做離線 ETL 計(jì)算加速,我們會(huì)有很多小時(shí)級(jí)別的任務(wù),針對(duì)這些任務(wù),每次批處理的時(shí)候有一些邏輯可以放到流處理去解決,這對(duì)于離線數(shù)倉(cāng) SLA 體系的提升十分巨大。
