<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink狀態(tài)管理與狀態(tài)一致性(長文)

          共 27633字,需瀏覽 56分鐘

           ·

          2021-04-19 20:07

          Flink狀態(tài)

          目錄

              一、前言

              二、狀態(tài)類型

                2.1、Keyed State

                2.2、Operator State

              三、狀態(tài)橫向擴展

              四、檢查點機制

                4.1、開啟檢查點 (checkpoint)

                4.2、保存點機制 (Savepoints)

              五、狀態(tài)后端

                5.1、狀態(tài)管理器分類

                5.2、配置方式

              六、狀態(tài)一致性

                6.1、端到端(end-to-end)

                6.2、Flink+Kafka的exactly-once

                6.3、Kafka冪等性和事務(wù)

                      冪等性

                      事務(wù)

                6.4 兩階段提交協(xié)議

          一、前言

          有狀態(tài)的計算是流處理框架要實現(xiàn)的重要功能,因為稍復(fù)雜的流處理場景都需要記錄狀態(tài),然后在新流入數(shù)據(jù)的基礎(chǔ)上不斷更新狀態(tài)。下面的幾個場景都需要使用流處理的狀態(tài)功能:

          數(shù)據(jù)流中的數(shù)據(jù)有重復(fù),想對重復(fù)數(shù)據(jù)去重,需要記錄哪些數(shù)據(jù)已經(jīng)流入過應(yīng)用,當新數(shù)據(jù)流入時,根據(jù)已流入過的數(shù)據(jù)來判斷去重。
          檢查輸入流是否符合某個特定的模式,需要將之前流入的元素以狀態(tài)的形式緩存下來。比如,判斷一個溫度傳感器數(shù)據(jù)流中的溫度是否在持續(xù)上升。
          對一個時間窗口內(nèi)的數(shù)據(jù)進行聚合分析,分析一個小時內(nèi)某項指標的75分位或99分位的數(shù)值。
          一個狀態(tài)更新和獲取的流程如下圖所示,一個算子子任務(wù)接收輸入流,獲取對應(yīng)的狀態(tài),根據(jù)新的計算結(jié)果更新狀態(tài)。一個簡單的例子是對一個時間窗口內(nèi)輸入流的某個整數(shù)字段求和,那么當算子子任務(wù)接收到新元素時,會獲取已經(jīng)存儲在狀態(tài)中的數(shù)值,然后將當前輸入加到狀態(tài)上,并將狀態(tài)數(shù)據(jù)更新。

          state

          二、狀態(tài)類型

          Flink有兩種基本類型的狀態(tài):托管狀態(tài)(Managed State)和原生狀態(tài)(Raw State)。

          兩者的區(qū)別:Managed State是由Flink管理的,F(xiàn)link幫忙存儲、恢復(fù)和優(yōu)化,Raw State是開發(fā)者自己管理的,需要自己序列化。

          具體區(qū)別有:

          • 從狀態(tài)管理的方式上來說,Managed State由Flink Runtime托管,狀態(tài)是自動存儲、自動恢復(fù)的,F(xiàn)link在存儲管理和持久化上做了一些優(yōu)化。當橫向伸縮,或者說修改Flink應(yīng)用的并行度時,狀態(tài)也能自動重新分布到多個并行實例上。Raw State是用戶自定義的狀態(tài)。

          • 從狀態(tài)的數(shù)據(jù)結(jié)構(gòu)上來說,Managed State支持了一系列常見的數(shù)據(jù)結(jié)構(gòu),如ValueState、ListState、MapState等。Raw State只支持字節(jié),任何上層數(shù)據(jù)結(jié)構(gòu)需要序列化為字節(jié)數(shù)組。使用時,需要用戶自己序列化,以非常底層的字節(jié)數(shù)組形式存儲,F(xiàn)link并不知道存儲的是什么樣的數(shù)據(jù)結(jié)構(gòu)。

          • 從具體使用場景來說,絕大多數(shù)的算子都可以通過繼承Rich函數(shù)類或其他提供好的接口類,在里面使用Managed State。Raw State是在已有算子和Managed State不夠用時,用戶自定義算子時使用。
            對Managed State繼續(xù)細分,它又有兩種類型:Keyed State和Operator State。

          為了自定義Flink的算子,可以重寫Rich Function接口類,比如RichFlatMapFunction。使用Keyed State時,通過重寫Rich Function接口類,在里面創(chuàng)建和訪問狀態(tài)。對于Operator State,還需進一步實現(xiàn)CheckpointedFunction接口。

          2.1、Keyed State

          Flink 為每個鍵值維護一個狀態(tài)實例,并將具有相同鍵的所有數(shù)據(jù),都分區(qū)到同一個算子任務(wù)中,這個任務(wù)會維護和處理這個key對應(yīng)的狀態(tài)。當任務(wù)處理一條數(shù)據(jù)時,它會自動將狀態(tài)的訪問范圍限定為當前數(shù)據(jù)的key。因此,具有相同key的所有數(shù)據(jù)都會訪問相同的狀態(tài)。

          需要注意的是鍵控狀態(tài)只能在 KeyedStream 上進行使用,可以通過 stream.keyBy(…) 來得到 KeyedStream 。

          Flink 提供了以下數(shù)據(jù)格式來管理和存儲鍵控狀態(tài) (Keyed State):

          • ValueState:存儲單值類型的狀態(tài)。可以使用 update(T) 進行更新,并通過 T value() 進行檢索。

          • ListState:存儲列表類型的狀態(tài)。可以使用 add(T) 或 addAll(List) 添加元素,update(T)進行更新;并通過 get() 獲得整個列表。

          • ReducingState:用于存儲經(jīng)過 ReduceFunction 計算后的結(jié)果,使用 add(T) 增加元素。

          • AggregatingState:用于存儲經(jīng)過 AggregatingState 計算后的結(jié)果,使用 add(IN) 添加元素。

          • FoldingState:已被標識為廢棄,會在未來版本中移除,官方推薦使用 AggregatingState 代替。

          • MapState:維護 Map 類型的狀態(tài),get獲取,put更新,contains判斷包含,remove移除元素。

          public class ListStateDemo extends RichFlatMapFunction<Tuple2<StringLong>,List<Tuple2<StringLong>>>{

              private transient ListState<Tuple2<String, Long>> listState;

              @Override
              public void open(Configuration parameters) throws Exception {
                  super.open(parameters);
                  ListStateDescriptor<Tuple2<String, Long>> listStateDescriptor = new ListStateDescriptor(
                          "listState",
                          TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})
                  );
                  listState = getRuntimeContext().getListState(listStateDescriptor);
              }

              @Override
              public void flatMap(Tuple2<String, Long> value, Collector<List<Tuple2<String, Long>>> out) throws Exception {
                  List<Tuple2<String, Long>> currentListState =  Lists.newArrayList(listState.get().iterator());
                  currentListState.add(value);
                  listState.update(currentListState);


                  out.collect(currentListState);
              }

              public static void main(String[] args) throws Exception {
                  StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
                  DataStream<Tuple2<String, Long>> dataStream = senv.fromElements(
                          Tuple2.of("a"50L),Tuple2.of("a"60L),Tuple2.of("a"70L),
                          Tuple2.of("b"50L),Tuple2.of("b"60L),Tuple2.of("b"70L),
                          Tuple2.of("c"50L),Tuple2.of("c"60L),Tuple2.of("c"70L)
                  );
                  dataStream
                          .keyBy(0)
                          .flatMap(new ListStateDemo())
                          .print();
                  senv.execute(ListStateDemo.class.getSimpleName());

              }
          }

          2.2、Operator State

          Operator State可以用在所有算子上,每個算子子任務(wù)或者說每個算子實例共享一個狀態(tài),流入這個算子子任務(wù)的數(shù)據(jù)可以訪問和更新這個狀態(tài)。

          算子狀態(tài)不能由相同或不同算子的另一個實例訪問。

          Flink為算子狀態(tài)提供三種基本數(shù)據(jù)結(jié)構(gòu):

          • ListState:存儲列表類型的狀態(tài)。

          • UnionListState:存儲列表類型的狀態(tài),與 ListState 的區(qū)別在于:如果并行度發(fā)生變化,ListState 會將該算子的所有并發(fā)的狀態(tài)實例進行匯總,然后均分給新的 Task;而 UnionListState 只是將所有并發(fā)的狀態(tài)實例匯總起來,具體的劃分行為則由用戶進行定義。

          • BroadcastState:用于廣播的算子狀態(tài)。如果一個算子有多項任務(wù),而它的每項任務(wù)狀態(tài)又都相同,那么這種特殊情況最適合應(yīng)用廣播狀態(tài)。
            假設(shè)此時不需要區(qū)分監(jiān)控數(shù)據(jù)的類型,只要有監(jiān)控數(shù)據(jù)超過閾值并達到指定的次數(shù)后,就進行報警:

          public class OperateStateDemo extends RichFlatMapFunction<Tuple2<StringLong>, List<Tuple2<StringLong>>>
                  implements CheckpointedFunction
          {

              private final int threshold;
              private transient ListState<Tuple2<String, Long>> checkpointedState;
              private List<Tuple2<String, Long>> bufferedElements;

              public OperateStateDemo(int threshold) {
                  this.threshold = threshold;
                  this.bufferedElements = new ArrayList<>();
              }

              @Override
              public void flatMap(Tuple2<String, Long> value, Collector<List<Tuple2<String, Long>>> out) throws Exception {
                  bufferedElements.add(value);
                  if(bufferedElements.size() == threshold) {
                      out.collect(bufferedElements);
                      bufferedElements.clear();
                  }
              }

              /**
               * 進行checkpoint快照
               * @param context
               * @throws Exception
               */

              @Override
              public void snapshotState(FunctionSnapshotContext context) throws Exception {
                  checkpointedState.clear();
                  for(Tuple2<String, Long> element : bufferedElements) {
                      checkpointedState.add(element);
                  }
              }

              @Override
              public void initializeState(FunctionInitializationContext context) throws Exception {
                  ListStateDescriptor<Tuple2<String, Long>> listStateDescriptor = new ListStateDescriptor(
                          "listState",
                          TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})
                  );
                  checkpointedState = context.getOperatorStateStore().getListState(listStateDescriptor);
                  // 如果是故障恢復(fù)
                  if(context.isRestored()) {
                      for(Tuple2<String, Long> element : checkpointedState.get()) {
                          bufferedElements.add(element);
                      }

                      checkpointedState.clear();
                  }
              }

              public static void main(String[] args) throws Exception {
                  StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
                  senv.getCheckpointConfig().setCheckpointInterval(500);
                  DataStream<Tuple2<String, Long>> dataStream = senv.fromElements(
                          Tuple2.of("a"50L),Tuple2.of("a"60L),Tuple2.of("a"70L),
                          Tuple2.of("b"50L),Tuple2.of("b"60L),Tuple2.of("b"70L),
                          Tuple2.of("c"50L),Tuple2.of("c"60L),Tuple2.of("c"70L)
                  );
                  dataStream
                          .flatMap(new OperateStateDemo(2))
                          .print();
                  senv.execute(OperateStateDemo.class.getSimpleName());
              }
          }

          三、狀態(tài)橫向擴展

          狀態(tài)的橫向擴展問題主要是指修改Flink應(yīng)用的并行度,確切的說,每個算子的并行實例數(shù)或算子子任務(wù)數(shù)發(fā)生了變化,應(yīng)用需要關(guān)停或啟動一些算子子任務(wù),某份在原來某個算子子任務(wù)上的狀態(tài)數(shù)據(jù)需要平滑更新到新的算子子任務(wù)上。

          Flink的Checkpoint就是一個非常好的在各算子間遷移狀態(tài)數(shù)據(jù)的機制。算子的本地狀態(tài)將數(shù)據(jù)生成快照(snapshot),保存到分布式存儲(如HDFS)上。橫向伸縮后,算子子任務(wù)個數(shù)變化,子任務(wù)重啟,相應(yīng)的狀態(tài)從分布式存儲上重建(restore)。

          state2

          對于Keyed State和Operator State這兩種狀態(tài),他們的橫向伸縮機制不太相同。由于每個Keyed State總是與某個Key相對應(yīng),當橫向伸縮時,Key總會被自動分配到某個算子子任務(wù)上,因此Keyed State會自動在多個并行子任務(wù)之間遷移。對于一個非KeyedStream,流入算子子任務(wù)的數(shù)據(jù)可能會隨著并行度的改變而改變。如上圖所示,假如一個應(yīng)用的并行度原來為2,那么數(shù)據(jù)會被分成兩份并行地流入兩個算子子任務(wù),每個算子子任務(wù)有一份自己的狀態(tài),當并行度改為3時,數(shù)據(jù)流被拆成3支,或者并行度改為1,數(shù)據(jù)流合并為1支,此時狀態(tài)的存儲也相應(yīng)發(fā)生了變化。對于橫向伸縮問題,Operator State有兩種狀態(tài)分配方式:一種是均勻分配,另一種是將所有狀態(tài)合并,再分發(fā)給每個實例上。

          四、檢查點機制

          為了使 Flink 的狀態(tài)具有良好的容錯性,F(xiàn)link 提供了檢查點機制 (CheckPoints) 。通過檢查點機制,F(xiàn)link 定期在數(shù)據(jù)流上生成 checkpoint barrier ,當某個算子收到 barrier 時,即會基于當前狀態(tài)生成一份快照,然后再將該 barrier 傳遞到下游算子,下游算子接收到該 barrier 后,也基于當前狀態(tài)生成一份快照,依次傳遞直至到最后的 Sink 算子上。當出現(xiàn)異常后,F(xiàn)link 就可以根據(jù)最近的一次的快照數(shù)據(jù)將所有算子恢復(fù)到先前的狀態(tài)。

          4.1、開啟檢查點 (checkpoint)

          默認情況下 checkpoint 是禁用的。通過調(diào)用 StreamExecutionEnvironment 的 enableCheckpointing(n) 來啟用 checkpoint,里面的 n 是進行 checkpoint 的間隔,單位毫秒。

          Checkpoint是Flink實現(xiàn)容錯機制最核心的功能,它能夠根據(jù)配置周期性地基于Stream中各個Operator的狀態(tài)來生成Snapshot,從而將這些狀態(tài)數(shù)據(jù)定期持久化存儲下來,當Flink程序一旦意外崩潰時,重新運行程序時可以有選擇地從這些Snapshot進行恢復(fù),從而修正因為故障帶來的程序數(shù)據(jù)狀態(tài)中斷。這里,我們簡單理解一下Flink Checkpoint機制,如官網(wǎng)下圖所示:

          Flink-Checkpointing

          Checkpoint指定觸發(fā)生成時間間隔后,每當需要觸發(fā)Checkpoint時,會向Flink程序運行時的多個分布式的Stream Source中插入一個Barrier標記,這些Barrier會根據(jù)Stream中的數(shù)據(jù)記錄一起流向下游的各個Operator。當一個Operator接收到一個Barrier時,它會暫停處理Steam中新接收到的數(shù)據(jù)記錄。因為一個Operator可能存在多個輸入的Stream,而每個Stream中都會存在對應(yīng)的Barrier,該Operator要等到所有的輸入Stream中的Barrier都到達。當所有Stream中的Barrier都已經(jīng)到達該Operator,這時所有的Barrier在時間上看來是同一個時刻點(表示已經(jīng)對齊),在等待所有Barrier到達的過程中,Operator的Buffer中可能已經(jīng)緩存了一些比Barrier早到達Operator的數(shù)據(jù)記錄(Outgoing Records),這時該Operator會將數(shù)據(jù)記錄(Outgoing Records)發(fā)射(Emit)出去,作為下游Operator的輸入,最后將Barrier對應(yīng)Snapshot發(fā)射(Emit)出去作為此次Checkpoint的結(jié)果數(shù)據(jù)。

          Checkpoint 其他的屬性包括:

          • 精確一次(exactly-once)對比至少一次(at-least-once):你可以選擇向 enableCheckpointing(long interval, CheckpointingMode mode) 方法中傳入一個模式來選擇使用兩種保證等級中的哪一種。對于大多數(shù)應(yīng)用來說,精確一次是較好的選擇。至少一次可能與某些延遲超低(始終只有幾毫秒)的應(yīng)用的關(guān)聯(lián)較大。

          • checkpoint 超時:如果 checkpoint 執(zhí)行的時間超過了該配置的閾值,還在進行中的 checkpoint 操作就會被拋棄。

          • checkpoints 之間的最小時間:該屬性定義在 checkpoint 之間需要多久的時間,以確保流應(yīng)用在 checkpoint 之間有足夠的進展。如果值設(shè)置為了 5000,無論 checkpoint 持續(xù)時間與間隔是多久,在前一個 checkpoint 完成時的至少五秒后會才開始下一個 checkpoint。

          • 并發(fā) checkpoint 的數(shù)目: 默認情況下,在上一個 checkpoint 未完成(失敗或者成功)的情況下,系統(tǒng)不會觸發(fā)另一個 checkpoint。這確保了拓撲不會在 checkpoint 上花費太多時間,從而影響正常的處理流程。不過允許多個 checkpoint 并行進行是可行的,對于有確定的處理延遲(例如某方法所調(diào)用比較耗時的外部服務(wù)),但是仍然想進行頻繁的 checkpoint 去最小化故障后重跑的 pipelines 來說,是有意義的。

          • externalized checkpoints: 你可以配置周期存儲 checkpoint 到外部系統(tǒng)中。Externalized checkpoints 將他們的元數(shù)據(jù)寫到持久化存儲上并且在 job 失敗的時候不會被自動刪除。這種方式下,如果你的 job 失敗,你將會有一個現(xiàn)有的 checkpoint 去恢復(fù)。更多的細節(jié)請看 Externalized checkpoints 的部署文檔。

          • 在 checkpoint 出錯時使 task 失敗或者繼續(xù)進行 task:他決定了在 task checkpoint 的過程中發(fā)生錯誤時,是否使 task 也失敗,使失敗是默認的行為。或者禁用它時,這個任務(wù)將會簡單的把 checkpoint 錯誤信息報告給 checkpoint coordinator 并繼續(xù)運行。

          • 優(yōu)先從 checkpoint 恢復(fù)(prefer checkpoint for recovery):該屬性確定 job 是否在最新的 checkpoint 回退,即使有更近的 savepoint 可用,這可以潛在地減少恢復(fù)時間(checkpoint 恢復(fù)比 savepoint 恢復(fù)更快)。

          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          // 每 1000ms 開始一次 checkpoint
          env.enableCheckpointing(1000);
          // 高級選項:
          // 設(shè)置模式為精確一次 (這是默認值)
          env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
          // 確認 checkpoints 之間的時間會進行 500 ms
          env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
          // Checkpoint 必須在一分鐘內(nèi)完成,否則就會被拋棄
          env.getCheckpointConfig().setCheckpointTimeout(60000);
          // 同一時間只允許一個 checkpoint 進行
          env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
          // 開啟在 job 中止后仍然保留的 externalized checkpoints
          env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
          // 允許在有更近 savepoint 時回退到 checkpoint
          env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

          保存多個Checkpoint

          默認情況下,如果設(shè)置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,可以從最近的這個Checkpoint來進行恢復(fù)。但是,如果我們希望保留多個Checkpoint,并能夠根據(jù)實際需要選擇其中一個進行恢復(fù),這樣會更加靈活,比如,我們發(fā)現(xiàn)最近4個小時數(shù)據(jù)記錄處理有問題,希望將整個狀態(tài)還原到4小時之前。
          Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數(shù):

          state.checkpoints.num-retained: 20

          保留了最近的20個Checkpoint。如果希望會退到某個Checkpoint點,只需要指定對應(yīng)的某個Checkpoint路徑即可實現(xiàn)。

          checkpoint-list

          從Checkpoint進行恢復(fù)

          從指定的checkpoint處啟動,最近的一個/flink/checkpoints/workFlowCheckpoint/339439e2a3d89ead4d71ae3816615281/chk-1740584啟動,通常需要先停掉當前運行的flink-session,然后通過命令啟動:

          ../bin/flink run -p 10 -s /flink/checkpoints/workFlowCheckpoint/339439e2a3d89ead4d71ae3816615281/chk-1740584/_metadata -c com.code2144.helper_wink-1.0-SNAPSHOT.jar

          可以把命令放到腳本里面,每次直接執(zhí)行checkpoint恢復(fù)腳本即可:

          checkpoint-start

          4.2、保存點機制 (Savepoints)

          保存點機制 (Savepoints)是檢查點機制的一種特殊的實現(xiàn),它允許通過手工的方式來觸發(fā) Checkpoint,并將結(jié)果持久化存儲到指定路徑中,主要用于避免 Flink 集群在重啟或升級時導(dǎo)致狀態(tài)丟失。示例如下:

          # 觸發(fā)指定id的作業(yè)的Savepoint,并將結(jié)果存儲到指定目錄下
          bin/flink savepoint :jobId [:targetDirectory]

          手動savepoint

          /app/local/flink-1.6.2/bin/flink savepoint 0409251eaff826ef2dd775b6a2d5e219  [hdfs://bigdata/path]

          成功觸發(fā)savepoint通常會提示:Savepoint completed. Path: hdfs://path...

          savepoint-trigger

          手動取消任務(wù)

          checkpoint異常停止或者手動Kill掉不一樣,對于savepoint通常是我們想要手動停止任務(wù),然后更新代碼,可以使用flink cancel ...命令:

          /app/local/flink-1.6.2/bin/flink cancel 0409251eaff826ef2dd775b6a2d5e219

          從指定savepoint啟動job

          bin/flink run -p 8 -s hdfs:///flink/savepoints/savepoint-567452-9e3587e55980 -c com.code2144.helper_workflow.HelperWorkFlowStreaming jars/BSS-ONSS-Flink-1.0-SNAPSHOT.jar

          五、狀態(tài)后端

          Flink 提供了多種 state backends,它用于指定狀態(tài)的存儲方式和位置。

          狀態(tài)可以位于 Java 的堆或堆外內(nèi)存。取決于 state backend,F(xiàn)link 也可以自己管理應(yīng)用程序的狀態(tài)。為了讓應(yīng)用程序可以維護非常大的狀態(tài),F(xiàn)link 可以自己管理內(nèi)存(如果有必要可以溢寫到磁盤)。默認情況下,所有 Flink Job 會使用配置文件 flink-conf.yaml 中指定的 state backend。

          但是,配置文件中指定的默認 state backend 會被 Job 中指定的 state backend 覆蓋。

          5.1、狀態(tài)管理器分類

          MemoryStateBackend

          默認的方式,即基于 JVM 的堆內(nèi)存進行存儲,主要適用于本地開發(fā)和調(diào)試。

          FsStateBackend

          基于文件系統(tǒng)進行存儲,可以是本地文件系統(tǒng),也可以是 HDFS 等分布式文件系統(tǒng)。需要注意而是雖然選擇使用了 FsStateBackend ,但正在進行的數(shù)據(jù)仍然是存儲在 TaskManager 的內(nèi)存中的,只有在 checkpoint 時,才會將狀態(tài)快照寫入到指定文件系統(tǒng)上。

          RocksDBStateBackend

          RocksDBStateBackend 是 Flink 內(nèi)置的第三方狀態(tài)管理器,采用嵌入式的 key-value 型數(shù)據(jù)庫 RocksDB 來存儲正在進行的數(shù)據(jù)。等到 checkpoint 時,再將其中的數(shù)據(jù)持久化到指定的文件系統(tǒng)中,所以采用 RocksDBStateBackend 時也需要配置持久化存儲的文件系統(tǒng)。之所以這樣做是因為 RocksDB 作為嵌入式數(shù)據(jù)庫安全性比較低,但比起全文件系統(tǒng)的方式,其讀取速率更快;比起全內(nèi)存的方式,其存儲空間更大,因此它是一種比較均衡的方案。

          5.2、配置方式

          Flink 支持使用兩種方式來配置后端管理器:

          第一種方式:基于代碼方式進行配置,只對當前作業(yè)生效:

          // 配置 FsStateBackend
          env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
          // 配置 RocksDBStateBackend
          env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));

          配置 RocksDBStateBackend 時,需要額外導(dǎo)入下面的依賴:

          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
              <version>1.9.0</version>
          </dependency>

          第二種方式:基于 flink-conf.yaml 配置文件的方式進行配置,對所有部署在該集群上的作業(yè)都生效:

          state.backend: filesystem
          state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

          六、狀態(tài)一致性

          6.1、端到端(end-to-end)

          在真實應(yīng)用中,流處理應(yīng)用除了流處理器以外還包含了數(shù)據(jù)源(例如 Kafka)和輸出到持久化系統(tǒng)。

          端到端的一致性保證,意味著結(jié)果的正確性貫穿了整個流處理應(yīng)用的始終;每一個組件都保證了它自己的一致性,整個端到端的一致性級別取決于所有組件中一致性最弱的組件。具體可以劃分如下:

          • 內(nèi)部保證:依賴checkpoint

          • source 端:需要外部源可重設(shè)數(shù)據(jù)的讀取位置

          • sink 端:需要保證從故障恢復(fù)時,數(shù)據(jù)不會重復(fù)寫入外部系統(tǒng)。
            而對于sink端,又有兩種具體的實現(xiàn)方式:

          • 冪等(Idempotent)寫入:所謂冪等操作,是說一個操作,可以重復(fù)執(zhí)行很多次,但只導(dǎo)致一次結(jié)果更改,也就是說,后面再重復(fù)執(zhí)行就不起作用了。

          • 事務(wù)性(Transactional)寫入:需要構(gòu)建事務(wù)來寫入外部系統(tǒng),構(gòu)建的事務(wù)對應(yīng)著 checkpoint,等到 checkpoint 真正完成的時候,才把所有對應(yīng)的結(jié)果寫入 sink 系統(tǒng)中。

          對于事務(wù)性寫入,具體又有兩種實現(xiàn)方式:預(yù)寫日志(WAL)和兩階段提交(2PC)。Flink DataStream API 提供了GenericWriteAheadSink 模板類和 TwoPhaseCommitSinkFunction 接口,可以方便地實現(xiàn)這兩種方式的事務(wù)性寫入。

          6.2、Flink+Kafka 實現(xiàn)端到端的 exactly-once語義

          端到端的狀態(tài)一致性的實現(xiàn),需要每一個組件都實現(xiàn),對于Flink + Kafka的數(shù)據(jù)管道系統(tǒng)(Kafka進、Kafka出)而言,各組件怎樣保證exactly-once語義呢?

          • 內(nèi)部:利用checkpoint機制,把狀態(tài)存盤,發(fā)生故障的時候可以恢復(fù),保證內(nèi)部的狀態(tài)一致性

          • source:kafka consumer作為source,可以將偏移量保存下來,如果后續(xù)任務(wù)出現(xiàn)了故障,恢復(fù)的時候可以由連接器重置偏移量,重新消費數(shù)據(jù),保證一致性

          • sink:kafka producer作為sink,采用兩階段提交 sink,需要實現(xiàn)一個TwoPhaseCommitSinkFunction內(nèi)部的checkpoint機制。

          EXACTLY_ONCE語義簡稱EOS,指的是每條輸入消息只會影響最終結(jié)果一次,注意這里是影響一次,而非處理一次,F(xiàn)link一直宣稱自己支持EOS,實際上主要是對于Flink應(yīng)用內(nèi)部來說的,對于外部系統(tǒng)(端到端)則有比較強的限制

          • 外部系統(tǒng)寫入支持冪等性

          • 外部系統(tǒng)支持以事務(wù)的方式寫入

          Kafka在0.11版本之前只能保證At-Least-OnceAt-Most-Once語義,從0.11版本開始,引入了冪等發(fā)送和事務(wù),從而開始保證EXACTLY_ONCE語義。

          Maven依賴開始支持的版本生產(chǎn)/消費 類名kafka版本注意
          flink-connector-kafka-0.8_2.111.0.0FlinkKafkaConsumer08
          FlinkKafkaProducer08
          0.8.x使用Kafka內(nèi)部SimpleConsumer API. Flink把Offsets提交到ZK
          flink-connector-kafka-0.9_2.111.0.0FlinkKafkaConsumer09
          FlinkKafkaProducer09
          0.9.x使用新版Kafka  Consumer API.
          flink-connector-kafka-0.10_2.111.2.0FlinkKafkaConsumer010
          FlinkKafkaProducer010
          0.10.x支持Kafka生產(chǎn)/消費消息帶時間戳
          flink-connector-kafka-0.11_2.111.4.0FlinkKafkaConsumer011
          FlinkKafkaProducer011
          0.11.x由于0.11.x Kafka不支持scala 2.10。此連接器支持Kafka事務(wù)消息傳遞,以便為生產(chǎn)者提供exactly once語義。
          flink-connector-kafka_2.111.7.0FlinkKafkaConsumer
          FlinkKafkaProducer
          >=1.0.0高版本向后兼容。但是,對于Kafka 0.11.x和0.10.x版本,我們建議分別使用專用的flink-connector-Kafka-0.11_2.11link-connector-Kafka-0.10_2.11

          Flink在1.4.0版本引入了TwoPhaseCommitSinkFunction接口,封裝了兩階段提交邏輯,并在Kafka Sink connector中實現(xiàn)了TwoPhaseCommitSinkFunction,依賴Kafka版本為0.11+

          public class FlinkKafkaDemo {

              public static void main(String[] args) throws Exception {
                  StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
                  senv.enableCheckpointing(1000);
                  senv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

                  // kafka 數(shù)據(jù)源
                  Map<String, String> config = Configuration.initConfig("commons.xml");
                  Properties kafkaProps = new Properties();
                  kafkaProps.setProperty("bootstrap.servers", config.get("kafka-ipport"));
                  kafkaProps.setProperty("group.id", config.get("kafka-groupid"));

                  SingleOutputStreamOperator<String> dataStream = senv.addSource(
                          new FlinkKafkaConsumer011(
                                  config.get("kafka-topic"),
                                  new SimpleStringSchema(),
                                  kafkaProps
                          ));

                  // sink 到 kafka
                  FlinkKafkaProducer011<String> producer011 = new FlinkKafkaProducer011<>(
                          config.get("kafka-ipport"),
                          "test-kafka-producer",
                          new SimpleStringSchema());

                  producer011.setWriteTimestampToKafka(true);
                  dataStream.map(x -> {
                      // 拋出異常
                      if("name4".equals(JSON.parseObject(x).get("name"))){
                          System.out.println("name4 exception test...");
                          // throw new RuntimeException("name4 exception test...");
                      }

                      return x;
                  }).addSink(producer011);

                  senv.execute(FlinkKafkaDemo.class.getSimpleName());

              }

          }

          Flink由JobManager協(xié)調(diào)各個TaskManager進行checkpoint存儲,checkpoint保存在 StateBackend中,默認StateBackend是內(nèi)存級的,也可以改為文件級的進行持久化保存。

          eo-post-graphic-1-1

          當 checkpoint 啟動時,JobManager 會將檢查點分界線(barrier)注入數(shù)據(jù)流;barrier會在算子間傳遞下去。

          eo-post-graphic-2-1

          每個算子會對當前的狀態(tài)做個快照,保存到狀態(tài)后端。對于source任務(wù)而言,就會把當前的offset作為狀態(tài)保存起來。下次從checkpoint恢復(fù)時,source任務(wù)可以重新提交偏移量,從上次保存的位置開始重新消費數(shù)據(jù)。

          eo-post-graphic-3-1

          每個內(nèi)部的 transform 任務(wù)遇到 barrier 時,都會把狀態(tài)存到 checkpoint 里。

          sink 任務(wù)首先把數(shù)據(jù)寫入外部 kafka,這些數(shù)據(jù)都屬于預(yù)提交的事務(wù)(還不能被消費);當遇到 barrier時,把狀態(tài)保存到狀態(tài)后端,并開啟新的預(yù)提交事務(wù)。

          eo-post-graphic-4-1

          當所有算子任務(wù)的快照完成,也就是這次的 checkpoint 完成時,JobManager 會向所有任務(wù)發(fā)通知,確認這次 checkpoint 完成。當sink 任務(wù)收到確認通知,就會正式提交之前的事務(wù),kafka 中未確認的數(shù)據(jù)就改為“已確認”,數(shù)據(jù)就真正可以被消費了。

          eo-post-graphic-6-1

          所以看到,執(zhí)行過程實際上是一個兩段式提交,每個算子執(zhí)行完成,會進行“預(yù)提交”,直到執(zhí)行完sink操作,會發(fā)起“確認提交”,如果執(zhí)行失敗,預(yù)提交會放棄掉。

          具體的兩階段提交步驟總結(jié)如下:

          • 第一條數(shù)據(jù)來了之后,開啟一個 kafka 的事務(wù)(transaction),正常寫入 kafka 分區(qū)日志但標記為未提交,這就是“預(yù)提交”, jobmanager 觸發(fā) checkpoint 操作,barrier 從 source 開始向下傳遞,遇到 barrier 的算子將狀態(tài)存入狀態(tài)后端,并通知 jobmanager

          • sink 連接器收到 barrier,保存當前狀態(tài),存入 checkpoint,通知 jobmanager,并開啟下一階段的事務(wù),用于提交下個檢查點的數(shù)據(jù)

          • jobmanager 收到所有任務(wù)的通知,發(fā)出確認信息,表示 checkpoint 完成

          • sink 任務(wù)收到 jobmanager 的確認信息,正式提交這段時間的數(shù)據(jù)

          • 外部kafka關(guān)閉事務(wù),提交的數(shù)據(jù)可以正常消費了。

          所以也可以看到,如果宕機需要通過StateBackend進行恢復(fù),只能恢復(fù)所有確認提交的操作。

          6.3、Kafka冪等性和事務(wù)

          前面表格總結(jié)的可以看出,Kafka在0.11版本之前只能保證At-Least-OnceAt-Most-Once語義,從0.11版本開始,引入了冪等發(fā)送和事務(wù),從而開始保證EXACTLY_ONCE語義。

          冪等性

          在未引入冪等性時,Kafka正常發(fā)送和重試發(fā)送消息流程圖如下:


          為了實現(xiàn)Producer的冪等語義,Kafka引入了Producer ID(即PID)和Sequence Number。每個新的Producer在初始化的時候會被分配一個唯一的PID,該PID對用戶完全透明而不會暴露給用戶。

          Producer發(fā)送每條消息對于Sequence Number會從0開始單調(diào)遞增,broker端會為每個維護一個序號,每次commit一條消息此序號加一,對于接收的每條消息,如果其序號比Broker維護的序號(即最后一次Commit的消息的序號)大1以上,則Broker會接受它,否則將其丟棄:

          • 序號比Broker維護的序號大1以上,說明存在亂序。

          • 序號比Broker維護的序號小,說明此消息以及被保存,為重復(fù)數(shù)據(jù)。

          有了冪等性,Kafka正常發(fā)送和重試發(fā)送消息流程圖如下:


          事務(wù)

          事務(wù)是指所有的操作作為一個原子,要么都成功,要么都失敗,而不會出現(xiàn)部分成功或部分失敗的可能。舉個例子,比如小明給小王轉(zhuǎn)賬1000元,那首先小明的賬戶會減去1000,然后小王的賬戶會增加1000,這兩個操作就必須作為一個事務(wù),否則就會出現(xiàn)只減不增或只增不減的問題,因此要么都失敗,表示此次轉(zhuǎn)賬失敗。要么都成功,表示此次轉(zhuǎn)賬成功。分布式下為了保證事務(wù),一般采用兩階段提交協(xié)議。

          為了解決跨session和所有分區(qū)不能EXACTLY-ONCE問題,Kafka從0.11開始引入了事務(wù)。

          為了支持事務(wù),Kafka引入了Transacation Coordinator來協(xié)調(diào)整個事務(wù)的進行,并可將事務(wù)持久化到內(nèi)部topic里,類似于offset和group的保存。

          用戶為應(yīng)用提供一個全局的Transacation ID,應(yīng)用重啟后Transacation ID不會改變。為了保證新的Producer啟動后,舊的具有相同Transaction ID的Producer即失效,每次Producer通過Transaction ID拿到PID的同時,還會獲取一個單調(diào)遞增的epoch。由于舊的Producer的epoch比新Producer的epoch小,Kafka可以很容易識別出該Producer是老的Producer并拒絕其請求。有了Transaction ID后,Kafka可保證:

          • 跨Session的數(shù)據(jù)冪等發(fā)送。當具有相同Transaction ID的新的Producer實例被創(chuàng)建且工作時,舊的Producer停止工作。

          • 跨Session的事務(wù)恢復(fù)。如果某個應(yīng)用實例宕機,新的實例可以保證任何未完成的舊的事務(wù)要么Commit要么Abort,使得新實例從一個正常狀態(tài)開始工作。

          KIP-98 對Kafka事務(wù)原理進行了詳細介紹,完整的流程圖如下:

          • Producer向任意一個brokers發(fā)送 FindCoordinatorRequest請求來獲取Transaction Coordinator的地址;

          • 找到Transaction Coordinator后,具有冪等特性的Producer必須發(fā)起InitPidRequest請求以獲取PID。

          • 調(diào)用beginTransaction()方法開啟一個事務(wù),Producer本地會記錄已經(jīng)開啟了事務(wù),但Transaction Coordinator只有在Producer發(fā)送第一條消息后才認為事務(wù)已經(jīng)開啟。

          • Consume-Transform-Produce這一階段,包含了整個事務(wù)的數(shù)據(jù)處理過程,并且包含了多種請求。

          • 提交或回滾事務(wù) 一旦上述數(shù)據(jù)寫入操作完成,應(yīng)用程序必須調(diào)用KafkaProducer的commitTransaction方法或者abortTransaction方法以結(jié)束當前事務(wù)。

          6.4 兩階段提交協(xié)議

          兩階段提交指的是一種協(xié)議,經(jīng)常用來實現(xiàn)分布式事務(wù),可以簡單理解為預(yù)提交+實際提交,一般分為協(xié)調(diào)器Coordinator(以下簡稱C)和若干事務(wù)參與者Participant(以下簡稱P)兩種角色。

          • C先將prepare請求寫入本地日志,然后發(fā)送一個prepare的請求給P

          • P收到prepare請求后,開始執(zhí)行事務(wù),如果執(zhí)行成功返回一個Yes或OK狀態(tài)給C,否則返回No,并將狀態(tài)存到本地日志。

          • C收到P返回的狀態(tài),如果每個P的狀態(tài)都是Yes,則開始執(zhí)行事務(wù)Commit操作,發(fā)Commit請求給每個P,P收到Commit請求后各自執(zhí)行Commit事務(wù)操作。如果至少一個P的狀態(tài)為No,則會執(zhí)行Abort操作,發(fā)Abort請求給每個P,P收到Abort請求后各自執(zhí)行Abort事務(wù)操作。
            :C或P把發(fā)送或接收到的消息先寫到日志里,主要是為了故障后恢復(fù)用,類似WAL


          七、鏈接文檔

          橫向擴展相關(guān)來于:Flink狀態(tài)管理詳解:Keyed State和Operator List State深度解析
          checkpoint 相關(guān)來于:Apache Flink v1.10 官方中文文檔
          狀態(tài)一致性相關(guān)來于:再忙也需要看的Flink狀態(tài)管理


          猜你喜歡
          Flink之Watermark詳解
          Flink實時計算topN熱榜
          數(shù)倉建模分層理論
          數(shù)據(jù)湖是誰?那數(shù)據(jù)倉庫又算什么?
          Hive整合Hbase
          數(shù)倉架構(gòu)發(fā)展史


          瀏覽 81
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  精品久久久国产 | 国产色婷婷免费视频 | 成人色色在线 | 日本亚洲女人在线视频 | 97成人网站 |