點擊上方 "肉眼品世界"關注, 深度價值體系傳遞

摘要:網(wǎng)易游戲資深開發(fā)工程師林小鉑為大家?guī)砭W(wǎng)易游戲基于 Flink 的流式 ETL 建設的介紹。內容包括:業(yè)務背景
專用 ETL
EntryX 通用 ETL
調優(yōu)實踐
未來規(guī)劃
網(wǎng)易游戲的基礎數(shù)據(jù)主要日志方式采集,這些日志通常是非結構化或半結構化數(shù)據(jù),需要經過數(shù)據(jù)集成 ETL 才可以入庫至實時或離線的數(shù)據(jù)倉庫。此后,業(yè)務用戶才可以方便地用 SQL 完成大部分數(shù)據(jù)計算,包括實時的 Flink SQL 和離線的 Hive 或 Spark。
網(wǎng)易游戲數(shù)據(jù)集成的數(shù)據(jù)流與大多數(shù)公司大同小異,主要有游戲客戶端日志、游戲服務端日志和其他周邊基礎的日志,比如 Nginx access log、數(shù)據(jù)庫日志等等。這些日志會被采集到統(tǒng)一的 Kafka 數(shù)據(jù)管道,然后經由 ETL 入庫服務寫入到 Hive 離線數(shù)據(jù)倉庫或者 Kafka 實時數(shù)據(jù)倉庫。這是很常見的架構,但在我們在需求方面是有一些比較特殊的情況。
網(wǎng)易游戲流式 ETL 需求特點

首先,不同于互聯(lián)網(wǎng)、金融等行業(yè)基本常用 MySQL、Postgres 等的關系型數(shù)據(jù)庫,游戲行業(yè)常常使用 MongoDB 這類 schema-free 的文檔型數(shù)據(jù)庫。這給我們 ETL 服務帶來的問題是并沒有一個線上業(yè)務的準確的 schema 可以依賴,在實際數(shù)據(jù)處理中,多字段或少字段,甚至一個字段因為玩法迭代變更為完全不同的格式,這樣的情況都是可能發(fā)生的。這樣的數(shù)據(jù)異構問題給我們 ETL 的數(shù)據(jù)清洗帶來了比較高的成本。其次,也是由于數(shù)據(jù)庫選型的原因,大部分業(yè)務的數(shù)據(jù)庫模式都遵循了反范式設計,會刻意以復雜內嵌的字段來避免表間的 join。這種情況給我們帶來的一個好處是,在數(shù)據(jù)集成階段我們不需要去實時地去 join 多個數(shù)據(jù)流,壞處則是數(shù)據(jù)結構可能會非常復雜,多層嵌套十分常見。然后,由于近年來實時數(shù)倉的流行,我們也同樣在逐步建設實時數(shù)據(jù)倉庫,所以復用現(xiàn)有的 ETL 管道,提取轉換一次,加載到實時離線兩個數(shù)據(jù)倉庫,成為一個很自然的發(fā)展方向。最后,我們的日志類型多且變更頻繁,比如一個玩法復雜的游戲,可能有 1,000 個以上的日志類型,每兩周可能就會有一次發(fā)版。在這樣的背景下 ETL 出現(xiàn)異常數(shù)據(jù)是不可避免的。因此我們需要提供完善的異常處理,讓業(yè)務可以及時得知數(shù)據(jù)異常和通過流程修復數(shù)據(jù)。
日志分類及特點

為了更好地針對不同業(yè)務使用模式優(yōu)化,我們對不同日志類型的業(yè)務提供了不同的服務。我們的日志通常分為三個類型:運營日志、業(yè)務日志和程序日志。運營日志記錄的是玩家行為事件,比如登錄帳號、領取禮包等。這類日志是最為重要日志,有固定的格式,也就是特定 header + json 的文本格式。數(shù)據(jù)的主要用途是做數(shù)據(jù)報表、數(shù)據(jù)分析還有游戲內的推薦,比如玩家的組隊匹配推薦。業(yè)務日志記錄的是玩家行為以外的業(yè)務事件,這個就比較廣泛,比如 Nginx access log、CDN 下載日志等等,這些完全沒有固定格式,可能是二進制也可能是文本。主要用途類似于運營日志,但更加豐富和定制化。程序日志記錄是程序的運行情況,也就是平時我們通過日志框架打的 INFO、ERROR 這類日志。程序日志主要用途是檢索定位運行問題,通常是寫入 ES,但有時數(shù)量過大或者需要提取指標分析時,也會寫入數(shù)據(jù)倉庫。
網(wǎng)易游戲 ETL 服務剖析

針對這些日志分類,我們具體提供了三類 ETL 入庫的服務。首先是運營日志專用的 ETL,這會根據(jù)運營日志的模式進行定制化。然后是通用的面向文本日志的 EntryX ETL 服務,它會服務于運營日志以外的所有日志。最后是 EntryX 無法支持的特殊 ETL 需求,比如有加密或者需要進行特殊轉換的數(shù)據(jù),這種情況下我們就會針對性地開發(fā) ad-hoc 作業(yè)來處理。
運營日志 ETL 發(fā)展歷程

運營日志 ETL 服務有著一個比較久的歷史。大概在 2013 年,網(wǎng)易游戲就建立了基于 Hadoop Streaming + Python 預處理/后處理的第一版離線 ETL 框架。這套框架是平穩(wěn)運行了多年。在 2017 年的時候,隨著 Spark Streaming 的嶄露頭角,我們開發(fā)了基于 Spark Streaming 的第二個版本,相當于一個 POC,但因為微批調優(yōu)困難且小文件多等問題沒有上線應用。時間來到 2018 年,當時 Flink 已經比較成熟,我們也決定將業(yè)務遷移到 Flink 上,所以我們很自然地開發(fā)了基于 Flink DataStream 的第三版運營日志 ETL 服務。這里面比較特殊的一點就是,因為長久以來我們業(yè)務方積累了很多 Python 的 ETL 腳本,然后新版最重要的一點就是要支持這些 Python UDF 的無縫遷移。
運營日志 ETL 架構

在早期 Hadoop Streaming 的版本里面,數(shù)據(jù)首先會被 dump 到 HDFS 上,然后 Hadoop Streaming 啟動 Mapper 來讀取數(shù)據(jù)并通過標準輸入的方式傳遞給 Python 腳本。Python 腳本里面會分為三個模塊:首先預處理 UDF,這里通常會進行基于字符串的替換,一般用作規(guī)范化數(shù)據(jù),比如有些海外合作廠商的時間格式可能跟我們不同,那么就可以在這里進行統(tǒng)一。預處理完的數(shù)據(jù)會進入通用的解析/轉換模塊,這里我們會根據(jù)運營日志的格式來解析數(shù)據(jù),并進行通用轉換,比如濾掉測試服數(shù)據(jù)。通用模塊之后,最后還有一個后處理模塊進行針對字段的轉換,比如常見的匯率轉換。之后數(shù)據(jù)會通過標準輸出返回給 Mapper,然后 Mapper 再將數(shù)據(jù)批量寫到 Hive 目錄中。我們用 Flink 重構后,數(shù)據(jù)源就由 HDFS 改為直接對接 Kafka,而 IO 模塊則用 Flink 的 Source/Sink Operator 來代替原本的 Mapper,然后中間通用模塊可以直接重寫為 Java,剩余的預處理和后處理則是我們需要支持 Python UDF 的地方。
Python UDF 實現(xiàn)

在具體實現(xiàn)上,我們在 Flink ProcessFunction 之上加入了 Runner 層,Runner 層負責跨語言的執(zhí)行。技術選型上是選了 Jython,而沒有選擇 Py4j,主要因為 Jython 可以直接在 JVM 里面去完成計算,不需要額外啟動 Python 進程,這樣開發(fā)和運維管理成本都比較低。而 Jython 帶來的限制,比如不支持 pandas 等基于 c 的庫,這些對于我們的 Python UDF 來說都是可接受的。整個調用鏈是,ProcessFunction 在 TaskManager 被調用時會在 open 函數(shù)延遲初始化 Runner,這是因為 Jython 是不可序列化的。Runner 初始化時會負責資源準備,包括將依賴的模塊加入 PYTHONPATH,然后根據(jù)配置反射調用 UDF 函數(shù)。調用時,對于預處理 UDF Runner 會把字符串轉化為 Jython 的 PyUnicode 類型,而對于后處理 UDF 則會把解析后的 Map 對象轉為 Jython 的 PyDcitionary,分別作為兩者的輸入。UDF 可以調用其他模塊進行計算,最終返回 PyObject,然后 Runner 再將其轉換成 Java String 或者 Map,返回給 ProcessFunction 輸出。
運營日志 ETL 運行時

剛剛是 UDF 模塊的局部視圖,我們再來看下整體的 ETL 作業(yè)視圖。首先在我們提供了通用的 Flink jar,當我們生成并提交 ETL 作業(yè)到作業(yè)平臺時,調度器會執(zhí)行通用的 main 函數(shù)構建 Flink JobGraph。這時會從我們的配置中心,也就是 ConfigServer,拉取 ETL 配置。ETL 配置中包含使用到的 Python 模塊,后端服務會掃描其中引用到的其他模塊,把它們統(tǒng)一作為資源文件通過 YARN 分發(fā)功能上傳到 HDFS 上。在 Flink JobManager 和 TaskManager 啟動時,這些 Python 資源會被 YARN 自動同步到工作目錄上備用。這就是整個作業(yè)初始化的過程。然后因為 ETL 規(guī)則的小變更是很頻繁的,比如新增一個字段或者變更一下過濾條件,如果我們每次變更都需要重啟作業(yè),那么作業(yè)重啟帶來的不可用時間會對我們的下游用戶造成比較糟糕的體驗。因此,我們對變更進行了分類,對于一些不影響 Flink JobGraph 的輕量級變更支持熱更新。實現(xiàn)的方式是每個 TaskManager 啟動一個熱更新線程,定時輪詢配置中心同步配置。
接下來介紹我們的通用 ETL 服務 EntryX。這里的通用可以分為兩層意義,首先是數(shù)據(jù)格式上的通用,支持非結構化到結構化的各種文本數(shù)據(jù),其次是用戶群體的通用,目標用戶覆蓋數(shù)據(jù)分析、數(shù)據(jù)開發(fā)等傳統(tǒng)用戶,和業(yè)務程序、策劃這些數(shù)據(jù)背景較弱的用戶。
EntryX 基本概念

先介紹 EntryX 的三個基本概念,Source、StreamingTable 和 Sink。用戶需要分別配置這個三個模塊,系統(tǒng)會根據(jù)這些自動生成 ETL 作業(yè)。
Source 是 ETL 作業(yè)的輸入源,通常是從業(yè)務端采集而來的原始日志 topic,或者是經過分發(fā)過濾后的 topic。這些 topic 可能只包含一種日志,但更多情況下會包含多種異構日志。接下來 StreamingTable,一個比較通俗的名稱就是流表。流表定義了 ETL 管道的主要元數(shù)據(jù),包括如何轉換數(shù)據(jù),還有根據(jù)轉換好的數(shù)據(jù)定義的流表 schema,將數(shù)據(jù) schema 化。流表 schema 是最為關鍵的概念,它相當于 Table DDL,主要包括字段名、字段數(shù)據(jù)類型、字段約束和表屬性等。為了更方便對接上下游,流表 schema 使用的是自研的 SQL-Like 的類型系統(tǒng),里面會支持我們一些拓展的數(shù)據(jù)類型,比如 JSON 類型。最后 Sink 負責流表到目標存儲的物理表的映射,比如映射到目標 Hive 表。這里主要需要 schema 的映射關系,比如流表哪個字段映射到目標表哪個字段,流表哪個字段用作目標 Hive 表分區(qū)字段。在底層,系統(tǒng)會自動根據(jù) schema 映射關系來提取字段,并將數(shù)據(jù)轉換為目標表的存儲格式,加載到目標表。
EntryX ETL 管道

再來看下 EntryX ETL 管道的具體實現(xiàn)。藍色部分是外部存儲系統(tǒng),而綠色部分則是 EnrtyX 的內部模塊。數(shù)據(jù)首先從對接采集的原始數(shù)據(jù) Topic 流入,經過 Source 攝入到 Filter。Filter 負責根據(jù)關鍵詞過濾數(shù)據(jù),通常來說我們要求過濾完的數(shù)據(jù)是有相同 schema 的。經過這兩步數(shù)據(jù)完成 Extract,來到 Transform 階段。Transform 第一步是解析數(shù)據(jù),也就是這里的 Parser。Parser 支持 JSON/Regex/Csv 三種解析,基本可以覆蓋所有案例。第二步是對數(shù)據(jù)進行轉換,這是由 Extender 負責的。Extender 通過內置函數(shù)或 UDF 計算衍生字段,最常見的是將 JSON 對象拉平展開,提取出內嵌字段。最后是 Formatter,F(xiàn)ormatter 會根據(jù)之前用戶定義的字段邏輯類型,將字段的值轉為對應的物理類型。比如一個邏輯類型為 BIGINT 的字段,我們在這里會統(tǒng)一轉為 Java long 的物理類型。數(shù)據(jù)完成 Transform 之后來到最后的 Load 階段。Load 第一步是決定數(shù)據(jù)應該加載到哪個表。Splitter 模塊會根據(jù)每個表的入庫條件(也就是一個表達式)來分流數(shù)據(jù),然后再到第二步的 Loader 來負責將數(shù)據(jù)寫到具體的外部存儲系統(tǒng)。目前我們支持 Hive/Kafka 兩種存儲,Hive 支持 Text/Parquet/JSON 三種格式,而 Kafka 支持 JSON 和 Avro 兩種格式。
實時離線統(tǒng)一 Schema

在 Entryx 的設計里數(shù)據(jù)可以被寫入實時和離線兩個數(shù)據(jù)倉庫,也就是說同一份數(shù)據(jù),但在不同的存儲系統(tǒng)中以不同格式表示。從 Flink SQL 的角度來說是 schema 部分相同,但 connector 和 format 不同的兩個表。而 schema 部分經常會隨業(yè)務變更,而 connector 和 format(也就是存儲系統(tǒng)和存儲格式)是相對穩(wěn)定的。那么一個很自然的想法就是,能不能將 schema 部分提取出來獨立維護?實際上,這個抽象的 schema 已經存在了,就是我們在 ETL 提取的流表 schema。在 EntryX 里面,流表 schema 是與序列化器、存儲系統(tǒng)無關的 schema,作為 Single Source of Truth。基于流表 schema,加上存儲系統(tǒng)信息和存儲格式信息,我們就可以衍生出具體的物理表的 DDL。目前我們主要是支持 Hive/Kafka,如果之后要拓展至支持 ES/HBase 表也是非常方便。
實時數(shù)據(jù)倉庫集成

EntryX 一個重要的定位是作為實時倉庫的統(tǒng)一入口。剛剛其實已經多次提到 Kafka 表,但還沒有說實時數(shù)倉是怎么做的。實時數(shù)倉的常見問題是 Kafka 并沒有原生支持 schema 元數(shù)據(jù)的持久化。目前社區(qū)的主流解決方案是基于 Hive MetaStore 來保存 Kafka 表的元數(shù)據(jù),并復用 HiveCatalog 來直接對接到 Flink SQL。
但這對于我們來說使用 Hive MetaStore 主要有幾個問題:一是在實時作業(yè)里引入 Hive 依賴并與 Hive 耦合,這是很重的依賴,導致定義的表很難被其他組件復用,包括 Flink DataStream 用戶;二是我們已經有 Kafka SaaS 平臺 Avatar 來管理物理 schema,比如 Avro schema,如果再引入 Hive MetaStore 會導致元數(shù)據(jù)的割裂。因此,我們是拓展了 Avatar 平臺的 schema 注冊中心,同時支持邏輯 schema 和物理 schema。那么實時數(shù)倉和 EntryX 的集成關系是:首先我們有 EntryX 的流表 schema,在新建 Sink 的時候調用 Avatar 的 schema 接口,根據(jù)映射關系生成邏輯 schema,而 Avatar 再根據(jù) Flink SQL 類型與物理類型的映射關系生成 topic 的物理 schema。與 Avatar schema 注冊中心配套的還有我們自研的 KafkaCatalog,它負責讀取 topic 的邏輯和物理 schema 來生成 Flink SQL 的 TableSource 或 TableSink。而對于一些 Flink SQL 以外的用戶,比如 Flink DataStream API 的用戶,他們也可以直接讀取物理 schema 來享受到數(shù)據(jù)倉庫的便利。
EntryX 運行時
和運營日志 ETL 類似,在 EntryX 運行時,系統(tǒng)會基于通用的 jar 和配置生成 Flink 作業(yè),但這里有兩種情況需要特別處理。

首先是一個 Kafka topic 往往有幾十甚至上千種日志,那么對應其實有也幾十甚至上千的流表,如果每個流表都單獨運行在一個作業(yè)里,那么一個 topic 會可能會被讀上千遍,這是非常大的浪費。因此,在作業(yè)運行時提供一個優(yōu)化策略,可以將同個 source 的不同流表合并到一個作業(yè)里跑。比如圖中,某個手游上傳了 3 種日志到 Kafka,用戶分別配置了玩家注冊、玩家登錄、領取禮包三個流表,那么我們可以這三個流表合并起來到一個作業(yè),共享同一個 Kafka Source。另外的一個優(yōu)化是,一般情況下我們可以按照之前“提取轉換一次,加載一次”的思路來將數(shù)據(jù)同時寫到 Hive 和 Kafka,但是由于 Hive 或者說 HDFS 畢竟是離線系統(tǒng),實時性比較差,寫入在一些負載比較高的 HDFS 老集群經常會出現(xiàn)反壓,同時阻塞上游,導致 Kafka 的寫入也受到影響。在這種情況下,我們通常要分離加載到實時和離線的 ETL 管道,具體會取決于業(yè)務的 SLA 還有 HDFS 的性能。接下來給大家分享下我們在 ETL 建設中的調優(yōu)實踐經驗。
HDFS 寫入調優(yōu)

首先是 HDFS 寫入的調優(yōu)。流式寫入 HDFS 場景中老生常談的一個問題便是小文件過多。通常來說小文件和實時性是魚與熊掌不可兼得。如果要延遲低,那么我們需要頻繁地滾動文件來提交數(shù)據(jù),必然導致小文件過多。小文件過多主要造成兩個問題:一從 HDFS 集群管理角度看,小文件會占用大量的文件數(shù)和 block 數(shù),浪費 NameNode 內存;二是從用戶角度看,讀寫效率都會降低,因為寫的時候要更頻繁地調用 RPC 和 flush 數(shù)據(jù),造成更多的阻塞,有時甚至造成 checkpoint 超時,而讀時則需要打開更多的文件才能讀完數(shù)據(jù)。
HDFS 寫入調優(yōu) - 數(shù)據(jù)流預分區(qū)
我們在優(yōu)化小文件問題時做的一點調優(yōu)是對數(shù)據(jù)流先做一遍預分區(qū),具體來說,便是在 Flink 作業(yè)內部先基于目標 Hive 表進行一次 keyby 分區(qū),讓同一個表的數(shù)據(jù)盡量集中在少數(shù)的幾個 subtask 上。

舉個例子,假設 Flink 作業(yè)并行度為 n,而目標 Hive 分區(qū)數(shù)為 m 個。因為每個 subtask 都有可能讀到任意分區(qū)的數(shù)據(jù),在默認的各 subtask 完全并行的情況下,每個 subtask 都會寫所有分區(qū),造成總體的寫入文件數(shù)是 n * m。假設 n 是 100,m 是 1000,按 10 分鐘滾一次文件算,每天會造成 14,400,000 個文件,這對于很多老集群來說是非常大的壓力。如果經過數(shù)據(jù)流分區(qū)的優(yōu)化之后,我們就可以限制住 Flink 并行度帶來的增長。比如我們 keyby hive 表字段,并加入范圍為 0-s 整數(shù)的鹽來避免數(shù)據(jù)傾斜,那么分區(qū)最多會被 s 個 subtask 讀寫。假設 s 是 5,比起原先 n 是 100,那么我們就將原本的文件數(shù)降低為原來 20 分之一。
基于 OperatorState 的 SLA 統(tǒng)計

第二個我想分享的是我們的 SLA 統(tǒng)計工具。背景是我們的用戶經常會通過 Web UI 來進行調試和問題的排查,比如不同 subtask 的輸入輸出數(shù)目,但這些 metric 會因為作業(yè)重啟或者 failover 而重置,因此我們開發(fā)了基于 OperatorState 的 SLA-Utils 工具來統(tǒng)計數(shù)據(jù)的輸入和分類輸出。這個工具設計得非常輕量級,可以很容易集成到我們自己的服務或者用戶的作業(yè)里面。
在 SLA-Utils 里面,我們支持了三種 metric。首先是標準的 metric,有 recordsIn/recordsOut/recordsDropped/recordsErrored,分別對應輸入記錄數(shù)/正常輸出記錄數(shù)/被過濾掉的記錄數(shù)/處理異常的記錄數(shù)。通常來說 recordsIn 就等于后面三者的總和。第二種用戶可以自定義的 metric,通??梢杂糜谟涗浉敿毜脑?,比如是 recordsEventTimeDropped 代表數(shù)據(jù)是因為 event time 被過濾的。那么上述兩種 metric 靜態(tài)的,也就是說 metric key 在作業(yè)運行前就要確定,此外 SLA-Utils 還支持在運行時動態(tài)注冊的 TTL metric。這種 metric 通常有動態(tài)生成的日期作為前綴,在經過 TTL 的時間之后被自動清理。TTL metric 主要可以用于做天級別時間窗口的統(tǒng)計。這里比較特別的一點是,因為 OperatorState 是不支持 TTL 的,SLA-Utils 是在每次進行 checkpoint 快照的時候進行一次過濾,剔除掉過期的 metric,以實現(xiàn) TTL 的效果。那么在 State 保存了 SLA 指標之后要做的就是暴露給用戶。我們目前的做法是通過 Accumulater 的方式來暴露,優(yōu)點是 Web UI 有支持,開箱即用,同時 Flink 可以自動合并不同的 subtask 的 metric。缺點在于沒有辦法利用 metric reporter 來 push 到監(jiān)控系統(tǒng),同時因為 Acuumulater 是不能在運行時動態(tài)注銷的,所以使用 TTL metric 會有內存泄漏的風險。因此,在未來我們也考慮支持 metric group 來避免這些問題。
數(shù)據(jù)容錯及恢復
最后再分享下我們在數(shù)據(jù)容錯和恢復上的實踐。

以很多最佳實踐相似,我們用 SideOutput 來收集 ETL 各環(huán)節(jié)中出錯的數(shù)據(jù),匯總到一個統(tǒng)一的錯誤流。錯誤記錄中包含我們預設的錯誤碼、原始輸入數(shù)據(jù)以及錯誤類和錯誤信息。一般情況下,錯誤數(shù)據(jù)會被分類寫入 HDFS,用戶通過監(jiān)控 HDFS 目錄可以得知數(shù)據(jù)是否正常。

那么存儲好異常數(shù)據(jù)后,下一步就是要恢復數(shù)據(jù)。這通常有兩種情況。一是數(shù)據(jù)格式異常,比如日志被截斷導致不完整或者時間戳不符合約定格式,這種情況下我們一般通過離線批作業(yè)來修復數(shù)據(jù),重新回填到原有的數(shù)據(jù)管道。二是 ETL 管道異常,比如數(shù)據(jù)實際的 schema 有變更但流表配置沒有更新,可能會導致某個字段都是空值,這時我們的處理辦法是:首先更新線上的流表配置為最新,保證不再產生更多異常數(shù)據(jù),這時 Hive 里面仍有部分分區(qū)是異常的。然后,我們發(fā)布一個獨立的補數(shù)作業(yè)來專門修復異常的數(shù)據(jù),輸出的數(shù)據(jù)會寫到一個臨時的目錄,并在 hive metastore 上切換 partition 分區(qū)的 location 來替換掉原來的異常目錄。因此這樣的一個補數(shù)流程對離線查詢的用戶來說是透明的。最后我們再在合適的時間替換掉異常分區(qū)的數(shù)據(jù)并恢復 location。
