20000字,詳解大廠實(shí)時(shí)數(shù)倉(cāng)建設(shè)(好文收藏)

來源:五分鐘學(xué)大數(shù)據(jù)
一、實(shí)時(shí)數(shù)倉(cāng)建設(shè)背景
1. 實(shí)時(shí)需求日趨迫切
目前各大公司的產(chǎn)品需求和內(nèi)部決策對(duì)于數(shù)據(jù)實(shí)時(shí)性的要求越來越迫切,需要實(shí)時(shí)數(shù)倉(cāng)的能力來賦能。傳統(tǒng)離線數(shù)倉(cāng)的數(shù)據(jù)時(shí)效性是 T+1,調(diào)度頻率以天為單位,無法支撐實(shí)時(shí)場(chǎng)景的數(shù)據(jù)需求。即使能將調(diào)度頻率設(shè)置成小時(shí),也只能解決部分時(shí)效性要求不高的場(chǎng)景,對(duì)于實(shí)效性要求很高的場(chǎng)景還是無法優(yōu)雅的支撐。因此實(shí)時(shí)使用數(shù)據(jù)的問題必須得到有效解決。
2. 實(shí)時(shí)技術(shù)日趨成熟
實(shí)時(shí)計(jì)算框架已經(jīng)經(jīng)歷了三代發(fā)展,分別是:Storm、SparkStreaming、Flink,計(jì)算框架越來越成熟。一方面,實(shí)時(shí)任務(wù)的開發(fā)已經(jīng)能通過編寫 SQL 的方式來完成,在技術(shù)層面能很好地繼承離線數(shù)倉(cāng)的架構(gòu)設(shè)計(jì)思想;另一方面,在線數(shù)據(jù)開發(fā)平臺(tái)所提供的功能對(duì)實(shí)時(shí)任務(wù)開發(fā)、調(diào)試、運(yùn)維的支持也日漸趨于成熟,開發(fā)成本逐步降低,有助于去做這件事。
二、實(shí)時(shí)數(shù)倉(cāng)建設(shè)目的
1. 解決傳統(tǒng)數(shù)倉(cāng)的問題
從目前數(shù)倉(cāng)建設(shè)的現(xiàn)狀來看,實(shí)時(shí)數(shù)倉(cāng)是一個(gè)容易讓人產(chǎn)生混淆的概念,根據(jù)傳統(tǒng)經(jīng)驗(yàn)分析,數(shù)倉(cāng)有一個(gè)重要的功能,即能夠記錄歷史。通常,數(shù)倉(cāng)都是希望從業(yè)務(wù)上線的第一天開始有數(shù)據(jù),然后一直記錄到現(xiàn)在。但實(shí)時(shí)流處理技術(shù),又是強(qiáng)調(diào)當(dāng)前處理狀態(tài)的一個(gè)技術(shù),結(jié)合當(dāng)前一線大廠的建設(shè)經(jīng)驗(yàn)和滴滴在該領(lǐng)域的建設(shè)現(xiàn)狀,我們嘗試把公司內(nèi)實(shí)時(shí)數(shù)倉(cāng)建設(shè)的目的定位為,以數(shù)倉(cāng)建設(shè)理論和實(shí)時(shí)技術(shù),解決由于當(dāng)前離線數(shù)倉(cāng)數(shù)據(jù)時(shí)效性低解決不了的問題。
現(xiàn)階段我們要建設(shè)實(shí)時(shí)數(shù)倉(cāng)的主要原因是:
公司業(yè)務(wù)對(duì)于數(shù)據(jù)的實(shí)時(shí)性越來越迫切,需要有實(shí)時(shí)數(shù)據(jù)來輔助完成決策; 實(shí)時(shí)數(shù)據(jù)建設(shè)沒有規(guī)范,數(shù)據(jù)可用性較差,無法形成數(shù)倉(cāng)體系,資源大量浪費(fèi); 數(shù)據(jù)平臺(tái)工具對(duì)整體實(shí)時(shí)開發(fā)的支持也日漸趨于成熟,開發(fā)成本降低。
2. 實(shí)時(shí)數(shù)倉(cāng)的應(yīng)用場(chǎng)景
實(shí)時(shí) OLAP 分析; 實(shí)時(shí)數(shù)據(jù)看板; 實(shí)時(shí)業(yè)務(wù)監(jiān)控; 實(shí)時(shí)數(shù)據(jù)接口服務(wù)。
三、實(shí)時(shí)數(shù)倉(cāng)建設(shè)方案
接下來我們分析下目前實(shí)時(shí)數(shù)倉(cāng)建設(shè)比較好的幾個(gè)案例,希望這些案例能夠給大家?guī)硪恍﹩l(fā)。
1. 滴滴順風(fēng)車實(shí)時(shí)數(shù)倉(cāng)案例
滴滴數(shù)據(jù)團(tuán)隊(duì)建設(shè)的實(shí)時(shí)數(shù)倉(cāng),基本滿足了順風(fēng)車業(yè)務(wù)方在實(shí)時(shí)側(cè)的各類業(yè)務(wù)需求,初步建立起順風(fēng)車實(shí)時(shí)數(shù)倉(cāng),完成了整體數(shù)據(jù)分層,包含明細(xì)數(shù)據(jù)和匯總數(shù)據(jù),統(tǒng)一了 DWD 層,降低了大數(shù)據(jù)資源消耗,提高了數(shù)據(jù)復(fù)用性,可對(duì)外輸出豐富的數(shù)據(jù)服務(wù)。
數(shù)倉(cāng)具體架構(gòu)如下圖所示:

從數(shù)據(jù)架構(gòu)圖來看,順風(fēng)車實(shí)時(shí)數(shù)倉(cāng)和對(duì)應(yīng)的離線數(shù)倉(cāng)有很多類似的地方。例如分層結(jié)構(gòu);比如 ODS 層,明細(xì)層,匯總層,乃至應(yīng)用層,他們命名的模式可能都是一樣的。但仔細(xì)比較不難發(fā)現(xiàn),兩者有很多區(qū)別:
與離線數(shù)倉(cāng)相比,實(shí)時(shí)數(shù)倉(cāng)的層次更少一些:
從目前建設(shè)離線數(shù)倉(cāng)的經(jīng)驗(yàn)來看,數(shù)倉(cāng)的數(shù)據(jù)明細(xì)層內(nèi)容會(huì)非常豐富,處理明細(xì)數(shù)據(jù)外一般還會(huì)包含輕度匯總層的概念,另外離線數(shù)倉(cāng)中應(yīng)用層數(shù)據(jù)在數(shù)倉(cāng)內(nèi)部,但實(shí)時(shí)數(shù)倉(cāng)中,app 應(yīng)用層數(shù)據(jù)已經(jīng)落入應(yīng)用系統(tǒng)的存儲(chǔ)介質(zhì)中,可以把該層與數(shù)倉(cāng)的表分離; 應(yīng)用層少建設(shè)的好處:實(shí)時(shí)處理數(shù)據(jù)的時(shí)候,每建一個(gè)層次,數(shù)據(jù)必然會(huì)產(chǎn)生一定的延遲; 匯總層少建的好處:在匯總統(tǒng)計(jì)的時(shí)候,往往為了容忍一部分?jǐn)?shù)據(jù)的延遲,可能會(huì)人為的制造一些延遲來保證數(shù)據(jù)的準(zhǔn)確。舉例,在統(tǒng)計(jì)跨天相關(guān)的訂單事件中的數(shù)據(jù)時(shí),可能會(huì)等到 00:00:05 或者 00:00:10 再統(tǒng)計(jì),確保 00:00 前的數(shù)據(jù)已經(jīng)全部接受到位了,再進(jìn)行統(tǒng)計(jì)。所以,匯總層的層次太多的話,就會(huì)更大的加重人為造成的數(shù)據(jù)延遲。
與離線數(shù)倉(cāng)相比,實(shí)時(shí)數(shù)倉(cāng)的數(shù)據(jù)源存儲(chǔ)不同:
在建設(shè)離線數(shù)倉(cāng)的時(shí)候,目前滴滴內(nèi)部整個(gè)離線數(shù)倉(cāng)都是建立在 Hive 表之上。但是,在建設(shè)實(shí)時(shí)數(shù)倉(cāng)的時(shí)候,同一份表,會(huì)使用不同的方式進(jìn)行存儲(chǔ)。比如常見的情況下,明細(xì)數(shù)據(jù)或者匯總數(shù)據(jù)都會(huì)存在 Kafka 里面,但是像城市、渠道等維度信息需要借助 Hbase,mysql 或者其他 KV 存儲(chǔ)等數(shù)據(jù)庫(kù)來進(jìn)行存儲(chǔ)。
接下來,根據(jù)順風(fēng)車實(shí)時(shí)數(shù)倉(cāng)架構(gòu)圖,對(duì)每一層建設(shè)做具體展開:
1. ODS 貼源層建設(shè)
根據(jù)順風(fēng)車具體場(chǎng)景,目前順風(fēng)車數(shù)據(jù)源主要包括訂單相關(guān)的 binlog 日志,冒泡和安全相關(guān)的 public 日志,流量相關(guān)的埋點(diǎn)日志等。這些數(shù)據(jù)部分已采集寫入 kafka 或 ddmq 等數(shù)據(jù)通道中,部分?jǐn)?shù)據(jù)需要借助內(nèi)部自研同步工具完成采集,最終基于順風(fēng)車數(shù)倉(cāng) ods 層建設(shè)規(guī)范分主題統(tǒng)一寫入 kafka 存儲(chǔ)介質(zhì)中。
命名規(guī)范:ODS 層實(shí)時(shí)數(shù)據(jù)源主要包括兩種。
一種是在離線采集時(shí)已經(jīng)自動(dòng)生產(chǎn)的 DDMQ 或者是 Kafka topic,這類型的數(shù)據(jù)命名方式為采集系統(tǒng)自動(dòng)生成規(guī)范為:cn-binlog-數(shù)據(jù)庫(kù)名-數(shù)據(jù)庫(kù)名 eg: cn-binlog-ihap_fangyuan-ihap_fangyuan一種是需要自己進(jìn)行采集同步到 kafka topic 中,生產(chǎn)的 topic 命名規(guī)范同離線類似:ODS 層采用: realtime_ods_binlog_{源系統(tǒng)庫(kù)/表名}/ods_log_{日志名} eg: realtime_ods_binlog_ihap_fangyuan
2. DWD 明細(xì)層建設(shè)
根據(jù)順風(fēng)車業(yè)務(wù)過程作為建模驅(qū)動(dòng),基于每個(gè)具體的業(yè)務(wù)過程特點(diǎn),構(gòu)建最細(xì)粒度的明細(xì)層事實(shí)表;結(jié)合順風(fēng)車分析師在離線側(cè)的數(shù)據(jù)使用特點(diǎn),將明細(xì)事實(shí)表的某些重要維度屬性字段做適當(dāng)冗余,完成寬表化處理,之后基于當(dāng)前順風(fēng)車業(yè)務(wù)方對(duì)實(shí)時(shí)數(shù)據(jù)的需求重點(diǎn),重點(diǎn)建設(shè)交易、財(cái)務(wù)、體驗(yàn)、安全、流量等幾大模塊;該層的數(shù)據(jù)來源于 ODS 層,通過大數(shù)據(jù)架構(gòu)提供的 Stream SQL 完成 ETL 工作,對(duì)于 binlog 日志的處理主要進(jìn)行簡(jiǎn)單的數(shù)據(jù)清洗、處理數(shù)據(jù)漂移和數(shù)據(jù)亂序,以及可能對(duì)多個(gè) ODS 表進(jìn)行 Stream Join,對(duì)于流量日志主要是做通用的 ETL 處理和針對(duì)順風(fēng)車場(chǎng)景的數(shù)據(jù)過濾,完成非結(jié)構(gòu)化數(shù)據(jù)的結(jié)構(gòu)化處理和數(shù)據(jù)的分流;該層的數(shù)據(jù)除了存儲(chǔ)在消息隊(duì)列 Kafka 中,通常也會(huì)把數(shù)據(jù)實(shí)時(shí)寫入 Druid 數(shù)據(jù)庫(kù)中,供查詢明細(xì)數(shù)據(jù)和作為簡(jiǎn)單匯總數(shù)據(jù)的加工數(shù)據(jù)源。
命名規(guī)范:DWD 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長(zhǎng)度不能超過 40 個(gè)字符,并且應(yīng)遵循下述規(guī)則:realtime_dwd_{業(yè)務(wù)/pub}_{數(shù)據(jù)域縮寫}_[{業(yè)務(wù)過程縮寫}]_[{自定義表命名標(biāo)簽縮寫}]
{業(yè)務(wù)/pub}:參考業(yè)務(wù)命名 {數(shù)據(jù)域縮寫}:參考數(shù)據(jù)域劃分部分 {自定義表命名標(biāo)簽縮寫}:實(shí)體名稱可以根據(jù)數(shù)據(jù)倉(cāng)庫(kù)轉(zhuǎn)換整合后做一定的業(yè)務(wù)抽象的名稱,該名稱應(yīng)該準(zhǔn)確表述實(shí)體所代表的業(yè)務(wù)含義 樣例:realtime_dwd_trip_trd_order_base
3. DIM 層
公共維度層,基于維度建模理念思想,建立整個(gè)業(yè)務(wù)過程的一致性維度,降低數(shù)據(jù)計(jì)算口徑和算法不統(tǒng)一風(fēng)險(xiǎn); DIM 層數(shù)據(jù)來源于兩部分:一部分是 Flink 程序?qū)崟r(shí)處理 ODS 層數(shù)據(jù)得到,另外一部分是通過離線任務(wù)出倉(cāng)得到; DIM 層維度數(shù)據(jù)主要使用 MySQL、Hbase、fusion(滴滴自研 KV 存儲(chǔ)) 三種存儲(chǔ)引擎,對(duì)于維表數(shù)據(jù)比較少的情況可以使用 MySQL,對(duì)于單條數(shù)據(jù)大小比較小,查詢 QPS 比較高的情況,可以使用 fusion 存儲(chǔ),降低機(jī)器內(nèi)存資源占用,對(duì)于數(shù)據(jù)量比較大,對(duì)維表數(shù)據(jù)變化不是特別敏感的場(chǎng)景,可以使用 HBase 存儲(chǔ)。
命名規(guī)范:DIM 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長(zhǎng)度不能超過 30 個(gè)字符,并且應(yīng)遵循下述規(guī)則:dim_{業(yè)務(wù)/pub}_{維度定義}[_{自定義命名標(biāo)簽}]:
{業(yè)務(wù)/pub}:參考業(yè)務(wù)命名 {維度定義}:參考維度命名 {自定義表命名標(biāo)簽縮寫}:實(shí)體名稱可以根據(jù)數(shù)據(jù)倉(cāng)庫(kù)轉(zhuǎn)換整合后做一定的業(yè)務(wù)抽象的名稱,該名稱應(yīng)該準(zhǔn)確表述實(shí)體所代表的業(yè)務(wù)含義 樣例:dim_trip_dri_base
4. DWM 匯總層建設(shè)
在建設(shè)順風(fēng)車實(shí)時(shí)數(shù)倉(cāng)的匯總層的時(shí)候,跟順風(fēng)車離線數(shù)倉(cāng)有很多一樣的地方,但其具體技術(shù)實(shí)現(xiàn)會(huì)存在很大不同。
第一:對(duì)于一些共性指標(biāo)的加工,比如 pv,uv,訂單業(yè)務(wù)過程指標(biāo)等,我們會(huì)在匯總層進(jìn)行統(tǒng)一的運(yùn)算,確保關(guān)于指標(biāo)的口徑是統(tǒng)一在一個(gè)固定的模型中完成。對(duì)于一些個(gè)性指標(biāo),從指標(biāo)復(fù)用性的角度出發(fā),確定唯一的時(shí)間字段,同時(shí)該字段盡可能與其他指標(biāo)在時(shí)間維度上完成拉齊,例如行中異常訂單數(shù)需要與交易域指標(biāo)在事件時(shí)間上做到拉齊。
第二:在順風(fēng)車匯總層建設(shè)中,需要進(jìn)行多維的主題匯總,因?yàn)閷?shí)時(shí)數(shù)倉(cāng)本身是面向主題的,可能每個(gè)主題會(huì)關(guān)心的維度都不一樣,所以需要在不同的主題下,按照這個(gè)主題關(guān)心的維度對(duì)數(shù)據(jù)進(jìn)行匯總,最后來算業(yè)務(wù)方需要的匯總指標(biāo)。在具體操作中,對(duì)于 pv 類指標(biāo)使用 Stream SQL 實(shí)現(xiàn) 1 分鐘匯總指標(biāo)作為最小匯總單位指標(biāo),在此基礎(chǔ)上進(jìn)行時(shí)間維度上的指標(biāo)累加;對(duì)于 uv 類指標(biāo)直接使用 druid 數(shù)據(jù)庫(kù)作為指標(biāo)匯總?cè)萜鳎鶕?jù)業(yè)務(wù)方對(duì)匯總指標(biāo)的及時(shí)性和準(zhǔn)確性的要求,實(shí)現(xiàn)相應(yīng)的精確去重和非精確去重。
第三:匯總層建設(shè)過程中,還會(huì)涉及到衍生維度的加工。在順風(fēng)車券相關(guān)的匯總指標(biāo)加工中我們使用 Hbase 的版本機(jī)制來構(gòu)建一個(gè)衍生維度的拉鏈表,通過事件流和 Hbase 維表關(guān)聯(lián)的方式得到實(shí)時(shí)數(shù)據(jù)當(dāng)時(shí)的準(zhǔn)確維度
命名規(guī)范:DWM 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長(zhǎng)度不能超過 40 個(gè)字符,并且應(yīng)遵循下述規(guī)則:realtime_dwm_{業(yè)務(wù)/pub}_{數(shù)據(jù)域縮寫}_{數(shù)據(jù)主粒度縮寫}_[{自定義表命名標(biāo)簽縮寫}]_{統(tǒng)計(jì)時(shí)間周期范圍縮寫}:
{業(yè)務(wù)/pub}:參考業(yè)務(wù)命名 {數(shù)據(jù)域縮寫}:參考數(shù)據(jù)域劃分部分 {數(shù)據(jù)主粒度縮寫}:指數(shù)據(jù)主要粒度或數(shù)據(jù)域的縮寫,也是聯(lián)合主鍵中的主要維度 {自定義表命名標(biāo)簽縮寫}:實(shí)體名稱可以根據(jù)數(shù)據(jù)倉(cāng)庫(kù)轉(zhuǎn)換整合后做一定的業(yè)務(wù)抽象的名稱,該名稱應(yīng)該準(zhǔn)確表述實(shí)體所代表的業(yè)務(wù)含義 {統(tǒng)計(jì)時(shí)間周期范圍縮寫}:1d:天增量;td:天累計(jì)(全量);1h:小時(shí)增量;th:小時(shí)累計(jì)(全量);1min:分鐘增量;tmin:分鐘累計(jì)(全量) 樣例: realtime_dwm_trip_trd_pas_bus_accum_1min
APP 應(yīng)用層
該層主要的工作是把實(shí)時(shí)匯總數(shù)據(jù)寫入應(yīng)用系統(tǒng)的數(shù)據(jù)庫(kù)中,包括用于大屏顯示和實(shí)時(shí) OLAP 的 Druid 數(shù)據(jù)庫(kù)(該數(shù)據(jù)庫(kù)除了寫入應(yīng)用數(shù)據(jù),也可以寫入明細(xì)數(shù)據(jù)完成匯總指標(biāo)的計(jì)算)中,用于實(shí)時(shí)數(shù)據(jù)接口服務(wù)的 Hbase 數(shù)據(jù)庫(kù),用于實(shí)時(shí)數(shù)據(jù)產(chǎn)品的 mysql 或者 redis 數(shù)據(jù)庫(kù)中。
命名規(guī)范:基于實(shí)時(shí)數(shù)倉(cāng)的特殊性不做硬性要求。
2. 快手實(shí)時(shí)數(shù)倉(cāng)場(chǎng)景化案例
1) 目標(biāo)及難點(diǎn)

目標(biāo)
首先由于是做數(shù)倉(cāng),因此希望所有的實(shí)時(shí)指標(biāo)都有離線指標(biāo)去對(duì)應(yīng),要求實(shí)時(shí)指標(biāo)和離線指標(biāo)整體的數(shù)據(jù)差異在 1% 以內(nèi),這是最低標(biāo)準(zhǔn)。
其次是數(shù)據(jù)延遲,其 SLA 標(biāo)準(zhǔn)是活動(dòng)期間所有核心報(bào)表場(chǎng)景的數(shù)據(jù)延遲不能超過 5 分鐘,這 5 分鐘包括作業(yè)掛掉之后和恢復(fù)時(shí)間,如果超過則意味著 SLA 不達(dá)標(biāo)。
最后是穩(wěn)定性,針對(duì)一些場(chǎng)景,比如作業(yè)重啟后,我們的曲線是正常的,不會(huì)因?yàn)樽鳂I(yè)重啟導(dǎo)致指標(biāo)產(chǎn)出一些明顯的異常。
難點(diǎn)
第一個(gè)難點(diǎn)是數(shù)據(jù)量大。每天整體的入口流量數(shù)據(jù)量級(jí)大概在萬(wàn)億級(jí)。在活動(dòng)如春晚的場(chǎng)景,QPS 峰值能達(dá)到億 / 秒。
第二個(gè)難點(diǎn)是組件依賴比較復(fù)雜。可能這條鏈路里有的依賴于 Kafka,有的依賴 Flink,還有一些依賴 KV 存儲(chǔ)、RPC 接口、OLAP 引擎等,我們需要思考在這條鏈路里如何分布,才能讓這些組件都能正常工作。
第三個(gè)難點(diǎn)是鏈路復(fù)雜。目前我們有 200+ 核心業(yè)務(wù)作業(yè),50+ 核心數(shù)據(jù)源,整體作業(yè)超過 1000。
2) 實(shí)時(shí)數(shù)倉(cāng) - 分層模型
基于上面三個(gè)難點(diǎn),來看一下數(shù)倉(cāng)架構(gòu):

如上所示:
最下層有三個(gè)不同的數(shù)據(jù)源,分別是客戶端日志、服務(wù)端日志以及 Binlog 日志;在公共基礎(chǔ)層分為兩個(gè)不同的層次,一個(gè)是 DWD 層,做明細(xì)數(shù)據(jù),另一個(gè)是 DWS 層,做公共聚合數(shù)據(jù),DIM 是我們常說的維度。我們有一個(gè)基于離線數(shù)倉(cāng)的主題預(yù)分層,這個(gè)主題預(yù)分層可能包括流量、用戶、設(shè)備、視頻的生產(chǎn)消費(fèi)、風(fēng)控、社交等。DWD 層的核心工作是標(biāo)準(zhǔn)化的清洗;DWS 層是把維度的數(shù)據(jù)和 DWD 層進(jìn)行關(guān)聯(lián),關(guān)聯(lián)之后生成一些通用粒度的聚合層次。再往上是應(yīng)用層,包括一些大盤的數(shù)據(jù),多維分析的模型以及業(yè)務(wù)專題數(shù)據(jù);最上面是場(chǎng)景。整體過程可以分為三步:
第一步是做業(yè)務(wù)數(shù)據(jù)化,相當(dāng)于把業(yè)務(wù)的數(shù)據(jù)接進(jìn)來;第二步是數(shù)據(jù)資產(chǎn)化,意思是對(duì)數(shù)據(jù)做很多的清洗,然后形成一些規(guī)則有序的數(shù)據(jù);第三步是數(shù)據(jù)業(yè)務(wù)化,可以理解數(shù)據(jù)在實(shí)時(shí)數(shù)據(jù)層面可以反哺業(yè)務(wù),為業(yè)務(wù)數(shù)據(jù)價(jià)值建設(shè)提供一些賦能。
3) 實(shí)時(shí)數(shù)倉(cāng) - 保障措施
基于上面的分層模型,來看一下整體的保障措施:

保障層面分為三個(gè)不同的部分,分別是質(zhì)量保障,時(shí)效保障以及穩(wěn)定保障。
我們先看藍(lán)色部分的質(zhì)量保障。針對(duì)質(zhì)量保障,可以看到在數(shù)據(jù)源階段,做了如數(shù)據(jù)源的亂序監(jiān)控,這是我們基于自己的 SDK 的采集做的,以及數(shù)據(jù)源和離線的一致性校準(zhǔn)。研發(fā)階段的計(jì)算過程有三個(gè)階段,分別是研發(fā)階段、上線階段和服務(wù)階段。研發(fā)階段可能會(huì)提供一個(gè)標(biāo)準(zhǔn)化的模型,基于這個(gè)模型會(huì)有一些 Benchmark,并且做離線的比對(duì)驗(yàn)證,保證質(zhì)量是一致的;上線階段更多的是服務(wù)監(jiān)控和指標(biāo)監(jiān)控;在服務(wù)階段,如果出現(xiàn)一些異常情況,先做 Flink 狀態(tài)拉起,如果出現(xiàn)了一些不符合預(yù)期的場(chǎng)景,我們會(huì)做離線的整體數(shù)據(jù)修復(fù)。
第二個(gè)是時(shí)效性保障。針對(duì)數(shù)據(jù)源,我們把數(shù)據(jù)源的延遲情況也納入監(jiān)控。在研發(fā)階段其實(shí)還有兩個(gè)事情:首先是壓測(cè),常規(guī)的任務(wù)會(huì)拿最近 7 天或者最近 14 天的峰值流量去看它是否存在任務(wù)延遲的情況;通過壓測(cè)之后,會(huì)有一些任務(wù)上線和重啟性能評(píng)估,相當(dāng)于按照 CP 恢復(fù)之后,重啟的性能是什么樣子。
最后一個(gè)是穩(wěn)定保障,這在大型活動(dòng)中會(huì)做得比較多,比如切換演練和分級(jí)保障。我們會(huì)基于之前的壓測(cè)結(jié)果做限流,目的是保障作業(yè)在超過極限的情況下,仍然是穩(wěn)定的,不會(huì)出現(xiàn)很多的不穩(wěn)定或者 CP 失敗的情況。之后我們會(huì)有兩種不同的標(biāo)準(zhǔn),一種是冷備雙機(jī)房,另外一種是熱備雙機(jī)房。冷備雙機(jī)房是:當(dāng)一個(gè)單機(jī)房掛掉,我們會(huì)從另一個(gè)機(jī)房去拉起;熱備雙機(jī)房:相當(dāng)于同樣一份邏輯在兩個(gè)機(jī)房各部署一次。以上就是我們整體的保障措施。
3) 快手場(chǎng)景問題及解決方案
1. PV/UV 標(biāo)準(zhǔn)化
1.1 場(chǎng)景
第一個(gè)問題是 PV/UV 標(biāo)準(zhǔn)化,這里有三個(gè)截圖:

第一張圖是春晚活動(dòng)的預(yù)熱場(chǎng)景,相當(dāng)于是一種玩法,第二和第三張圖是春晚當(dāng)天的發(fā)紅包活動(dòng)和直播間截圖。
在活動(dòng)進(jìn)行過程中,我們發(fā)現(xiàn) 60~70% 的需求是計(jì)算頁(yè)面里的信息,如:
這個(gè)頁(yè)面來了多少人,或者有多少人點(diǎn)擊進(jìn)入這個(gè)頁(yè)面; 活動(dòng)一共來了多少人; 頁(yè)面里的某一個(gè)掛件,獲得了多少點(diǎn)擊、產(chǎn)生了多少曝光。
1.2 方案
抽象一下這個(gè)場(chǎng)景就是下面這種 SQL:

簡(jiǎn)單來說,就是從一張表做篩選條件,然后按照維度層面做聚合,接著產(chǎn)生一些 Count 或者 Sum 操作。
基于這種場(chǎng)景,我們最開始的解決方案如上圖右邊所示。
我們用到了 Flink SQL 的 Early Fire 機(jī)制,從 Source 數(shù)據(jù)源取數(shù)據(jù),之后做了 DID 的分桶。比如最開始紫色的部分按這個(gè)做分桶,先做分桶的原因是防止某一個(gè) DID 存在熱點(diǎn)的問題。分桶之后會(huì)有一個(gè)叫做 Local Window Agg 的東西,相當(dāng)于數(shù)據(jù)分完桶之后把相同類型的數(shù)據(jù)相加。Local Window Agg 之后再按照維度進(jìn)行 Global Window Agg 的合桶,合桶的概念相當(dāng)于按照維度計(jì)算出最終的結(jié)果。Early Fire 機(jī)制相當(dāng)于在 Local Window Agg 開一個(gè)天級(jí)的窗口,然后每分鐘去對(duì)外輸出一次。
這個(gè)過程中我們遇到了一些問題,如上圖左下角所示。
在代碼正常運(yùn)行的情況下是沒有問題的,但如果整體數(shù)據(jù)存在延遲或者追溯歷史數(shù)據(jù)的情況,比如一分鐘 Early Fire 一次,因?yàn)樽匪輾v史的時(shí)候數(shù)據(jù)量會(huì)比較大,所以可能導(dǎo)致 14:00 追溯歷史,直接讀到了 14:02 的數(shù)據(jù),而 14:01 的那個(gè)點(diǎn)就被丟掉了,丟掉了以后會(huì)發(fā)生什么?

在這種場(chǎng)景下,圖中上方的曲線為 Early Fire 回溯歷史數(shù)據(jù)的結(jié)果。橫坐標(biāo)是分鐘,縱坐標(biāo)是截止到當(dāng)前時(shí)刻的頁(yè)面 UV,我們發(fā)現(xiàn)有些點(diǎn)是橫著的,意味著沒有數(shù)據(jù)結(jié)果,然后一個(gè)陡增,然后又橫著的,接著又一個(gè)陡增,而這個(gè)曲線的預(yù)期結(jié)果其實(shí)是圖中下方那種平滑的曲線。
為了解決這個(gè)問題,我們用到了 Cumulate Window 的解決方案,這個(gè)解決方案在 Flink 1.13 版本里也有涉及,其原理是一樣的。

數(shù)據(jù)開一個(gè)大的天級(jí)窗口,大窗口下又開了一個(gè)小的分鐘級(jí)窗口,數(shù)據(jù)按數(shù)據(jù)本身的 Row Time 落到分鐘級(jí)窗口。
Watermark 推進(jìn)過了窗口的 event_time,它會(huì)進(jìn)行一次下發(fā)的觸發(fā),通過這種方式可以解決回溯的問題,數(shù)據(jù)本身落在真實(shí)的窗口, Watermark 推進(jìn),在窗口結(jié)束后觸發(fā)。此外,這種方式在一定程度上能夠解決亂序的問題。比如它的亂序數(shù)據(jù)本身是一個(gè)不丟棄的狀態(tài),會(huì)記錄到最新的累計(jì)數(shù)據(jù)。最后是語(yǔ)義一致性,它會(huì)基于事件時(shí)間,在亂序不嚴(yán)重的情況下,和離線計(jì)算出來的結(jié)果一致性是相當(dāng)高的。以上是 PV/UV 一個(gè)標(biāo)準(zhǔn)化的解決方案。
2. DAU 計(jì)算
2.1 背景介紹
下面介紹一下 DAU 計(jì)算:

我們對(duì)于整個(gè)大盤的活躍設(shè)備、新增設(shè)備和回流設(shè)備有比較多的監(jiān)控。
活躍設(shè)備指的是當(dāng)天來過的設(shè)備;新增設(shè)備指的是當(dāng)天來過且歷史沒有來過的設(shè)備;回流設(shè)備指的是當(dāng)天來過且 N 天內(nèi)沒有來過的設(shè)備。但是我們計(jì)算過程之中可能需要 5~8 個(gè)這樣不同的 Topic 去計(jì)算這幾個(gè)指標(biāo)。
我們看一下離線過程中,邏輯應(yīng)該怎么算。
首先我們先算活躍設(shè)備,把這些合并到一起,然后做一個(gè)維度下的天級(jí)別去重,接著再去關(guān)聯(lián)維度表,這個(gè)維度表包括設(shè)備的首末次時(shí)間,就是截止到昨天設(shè)備首次訪問和末次訪問的時(shí)間。
得到這個(gè)信息之后,我們就可以進(jìn)行邏輯計(jì)算,然后我們會(huì)發(fā)現(xiàn)新增和回流的設(shè)備其實(shí)是活躍設(shè)備里打的一個(gè)子標(biāo)簽。新增設(shè)備就是做了一個(gè)邏輯處理,回流設(shè)備是做了 30 天的邏輯處理,基于這樣的解決方案,我們能否簡(jiǎn)單地寫一個(gè) SQL 去解決這個(gè)問題?
其實(shí)我們最開始是這么做的,但遇到了一些問題:
第一個(gè)問題是:數(shù)據(jù)源是 6~8 個(gè),而且我們大盤的口徑經(jīng)常會(huì)做微調(diào),如果是單作業(yè)的話,每次微調(diào)的過程之中都要改,單作業(yè)的穩(wěn)定性會(huì)非常差;第二個(gè)問題是:數(shù)據(jù)量是萬(wàn)億級(jí),這會(huì)導(dǎo)致兩個(gè)情況,首先是這個(gè)量級(jí)的單作業(yè)穩(wěn)定性非常差,其次是實(shí)時(shí)關(guān)聯(lián)維表的時(shí)候用的 KV 存儲(chǔ),任何一個(gè)這樣的 RPC 服務(wù)接口,都不可能在萬(wàn)億級(jí)數(shù)據(jù)量的場(chǎng)景下保證服務(wù)穩(wěn)定性;第三個(gè)問題是:我們對(duì)于時(shí)延要求比較高,要求時(shí)延小于一分鐘。整個(gè)鏈路要避免批處理,如果出現(xiàn)了一些任務(wù)性能的單點(diǎn)問題,我們還要保證高性能和可擴(kuò)容。
2.2 技術(shù)方案
針對(duì)以上問題,介紹一下我們是怎么做的:

如上圖的例子,第一步是對(duì) A B C 這三個(gè)數(shù)據(jù)源,先按照維度和 DID 做分鐘級(jí)別去重,分別去重之后得到三個(gè)分鐘級(jí)別去重的數(shù)據(jù)源,接著把它們 Union 到一起,然后再進(jìn)行同樣的邏輯操作。
這相當(dāng)于我們數(shù)據(jù)源的入口從萬(wàn)億變到了百億的級(jí)別,分鐘級(jí)別去重之后再進(jìn)行一個(gè)天級(jí)別的去重,產(chǎn)生的數(shù)據(jù)源就可以從百億變成了幾十億的級(jí)別。
在幾十億級(jí)別數(shù)據(jù)量的情況下,我們?cè)偃リP(guān)聯(lián)數(shù)據(jù)服務(wù)化,這就是一種比較可行的狀態(tài),相當(dāng)于去關(guān)聯(lián)用戶畫像的 RPC 接口,得到 RPC 接口之后,最終寫入到了目標(biāo) Topic。這個(gè)目標(biāo) Topic 會(huì)導(dǎo)入到 OLAP 引擎,供給多個(gè)不同的服務(wù),包括移動(dòng)版服務(wù),大屏服務(wù),指標(biāo)看板服務(wù)等。
這個(gè)方案有三個(gè)方面的優(yōu)勢(shì),分別是穩(wěn)定性、時(shí)效性和準(zhǔn)確性。
首先是穩(wěn)定性。松耦合可以簡(jiǎn)單理解為當(dāng)數(shù)據(jù)源 A 的邏輯和數(shù)據(jù)源 B 的邏輯需要修改時(shí),可以單獨(dú)修改。第二是任務(wù)可擴(kuò)容,因?yàn)槲覀儼阉羞壿嫴鸱值梅浅<?xì)粒度,當(dāng)一些地方出現(xiàn)了如流量問題,不會(huì)影響后面的部分,所以它擴(kuò)容比較簡(jiǎn)單,除此之外還有服務(wù)化后置和狀態(tài)可控。其次是時(shí)效性,我們做到毫秒延遲,并且維度豐富,整體上有 20+ 的維度做多維聚合。最后是準(zhǔn)確性,我們支持?jǐn)?shù)據(jù)驗(yàn)證、實(shí)時(shí)監(jiān)控、模型出口統(tǒng)一等。此時(shí)我們遇到了另外一個(gè)問題 - 亂序。對(duì)于上方三個(gè)不同的作業(yè),每一個(gè)作業(yè)重啟至少會(huì)有兩分鐘左右的延遲,延遲會(huì)導(dǎo)致下游的數(shù)據(jù)源 Union 到一起就會(huì)有亂序。
2.3 延遲計(jì)算方案
遇到上面這種有亂序的情況下,我們要怎么處理?

我們總共有三種處理方案:
第一種解決方案是用 “did + 維度 + 分鐘” 進(jìn)行去重,Value 設(shè)為 “是否來過”。比如同一個(gè) did,04:01 來了一條,它會(huì)進(jìn)行結(jié)果輸出。同樣的,04:02 和 04:04 也會(huì)進(jìn)行結(jié)果輸出。但如果 04:01 再來,它就會(huì)丟棄,但如果 04:00 來,依舊會(huì)進(jìn)行結(jié)果輸出。
這個(gè)解決方案存在一些問題,因?yàn)槲覀儼捶昼姶妫?20 分鐘的狀態(tài)大小是存 10 分鐘的兩倍,到后面這個(gè)狀態(tài)大小有點(diǎn)不太可控,因此我們又換了解決方案 2。
第二種解決方案,我們的做法會(huì)涉及到一個(gè)假設(shè)前提,就是假設(shè)不存在數(shù)據(jù)源亂序的情況。在這種情況下,key 存的是 “did + 維度”,Value 為 “時(shí)間戳”,它的更新方式如上圖所示。04:01 來了一條數(shù)據(jù),進(jìn)行結(jié)果輸出。04:02 來了一條數(shù)據(jù),如果是同一個(gè) did,那么它會(huì)更新時(shí)間戳,然后仍然做結(jié)果輸出。04:04 也是同樣的邏輯,然后將時(shí)間戳更新到 04:04,如果后面來了一條 04:01 的數(shù)據(jù),它發(fā)現(xiàn)時(shí)間戳已經(jīng)更新到 04:04,它會(huì)丟棄這條數(shù)據(jù)。這樣的做法大幅度減少了本身所需要的一些狀態(tài),但是對(duì)亂序是零容忍,不允許發(fā)生任何亂序的情況,由于我們不好解決這個(gè)問題,因此我們又想出了解決方案 3。
方案 3 是在方案 2 時(shí)間戳的基礎(chǔ)之上,加了一個(gè)類似于環(huán)形緩沖區(qū),在緩沖區(qū)之內(nèi)允許亂序。
比如 04:01 來了一條數(shù)據(jù),進(jìn)行結(jié)果輸出;04:02 來了一條數(shù)據(jù),它會(huì)把時(shí)間戳更新到 04:02,并且會(huì)記錄同一個(gè)設(shè)備在 04:01 也來過。如果 04:04 再來了一條數(shù)據(jù),就按照相應(yīng)的時(shí)間差做一個(gè)位移,最后通過這樣的邏輯去保障它能夠容忍一定的亂序。
綜合來看這三個(gè)方案:
方案 1 在容忍 16 分鐘亂序的情況下,單作業(yè)的狀態(tài)大小在 480G 左右。這種情況雖然保證了準(zhǔn)確性,但是作業(yè)的恢復(fù)和穩(wěn)定性是完全不可控的狀態(tài),因此我們還是放棄了這個(gè)方案;
方案 2 是 30G 左右的狀態(tài)大小,對(duì)于亂序 0 容忍,但是數(shù)據(jù)不準(zhǔn)確,由于我們對(duì)準(zhǔn)確性的要求非常高,因此也放棄了這個(gè)方案;
方案 3 的狀態(tài)跟方案 1 相比,它的狀態(tài)雖然變化了但是增加的不多,而且整體能達(dá)到跟方案 1 同樣的效果。方案 3 容忍亂序的時(shí)間是 16 分鐘,我們正常更新一個(gè)作業(yè)的話,10 分鐘完全足夠重啟,因此最終選擇了方案 3。
3. 運(yùn)營(yíng)場(chǎng)景
3.1 背景介紹

運(yùn)營(yíng)場(chǎng)景可分為四個(gè)部分:
第一個(gè)是數(shù)據(jù)大屏支持,包括單直播間的分析數(shù)據(jù)和大盤的分析數(shù)據(jù),需要做到分鐘級(jí)延遲,更新要求比較高;
第二個(gè)是直播看板支持,直播看板的數(shù)據(jù)會(huì)有特定維度的分析,特定人群支持,對(duì)維度豐富性要求比較高;
第三個(gè)是數(shù)據(jù)策略榜單,這個(gè)榜單主要是預(yù)測(cè)熱門作品、爆款,要求的是小時(shí)級(jí)別的數(shù)據(jù),更新要求比較低;
第四個(gè)是 C 端實(shí)時(shí)指標(biāo)展示,查詢量比較大,但是查詢模式比較固定。
下面進(jìn)行分析這 4 種不同的狀態(tài)產(chǎn)生的一些不同的場(chǎng)景。

前 3 種基本沒有什么差別,只是在查詢模式上,有的是特定業(yè)務(wù)場(chǎng)景,有的是通用業(yè)務(wù)場(chǎng)景。
針對(duì)第 3 種和第 4 種,它對(duì)于更新的要求比較低,對(duì)于吞吐的要求比較高,過程之中的曲線也不要求有一致性。第 4 種查詢模式更多的是單實(shí)體的一些查詢,比如去查詢內(nèi)容,會(huì)有哪些指標(biāo),而且對(duì) QPS 要求比較高。
3.2 技術(shù)方案
針對(duì)上方 4 種不同的場(chǎng)景,我們是如何去做的?

首先看一下基礎(chǔ)明細(xì)層 (圖中左方),數(shù)據(jù)源有兩條鏈路,其中一條鏈路是消費(fèi)的流,比如直播的消費(fèi)信息,還有觀看 / 點(diǎn)贊 / 評(píng)論。經(jīng)過一輪基礎(chǔ)清洗,然后做維度管理。上游的這些維度信息來源于 Kafka,Kafka 寫入了一些內(nèi)容的維度,放到了 KV 存儲(chǔ)里邊,包括一些用戶的維度。
這些維度關(guān)聯(lián)了之后,最終寫入 Kafka 的 DWD 事實(shí)層,這里為了做性能的提升,我們做了二級(jí)緩存的操作。
如圖中上方,我們讀取 DWD 層的數(shù)據(jù)然后做基礎(chǔ)匯總,核心是窗口維度聚合生成 4 種不同粒度的數(shù)據(jù),分別是大盤多維匯總 topic、直播間多維匯總 topic、作者多維匯總 topic、用戶多維匯總 topic,這些都是通用維度的數(shù)據(jù)。
如圖中下方,基于這些通用維度數(shù)據(jù),我們?cè)偃ゼ庸€(gè)性化維度的數(shù)據(jù),也就是 ADS 層。拿到了這些數(shù)據(jù)之后會(huì)有維度擴(kuò)展,包括內(nèi)容擴(kuò)展和運(yùn)營(yíng)維度的拓展,然后再去做聚合,比如會(huì)有電商實(shí)時(shí) topic,機(jī)構(gòu)服務(wù)實(shí)時(shí) topic 和大 V 直播實(shí)時(shí) topic。
分成這樣的兩個(gè)鏈路會(huì)有一個(gè)好處:一個(gè)地方處理的是通用維度,另一個(gè)地方處理的是個(gè)性化的維度。通用維度保障的要求會(huì)比較高一些,個(gè)性化維度則會(huì)做很多個(gè)性化的邏輯。如果這兩個(gè)耦合在一起的話,會(huì)發(fā)現(xiàn)任務(wù)經(jīng)常出問題,并且分不清楚哪個(gè)任務(wù)的職責(zé)是什么,構(gòu)建不出這樣的一個(gè)穩(wěn)定層。
如圖中右方,最終我們用到了三種不同的引擎。簡(jiǎn)單來說就是 Redis 查詢用到了 C 端的場(chǎng)景,OLAP 查詢用到了大屏、業(yè)務(wù)看板的場(chǎng)景。
3. 騰訊看點(diǎn)實(shí)時(shí)數(shù)倉(cāng)案例
騰訊看點(diǎn)業(yè)務(wù)為什么要構(gòu)建實(shí)時(shí)數(shù)倉(cāng),因?yàn)樵嫉纳蠄?bào)數(shù)據(jù)量非常大,一天上報(bào)峰值就有上萬(wàn)億條。而且上報(bào)格式混亂。缺乏內(nèi)容維度信息、用戶畫像信息,下游沒辦法直接使用。而我們提供的實(shí)時(shí)數(shù)倉(cāng),是根據(jù)騰訊看點(diǎn)信息流的業(yè)務(wù)場(chǎng)景,進(jìn)行了內(nèi)容維度的關(guān)聯(lián),用戶畫像的關(guān)聯(lián),各種粒度的聚合,下游可以非常方便的使用實(shí)時(shí)數(shù)據(jù)。
1) 方案選型

那就看下我們多維實(shí)時(shí)數(shù)據(jù)分析系統(tǒng)的方案選型,選型我們對(duì)比了行業(yè)內(nèi)的領(lǐng)先方案,選擇了最符合我們業(yè)務(wù)場(chǎng)景的方案。
第一塊是實(shí)時(shí)數(shù)倉(cāng)的選型,我們選擇的是業(yè)界比較成熟的 Lambda 架構(gòu),他的優(yōu)點(diǎn)是靈活性高、容錯(cuò)性高、成熟度高和遷移成本低;缺點(diǎn)是實(shí)時(shí)、離線數(shù)據(jù)用兩套代碼,可能會(huì)存在一個(gè)口徑修改了,另一個(gè)沒改的問題,我們每天都有做數(shù)據(jù)對(duì)賬的工作,如果有異常會(huì)進(jìn)行告警。
第二塊是實(shí)時(shí)計(jì)算引擎選型,因?yàn)?Flink 設(shè)計(jì)之初就是為了流處理,SparkStreaming 嚴(yán)格來說還是微批處理,Strom 用的已經(jīng)不多了。再看 Flink 具有 Exactly-once 的準(zhǔn)確性、輕量級(jí) Checkpoint 容錯(cuò)機(jī)制、低延時(shí)高吞吐和易用性高的特點(diǎn),我們選擇了 Flink 作為實(shí)時(shí)計(jì)算引擎。
第三塊是實(shí)時(shí)存儲(chǔ)引擎,我們的要求就是需要有維度索引、支持高并發(fā)、預(yù)聚合、高性能實(shí)時(shí)多維 OLAP 查詢。可以看到,Hbase、Tdsql 和 ES 都不能滿足要求,Druid 有一個(gè)缺陷,它是按照時(shí)序劃分 Segment,無法將同一個(gè)內(nèi)容,存放在同一個(gè) Segment 上,計(jì)算全局 TopN 只能是近似值,所以我們選擇了最近兩年大火的 MPP 數(shù)據(jù)庫(kù)引擎 ClickHouse。
2) 設(shè)計(jì)目標(biāo)與設(shè)計(jì)難點(diǎn)

我們多維實(shí)時(shí)數(shù)據(jù)分析系統(tǒng)分為三大模塊
實(shí)時(shí)計(jì)算引擎 實(shí)時(shí)存儲(chǔ)引擎 App 層
難點(diǎn)主要在前兩個(gè)模塊:實(shí)時(shí)計(jì)算引擎和實(shí)時(shí)存儲(chǔ)引擎。
千萬(wàn)級(jí)/s 的海量數(shù)據(jù)如何實(shí)時(shí)接入,并且進(jìn)行極低延遲維表關(guān)聯(lián)。 實(shí)時(shí)存儲(chǔ)引擎如何支持高并發(fā)寫入、高可用分布式和高性能索引查詢,是比較難的。
這幾個(gè)模塊的具體實(shí)現(xiàn),看一下我們系統(tǒng)的架構(gòu)設(shè)計(jì)。
3) 架構(gòu)設(shè)計(jì)

前端采用的是開源組件 Ant Design,利用了 Nginx 服務(wù)器,部署靜態(tài)頁(yè)面,并反向代理了瀏覽器的請(qǐng)求到后臺(tái)服務(wù)器上。
后臺(tái)服務(wù)是基于騰訊自研的 RPC 后臺(tái)服務(wù)框架寫的,并且會(huì)進(jìn)行一些二級(jí)緩存。
實(shí)時(shí)數(shù)倉(cāng)部分,分為了接入層、實(shí)時(shí)計(jì)算層和實(shí)時(shí)數(shù)倉(cāng)存儲(chǔ)層。
接入層主要是從千萬(wàn)級(jí)/s 的原始消息隊(duì)列中,拆分出不同行為數(shù)據(jù)的微隊(duì)列,拿看點(diǎn)的視頻來說,拆分過后,數(shù)據(jù)就只有百萬(wàn)級(jí)/s 了;
實(shí)時(shí)計(jì)算層主要負(fù)責(zé),多行行為流水?dāng)?shù)據(jù)進(jìn)行行轉(zhuǎn)列,實(shí)時(shí)關(guān)聯(lián)用戶畫像數(shù)據(jù)和內(nèi)容維度數(shù)據(jù);
實(shí)時(shí)數(shù)倉(cāng)存儲(chǔ)層主要是設(shè)計(jì)出符合看點(diǎn)業(yè)務(wù)的,下游好用的實(shí)時(shí)消息隊(duì)列。我們暫時(shí)提供了兩個(gè)消息隊(duì)列,作為實(shí)時(shí)數(shù)倉(cāng)的兩層。一層 DWM 層是內(nèi)容 ID-用戶 ID 粒度聚合的,就是一條數(shù)據(jù)包含內(nèi)容 ID-用戶 ID 還有 B 側(cè)內(nèi)容數(shù)據(jù)、C 側(cè)用戶數(shù)據(jù)和用戶畫像數(shù)據(jù);另一層是 DWS 層,是內(nèi)容 ID 粒度聚合的,一條數(shù)據(jù)包含內(nèi)容 ID,B 側(cè)數(shù)據(jù)和 C 側(cè)數(shù)據(jù)。可以看到內(nèi)容 ID-用戶 ID 粒度的消息隊(duì)列流量進(jìn)一步減小到十萬(wàn)級(jí)/s,內(nèi)容 ID 粒度的更是萬(wàn)級(jí)/s,并且格式更加清晰,維度信息更加豐富。
實(shí)時(shí)存儲(chǔ)部分分為實(shí)時(shí)寫入層、OLAP 存儲(chǔ)層和后臺(tái)接口層。
實(shí)時(shí)寫入層主要是負(fù)責(zé) Hash 路由將數(shù)據(jù)寫入; OLAP 存儲(chǔ)層利用 MPP 存儲(chǔ)引擎,設(shè)計(jì)符合業(yè)務(wù)的索引和物化視圖,高效存儲(chǔ)海量數(shù)據(jù); 后臺(tái)接口層提供高效的多維實(shí)時(shí)查詢接口。
4) 實(shí)時(shí)計(jì)算
這個(gè)系統(tǒng)最復(fù)雜的兩塊,實(shí)時(shí)計(jì)算和實(shí)時(shí)存儲(chǔ)。
先介紹實(shí)時(shí)計(jì)算部分:分為實(shí)時(shí)關(guān)聯(lián)和實(shí)時(shí)數(shù)倉(cāng)。
1. 實(shí)時(shí)高性能維表關(guān)聯(lián)

實(shí)時(shí)維表關(guān)聯(lián)這一塊難度在于 百萬(wàn)級(jí)/s 的實(shí)時(shí)數(shù)據(jù)流,如果直接去關(guān)聯(lián) HBase,1 分鐘的數(shù)據(jù),關(guān)聯(lián)完 HBase 耗時(shí)是小時(shí)級(jí)的,會(huì)導(dǎo)致數(shù)據(jù)延遲嚴(yán)重。
我們提出了幾個(gè)解決方案:
第一個(gè)是,在 Flink 實(shí)時(shí)計(jì)算環(huán)節(jié),先按照 1 分鐘進(jìn)行了窗口聚合,將窗口內(nèi)多行行為數(shù)據(jù)轉(zhuǎn)一行多列的數(shù)據(jù)格式,經(jīng)過這一步操作,原本小時(shí)級(jí)的關(guān)聯(lián)耗時(shí)下降到了十幾分鐘,但是還是不夠的。
第二個(gè)是,在訪問 HBase 內(nèi)容之前設(shè)置一層 Redis 緩存,因?yàn)?1000 條數(shù)據(jù)訪問 HBase 是秒級(jí)的,而訪問 Redis 是毫秒級(jí)的,訪問 Redis 的速度基本是訪問 HBase 的 1000 倍。為了防止過期的數(shù)據(jù)浪費(fèi)緩存,緩存過期時(shí)間設(shè)置成 24 小時(shí),同時(shí)通過監(jiān)聽寫 HBase Proxy 來保證緩存的一致性。這樣將訪問時(shí)間從十幾分鐘變成了秒級(jí)。
第三個(gè)是,上報(bào)過程中會(huì)上報(bào)不少非常規(guī)內(nèi)容 ID,這些內(nèi)容 ID 在內(nèi)容 HBase 中是不存儲(chǔ)的,會(huì)造成緩存穿透的問題。所以在實(shí)時(shí)計(jì)算的時(shí)候,我們直接過濾掉這些內(nèi)容 ID,防止緩存穿透,又減少一些時(shí)間。
第四個(gè)是,因?yàn)樵O(shè)置了定時(shí)緩存,會(huì)引入一個(gè)緩存雪崩的問題。為了防止雪崩,我們?cè)趯?shí)時(shí)計(jì)算中,進(jìn)行了削峰填谷的操作,錯(cuò)開設(shè)置緩存的時(shí)間。
可以看到,優(yōu)化前后,數(shù)據(jù)量從百億級(jí)減少到了十億級(jí),耗時(shí)從小時(shí)級(jí)減少到了數(shù)十秒,減少 99%。
2. 下游提供服務(wù)

實(shí)時(shí)數(shù)倉(cāng)的難度在于:它處于比較新的領(lǐng)域,并且各個(gè)公司各個(gè)業(yè)務(wù)差距比較大,怎么能設(shè)計(jì)出方便,好用,符合看點(diǎn)業(yè)務(wù)場(chǎng)景的實(shí)時(shí)數(shù)倉(cāng)是有難度的。
先看一下實(shí)時(shí)數(shù)倉(cāng)做了什么,實(shí)時(shí)數(shù)倉(cāng)對(duì)外就是幾個(gè)消息隊(duì)列,不同的消息隊(duì)列里面存放的就是不同聚合粒度的實(shí)時(shí)數(shù)據(jù),包括內(nèi)容 ID、用戶 ID、C 側(cè)行為數(shù)據(jù)、B 側(cè)內(nèi)容維度數(shù)據(jù)和用戶畫像數(shù)據(jù)等。
我們是怎么搭建實(shí)時(shí)數(shù)倉(cāng)的,就是上面介紹的實(shí)時(shí)計(jì)算引擎的輸出,放到消息隊(duì)列中保存,可以提供給下游多用戶復(fù)用。
我們可以看下,在我們建設(shè)實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)前后,開發(fā)一個(gè)實(shí)時(shí)應(yīng)用的區(qū)別。沒有數(shù)倉(cāng)的時(shí)候,我們需要消費(fèi)千萬(wàn)級(jí)/s 的原始隊(duì)列,進(jìn)行復(fù)雜的數(shù)據(jù)清洗,然后再進(jìn)行用戶畫像關(guān)聯(lián)、內(nèi)容維度關(guān)聯(lián),才能拿到符合要求格式的實(shí)時(shí)數(shù)據(jù),開發(fā)和擴(kuò)展的成本都會(huì)比較高,如果想開發(fā)一個(gè)新的應(yīng)用,又要走一遍這個(gè)流程。有了數(shù)倉(cāng)之后,如果想開發(fā)內(nèi)容 ID 粒度的實(shí)時(shí)應(yīng)用,就直接申請(qǐng) TPS 萬(wàn)級(jí)/s 的 DWS 層的消息隊(duì)列。開發(fā)成本變低很多,資源消耗小很多,可擴(kuò)展性也強(qiáng)很多。
看個(gè)實(shí)際例子,開發(fā)我們系統(tǒng)的實(shí)時(shí)數(shù)據(jù)大屏,原本需要進(jìn)行如上所有操作,才能拿到數(shù)據(jù)。現(xiàn)在只需要消費(fèi) DWS 層消息隊(duì)列,寫一條 Flink SQL 即可,僅消耗 2 個(gè) CPU 核心,1G 內(nèi)存。
可以看到,以 50 個(gè)消費(fèi)者為例,建立實(shí)時(shí)數(shù)倉(cāng)前后,下游開發(fā)一個(gè)實(shí)時(shí)應(yīng)用,可以減少 98%的資源消耗。包括計(jì)算資源,存儲(chǔ)資源,人力成本和開發(fā)人員學(xué)習(xí)接入成本等等。并且消費(fèi)者越多,節(jié)省越多。就拿 Redis 存儲(chǔ)這一部分來說,一個(gè)月就能省下上百萬(wàn)人民幣。
5) 實(shí)時(shí)存儲(chǔ)
介紹完實(shí)時(shí)計(jì)算,再來介紹實(shí)時(shí)存儲(chǔ)。
這塊分為三個(gè)部分來介紹
第一是 分布式-高可用 第二是 海量數(shù)據(jù)-寫入 第三是 高性能-查詢
1. 分布式-高可用

我們這里聽取的是 Clickhouse 官方的建議,借助 ZK 實(shí)現(xiàn)高可用的方案。數(shù)據(jù)寫入一個(gè)分片,僅寫入一個(gè)副本,然后再寫 ZK,通過 ZK 告訴同一個(gè)分片的其他副本,其他副本再過來拉取數(shù)據(jù),保證數(shù)據(jù)一致性。
這里沒有選用消息隊(duì)列進(jìn)行數(shù)據(jù)同步,是因?yàn)?ZK 更加輕量級(jí)。而且寫的時(shí)候,任意寫一個(gè)副本,其它副本都能夠通過 ZK 獲得一致的數(shù)據(jù)。而且就算其它節(jié)點(diǎn)第一次來獲取數(shù)據(jù)失敗了,后面只要發(fā)現(xiàn)它跟 ZK 上記錄的數(shù)據(jù)不一致,就會(huì)再次嘗試獲取數(shù)據(jù),保證一致性。
2. 海量數(shù)據(jù)-寫入

數(shù)據(jù)寫入遇到的第一個(gè)問題是,海量數(shù)據(jù)直接寫入 Clickhouse 的話,會(huì)導(dǎo)致 ZK 的 QPS 太高,解決方案是改用 Batch 方式寫入。Batch 設(shè)置多大呢,Batch 太小的話緩解不了 ZK 的壓力,Batch 也不能太大,不然上游內(nèi)存壓力太大,通過實(shí)驗(yàn),最終我們選用了大小幾十萬(wàn)的 Batch。
第二個(gè)問題是,隨著數(shù)據(jù)量的增長(zhǎng),單 QQ 看點(diǎn)的視頻內(nèi)容每天可能寫入百億級(jí)的數(shù)據(jù),默認(rèn)方案是寫一張分布式表,這就會(huì)造成單臺(tái)機(jī)器出現(xiàn)磁盤的瓶頸,尤其是 Clickhouse 底層運(yùn)用的是 Mergetree,原理類似于 HBase、RocketsDB 的底層 LSM-Tree。在合并的過程中會(huì)存在寫放大的問題,加重磁盤壓力。峰值每分鐘幾千萬(wàn)條數(shù)據(jù),寫完耗時(shí)幾十秒,如果正在做 Merge,就會(huì)阻塞寫入請(qǐng)求,查詢也會(huì)非常慢。我們做的兩個(gè)優(yōu)化方案:一是對(duì)磁盤做 Raid,提升磁盤的 IO;二是在寫入之前進(jìn)行分表,直接分開寫入到不同的分片上,磁盤壓力直接變?yōu)?1/N。
第三個(gè)問題是,雖然我們寫入按照分片進(jìn)行了劃分,但是這里引入了一個(gè)分布式系統(tǒng)常見的問題,就是局部的 Top 并非全局 Top 的問題。比如同一個(gè)內(nèi)容 ID 的數(shù)據(jù)落在了不同的分片上,計(jì)算全局 Top100 閱讀的內(nèi)容 ID,有一個(gè)內(nèi)容 ID 在分片 1 上是 Top100,但是在其它分片上不是 Top100,導(dǎo)致匯總的時(shí)候,會(huì)丟失一部分?jǐn)?shù)據(jù),影響最終結(jié)果。我們做的優(yōu)化是在寫入之前加上一層路由,將同一個(gè)內(nèi)容 ID 的記錄,全部路由到同一個(gè)分片上,解決了該問題。
介紹完寫入,下一步介紹 Clickhouse 的高性能存儲(chǔ)和查詢。
3. 高性能-存儲(chǔ)-查詢
Clickhouse 高性能查詢的一個(gè)關(guān)鍵點(diǎn)是稀疏索引。稀疏索引這個(gè)設(shè)計(jì)就很有講究,設(shè)計(jì)得好可以加速查詢,設(shè)計(jì)不好反而會(huì)影響查詢效率。我根據(jù)我們的業(yè)務(wù)場(chǎng)景,因?yàn)槲覀兊牟樵兇蟛糠侄际菚r(shí)間和內(nèi)容 ID 相關(guān)的,比如說,某個(gè)內(nèi)容,過去 N 分鐘在各個(gè)人群表現(xiàn)如何?我按照日期,分鐘粒度時(shí)間和內(nèi)容 ID 建立了稀疏索引。針對(duì)某個(gè)內(nèi)容的查詢,建立稀疏索引之后,可以減少 99%的文件掃描。
還有一個(gè)問題就是,我們現(xiàn)在數(shù)據(jù)量太大,維度太多。拿 QQ 看點(diǎn)的視頻內(nèi)容來說,一天流水有上百億條,有些維度有幾百個(gè)類別。如果一次性把所有維度進(jìn)行預(yù)聚合,數(shù)據(jù)量會(huì)指數(shù)膨脹,查詢反而變慢,并且會(huì)占用大量?jī)?nèi)存空間。我們的優(yōu)化,針對(duì)不同的維度,建立對(duì)應(yīng)的預(yù)聚合物化視圖,用空間換時(shí)間,這樣可以縮短查詢的時(shí)間。

分布式表查詢還會(huì)有一個(gè)問題,查詢單個(gè)內(nèi)容 ID 的信息,分布式表會(huì)將查詢下發(fā)到所有的分片上,然后再返回查詢結(jié)果進(jìn)行匯總。實(shí)際上,因?yàn)樽鲞^路由,一個(gè)內(nèi)容 ID 只存在于一個(gè)分片上,剩下的分片都在空跑。針對(duì)這類查詢,我們的優(yōu)化是后臺(tái)按照同樣的規(guī)則先進(jìn)行路由,直接查詢目標(biāo)分片,這樣減少了 N-1/N 的負(fù)載,可以大量縮短查詢時(shí)間。而且由于我們是提供的 OLAP 查詢,數(shù)據(jù)滿足最終一致性即可,通過主從副本讀寫分離,可以進(jìn)一步提升性能。
我們?cè)诤笈_(tái)還做了一個(gè) 1 分鐘的數(shù)據(jù)緩存,針對(duì)相同條件查詢,后臺(tái)就直接返回了。
4. 擴(kuò)容
這里再介紹一下我們的擴(kuò)容的方案,調(diào)研了業(yè)內(nèi)的一些常見方案。
比如 HBase,原始數(shù)據(jù)都存放在 HDFS 上,擴(kuò)容只是 Region Server 擴(kuò)容,不涉及原始數(shù)據(jù)的遷移。但是 Clickhouse 的每個(gè)分片數(shù)據(jù)都是在本地,是一個(gè)比較底層存儲(chǔ)引擎,不能像 HBase 那樣方便擴(kuò)容。
Redis 是哈希槽這種類似一致性哈希的方式,是比較經(jīng)典分布式緩存的方案。Redis slot 在 Rehash 的過程中雖然存在短暫的 ask 讀不可用,但是總體來說遷移是比較方便的,從原 h[0]遷移到 h[1],最后再刪除 h[0]。但是 Clickhouse 大部分都是 OLAP 批量查詢,不是點(diǎn)查,而且由于列式存儲(chǔ),不支持刪除的特性,一致性哈希的方案不是很適合。
目前擴(kuò)容的方案是,另外消費(fèi)一份數(shù)據(jù),寫入新 Clickhouse 集群,兩個(gè)集群一起跑一段時(shí)間,因?yàn)閷?shí)時(shí)數(shù)據(jù)就保存 3 天,等 3 天之后,后臺(tái)服務(wù)直接訪問新集群。
4. 有贊實(shí)時(shí)數(shù)倉(cāng)案例
1) 分層設(shè)計(jì)
傳統(tǒng)離線數(shù)倉(cāng)的分層設(shè)計(jì)大家都很熟悉,為了規(guī)范的組織和管理數(shù)據(jù),層級(jí)劃分會(huì)比較多,在一些復(fù)雜邏輯處理場(chǎng)景還會(huì)引入臨時(shí)層落地中間結(jié)果以方便下游加工處理。實(shí)時(shí)數(shù)倉(cāng)考慮到時(shí)效性問題,分層設(shè)計(jì)需要盡量精簡(jiǎn),降低中間流程出錯(cuò)的可能性,不過總體而言,實(shí)時(shí)數(shù)倉(cāng)還是會(huì)參考離線數(shù)倉(cāng)的分層思想來設(shè)計(jì)。
實(shí)時(shí)數(shù)倉(cāng)分層架構(gòu)如下圖所示 :

- ODS(實(shí)時(shí)數(shù)據(jù)接入層)
ODS 層,即實(shí)時(shí)數(shù)據(jù)接入層,通過數(shù)據(jù)采集工具收集各個(gè)業(yè)務(wù)系統(tǒng)的實(shí)時(shí)數(shù)據(jù),對(duì)非結(jié)構(gòu)化的數(shù)據(jù)進(jìn)行結(jié)構(gòu)化處理,保存原始數(shù)據(jù),幾乎不過濾數(shù)據(jù);該層數(shù)據(jù)的主要來源有三個(gè)部分:第一部分是業(yè)務(wù)方創(chuàng)建的 NSQ 消息,第二部分是業(yè)務(wù)數(shù)據(jù)庫(kù)的 Binlog 日志,第三部分是埋點(diǎn)日志和應(yīng)用程序日志,以上三部分的實(shí)時(shí)數(shù)據(jù)最終統(tǒng)一寫入 Kafka 存儲(chǔ)介質(zhì)中。
ODS 層表命名規(guī)范:部門名稱.應(yīng)用名稱.數(shù)倉(cāng)層級(jí)主題域前綴數(shù)據(jù)庫(kù)名/消息名
例如:接入業(yè)務(wù)庫(kù)的 Binlog
實(shí)時(shí)數(shù)倉(cāng)表命名:deptname.appname.ods_subjectname_tablename
例如:接入業(yè)務(wù)方的 NSQ 消息
實(shí)時(shí)數(shù)倉(cāng)表命名:deptname.appname.ods_subjectname_msgname
- DWS(實(shí)時(shí)明細(xì)中間層)
DWS 層,即實(shí)時(shí)明細(xì)中間層,該層以業(yè)務(wù)過程作為建模驅(qū)動(dòng),基于每個(gè)具體的業(yè)務(wù)過程事件來構(gòu)建最細(xì)粒度的明細(xì)層事實(shí)表;比如交易過程,有下單事件、支付事件、發(fā)貨事件等,我們會(huì)基于這些獨(dú)立的事件來進(jìn)行明細(xì)層的構(gòu)建。在這層,事實(shí)明細(xì)數(shù)據(jù)同樣是按照離線數(shù)倉(cāng)的主題域來進(jìn)行劃分,也會(huì)采用維度建模的方式組織數(shù)據(jù),對(duì)于一些重要的維度字段,會(huì)做適當(dāng)冗余。基于有贊實(shí)時(shí)需求的場(chǎng)景,重點(diǎn)建設(shè)交易、營(yíng)銷、客戶、店鋪、商品等主題域的數(shù)據(jù)。該層的數(shù)據(jù)來源于 ODS 層,通過 FlinkSQL 進(jìn)行 ETL 處理,主要工作有規(guī)范命名、數(shù)據(jù)清洗、維度補(bǔ)全、多流關(guān)聯(lián),最終統(tǒng)一寫入 Kafka 存儲(chǔ)介質(zhì)中。
DWS 層表命名規(guī)范:部門名稱.應(yīng)用名稱.數(shù)倉(cāng)層級(jí)_主題域前綴_數(shù)倉(cāng)表命名
例如:實(shí)時(shí)事件 A 的中間層
實(shí)時(shí)數(shù)倉(cāng)表命名:deptname.appname.dws_subjectname_tablename_eventnameA
例如:實(shí)時(shí)事件 B 的中間層
實(shí)時(shí)數(shù)倉(cāng)表命名:deptname.appname.dws_subjectname_tablename_eventnameB
- DIM(實(shí)時(shí)維表層)
DIM 層,即實(shí)時(shí)維表層,用來存放維度數(shù)據(jù),主要用于實(shí)時(shí)明細(xì)中間層寬化處理時(shí)補(bǔ)全維度使用,目前該層的數(shù)據(jù)主要存儲(chǔ)于 HBase 中,后續(xù)會(huì)基于 QPS 和數(shù)據(jù)量大小提供更多合適類型的存儲(chǔ)介質(zhì)。
DIM 層表命名規(guī)范:應(yīng)用名稱_數(shù)倉(cāng)層級(jí)_主題域前綴_數(shù)倉(cāng)表命名
例如:HBase 存儲(chǔ),實(shí)時(shí)維度表
實(shí)時(shí)數(shù)倉(cāng)表命名:appname_dim_tablename
- DWA(實(shí)時(shí)匯總層)
DWA 層,即實(shí)時(shí)匯總層,該層通過 DWS 層數(shù)據(jù)進(jìn)行多維匯總,提供給下游業(yè)務(wù)方使用,在實(shí)際應(yīng)用過程中,不同業(yè)務(wù)方使用維度匯總的方式不太一樣,根據(jù)不同的需求采用不同的技術(shù)方案去實(shí)現(xiàn)。第一種方式,采用 FlinkSQL 進(jìn)行實(shí)時(shí)匯總,將結(jié)果指標(biāo)存入 HBase、MySQL 等數(shù)據(jù)庫(kù),該種方式是我們?cè)缙诓捎玫姆桨福瑑?yōu)點(diǎn)是實(shí)現(xiàn)業(yè)務(wù)邏輯比較靈活,缺點(diǎn)是聚合粒度固化,不易擴(kuò)展;第二種方式,采用實(shí)時(shí) OLAP 工具進(jìn)行匯總,該種方式是我們目前常用的方案,優(yōu)點(diǎn)是聚合粒度易擴(kuò)展,缺點(diǎn)是業(yè)務(wù)邏輯需要在中間層預(yù)處理。
DWA 層表命名規(guī)范:應(yīng)用名稱_數(shù)倉(cāng)層級(jí)_主題域前綴_聚合粒度_數(shù)據(jù)范圍
例如:HBase 存儲(chǔ),某域當(dāng)日某粒度實(shí)時(shí)匯總表
實(shí)時(shí)數(shù)倉(cāng)表命名:appname_dwa_subjectname_aggname_daily
- APP(實(shí)時(shí)應(yīng)用層)
APP 層,即實(shí)時(shí)應(yīng)用層,該層數(shù)據(jù)已經(jīng)寫入應(yīng)用系統(tǒng)的存儲(chǔ)中,例如寫入 Druid 作為 BI 看板的實(shí)時(shí)數(shù)據(jù)集;寫入 HBase、MySQL 用于提供統(tǒng)一數(shù)據(jù)服務(wù)接口;寫入 ClickHouse 用于提供實(shí)時(shí) OLAP 服務(wù)。因?yàn)樵搶臃浅YN近業(yè)務(wù),在命名規(guī)范上實(shí)時(shí)數(shù)倉(cāng)不做統(tǒng)一要求。
2) 實(shí)時(shí) ETL
實(shí)時(shí)數(shù)倉(cāng) ETL 處理過程所涉及的組件比較多,接下來盤點(diǎn)構(gòu)建實(shí)時(shí)數(shù)倉(cāng)所需要的組件以及每個(gè)組件的應(yīng)用場(chǎng)景。如下圖所示:

具體實(shí)時(shí) ETL 處理流程如下圖所示:

1. 維度補(bǔ)全
創(chuàng)建調(diào)用 Duboo 接口的 UDF 函數(shù)在實(shí)時(shí)流里補(bǔ)全維度是最便捷的使用方式,但如果請(qǐng)求量過大,對(duì) Duboo 接口壓力會(huì)過大。在實(shí)際應(yīng)用場(chǎng)景補(bǔ)全維度首選還是關(guān)聯(lián)維度表,但關(guān)聯(lián)也存在一定概率的丟失問題,為了彌補(bǔ)這種丟失,可以采用 Duboo 接口調(diào)用兜底的方式來補(bǔ)全。偽代碼如下:
create function call_dubbo as 'XXXXXXX';
create function get_json_object as 'XXXXXXX';
case
when cast( b.column as bigint) is not null
then cast( b.column as bigint)
else cast(coalesce(cast(get_json_object(call_dubbo('clusterUrl'
,'serviceName'
,'methodName'
,cast(concat('[',cast(a.column as varchar),']') as varchar)
,'key'
)
,'rootId')
as bigint)
,a.column)
as bigint) end
2. 冪等處理
實(shí)時(shí)任務(wù)在運(yùn)行過程中難免會(huì)遇到執(zhí)行異常的情況,當(dāng)任務(wù)異常重啟的時(shí)候會(huì)導(dǎo)致部分消息重新發(fā)送和消費(fèi),從而引發(fā)下游實(shí)時(shí)統(tǒng)計(jì)數(shù)據(jù)不準(zhǔn)確,為了有效避免這種情況,可以選擇對(duì)實(shí)時(shí)消息流做冪等處理,當(dāng)消費(fèi)完一條消息,將這條消息的 Key 存入 KV,如果任務(wù)異常重啟導(dǎo)致消息重新發(fā)送的時(shí)候,先從 KV 判斷該消息是否已被消費(fèi),如果已消費(fèi)就不再往下發(fā)送。偽代碼如下:
create function idempotenc as 'XXXXXXX';
insert into table
select
order_no
from
(
select
a.orderNo as order_no
, idempotenc('XXXXXXX', coalesce( order_no, '') ) as rid
from
table1
) t
where
t.rid = 0;
3. 數(shù)據(jù)驗(yàn)證
由于實(shí)時(shí)數(shù)倉(cāng)的數(shù)據(jù)是無邊界的流,相比于離線數(shù)倉(cāng)固定不變的數(shù)據(jù)更難驗(yàn)收。基于不同的場(chǎng)景,我們提供了 2 種驗(yàn)證方式,分別是:抽樣驗(yàn)證與全量驗(yàn)證。如圖 3.3 所示
抽樣驗(yàn)證方案
該方案主要應(yīng)用在數(shù)據(jù)準(zhǔn)確性驗(yàn)證上,實(shí)時(shí)匯總結(jié)果是基于存儲(chǔ)在 Kafka 的實(shí)時(shí)明細(xì)中間層計(jì)算而來,但 Kafka 本身不支持按照特定條件檢索,不支持寫查詢語(yǔ)句,再加上消息的無邊界性,統(tǒng)計(jì)結(jié)果是在不斷變化的,很難尋找參照物進(jìn)行比對(duì)。鑒于此,我們采用了持久化消息的方法,將消息落盤到 TiDB 存儲(chǔ),基于 TiDB 的能力對(duì)落盤的消息進(jìn)行檢索、查詢、匯總。編寫固定時(shí)間邊界的測(cè)試用例與相同時(shí)間邊界的業(yè)務(wù)庫(kù)數(shù)據(jù)或者離線數(shù)倉(cāng)數(shù)據(jù)進(jìn)行比對(duì)。通過以上方式,抽樣核心店鋪的數(shù)據(jù)進(jìn)行指標(biāo)準(zhǔn)確性驗(yàn)證,確保測(cè)試用例全部通過。
全量驗(yàn)證方案
該方案主要應(yīng)用在數(shù)據(jù)完整性和一致性驗(yàn)證上,在實(shí)時(shí)維度表驗(yàn)證的場(chǎng)景使用最多。大體思路:將存儲(chǔ)實(shí)時(shí)維度表的在線 HBase 集群中的數(shù)據(jù)同步到離線 HBase 集群中,再將離線 HBase 集群中的數(shù)據(jù)導(dǎo)入到 Hive 中,在限定實(shí)時(shí)維度表的時(shí)間邊界后,通過數(shù)據(jù)平臺(tái)提供的數(shù)據(jù)校驗(yàn)功能,比對(duì)實(shí)時(shí)維度表與離線維度表是否存在差異,最終確保兩張表的數(shù)據(jù)完全一致。

4. 數(shù)據(jù)恢復(fù)
實(shí)時(shí)任務(wù)一旦上線就要求持續(xù)不斷的提供準(zhǔn)確、穩(wěn)定的服務(wù)。區(qū)別于離線任務(wù)按天調(diào)度,如果離線任務(wù)出現(xiàn) Bug,會(huì)有充足的時(shí)間去修復(fù)。如果實(shí)時(shí)任務(wù)出現(xiàn) Bug,必須按照提前制定好的流程,嚴(yán)格按照步驟執(zhí)行,否則極易出現(xiàn)問題。造成 Bug 的情況有非常多,比如代碼 Bug、異常數(shù)據(jù) Bug、實(shí)時(shí)集群 Bug,如下圖展示了修復(fù)實(shí)時(shí)任務(wù) Bug 并恢復(fù)數(shù)據(jù)的流程。

5. 騰訊全場(chǎng)景實(shí)時(shí)數(shù)倉(cāng)建設(shè)案例
在數(shù)倉(cāng)體系中會(huì)有各種各樣的大數(shù)據(jù)組件,譬如 Hive/HBase/HDFS/S3,計(jì)算引擎如 MapReduce、Spark、Flink,根據(jù)不同的需求,用戶會(huì)構(gòu)建大數(shù)據(jù)存儲(chǔ)和處理平臺(tái),數(shù)據(jù)在平臺(tái)經(jīng)過處理和分析,結(jié)果數(shù)據(jù)會(huì)保存到 MySQL、Elasticsearch 等支持快速查詢的關(guān)系型、非關(guān)系型數(shù)據(jù)庫(kù)中,接下來應(yīng)用層就可以基于這些數(shù)據(jù)進(jìn)行 BI 報(bào)表開發(fā)、用戶畫像,或基于 Presto 這種 OLAP 工具進(jìn)行交互式查詢等。

1) Lambda 架構(gòu)的痛點(diǎn)
在整個(gè)過程中我們常常會(huì)用一些離線的調(diào)度系統(tǒng),定期的(T+1 或者每隔幾小時(shí))去執(zhí)行一些 Spark 分析任務(wù),做一些數(shù)據(jù)的輸入、輸出或是 ETL 工作。離線數(shù)據(jù)處理的整個(gè)過程中必然存在數(shù)據(jù)延遲的現(xiàn)象,不管是數(shù)據(jù)接入還是中間的分析,數(shù)據(jù)的延遲都是比較大的,可能是小時(shí)級(jí)也有可能是天級(jí)別的。另外一些場(chǎng)景中我們也常常會(huì)為了一些實(shí)時(shí)性的需求去構(gòu)建一個(gè)實(shí)時(shí)處理過程,比如借助 Flink+Kafka 去構(gòu)建實(shí)時(shí)的流處理系統(tǒng)。
整體上,數(shù)倉(cāng)架構(gòu)中有非常多的組件,大大增加了整個(gè)架構(gòu)的復(fù)雜性和運(yùn)維的成本。
如下圖,這是很多公司之前或者現(xiàn)在正在采用的 Lambda 架構(gòu),Lambda 架構(gòu)將數(shù)倉(cāng)分為離線層和實(shí)時(shí)層,相應(yīng)的就有批處理和流處理兩個(gè)相互獨(dú)立的數(shù)據(jù)處理流程,同一份數(shù)據(jù)會(huì)被處理兩次以上,同一套業(yè)務(wù)邏輯代碼需要適配性的開發(fā)兩次。Lambda 架構(gòu)大家應(yīng)該已經(jīng)非常熟悉了,下面我就著重介紹一下我們采用 Lambda 架構(gòu)在數(shù)倉(cāng)建設(shè)過程中遇到的一些痛點(diǎn)問題。

例如在實(shí)時(shí)計(jì)算一些用戶相關(guān)指標(biāo)的實(shí)時(shí)場(chǎng)景下,我們想看到當(dāng)前 pv、uv 時(shí),我們會(huì)將這些數(shù)據(jù)放到實(shí)時(shí)層去做一些計(jì)算,這些指標(biāo)的值就會(huì)實(shí)時(shí)呈現(xiàn)出來,但同時(shí)想了解用戶的一個(gè)增長(zhǎng)趨勢(shì),需要把過去一天的數(shù)據(jù)計(jì)算出來。這樣就需要通過批處理的調(diào)度任務(wù)來實(shí)現(xiàn),比如凌晨?jī)扇c(diǎn)的時(shí)候在調(diào)度系統(tǒng)上起一個(gè) Spark 調(diào)度任務(wù)把當(dāng)天所有的數(shù)據(jù)重新跑一遍。
很顯然在這個(gè)過程中,由于兩個(gè)過程運(yùn)行的時(shí)間是不一樣的,跑的數(shù)據(jù)卻相同,因此可能造成數(shù)據(jù)的不一致。因?yàn)槟骋粭l或幾條數(shù)據(jù)的更新,需要重新跑一遍整個(gè)離線分析的鏈路,數(shù)據(jù)更新成本很大,同時(shí)需要維護(hù)離線和實(shí)時(shí)分析兩套計(jì)算平臺(tái),整個(gè)上下兩層的開發(fā)流程和運(yùn)維成本其實(shí)都是非常高的。
為了解決 Lambda 架構(gòu)帶來的各種問題,就誕生了 Kappa 架構(gòu),這個(gè)架構(gòu)大家應(yīng)該也非常的熟悉。
2) Kappa 架構(gòu)的痛點(diǎn)
我們來講一下 Kappa 架構(gòu),如下圖,它中間其實(shí)用的是消息隊(duì)列,通過用 Flink 將整個(gè)鏈路串聯(lián)起來。Kappa 架構(gòu)解決了 Lambda 架構(gòu)中離線處理層和實(shí)時(shí)處理層之間由于引擎不一樣,導(dǎo)致的運(yùn)維成本和開發(fā)成本高昂的問題,但 Kappa 架構(gòu)也有其痛點(diǎn)。
首先,在構(gòu)建實(shí)時(shí)業(yè)務(wù)場(chǎng)景時(shí),會(huì)用到 Kappa 去構(gòu)建一個(gè)近實(shí)時(shí)的場(chǎng)景,但如果想對(duì)數(shù)倉(cāng)中間層例如 ODS 層做一些簡(jiǎn)單的 OLAP 分析或者進(jìn)一步的數(shù)據(jù)處理時(shí),如將數(shù)據(jù)寫到 DWD 層的 Kafka,則需要另外接入 Flink。同時(shí),當(dāng)需要從 DWD 層的 Kafka 把數(shù)據(jù)再導(dǎo)入到 Clickhouse,Elasticsearch,MySQL 或者是 Hive 里面做進(jìn)一步的分析時(shí),顯然就增加了整個(gè)架構(gòu)的復(fù)雜性。
其次,Kappa 架構(gòu)是強(qiáng)烈依賴消息隊(duì)列的,我們知道消息隊(duì)列本身在整個(gè)鏈路上數(shù)據(jù)計(jì)算的準(zhǔn)確性是嚴(yán)格依賴它上游數(shù)據(jù)的順序,消息隊(duì)列接的越多,發(fā)生亂序的可能性就越大。ODS 層數(shù)據(jù)一般是絕對(duì)準(zhǔn)確的,把 ODS 層的數(shù)據(jù)發(fā)送到下一個(gè) kafka 的時(shí)候就有可能發(fā)生亂序,DWD 層再發(fā)到 DWS 的時(shí)候可能又亂序了,這樣數(shù)據(jù)不一致性就會(huì)變得很嚴(yán)重。
第三,Kafka 由于它是一個(gè)順序存儲(chǔ)的系統(tǒng),順序存儲(chǔ)系統(tǒng)是沒有辦法直接在其上面利用 OLAP 分析的一些優(yōu)化策略,例如謂詞下推這類的優(yōu)化策略,在順序存儲(chǔ)的 Kafka 上來實(shí)現(xiàn)是比較困難的事情。
那么有沒有這樣一個(gè)架構(gòu),既能夠滿足實(shí)時(shí)性的需求,又能夠滿足離線計(jì)算的要求,而且還能夠減輕運(yùn)維開發(fā)的成本,解決通過消息隊(duì)列構(gòu)建 Kappa 架構(gòu)過程中遇到的一些痛點(diǎn)?答案是肯定的,后面的篇幅會(huì)詳細(xì)論述。

3) 痛點(diǎn)總結(jié)

4) Flink+Iceberg 構(gòu)建實(shí)時(shí)數(shù)倉(cāng)
1. 近實(shí)時(shí)的數(shù)據(jù)接入
前面介紹了 Iceberg 既支持讀寫分離,又支持并發(fā)讀、增量讀、小文件合并,還可以支持秒級(jí)到分鐘級(jí)的延遲,基于這些優(yōu)勢(shì)我們嘗試采用 Iceberg 這些功能來構(gòu)建基于 Flink 的實(shí)時(shí)全鏈路批流一體化的實(shí)時(shí)數(shù)倉(cāng)架構(gòu)。
如下圖所示,Iceberg 每次的 commit 操作,都是對(duì)數(shù)據(jù)的可見性的改變,比如說讓數(shù)據(jù)從不可見變成可見,在這個(gè)過程中,就可以實(shí)現(xiàn)近實(shí)時(shí)的數(shù)據(jù)記錄。

2. 實(shí)時(shí)數(shù)倉(cāng) - 數(shù)據(jù)湖分析系統(tǒng)
此前需要先進(jìn)行數(shù)據(jù)接入,比如用 Spark 的離線調(diào)度任務(wù)去跑一些數(shù)據(jù),拉取,抽取最后再寫入到 Hive 表里面,這個(gè)過程的延時(shí)比較大。有了 Iceberg 的表結(jié)構(gòu),可以中間使用 Flink,或者 spark streaming,完成近實(shí)時(shí)的數(shù)據(jù)接入。
基于以上功能,我們?cè)賮砘仡櫼幌虑懊嬗懻摰?Kappa 架構(gòu),Kappa 架構(gòu)的痛點(diǎn)上面已經(jīng)描述過,Iceberg 既然能夠作為一個(gè)優(yōu)秀的表格式,既支持 Streaming reader,又可以支持 Streaming sink,是否可以考慮將 Kafka 替換成 Iceberg?
Iceberg 底層依賴的存儲(chǔ)是像 HDFS 或 S3 這樣的廉價(jià)存儲(chǔ),而且 Iceberg 是支持 parquet、orc、Avro 這樣的列式存儲(chǔ)。有列式存儲(chǔ)的支持,就可以對(duì) OLAP 分析進(jìn)行基本的優(yōu)化,在中間層直接進(jìn)行計(jì)算。例如謂詞下推最基本的 OLAP 優(yōu)化策略,基于 Iceberg snapshot 的 Streaming reader 功能,可以把離線任務(wù)天級(jí)別到小時(shí)級(jí)別的延遲大大的降低,改造成一個(gè)近實(shí)時(shí)的數(shù)據(jù)湖分析系統(tǒng)。

在中間處理層,可以用 presto 進(jìn)行一些簡(jiǎn)單的查詢,因?yàn)?Iceberg 支持 Streaming read,所以在系統(tǒng)的中間層也可以直接接入 Flink,直接在中間層用 Flink 做一些批處理或者流式計(jì)算的任務(wù),把中間結(jié)果做進(jìn)一步計(jì)算后輸出到下游。
替換 Kafka 的優(yōu)劣勢(shì):
總的來說,Iceberg 替換 Kafka 的優(yōu)勢(shì)主要包括:
實(shí)現(xiàn)存儲(chǔ)層的流批統(tǒng)一
中間層支持 OLAP 分析
完美支持高效回溯
存儲(chǔ)成本降低
當(dāng)然,也存在一定的缺陷,如:
數(shù)據(jù)延遲從實(shí)時(shí)變成近實(shí)時(shí)
對(duì)接其他數(shù)據(jù)系統(tǒng)需要額外開發(fā)工作

秒級(jí)分析 - 數(shù)據(jù)湖加速:
由于 Iceberg 本身是將數(shù)據(jù)文件全部存儲(chǔ)在 HDFS 上的,HDFS 讀寫這塊對(duì)于秒級(jí)分析的場(chǎng)景,還是不能夠完全滿足我們的需求,所以接下去我們會(huì)在 Iceberg 底層支持 Alluxio 這樣一個(gè)緩存,借助于緩存的能力可以實(shí)現(xiàn)數(shù)據(jù)湖的加速。這塊的架構(gòu)也在我們未來的一個(gè)規(guī)劃和建設(shè)中。

推薦閱讀:
世界的真實(shí)格局分析,地球人類社會(huì)底層運(yùn)行原理
不是你需要中臺(tái),而是一名合格的架構(gòu)師(附各大廠中臺(tái)建設(shè)PPT)
企業(yè)IT技術(shù)架構(gòu)規(guī)劃方案
論數(shù)字化轉(zhuǎn)型——轉(zhuǎn)什么,如何轉(zhuǎn)?
企業(yè)10大管理流程圖,數(shù)字化轉(zhuǎn)型從業(yè)者必備!
【中臺(tái)實(shí)踐】華為大數(shù)據(jù)中臺(tái)架構(gòu)分享.pdf
華為如何實(shí)施數(shù)字化轉(zhuǎn)型(附PPT)
