<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink1.12集成Hive打造自己的批流一體數(shù)倉

          共 12890字,需瀏覽 26分鐘

           ·

          2021-01-15 00:48

          點擊上方藍色字體,選擇“設為星標
          回復”資源“獲取更多資源
          ? ?簡介

          小編在去年之前分享過參與的實時數(shù)據(jù)平臺的建設,關于實時數(shù)倉也進行過分享。客觀的說,我們當時做不到批流一體,小編當時的方案是將實時消息數(shù)據(jù)每隔15分鐘文件同步到離線數(shù)據(jù)平臺,然后用同一套SQL代碼進行離線入庫操作。

          但是隨著 Flink1.12版本的發(fā)布,F(xiàn)link使用HiveCatalog可以通過批或者流的方式來處理Hive中的表。這就意味著Flink既可以作為Hive的一個批處理引擎,也可以通過流處理的方式來讀寫Hive中的表,從而為實時數(shù)倉的應用和流批一體的落地實踐奠定了堅實的基礎。

          Flink 與 Hive 的集成包含兩個層面。

          • 一是利用了 Hive 的 MetaStore 作為持久化的 Catalog,用戶可通過HiveCatalog將不同會話中的 Flink 元數(shù)據(jù)存儲到 Hive Metastore 中。例如,用戶可以使用HiveCatalog將其 Kafka 表或 Elasticsearch 表存儲在 Hive Metastore 中,并后續(xù)在 SQL 查詢中重新使用它們。

          • 二是利用 Flink 來讀寫 Hive 的表。

          HiveCatalog的設計提供了與 Hive 良好的兼容性,用戶可以”開箱即用”的訪問其已有的 Hive 數(shù)倉。您不需要修改現(xiàn)有的 Hive Metastore,也不需要更改表的數(shù)據(jù)位置或分區(qū)。

          Flink1.12 對Hive的支持

          從 1.11.0 開始,在使用 Hive 方言時,F(xiàn)link 允許用戶用 Hive 語法來編寫 SQL 語句。通過提供與 Hive 語法的兼容性,我們旨在改善與 Hive 的互操作性,并減少用戶需要在 Flink 和 Hive 之間切換來執(zhí)行不同語句的情況。

          Flink 支持的 Hive 版本如下圖所示:

          某些功能是否可用取決于您使用的 Hive 版本,這些限制不是由 Flink 所引起的:

          • Hive 內(nèi)置函數(shù)在使用 Hive-1.2.0 及更高版本時支持。

          • 列約束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本時支持。

          • 更改表的統(tǒng)計信息,在使用 Hive-1.2.0 及更高版本時支持。

          • DATE列統(tǒng)計信息,在使用 Hive-1.2.0 及更高版時支持。

          • 使用 Hive-2.0.x 版本時不支持寫入 ORC 表。

          要與 Hive 集成,我們需要在 Flink 下的/lib/目錄中添加一些額外的依賴包, 以便通過 Table API 或 SQL Client 與 Hive 進行交互。

          Apache Hive 是基于 Hadoop 之上構建的, 首先您需要 Hadoop 的依賴:

          export HADOOP_CLASSPATH=`hadoop classpath`

          有兩種添加 Hive 依賴項的方法。第一種是使用 Flink 提供的 Hive Jar包。我們根據(jù)使用的 Metastore 的版本來選擇對應的 Hive jar。第二個方式是分別添加每個所需的 jar 包。如果您使用的 Hive 版本尚未在此處列出,則第二種方法會更適合。

          注意:建議優(yōu)先使用 Flink 提供的 Hive jar 包。僅在 Flink 提供的 Hive jar 不滿足您的需求時,再考慮使用分開添加 jar 包的方式。

          本文我們使用的Flink和Hive版本是1.12+2.3.6,集成Hive時還需要一些額外的Jar包依賴,將其放置在Flink安裝目錄下的lib文件夾下,這樣我們才能通過 Table API 或 SQL Client 與 Hive 進行交互。

          下圖列舉了Hive版本相對應的Jar包:

          此外,我們還需要添加下面兩個jar包:

          flink-connector-hive_2.11-1.12.0.jar和hive-exec-2.3.6.jar。其中hive-exec-2.3.6.jar包我們可以在Hive安裝路徑下的lib文件夾中找到。官網(wǎng)給出了下載地址,大家可以參考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/hive/#%E4%BE%9D%E8%B5%96%E9%A1%B9

          如果你需要構建工程,那么只需要在pom.xml中新增以下依賴即可:



          org.apache.flink
          flink-connector-hive_2.11
          1.12.0
          provided



          org.apache.flink
          flink-table-api-java-bridge_2.11
          1.12.0
          provided




          org.apache.hive
          hive-exec
          ${hive.version}
          provided

          使用Blink Planner連接Hive

          請大家注意,F(xiàn)link 1.12版本中雖然 HiveCatalog 不需要特定的 planner,但讀寫Hive表僅適用于 Blink planner。因此,強烈建議您在連接到Hive倉庫時使用 Blink planner。

          EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
          TableEnvironment tableEnv = TableEnvironment.create(settings);

          String name = "myhive";
          String defaultDatabase = "mydatabase";
          String hiveConfDir = "/opt/hive-conf";

          HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
          tableEnv.registerCatalog("myhive", hive);

          #### 然后我們就可以愉快的對Hive表進行操作了,例如:

          // 隨便讀取一些數(shù)據(jù)
          String mySql = "select * from my_table limit 10'";

          TableResult result = tableEnv.executeSql(mySql);
          System.out.println(result.getJobClient().get().getJobStatus());

          下表列出了通過 YAML 文件或 DDL 定義 HiveCatalog 時所支持的參數(shù)。后續(xù)的版本規(guī)劃中將會支持在 Flink 中創(chuàng)建 Hive 表,視圖,分區(qū)和函數(shù)的DDL。

          使用 FlinkCli 連接Hive

          我們把三個必須依賴的包放到 Flink 的lib目錄下:

          flink-sql-connector-hive-2.3.6
          flink-connector-hive_2.11-1.12.0.jar
          hive-exec-2.3.4.jar

          然后,最重要的一步來了我們需要修改 conf/sql-cli-defaults.yaml配置文件:

          execution:
          planner: blink
          type: streaming
          ...
          current-catalog: myhive # set the HiveCatalog as the current catalog of the session
          current-database: mydatabase

          catalogs:
          - name: myhive
          type: hive
          hive-conf-dir: /opt/hive-conf # contains hive-site.xml

          然后就可以愉快的玩耍了:

          ##命令行啟動 
          bin/sql-client.sh embedded

          使用Hive Dialect

          Flink 目前支持兩種 SQL 方言: default 和 hive。你需要先切換到 Hive 方言,然后才能使用 Hive 語法編寫。下面介紹如何使用 SQL 客戶端和 Table API 設置方言。還要注意,你可以為執(zhí)行的每個語句動態(tài)切換方言。無需重新啟動會話即可使用其他方言。

          方言切換

          SQL 方言可以通過 table.sql-dialect 屬性指定。我們需要在sql-client-defaults.yaml配置文件中進行配置:

          execution:
          planner: blink
          type: batch
          result-mode: table

          configuration:
          table.sql-dialect: hive

          同樣我們也可以在 SQL 客戶端啟動后設置方言:

          Flink SQL> set table.sql-dialect=hive; -- to use hive dialect
          [INFO] Session property has been set.

          Flink SQL> set table.sql-dialect=default; -- to use default dialect
          [INFO] Session property has been set.

          Table API中使用Dialect

          EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();
          TableEnvironment tableEnv = TableEnvironment.create(settings);
          // to use hive dialect
          tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
          // to use default dialect
          tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

          以下是使用 Hive 方言的一些注意事項。

          • Hive 方言只能用于操作 Hive 表,不能用于一般表。Hive 方言應與HiveCatalog一起使用。

          • 雖然所有 Hive 版本支持相同的語法,但是一些特定的功能是否可用仍取決于你使用的Hive 版本。例如,更新數(shù)據(jù)庫位置 只在 Hive-2.4.0 或更高版本支持。

          • Hive 和 Calcite 有不同的保留關鍵字集合。例如,default 是 Calcite 的保留關鍵字,卻不是 Hive 的保留關鍵字。即使使用 Hive 方言, 也必須使用反引號引用此類關鍵字才能將其用作標識符。

          • 由于擴展的查詢語句的不兼容性,在 Flink 中創(chuàng)建的視圖是不能在 Hive 中查詢的。

          使用Hive UDF

          在 Flink SQL 和 Table API 中,可以通過系統(tǒng)內(nèi)置的 HiveModule 來使用 Hive 內(nèi)置函數(shù),

          String name            = "myhive";
          String version = "2.3.4";
          tableEnv.loadModue(name, new HiveModule(version));

          在 Flink 中用戶可以使用 Hive 里已經(jīng)存在的 UDF 函數(shù)。

          支持的 UDF 類型包括:

          • UDF

          • GenericUDF

          • GenericUDTF

          • UDAF

          • GenericUDAFResolver2

          在進行查詢規(guī)劃和執(zhí)行時,Hive UDF 和 GenericUDF 函數(shù)會自動轉換成 Flink 中的 ScalarFunction,GenericUDTF 會被自動轉換成 Flink 中的 TableFunction,UDAF 和 GenericUDAFResolver2 則轉換成 Flink 聚合函數(shù)(AggregateFunction)。

          想要使用 Hive UDF 函數(shù),需要如下幾步:

          • 通過 Hive Metastore 將帶有 UDF 的 HiveCatalog 設置為當前會話的 catalog 后端。

          • 將帶有 UDF 的 jar 包放入 Flink classpath 中,并在代碼中引入。

          • 使用 Blink planner。

          假設我們在 Hive Metastore 中已經(jīng)注冊了下面的 UDF 函數(shù):

          /**
          * 注冊為 'myudf' 的簡單 UDF 測試類.
          */
          public class TestHiveSimpleUDF extends UDF {

          public IntWritable evaluate(IntWritable i) {
          return new IntWritable(i.get());
          }

          public Text evaluate(Text text) {
          return new Text(text.toString());
          }
          }

          /**
          * 注冊為 'mygenericudf' 的普通 UDF 測試類
          */
          public class TestHiveGenericUDF extends GenericUDF {

          @Override
          public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
          checkArgument(arguments.length == 2);

          checkArgument(arguments[1] instanceof ConstantObjectInspector);
          Object constant = ((ConstantObjectInspector) arguments[1]).getWritableConstantValue();
          checkArgument(constant instanceof IntWritable);
          checkArgument(((IntWritable) constant).get() == 1);

          if (arguments[0] instanceof IntObjectInspector ||
          arguments[0] instanceof StringObjectInspector) {
          return arguments[0];
          } else {
          throw new RuntimeException("Not support argument: " + arguments[0]);
          }
          }

          @Override
          public Object evaluate(DeferredObject[] arguments) throws HiveException {
          return arguments[0].get();
          }

          @Override
          public String getDisplayString(String[] children) {
          return "TestHiveGenericUDF";
          }
          }

          /**
          * 注冊為 'mygenericudtf' 的字符串分割 UDF 測試類
          */
          public class TestHiveUDTF extends GenericUDTF {

          @Override
          public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
          checkArgument(argOIs.length == 2);

          // TEST for constant arguments
          checkArgument(argOIs[1] instanceof ConstantObjectInspector);
          Object constant = ((ConstantObjectInspector) argOIs[1]).getWritableConstantValue();
          checkArgument(constant instanceof IntWritable);
          checkArgument(((IntWritable) constant).get() == 1);

          return ObjectInspectorFactory.getStandardStructObjectInspector(
          Collections.singletonList("col1"),
          Collections.singletonList(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
          }

          @Override
          public void process(Object[] args) throws HiveException {
          String str = (String) args[0];
          for (String s : str.split(",")) {
          forward(s);
          forward(s);
          }
          }

          @Override
          public void close() {
          }
          }

          在 Hive CLI 中,可以查詢到已經(jīng)注冊的 UDF 函數(shù):

          hive> show functions;
          OK
          ......
          mygenericudf
          myudf
          myudtf

          此時,用戶如果想使用這些 UDF,在 SQL 中就可以這樣寫:

          Flink SQL> select mygenericudf(myudf(name), 1) as a, mygenericudf(myudf(age), 1) as b, s from mysourcetable, lateral table(myudtf(name, 1)) as T(s);

          此外,F(xiàn)link1.12有了一個巨大的突破就是和Hive進行維表Join。我們在官網(wǎng)上可以看到如下信息:

          Flink support tracking the latest partition(version) of temporal table automatically in processing time temporal join, the latest partition(version) is defined by ‘streaming-source.partition-order’ option, This is the most common user cases that use Hive table as dimension table in a Flink stream application job.
          NOTE: This feature is only support in Flink STREAMING Mode.
          Flink 1.12 支持了 Hive 最新的分區(qū)作為時態(tài)表的功能,可以通過 SQL 的方式直接關聯(lián) Hive 分區(qū)表的最新分區(qū),并且會自動監(jiān)聽最新的 Hive 分區(qū),當監(jiān)控到新的分區(qū)后,會自動地做維表數(shù)據(jù)的全量替換。
          Flink支持的是processing-time的temporal join,也就是說總是與最新版本的時態(tài)表進行JOIN。另外,F(xiàn)link既支持非分區(qū)表的temporal join,又支持分區(qū)表的temporal join。對于分區(qū)表而言,F(xiàn)link會監(jiān)聽Hive表的最新分區(qū)數(shù)據(jù)。值得注意的是,F(xiàn)link尚不支持 event-time temporal join。

          同時給出了一個案例:

          -- Assume the data in hive table is updated per day, every day contains the latest and complete dimension data
          SET table.sql-dialect=hive;
          CREATE TABLE dimension_table (
          product_id STRING,
          product_name STRING,
          unit_price DECIMAL(10, 4),
          pv_count BIGINT,
          like_count BIGINT,
          comment_count BIGINT,
          update_time TIMESTAMP(3),
          update_user STRING,
          ...
          ) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (
          -- using default partition-name order to load the latest partition every 12h (the most recommended and convenient way)
          'streaming-source.enable' = 'true',
          'streaming-source.partition.include' = 'latest',
          'streaming-source.monitor-interval' = '12 h',
          'streaming-source.partition-order' = 'partition-name', -- option with default value, can be ignored.

          -- using partition file create-time order to load the latest partition every 12h
          'streaming-source.enable' = 'true',
          'streaming-source.partition.include' = 'latest',
          'streaming-source.partition-order' = 'create-time',
          'streaming-source.monitor-interval' = '12 h'

          -- using partition-time order to load the latest partition every 12h
          'streaming-source.enable' = 'true',
          'streaming-source.partition.include' = 'latest',
          'streaming-source.monitor-interval' = '12 h',
          'streaming-source.partition-order' = 'partition-time',
          'partition.time-extractor.kind' = 'default',
          'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00'
          );

          SET table.sql-dialect=default;
          CREATE TABLE orders_table (
          order_id STRING,
          order_amount DOUBLE,
          product_id STRING,
          log_ts TIMESTAMP(3),
          proctime as PROCTIME()
          ) WITH (...);


          -- streaming sql, kafka temporal join a hive dimension table. Flink will automatically reload data from the
          -- configured latest partition in the interval of 'streaming-source.monitor-interval'.

          SELECT * FROM orders_table AS order
          JOIN dimension_table FOR SYSTEM_TIME AS OF o.proctime AS dim
          ON order.product_id = dim.product_id;

          萬事俱備只欠東風

          在 Flink 1.11 之前,F(xiàn)link 對接 Hive 會做些批處理的計算,并且只支持離線的場景。離線的場景一個問題是延遲比較大,批作業(yè)的調(diào)度一般都會通過一些調(diào)度的框架去調(diào)度。這樣其實延遲會有累加的作用。例如第一個 job 跑完,才能去跑第二個 job...這樣依次執(zhí)行。所以端對端的延遲就是所有 job 的疊加。

          然而隨著Flink在1.12中對Hive的友好支持情況變得不一樣了。在 Flink中文網(wǎng)上,社區(qū)分享了阿里巴巴之信和天離兩位同學關于建設 Flink 批流一體的實時數(shù)倉應用:

          例如 Online 的一些數(shù)據(jù),可以用 Flink 做 ETL,去實時的寫入 Hive。當數(shù)據(jù)寫入 Hive之后,可以進一步接一個新的 Flink job,來做實時的查詢或者近似實時的查詢,可以很快的返回結果。同時,其他的 Flink job 還可以利用寫入 Hive 數(shù)倉的數(shù)據(jù)作為維表,來跟其它線上的數(shù)據(jù)進行關聯(lián)整合,來得到分析的結果。

          此時我們的典型的架構就變成了:

          一個典型的Demo實現(xiàn)如下:


          #### 初始化環(huán)境
          val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
          streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          streamEnv.setParallelism(3)

          val tableEnvSettings = EnvironmentSettings.newInstance()
          .useBlinkPlanner()
          .inStreamingMode()
          .build()
          val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
          tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
          tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))

          #### 連接Hive
          val name = "myhive";
          val defaultDatabase = "mydatabase";
          val hiveConfDir = "/opt/hive-conf";
          val catalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
          tableEnv.registerCatalog("myhive", hive);
          tableEnv.useCatalog(catalogName);

          #### 讀寫hive
          tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka_stream")
          tableEnv.executeSql("DROP TABLE IF EXISTS kafka_stream.kafka_source_topic")

          tableEnv.executeSql(
          """
          |CREATE TABLE kafka_stream.kafka_source_topic (
          | ts BIGINT,
          | userId BIGINT,
          | username STRING,
          | gender STRING,
          | procTime AS PROCTIME(),
          | eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')),
          | WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND
          |) WITH (
          | 'connector' = 'kafka',
          | 'topic' = 'kafka_source_topic',
          | 'properties.bootstrap.servers' = 'localhost:9092'
          | 'properties.group.id' = 'flink_hive',
          | 'scan.startup.mode' = 'latest-offset',
          | 'format' = 'json',
          | 'json.fail-on-missing-field' = 'false',
          | 'json.ignore-parse-errors' = 'true'
          |)
          """.stripMargin
          )
          ####其他操作如Hive建表、消費源數(shù)據(jù)寫入Kafka分區(qū)等

          關于Flink讀寫Hive的詳細實現(xiàn),小編會單獨開文章進行詳細介紹。


          Flink企業(yè)級面試題60連擊

          Flink實戰(zhàn) - Binlog日志并對接Kafka實戰(zhàn)

          Flink面試通關手冊

          Flink CDC 原理及生產(chǎn)實踐


          微信公眾號|import_bigdata
          編輯 |?《大數(shù)據(jù)技術與架構》

          歡迎點贊+收藏+轉發(fā)朋友圈素質三連

          文章不錯?點個【在看】吧!???
          瀏覽 41
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天搞天天干在线视频 | 黄色一级网址 | 中文无码熟妇人妻 | 啦啦啦www日本高清 | 操芘网站大全 |