Flink 特性 | Flink 1.12 資源管理新特性回顧
內(nèi)存管理 資源調(diào)度 擴(kuò)展資源框架 未來規(guī)劃 總結(jié)
GitHub 地址 
一、內(nèi)存管理


1. 本地內(nèi)存(Managed Memory)
一方面是 slot 級別的預(yù)算規(guī)劃,它可以保證作業(yè)運行過程中不會因為內(nèi)存不足,造成某些算子或者任務(wù)無法運行;也不會因為預(yù)留了過多的內(nèi)存沒有使用造成資源浪費。同時 Flink 能保證當(dāng)任務(wù)運行結(jié)束時準(zhǔn)確將內(nèi)存釋放,確保 Task Manager 執(zhí)行新任務(wù)時有足夠的內(nèi)存可用。
另一方面,資源適應(yīng)性也是托管內(nèi)存很重要的特性之一,指算子對于內(nèi)存的需求是動態(tài)可調(diào)整的。具備了適應(yīng)性,算子就不會因為給予任務(wù)過多的內(nèi)存造成資源使用上的浪費,也不會因為提供的內(nèi)存相對較少導(dǎo)致整個作業(yè)無法運行,使內(nèi)存的運用保持在一定的合理范圍內(nèi)。
當(dāng)然,在內(nèi)存分配相對比較少情況下,作業(yè)會受到一定限制,例如需要通過頻繁的落盤保證作業(yè)的運行,這樣可能會影響性能。
RocksDB 狀態(tài)后端:在流計算的場景中,每個 Slot 會使用 State 的 Operator,從而共享同一底層 的 RocksDB 緩存; Flink 內(nèi)置算子:包含批處理、Table SQL、DataSet API 等算子,每個算子有獨立的資源預(yù)算,不會相互共享; Python 進(jìn)程:用戶使用 PyFlink,使用 Python 語言定義 UDF 時需要啟動 Python 的虛擬機進(jìn)程。
2. Job Graph 編譯階段
■ 2.1 作業(yè)的 Job Graph 編譯階段
第一個問題是:slot 當(dāng)中到底有哪些算子或者任務(wù)會同時執(zhí)行。這個問題關(guān)系到在一個查詢作業(yè)中如何對內(nèi)存進(jìn)行規(guī)劃,是否還有其他的任務(wù)需要使用 management memory,從而把相應(yīng)的內(nèi)存留出來。在流式的作業(yè)中,這個問題是比較簡單的,因為我們需要所有的算子同時執(zhí)行,才能保證上游產(chǎn)出的數(shù)據(jù)能被下游及時的消費掉,這個數(shù)據(jù)才能夠在整個 job grep 當(dāng)中流動起來。但是如果我們是在批處理的一些場景當(dāng)中,實際上我們會存在兩種數(shù)據(jù) shuffle 的模式。
一種是 pipeline 的模式,這種模式跟流式是一樣的,也就是我們前面說到的 bounded stream 處理方式,同樣需要上游和下游的算子同時運行,上游隨時產(chǎn)出,下游隨時消費。

另外一種是所謂的 batch 的 blocking的方式,它要求上游把數(shù)據(jù)全部產(chǎn)出,并且落盤結(jié)束之后,下游才能開始讀數(shù)據(jù)。
這兩種模式會影響到哪些任務(wù)可以同時執(zhí)行。目前在 Flink 當(dāng)中,根據(jù)作業(yè)拓?fù)鋱D中的一個邊的類型 (如圖上)。我們劃分出了定義的一個概念叫做 pipelined region,也就是全部都由 pipeline 的邊鎖連通起來的一個子圖,我們把這個子圖識別出來,用來判斷哪些 task 會同時執(zhí)行。
第二個問題是:slot 當(dāng)中到底有哪些使用場景?我們剛才介紹了三種 manage memory 的使用場景。在這個階段,對于流式作業(yè),可能會出現(xiàn) Python UDF 以及 Stateful Operator。這個階段當(dāng)中我們需要注意的是,這里并不能肯定 State Operator 一定會用到 management memory,因為這跟它的狀態(tài)類型是相關(guān)的。
如果它使用了 RocksDB State Operator,是需要使用 manage memory 的;
但是如果它使用的是 Heap State Backend,則并不需要。
第三個問題:對于 batch 的作業(yè),我們除了需要清楚有哪些使用場景,還需要清楚一件事情,就是前面提到過 batch 的 operator。它使用 management memory 是以一種算子獨享的方式,而不是以 slot 為單位去進(jìn)行共享。我們需要知道不同的算子應(yīng)該分別分配多少內(nèi)存,這個事情目前是由 Flink 的計劃作業(yè)來自動進(jìn)行設(shè)置的。
然而,作業(yè)在編譯的階段,其實并不知道狀態(tài)的類型,這里是需要去注意的地方。
■ 2.2 執(zhí)行階段

其中一個是 RocksDB State Backend,有了第一步的判斷之后,第二步我們會根據(jù)用戶的配置,去決定不同使用方式之間怎么樣去共享 slot 的 management memory。
在這個 Steaming 的例子當(dāng)中,我們定義的 Python 的權(quán)重是 30%,State Backend 的權(quán)重是 70%。在這樣的情況下,如果只有 Python,Python 的部分自然是使用 100% 的內(nèi)存(Streaming 的 Heap State Backend 分支);
而對于第二種情況(Streaming 的 RocksDB State Backend 分支),B、C 的這兩個 Operator 共用 30% 的內(nèi)存用于 Python 的 UDF,另外 C 再獨享 70% 的內(nèi)存用于 RocksDB State Backend。最后 Flink 會根據(jù) Task manager 的資源配置,一個 slot 當(dāng)中有多少 manager memory 來決定每個 operator 實際可以用的內(nèi)存的數(shù)量。

3. 參數(shù)配置
| 配置參數(shù) | 默認(rèn)值 | 備注 | |
|---|---|---|---|
| 大小 | taskmanager.memory.managed.size | / | 絕對大小 |
| 權(quán)重 | taskmanager.memory.managed.fraction | 0.4 | 相對大小(占用Flink)總內(nèi)存比例 |
| taskmanager.memory.managed.consumer-weight | DATAPROC:70,PYTHON:30 | 多種用途并存時候分配權(quán)重 |
一種是絕對值的配置方式;
還有一種是作為 Task Manager 總內(nèi)存的一個相對值的配置方式。
taskmanager.memory.managed.consumer-weight 是一個新加的配置項,它的數(shù)據(jù)類型是 map 的類型,也就是說我們在這里面實際上是給了一個 key 冒號 value,然后逗號再加上下一組 key 冒號 value 的這樣的一個數(shù)據(jù)的結(jié)構(gòu)。這里面我們目前支持兩種 consumer 的 key:
一個是 DATAPROC, DATAPROC 既包含了流處理當(dāng)中的狀態(tài)后端 State Backend 的內(nèi)存,也包含了批處理當(dāng)中的 Batch Operator;
另外一種是 Python。
二、 資源調(diào)度
部分資源調(diào)度相關(guān)的 Feature 是其他版本或者郵件列表里面大家詢問較多的,這里我們也做對應(yīng)的介紹。
1. 最大 Slot 數(shù)

2. TaskManager 容錯

3. 任務(wù)平鋪分布

第一,這個參數(shù)我們只針對 Standalone 模式,因為在 yarn 跟 k8s 的模式下,實際上是根據(jù)你作業(yè)的需求來決定起多少 task manager 的,所以是先有了需求再有 TaskManager,而不是先有 task manager,再有 slot 的調(diào)度需求。
在每次調(diào)度任務(wù)的時候,實際上只能看到當(dāng)前注冊上來的那一個 TaskManager,F(xiàn)link 沒辦法全局的知道后面還有多少 TaskManager 會注冊上來,這也是很多人在問的一個問題,就是為什么特性打開了之后好像并沒有起到一個很好的效果。
第二個需要注意的點是,這里面我們只能決定每一個 TaskManager 上有多少空閑 slot,然而并不能夠決定每個 operator 有不同的并發(fā)數(shù),F(xiàn)link 并不能決定說每個 operator 是否在 TaskManager 上是一個均勻的分布,因為在 flink 的資源調(diào)度邏輯當(dāng)中,在整個 slot 的 allocation 這一層是完全看不到 task 的。
三、擴(kuò)展資源框架
1. 背景
一個是 Flink AI Extended 的項目,是基于 Flink 的深度學(xué)習(xí)擴(kuò)展框架,目前支持 TensorFlow、PyTorch 等框架的集成,它使用戶可以將 TensorFlow 當(dāng)做一個算子,放在 Flink 任務(wù)中。
另一個是 Alink,它是一個基于 Flink 的通用算法平臺,里面也內(nèi)置了很多常用的機器學(xué)習(xí)算法。
2. 使用擴(kuò)展資源
需要支持該類擴(kuò)展資源的配置與調(diào)度。用戶可以在配置中指明對這類擴(kuò)展資源的需求,如每個 TaskManager 上需要有一塊 GPU 卡,并且當(dāng) Flink 被部署在 Kubernetes/Yarn 這類資源底座上時,需要將用戶對擴(kuò)展資源的需求進(jìn)行轉(zhuǎn)發(fā),以保證申請到的 Container/Pod 中存在對應(yīng)的擴(kuò)展資源。
需要向算子提供運行時的擴(kuò)展資源信息。用戶在自定義算子中可能需要一些運行時的信息才能使用擴(kuò)展資源,以 GPU 為例,算子需要知道它內(nèi)部的模型可以部署在那一塊 GPU 卡上,因此,需要向算子提供這些信息。
3. 擴(kuò)展資源框架使用方法
首先為該擴(kuò)展資源設(shè)置相關(guān)配置;
然后為所需的擴(kuò)展資源準(zhǔn)備擴(kuò)展資源框架中的插件;
最后在算子中,從 RuntimeContext 來獲取擴(kuò)展資源的信息并使用這些資源。
■ 3.1 配置參數(shù)
# 定義擴(kuò)展資源名稱,“gpu”external-resources: gpu# 定義每個 TaskManager 所需的 GPU 數(shù)量external-resource.gpu.amount: 1# 定義Yarn或Kubernetes中擴(kuò)展資源的配置鍵external-resource.gpu.yarn.config-key: yarn.io/gpuexternal-resource.gpu.kubernetes.config-key: nvidia.com/gpu# 定義插件 GPUDriver 的工廠類。external-resource.gpu.driver-factory.class:org.apache.flink.externalresource.gpu.GPUDriverFactory
對于任何擴(kuò)展資源,用戶首先需要將它的名稱加入 "external-resources" 中,這個名稱也會被用作該擴(kuò)展資源其他相關(guān)配置的前綴來使用。示例中,我們定義了一種名為 "gpu" 的資源。
在調(diào)度層,目前支持用戶在 TaskManager 的粒度來配置擴(kuò)展資源需求。示例中,我們定義每個 TaskManager 上的 GPU 設(shè)備數(shù)為 1。
將 Flink 部署在 Kubernetes 或是 Yarn 上時,我們需要配置擴(kuò)展資源在對應(yīng)的資源底座上的配置鍵,以便 Flink 對資源需求進(jìn)行轉(zhuǎn)發(fā)。示例中展示了 GPU 對應(yīng)的配置。
如果提供了插件,則需要將插件的工廠類名放入配置中。
■ 3.2 前置準(zhǔn)備
在 Standalone 模式下,集群管理員需要保證 GPU 資源對 TaskManager 進(jìn)程可見; 在 Kubernetes 模式下,需要集群支持 Device Plugin[6],對應(yīng)的 Kubernetes 版本為 1.10,并且在集群中安裝了 GPU 對應(yīng)的插件; 在 Yarn 模式下,GPU 調(diào)度需要集群 Hadoop 版本在 2.10 或 3.1 以上,并正確配置了 resource-types.xml 等文件。
■ 3.3 擴(kuò)展資源框架插件
public interface ExternalResourceDriverFactory {/*** 根據(jù)提供的設(shè)置創(chuàng)建擴(kuò)展資源的Driver*/ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;}public interface ExternalResourceDriver {/*** 獲取所需數(shù)量的擴(kuò)展資源信息*/Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception;}
4. GPU 插件
當(dāng)調(diào)用腳本時,所需要的 GPU 數(shù)量將作為第一個參數(shù)輸入,之后為用戶自定義參數(shù)列表; 若腳本執(zhí)行正常,則輸出 GPU Index 列表,以逗號分隔; 若腳本出錯或執(zhí)行結(jié)果不符合預(yù)期,則腳本以非零值退出,這會導(dǎo)致 TaskManager 初始化失敗,并在日志中打印腳本的錯誤信息。
5. 在算子中獲取擴(kuò)展資源信息
public class ExternalResourceMapFunction extends RichMapFunction<String, String> {private static finalRESOURCE_NAME="gpu";public String map(String value) {Set<ExternalResourceInfo> gpuInfos = getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME);List<String> indexes = gpuInfos.stream().map(gpuInfo -> gpuInfo.getProperty("index").get()).collect(Collectors.toList());// Map function with GPU// ...}}
6. MNIST Demo

class MNISTClassifier extends RichMapFunction<List<Float>, Integer> {public void open(Configuration parameters) {//獲取GPU信息并且選擇第一塊GPUSet<ExternalResourceInfo> externalResourceInfos = getRuntimeContext().getExternalResourceInfos(resourceName);final Optional<String> firstIndexOptional = externalResourceInfos.iterator().next().getProperty("index");// 使用第一塊GPU的index初始化JCUDA組件JCuda.cudaSetDevice(Integer.parseInt(firstIndexOptional.get()));JCublas.cublasInit();}}
class MNISTClassifier extends RichMapFunction<List<Float>, Integer> {public Integer map(List<Float> value) {// 使用Jucblas做矩陣算法JCublas.cublasSgemv('n', DIMENSIONS.f1, DIMENSIONS.f0, 1.0f,matrixPointer, DIMENSIONS.f1, inputPointer, 1, 0.0f, outputPointer, 1);// 獲得乘法結(jié)果并得出該圖所表示的數(shù)字JCublas.cublasGetVector(DIMENSIONS.f1, Sizeof.FLOAT, outputPointer, 1, Pointer.to(output), 1);JCublas.cublasFree(inputPointer);JCublas.cublasFree(outputPointer);int result = 0;for (int i = 0; i < DIMENSIONS.f1; ++i) {result = output[i] > output[result] ? i : result;}return result;}}
四、未來計劃
除了上文介紹的這些已經(jīng)發(fā)布的特性外,Apache Flink 社區(qū)也正在積極準(zhǔn)備更多資源管理方面的優(yōu)化特性,在未來的版本中將陸續(xù)和大家見面。
被動資源調(diào)度模式:托管內(nèi)存使得 Flink 任務(wù)可以靈活地適配不同的 TaskManager/Slot 資源,充分利用可用資源,為計算任務(wù)提供給定資源限制下的最佳算力。但用戶仍需指定計算任務(wù)的并行度,F(xiàn)link 需要申請到滿足該并行度數(shù)量的 TaskManager/Slot 才能順利執(zhí)行。被動資源調(diào)度將使 Flink 能夠根據(jù)可用資源動態(tài)改變并行度,在資源不足時能夠 best effort 進(jìn)行數(shù)據(jù)處理,同時在資源充足時恢復(fù)到指定的并行度保障處理性能。 細(xì)粒度資源管理:Flink 目前基于 Slot 的資源管理與調(diào)度機制,認(rèn)為所有的 Slot 都具有相同的規(guī)格。對于一些復(fù)雜的規(guī)模化生產(chǎn)任務(wù),往往需要將計算任務(wù)拆分成多個子圖,每個子圖單獨使用一個 Slot 執(zhí)行。當(dāng)子圖間的資源需求差異較大時,使用相同規(guī)格的 Slot 往往難以滿足資源效率方面的需求,特別是對于 GPU 這類成本較高的擴(kuò)展資源。細(xì)粒度資源管理允許用戶為作業(yè)的子圖指定資源需求,F(xiàn)link 會根據(jù)資源需求使用不同規(guī)格的 TaskManager/Slot 執(zhí)行計算任務(wù),從而優(yōu)化資源效率。
五、總結(jié)
首先從本地內(nèi)存、Job Graph 編譯階段、執(zhí)行階段來解答每個流程的內(nèi)存管理以及內(nèi)存分配細(xì)節(jié),通過新的參數(shù)配置控制 TaskManager的內(nèi)存分配; 然后從大家平時遇到資源調(diào)度相關(guān)問題,包括最大 Slot 數(shù)使用,如何進(jìn)行 TaskManager 進(jìn)行容錯,任務(wù)如何通過任務(wù)平鋪均攤?cè)蝿?wù)資源; 最后在機器學(xué)習(xí)和深度學(xué)習(xí)領(lǐng)域常常用到 GPU 進(jìn)行加速計算,通過解釋 Flink 在 1.12 版本如何使用擴(kuò)展資源框架和演示 Demo, 給我們展示了資源擴(kuò)展的使用。再針對資源利用率方面提出 2 個社區(qū)未來正在做的計劃,包括被動資源模式和細(xì)粒度的資源管理。
六、附錄

戳我,查看更多技術(shù)干貨~