Flink1.12集成Hive打造自己的批流一體數(shù)倉
? ?簡介小編在去年之前分享過參與的實時數(shù)據(jù)平臺的建設,關于實時數(shù)倉也進行過分享。客觀的說,我們當時做不到批流一體,小編當時的方案是將實時消息數(shù)據(jù)每隔15分鐘文件同步到離線數(shù)據(jù)平臺,然后用同一套SQL代碼進行離線入庫操作。
但是隨著 Flink1.12版本的發(fā)布,F(xiàn)link使用HiveCatalog可以通過批或者流的方式來處理Hive中的表。這就意味著Flink既可以作為Hive的一個批處理引擎,也可以通過流處理的方式來讀寫Hive中的表,從而為實時數(shù)倉的應用和流批一體的落地實踐奠定了堅實的基礎。
Flink 與 Hive 的集成包含兩個層面。
一是利用了 Hive 的 MetaStore 作為持久化的 Catalog,用戶可通過HiveCatalog將不同會話中的 Flink 元數(shù)據(jù)存儲到 Hive Metastore 中。例如,用戶可以使用HiveCatalog將其 Kafka 表或 Elasticsearch 表存儲在 Hive Metastore 中,并后續(xù)在 SQL 查詢中重新使用它們。
二是利用 Flink 來讀寫 Hive 的表。
HiveCatalog的設計提供了與 Hive 良好的兼容性,用戶可以”開箱即用”的訪問其已有的 Hive 數(shù)倉。您不需要修改現(xiàn)有的 Hive Metastore,也不需要更改表的數(shù)據(jù)位置或分區(qū)。
Flink1.12 對Hive的支持
從 1.11.0 開始,在使用 Hive 方言時,F(xiàn)link 允許用戶用 Hive 語法來編寫 SQL 語句。通過提供與 Hive 語法的兼容性,我們旨在改善與 Hive 的互操作性,并減少用戶需要在 Flink 和 Hive 之間切換來執(zhí)行不同語句的情況。
Flink 支持的 Hive 版本如下圖所示:

某些功能是否可用取決于您使用的 Hive 版本,這些限制不是由 Flink 所引起的:
Hive 內(nèi)置函數(shù)在使用 Hive-1.2.0 及更高版本時支持。
列約束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本時支持。
更改表的統(tǒng)計信息,在使用 Hive-1.2.0 及更高版本時支持。
DATE列統(tǒng)計信息,在使用 Hive-1.2.0 及更高版時支持。
使用 Hive-2.0.x 版本時不支持寫入 ORC 表。
要與 Hive 集成,我們需要在 Flink 下的/lib/目錄中添加一些額外的依賴包, 以便通過 Table API 或 SQL Client 與 Hive 進行交互。
Apache Hive 是基于 Hadoop 之上構建的, 首先您需要 Hadoop 的依賴:
export HADOOP_CLASSPATH=`hadoop classpath`有兩種添加 Hive 依賴項的方法。第一種是使用 Flink 提供的 Hive Jar包。我們根據(jù)使用的 Metastore 的版本來選擇對應的 Hive jar。第二個方式是分別添加每個所需的 jar 包。如果您使用的 Hive 版本尚未在此處列出,則第二種方法會更適合。
注意:建議優(yōu)先使用 Flink 提供的 Hive jar 包。僅在 Flink 提供的 Hive jar 不滿足您的需求時,再考慮使用分開添加 jar 包的方式。
本文我們使用的Flink和Hive版本是1.12+2.3.6,集成Hive時還需要一些額外的Jar包依賴,將其放置在Flink安裝目錄下的lib文件夾下,這樣我們才能通過 Table API 或 SQL Client 與 Hive 進行交互。
下圖列舉了Hive版本相對應的Jar包:

此外,我們還需要添加下面兩個jar包:
flink-connector-hive_2.11-1.12.0.jar和hive-exec-2.3.6.jar。其中hive-exec-2.3.6.jar包我們可以在Hive安裝路徑下的lib文件夾中找到。官網(wǎng)給出了下載地址,大家可以參考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/hive/#%E4%BE%9D%E8%B5%96%E9%A1%B9
如果你需要構建工程,那么只需要在pom.xml中新增以下依賴即可:
org.apache.flink
flink-connector-hive_2.11
1.12.0
provided
org.apache.flink
flink-table-api-java-bridge_2.11
1.12.0
provided
org.apache.hive
hive-exec
${hive.version}
provided
使用Blink Planner連接Hive
請大家注意,F(xiàn)link 1.12版本中雖然 HiveCatalog 不需要特定的 planner,但讀寫Hive表僅適用于 Blink planner。因此,強烈建議您在連接到Hive倉庫時使用 Blink planner。
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "/opt/hive-conf";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
#### 然后我們就可以愉快的對Hive表進行操作了,例如:
// 隨便讀取一些數(shù)據(jù)
String mySql = "select * from my_table limit 10'";
TableResult result = tableEnv.executeSql(mySql);
System.out.println(result.getJobClient().get().getJobStatus());下表列出了通過 YAML 文件或 DDL 定義 HiveCatalog 時所支持的參數(shù)。后續(xù)的版本規(guī)劃中將會支持在 Flink 中創(chuàng)建 Hive 表,視圖,分區(qū)和函數(shù)的DDL。

使用 FlinkCli 連接Hive
我們把三個必須依賴的包放到 Flink 的lib目錄下:
flink-sql-connector-hive-2.3.6
flink-connector-hive_2.11-1.12.0.jar
hive-exec-2.3.4.jar然后,最重要的一步來了我們需要修改 conf/sql-cli-defaults.yaml配置文件:
execution:
planner: blink
type: streaming
...
current-catalog: myhive # set the HiveCatalog as the current catalog of the session
current-database: mydatabase
catalogs:
- name: myhive
type: hive
hive-conf-dir: /opt/hive-conf # contains hive-site.xml然后就可以愉快的玩耍了:
##命令行啟動
bin/sql-client.sh embedded
使用Hive Dialect
Flink 目前支持兩種 SQL 方言: default 和 hive。你需要先切換到 Hive 方言,然后才能使用 Hive 語法編寫。下面介紹如何使用 SQL 客戶端和 Table API 設置方言。還要注意,你可以為執(zhí)行的每個語句動態(tài)切換方言。無需重新啟動會話即可使用其他方言。
方言切換
SQL 方言可以通過 table.sql-dialect 屬性指定。我們需要在sql-client-defaults.yaml配置文件中進行配置:
execution:
planner: blink
type: batch
result-mode: table
configuration:
table.sql-dialect: hive同樣我們也可以在 SQL 客戶端啟動后設置方言:
Flink SQL> set table.sql-dialect=hive; -- to use hive dialect
[INFO] Session property has been set.
Flink SQL> set table.sql-dialect=default; -- to use default dialect
[INFO] Session property has been set.Table API中使用Dialect
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// to use hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// to use default dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);以下是使用 Hive 方言的一些注意事項。
Hive 方言只能用于操作 Hive 表,不能用于一般表。Hive 方言應與HiveCatalog一起使用。
雖然所有 Hive 版本支持相同的語法,但是一些特定的功能是否可用仍取決于你使用的Hive 版本。例如,更新數(shù)據(jù)庫位置 只在 Hive-2.4.0 或更高版本支持。
Hive 和 Calcite 有不同的保留關鍵字集合。例如,default 是 Calcite 的保留關鍵字,卻不是 Hive 的保留關鍵字。即使使用 Hive 方言, 也必須使用反引號引用此類關鍵字才能將其用作標識符。
由于擴展的查詢語句的不兼容性,在 Flink 中創(chuàng)建的視圖是不能在 Hive 中查詢的。
使用Hive UDF
在 Flink SQL 和 Table API 中,可以通過系統(tǒng)內(nèi)置的 HiveModule 來使用 Hive 內(nèi)置函數(shù),
String name = "myhive";
String version = "2.3.4";
tableEnv.loadModue(name, new HiveModule(version));在 Flink 中用戶可以使用 Hive 里已經(jīng)存在的 UDF 函數(shù)。
支持的 UDF 類型包括:
UDF
GenericUDF
GenericUDTF
UDAF
GenericUDAFResolver2
在進行查詢規(guī)劃和執(zhí)行時,Hive UDF 和 GenericUDF 函數(shù)會自動轉換成 Flink 中的 ScalarFunction,GenericUDTF 會被自動轉換成 Flink 中的 TableFunction,UDAF 和 GenericUDAFResolver2 則轉換成 Flink 聚合函數(shù)(AggregateFunction)。
想要使用 Hive UDF 函數(shù),需要如下幾步:
通過 Hive Metastore 將帶有 UDF 的 HiveCatalog 設置為當前會話的 catalog 后端。
將帶有 UDF 的 jar 包放入 Flink classpath 中,并在代碼中引入。
使用 Blink planner。
假設我們在 Hive Metastore 中已經(jīng)注冊了下面的 UDF 函數(shù):
/**
* 注冊為 'myudf' 的簡單 UDF 測試類.
*/
public class TestHiveSimpleUDF extends UDF {
public IntWritable evaluate(IntWritable i) {
return new IntWritable(i.get());
}
public Text evaluate(Text text) {
return new Text(text.toString());
}
}
/**
* 注冊為 'mygenericudf' 的普通 UDF 測試類
*/
public class TestHiveGenericUDF extends GenericUDF {
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
checkArgument(arguments.length == 2);
checkArgument(arguments[1] instanceof ConstantObjectInspector);
Object constant = ((ConstantObjectInspector) arguments[1]).getWritableConstantValue();
checkArgument(constant instanceof IntWritable);
checkArgument(((IntWritable) constant).get() == 1);
if (arguments[0] instanceof IntObjectInspector ||
arguments[0] instanceof StringObjectInspector) {
return arguments[0];
} else {
throw new RuntimeException("Not support argument: " + arguments[0]);
}
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
return arguments[0].get();
}
@Override
public String getDisplayString(String[] children) {
return "TestHiveGenericUDF";
}
}
/**
* 注冊為 'mygenericudtf' 的字符串分割 UDF 測試類
*/
public class TestHiveUDTF extends GenericUDTF {
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
checkArgument(argOIs.length == 2);
// TEST for constant arguments
checkArgument(argOIs[1] instanceof ConstantObjectInspector);
Object constant = ((ConstantObjectInspector) argOIs[1]).getWritableConstantValue();
checkArgument(constant instanceof IntWritable);
checkArgument(((IntWritable) constant).get() == 1);
return ObjectInspectorFactory.getStandardStructObjectInspector(
Collections.singletonList("col1"),
Collections.singletonList(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
}
@Override
public void process(Object[] args) throws HiveException {
String str = (String) args[0];
for (String s : str.split(",")) {
forward(s);
forward(s);
}
}
@Override
public void close() {
}
}在 Hive CLI 中,可以查詢到已經(jīng)注冊的 UDF 函數(shù):
hive> show functions;
OK
......
mygenericudf
myudf
myudtf此時,用戶如果想使用這些 UDF,在 SQL 中就可以這樣寫:
Flink SQL> select mygenericudf(myudf(name), 1) as a, mygenericudf(myudf(age), 1) as b, s from mysourcetable, lateral table(myudtf(name, 1)) as T(s);此外,F(xiàn)link1.12有了一個巨大的突破就是和Hive進行維表Join。我們在官網(wǎng)上可以看到如下信息:
Flink support tracking the latest partition(version) of temporal table automatically in processing time temporal join, the latest partition(version) is defined by ‘streaming-source.partition-order’ option, This is the most common user cases that use Hive table as dimension table in a Flink stream application job.
NOTE: This feature is only support in Flink STREAMING Mode.
Flink 1.12 支持了 Hive 最新的分區(qū)作為時態(tài)表的功能,可以通過 SQL 的方式直接關聯(lián) Hive 分區(qū)表的最新分區(qū),并且會自動監(jiān)聽最新的 Hive 分區(qū),當監(jiān)控到新的分區(qū)后,會自動地做維表數(shù)據(jù)的全量替換。
Flink支持的是processing-time的temporal join,也就是說總是與最新版本的時態(tài)表進行JOIN。另外,F(xiàn)link既支持非分區(qū)表的temporal join,又支持分區(qū)表的temporal join。對于分區(qū)表而言,F(xiàn)link會監(jiān)聽Hive表的最新分區(qū)數(shù)據(jù)。值得注意的是,F(xiàn)link尚不支持 event-time temporal join。同時給出了一個案例:
-- Assume the data in hive table is updated per day, every day contains the latest and complete dimension data
SET table.sql-dialect=hive;
CREATE TABLE dimension_table (
product_id STRING,
product_name STRING,
unit_price DECIMAL(10, 4),
pv_count BIGINT,
like_count BIGINT,
comment_count BIGINT,
update_time TIMESTAMP(3),
update_user STRING,
...
) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (
-- using default partition-name order to load the latest partition every 12h (the most recommended and convenient way)
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-name', -- option with default value, can be ignored.
-- using partition file create-time order to load the latest partition every 12h
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.partition-order' = 'create-time',
'streaming-source.monitor-interval' = '12 h'
-- using partition-time order to load the latest partition every 12h
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-time',
'partition.time-extractor.kind' = 'default',
'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00'
);
SET table.sql-dialect=default;
CREATE TABLE orders_table (
order_id STRING,
order_amount DOUBLE,
product_id STRING,
log_ts TIMESTAMP(3),
proctime as PROCTIME()
) WITH (...);
-- streaming sql, kafka temporal join a hive dimension table. Flink will automatically reload data from the
-- configured latest partition in the interval of 'streaming-source.monitor-interval'.
SELECT * FROM orders_table AS order
JOIN dimension_table FOR SYSTEM_TIME AS OF o.proctime AS dim
ON order.product_id = dim.product_id;萬事俱備只欠東風
在 Flink 1.11 之前,F(xiàn)link 對接 Hive 會做些批處理的計算,并且只支持離線的場景。離線的場景一個問題是延遲比較大,批作業(yè)的調(diào)度一般都會通過一些調(diào)度的框架去調(diào)度。這樣其實延遲會有累加的作用。例如第一個 job 跑完,才能去跑第二個 job...這樣依次執(zhí)行。所以端對端的延遲就是所有 job 的疊加。
然而隨著Flink在1.12中對Hive的友好支持情況變得不一樣了。在 Flink中文網(wǎng)上,社區(qū)分享了阿里巴巴之信和天離兩位同學關于建設 Flink 批流一體的實時數(shù)倉應用:

例如 Online 的一些數(shù)據(jù),可以用 Flink 做 ETL,去實時的寫入 Hive。當數(shù)據(jù)寫入 Hive之后,可以進一步接一個新的 Flink job,來做實時的查詢或者近似實時的查詢,可以很快的返回結果。同時,其他的 Flink job 還可以利用寫入 Hive 數(shù)倉的數(shù)據(jù)作為維表,來跟其它線上的數(shù)據(jù)進行關聯(lián)整合,來得到分析的結果。
此時我們的典型的架構就變成了:

一個典型的Demo實現(xiàn)如下:
#### 初始化環(huán)境
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.setParallelism(3)
val tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))
#### 連接Hive
val name = "myhive";
val defaultDatabase = "mydatabase";
val hiveConfDir = "/opt/hive-conf";
val catalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog(catalogName);
#### 讀寫hive
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka_stream")
tableEnv.executeSql("DROP TABLE IF EXISTS kafka_stream.kafka_source_topic")
tableEnv.executeSql(
"""
|CREATE TABLE kafka_stream.kafka_source_topic (
| ts BIGINT,
| userId BIGINT,
| username STRING,
| gender STRING,
| procTime AS PROCTIME(),
| eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')),
| WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'kafka_source_topic',
| 'properties.bootstrap.servers' = 'localhost:9092'
| 'properties.group.id' = 'flink_hive',
| 'scan.startup.mode' = 'latest-offset',
| 'format' = 'json',
| 'json.fail-on-missing-field' = 'false',
| 'json.ignore-parse-errors' = 'true'
|)
""".stripMargin
)
####其他操作如Hive建表、消費源數(shù)據(jù)寫入Kafka分區(qū)等關于Flink讀寫Hive的詳細實現(xiàn),小編會單獨開文章進行詳細介紹。

Flink實戰(zhàn) - Binlog日志并對接Kafka實戰(zhàn)
