<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink+Iceberg環(huán)境搭建及生產(chǎn)問題處理

          共 14408字,需瀏覽 29分鐘

           ·

          2022-07-31 18:42

          全網(wǎng)最全大數(shù)據(jù)面試提升手冊!

          概述

          作為實時計算的新貴,F(xiàn)link受到越來越多公司的青睞,它強大的流批一體的處理能力可以很好地解決流處理和批處理需要構(gòu)建實時和離線兩套處理平臺的問題,可以通過一套Flink處理完成,降低成本,F(xiàn)link結(jié)合數(shù)據(jù)湖的處理方式可以滿足我們實時數(shù)倉和離線數(shù)倉的需求,構(gòu)建一套數(shù)據(jù)湖,存儲多樣化的數(shù)據(jù),實現(xiàn)離線查詢和實時查詢的需求。目前數(shù)據(jù)湖方面有Hudi和Iceberg,Hudi屬于相對成熟的數(shù)據(jù)湖方案,主要用于增量的數(shù)據(jù)處理,它跟spark結(jié)合比較緊密,F(xiàn)link結(jié)合Hudi的方案目前應用不多。Iceberg屬于數(shù)據(jù)湖的后起之秀,可以實現(xiàn)高性能的分析與可靠的數(shù)據(jù)管理,目前跟Flink集合方面相對較好。

          安裝

          本次主要基于flink+iceberg進行環(huán)境搭建。

          1.安裝flink

          安裝并啟動hadoop、hive等相關(guān)環(huán)境。

          下載flink安裝包,解壓后安裝:

          下載地址: https://archive.apache.org/dist/flink/flink-1.11.3/

          wget https://downloads.apache.org/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.12.tgz
          tar xzvf flink-1.11.1-bin-scala_2.12.tgz

          導入hadoop的環(huán)境包,flink-sql會使用到hdfs和hive等相關(guān)依賴包進行通訊。

          export HADOOP_CLASSPATH=$HADOOP_HOME/bin/hadoop classpath

          啟動flink集群

          ./bin/start-cluster.sh

          注:這里會遇到第一個坑,iceberg-0.11.1支持的是flink1.11的版本,如果使用過高的版本,會報一堆找不到類和方法的異常(因為flink1.12版本刪掉了許多API)。請使用Flink1.11.x版本進行安裝。

          2.下載Iceberg環(huán)境包

          主要是/iceberg-flink-runtime-xxx.jar和flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar兩個jar包。

          下載地址:

          https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/0.11.1/

          https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive

          3.啟動Flink-sql

          執(zhí)行命令啟動flink-sql。

          ./bin/sql-client.sh embedded
          -j /iceberg-flink-runtime-xxx.jar
          -j /flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar
          shell

          4.創(chuàng)建Catalog

          Flink支持hadoop、hive、自定義三種Catalog。這里以Hive為例。

          注:這里會遇到第二個坑,iceberg和flink當前版本支持的是hive2.3.x的版本,推薦安裝hive2.3.8版本。不然也會遇到一堆找不到方法和類的異常。

          執(zhí)行命令,創(chuàng)建hive類型的Catalog。

          CREATE CATALOG hive_catalog WITH (
            'type'='iceberg',
            'catalog-type'='hive',
            'uri'='thrift://server1:9083',
            'clients'='5',
            'property-version'='1',
            'warehouse'='hdfs://server1/user/hive/warehouse'
          );

          創(chuàng)建成功后的提示

          5.創(chuàng)建表

          創(chuàng)建DataBase:

          create iceberg_db;
          use iceberg_db;

          創(chuàng)建表:

          CREATE TABLE test (
              id BIGINT COMMENT 'unique id',
              busi_date STRING

          6.插入數(shù)據(jù)和Flink任務(wù)執(zhí)行情況

          執(zhí)行sql插入數(shù)據(jù)。

          可以在Flink任務(wù)中看到相應的Job。

          7.Iceberg組件介紹

          IcebergStreamWriter

          主要用來寫入記錄到對應的 avro、parquet、orc 文件,生成一個對應的 Iceberg DataFile,并發(fā)送給下游算子。

          另外一個叫做 IcebergFilesCommitter,主要用來在 checkpoint 到來時把所有的 DataFile 文件收集起來,并提交 Transaction 到 Apache Iceberg,完成本次 checkpoint 的數(shù)據(jù)寫入,生成 DataFile。

          IcebergFilesCommitter

          為每個 checkpointId 維護了一個 DataFile 文件列表,即 map,這樣即使中間有某個 checkpoint 的 transaction 提交失敗了,它的 DataFile 文件仍然維護在 State 中,依然可以通過后續(xù)的 checkpoint 來提交數(shù)據(jù)到 Iceberg 表中。

          在Flink的任務(wù)日志中,可以看到對應IcebergStreamWriter和IcebergFilesCommitter的信息,以及snap的ID(3509023638495847835)。

          8.Iceberg文件結(jié)構(gòu)介紹

          在HDFS系統(tǒng)中觀察Iceberg的整個目錄結(jié)構(gòu),可以看到分為data和metadata兩個目錄,對應開篇介紹的Iceberg文件結(jié)構(gòu)。

          下圖中可看到Iceberg文件包含了數(shù)據(jù)文件、元數(shù)據(jù)和快照、manifest清單和manifest。

          觀察Iceberg的表元數(shù)據(jù)文件

          hadoop dfs text /user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/00004-afffb920-e788-437e-80f0-4187a42ae74b.metadata.json

          可以看到對應的快照信息,表的版本、更新時間戳、manifest清單文件地址等信息。具體的字段描述可以參考官網(wǎng)介紹:https://iceberg.apache.org/spec/#iceberg-table-spec

          這里可以看到剛剛Flink任務(wù)插入的快照信息(3509023638495847835)

          觀察manifest清單和manifest文件

          9.分區(qū)表

          采集分區(qū)表并插入數(shù)據(jù)。

          CREATE TABLE t_partition (
              id BIGINT COMMENT 'unique id',
              busi_date STRING
          ) PARTITIONED BY (busi_date);

          可以看到表文件通過分區(qū)目錄進行了劃分,提高查詢效率。

          10.Iceberg執(zhí)行計劃

          11.通過Flink代碼的方式操作Iceberg

          package com.hyr.flink.iceberg

          import org.apache.flink.configuration.{Configuration, RestOptions}
          import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
          import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

          object IcebergDemo {

            def main(args: Array[String]): Unit = {
              val conf: Configuration = new Configuration()
              // 自定義web端口
              conf.setInteger(RestOptions.PORT, 9000)
              val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
              streamEnv.setParallelism(1)
              val tenv = StreamTableEnvironment.create(streamEnv)

              // add hadoop config file
              tenv.executeSql("CREATE CATALOG hive_catalog WITH (\n  'type'='iceberg',\n  'catalog-type'='hive',\n  'uri'='thrift://server1:9083',\n  'clients'='5',\n  'property-version'='1',\n  'warehouse'='hdfs://server1:8020/user/hive/warehouse'\n)");

              tenv.useCatalog("hive_catalog");
              tenv.executeSql("show databases").print()
              tenv.useDatabase("iceberg_db")
              tenv.executeSql("show tables").print()
              tenv.executeSql("select id from test").print() 
            }
          }

          完整的一個表元數(shù)據(jù)信息文件:

          {
            "format-version" : 1,
            "table-uuid" : "cfa12929-0f4c-475c-aca0-7c9cc411a1ac",
            "location" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test",
            "last-updated-ms" : 1622771727393,
            "last-column-id" : 2,
            "schema" : {
              "type" : "struct",
              "fields" : [ {
                "id" : 1,
                "name" : "id",
                "required" : false,
                "type" : "long"
              }, {
                "id" : 2,
                "name" : "data",
                "required" : false,
                "type" : "string"
              } ]
            },
            "partition-spec" : [ ],
            "default-spec-id" : 0,
            "partition-specs" : [ {
              "spec-id" : 0,
              "fields" : [ ]
            } ],
            "default-sort-order-id" : 0,
            "sort-orders" : [ {
              "order-id" : 0,
              "fields" : [ ]
            } ],
            "properties" : { },
            "current-snapshot-id" : 555628243696744305,
            "snapshots" : [ {
              "snapshot-id" : 8531001366494199026,
              "timestamp-ms" : 1622770732247,
              "summary" : {
                "operation" : "append",
                "flink.job-id" : "371316a9b274ff09f85957afe730e25d",
                "flink.max-committed-checkpoint-id" : "9223372036854775807",
                "added-data-files" : "1",
                "added-records" : "1",
                "added-files-size" : "637",
                "changed-partition-count" : "1",
                "total-records" : "1",
                "total-data-files" : "1",
                "total-delete-files" : "0",
                "total-position-deletes" : "0",
                "total-equality-deletes" : "0"
              },
              "manifest-list" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/snap-8531001366494199026-1-63278140-aca4-4cd7-bffc-1e7d0e4b4b1b.avro"
            }, {
              "snapshot-id" : 626484522728673979,
              "parent-snapshot-id" : 8531001366494199026,
              "timestamp-ms" : 1622770733546,
              "summary" : {
                "operation" : "append",
                "flink.job-id" : "bbdcfb52195a0b0b556c6a167fc3de9f",
                "flink.max-committed-checkpoint-id" : "9223372036854775807",
                "added-data-files" : "1",
                "added-records" : "1",
                "added-files-size" : "636",
                "changed-partition-count" : "1",
                "total-records" : "2",
                "total-data-files" : "2",
                "total-delete-files" : "0",
                "total-position-deletes" : "0",
                "total-equality-deletes" : "0"
              },
              "manifest-list" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/snap-626484522728673979-1-a79e0789-dbc0-4a45-b57d-73575cdccb1d.avro"
            }, {
              "snapshot-id" : 4382866461439510817,
              "parent-snapshot-id" : 626484522728673979,
              "timestamp-ms" : 1622770735121,
              "summary" : {
                "operation" : "append",
                "flink.job-id" : "947829c23ca09fba470204f5b146c191",
                "flink.max-committed-checkpoint-id" : "9223372036854775807",
                "added-data-files" : "1",
                "added-records" : "1",
                "added-files-size" : "637",
                "changed-partition-count" : "1",
                "total-records" : "3",
                "total-data-files" : "3",
                "total-delete-files" : "0",
                "total-position-deletes" : "0",
                "total-equality-deletes" : "0"
              },
              "manifest-list" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/snap-4382866461439510817-1-1d78fbb1-7c97-4c3a-b47d-deef81273d0e.avro"
            }, {
              "snapshot-id" : 555628243696744305,
              "parent-snapshot-id" : 4382866461439510817,
              "timestamp-ms" : 1622771727393,
              "summary" : {
                "operation" : "append",
                "flink.job-id" : "f30e7cd040204f737ba8aaf0350340f7",
                "flink.max-committed-checkpoint-id" : "9223372036854775807",
                "added-data-files" : "1",
                "added-records" : "1",
                "added-files-size" : "637",
                "changed-partition-count" : "1",
                "total-records" : "4",
                "total-data-files" : "4",
                "total-delete-files" : "0",
                "total-position-deletes" : "0",
                "total-equality-deletes" : "0"
              },
              "manifest-list" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/snap-555628243696744305-1-194acdcb-abdf-4acd-8bc1-5de6d4bb76b0.avro"
            } ],
            "snapshot-log" : [ {
              "timestamp-ms" : 1622770732247,
              "snapshot-id" : 8531001366494199026
            }, {
              "timestamp-ms" : 1622770733546,
              "snapshot-id" : 626484522728673979
            }, {
              "timestamp-ms" : 1622770735121,
              "snapshot-id" : 4382866461439510817
            }, {
              "timestamp-ms" : 1622771727393,
              "snapshot-id" : 555628243696744305
            } ],
            "metadata-log" : [ {
              "timestamp-ms" : 1622770665028,
              "metadata-file" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/00000-8bec1008-0d2d-4dac-82a6-387d9354b2bc.metadata.json"
            }, {
              "timestamp-ms" : 1622770732247,
              "metadata-file" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/00001-66552574-1458-40a6-8ebc-2d5f2c58a65e.metadata.json"
            }, {
              "timestamp-ms" : 1622770733546,
              "metadata-file" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/00002-1b1fb277-d165-46cd-a3ee-f2b6c358213f.metadata.json"
            }, {
              "timestamp-ms" : 1622770735121,
              "metadata-file" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/00003-56ffa5b6-bac5-4a1b-9e8d-2f36fe379610.metadata.json"
            } ]
          }

          如果這個文章對你有幫助,不要忘記 「在看」 「點贊」 「收藏」 三連啊喂!

          2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專家級技能模型與學習指南(勝天半子篇)
          互聯(lián)網(wǎng)最壞的時代可能真的來了
          我在B站讀大學,大數(shù)據(jù)專業(yè)
          我們在學習Flink的時候,到底在學習什么?
          193篇文章暴揍Flink,這個合集你需要關(guān)注一下
          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點
          我們在學習Spark的時候,到底在學習什么?
          在所有Spark模塊中,我愿稱SparkSQL為最強!
          硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
          數(shù)據(jù)治理方法論和實踐小百科全書
          標簽體系下的用戶畫像建設(shè)小指南
          4萬字長文 | ClickHouse基礎(chǔ)&實踐&調(diào)優(yōu)全視角解析
          【面試&個人成長】2021年過半,社招和校招的經(jīng)驗之談
          大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結(jié)
          我寫過的關(guān)于成長/面試/職場進階的文章
          當我們在學習Hive的時候在學習什么?「硬剛Hive續(xù)集」
          瀏覽 64
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  一道本高清无码在线看 | 天堂网av在线 | 色色色色综合 | 爱爱199极品 | 无码视频在线免费播放 |