?Flink 教程 | PyFlink 教程(二):PyFlink Table API - Python 自定義函數(shù)
背景:Python 自定義函數(shù)是 PyFlink Table API 中最重要的功能之一,其允許用戶在 PyFlink Table API 中使用 Python 語言開發(fā)的自定義函數(shù),極大地拓寬了 Python Table API 的使用范圍。
目前 Python 自定義函數(shù)的功能已經(jīng)非常完善,支持多種類型的自定義函數(shù),比如 UDF(scalar function)、UDTF(table function)、UDAF(aggregate function),UDTAF(table aggregate function,1.13 支持)、Panda UDF、Pandas UDAF 等。接下來,我們詳細(xì)介紹一下如何在 PyFlink Table API 作業(yè)中使用 Python 自定義函數(shù)。
GitHub 地址 
一、Python 自定義函數(shù)基礎(chǔ)
1. Python UDF
from pyflink.table.udf import udf, FunctionContext, ScalarFunctionfrom pyflink.table import DataTypes方式一:@udf(result_type=DataTypes.STRING())def sub_string(s: str, begin: int, end: int):return s[begin:end]方式二:sub_string = udf(lambda s, begin, end: s[begin:end], result_type=DataTypes.STRING())方式三:class SubString(object):def __call__(self, s: str, begin: int, end: int):return s[begin:end]sub_string = udf(SubString(), result_type=DataTypes.STRING())方式四:def sub_string(s: str, begin: int, end: int):return s[begin:end]sub_string_begin_1 = udf(functools.partial(sub_string, begin=1), result_type=DataTypes.STRING())方式五:class SubString(ScalarFunction):def open(self, function_context: FunctionContext):passdef eval(self, s: str, begin: int, end: int):return s[begin:end]sub_string = udf(SubString(), result_type=DataTypes.STRING())
需要通過名字為 “ udf ” 的裝飾器,聲明這是一個 scalar function; 需要通過裝飾器中的 result_type 參數(shù),聲明 scalar function 的結(jié)果類型; 上述方式五,通過繼承 ScalarFunction 的方式來定義 Python UDF 有以下用處: ScalarFunction 的基類 UserDefinedFunction 中定義了一個 open 方法,該方法只在作業(yè)初始化時執(zhí)行一次,因此可以利用該方法,做一些初始化工作,比如加載機(jī)器學(xué)習(xí)模型、連接外部服務(wù)等。 此外,還可以通過 open 方法中的 function_context 參數(shù),注冊及使用 metrics。
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)table = t_env.from_elements([("hello", 1), ("world", 2), ("flink", 3)], ['a', 'b'])table.select(sub_string(table.a, 1, 3))
2. Python UDTF
from pyflink.table.udf import udtffrom pyflink.table import DataTypes@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])def split(s: str, sep: str):splits = s.split(sep)yield splits[0], splits[1]
需要通過名字為 “ udtf ” 的裝飾器,聲明這是一個 table function; 需要通過裝飾器中的 result_types 參數(shù),聲明 table function 的結(jié)果類型。由于 table function 每條輸出可以包含多個列,result_types 需要指定所有輸出列的類型; Python UDTF 的定義,也支持 Python UDF 章節(jié)中所列出的多種定義方式,這里只展示了其中一種。
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)table = t_env.from_elements([("hello|word", 1), ("abc|def", 2)], ['a', 'b'])table.join_lateral(split(table.a, '|').alias("c1, c2"))table.left_outer_join_lateral(split(table.a, '|').alias("c1, c2"))
3. Python UDAF
from pyflink.common import Rowfrom pyflink.table import AggregateFunction, DataTypesfrom pyflink.table.udf import udafclass WeightedAvg(AggregateFunction):def create_accumulator(self):# Row(sum, count)return Row(0, 0)def get_value(self, accumulator: Row) -> float:if accumulator[1] == 0:return 0else:return accumulator[0] / accumulator[1]def accumulate(self, accumulator: Row, value, weight):accumulator[0] += value * weightaccumulator[1] += weightdef retract(self, accumulator: Row, value, weight):accumulator[0] -= value * weightaccumulator[1] -= weightweighted_avg = udaf(f=WeightedAvg(),result_type=DataTypes.DOUBLE(),accumulator_type=DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.BIGINT()),DataTypes.FIELD("f1", DataTypes.BIGINT())]))
需要通過名字為 “ udaf ” 的裝飾器,聲明這是一個 aggregate function, 需要分別通過裝飾器中的 result_type 及 accumulator_type 參數(shù),聲明 aggregate function 的結(jié)果類型及 accumulator 類型; create_accumulator,get_value 和 accumulate 這 3 個方法必須要定義,retract 方法可以根據(jù)需要定義,詳細(xì)信息可以參見 Flink 官方文檔 [1];需要注意的是,由于必須定義 create_accumulator,get_value 和 accumulate 這 3 個方法,Python UDAF 只能通過繼承AggregateFunction 的方式進(jìn)行定義(Pandas UDAF 沒有這方面的限制)。
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)t = t_env.from_elements([(1, 2, "Lee"), (3, 4, "Jay"), (5, 6, "Jay"), (7, 8, "Lee")],["value", "count", "name"])t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg"))
4. Python UDTAF
from pyflink.common import Rowfrom pyflink.table import DataTypesfrom pyflink.table.udf import udtaf, TableAggregateFunctionclass Top2(TableAggregateFunction):def create_accumulator(self):# 存儲當(dāng)前最大的兩個值return [None, None]def accumulate(self, accumulator, input_row):if input_row[0] is not None:# 新的輸入值最大if accumulator[0] is None or input_row[0] > accumulator[0]:accumulator[1] = accumulator[0]accumulator[0] = input_row[0]# 新的輸入值次大elif accumulator[1] is None or input_row[0] > accumulator[1]:accumulator[1] = input_row[0]def emit_value(self, accumulator):yield Row(accumulator[0])if accumulator[1] is not None:yield Row(accumulator[1])top2 = udtaf(f=Top2(),result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]),accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))
Python UDTAF 功能是 Flink 1.13 之后支持的新功能; create_accumulator,accumulate 和 emit_value 這 3 個方法必須定義,此外 TableAggregateFunction 中支持 retract、merge 等方法,可以根據(jù)需要選擇是否定義,詳細(xì)信息可以參見 Flink 官方文檔[2]。
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)t = t_env.from_elements([(1, 'Hi', 'Hello'),(3, 'Hi', 'hi'),(5, 'Hi2', 'hi'),(2, 'Hi', 'Hello'),(7, 'Hi', 'Hello')],['a', 'b', 'c'])t_env.execute_sql("""CREATE TABLE my_sink (word VARCHAR,`sum` BIGINT) WITH ('connector' = 'print')""")result = t.group_by(t.b).flat_aggregate(top2).select("b, a").execute_insert("my_sink")# 1)等待作業(yè)執(zhí)行結(jié)束,用于local執(zhí)行,否則可能作業(yè)尚未執(zhí)行結(jié)束,該腳本已退出,會導(dǎo)致minicluster過早退出# 2)當(dāng)作業(yè)通過detach模式往remote集群提交時,比如YARN/Standalone/K8s等,需要移除該方法result.wait()
11> +I[Hi, 7]10> +I[Hi2, 5]11> +I[Hi, 3]
Python UDTAF 只能用于 Table API,不能用于 SQL 語句中; flat_aggregate 的結(jié)果包含了原始的 grouping 列以及 UDTAF(top 2)的輸出,因此,可以在 select 中訪問列 “ b ”。
二、Python 自定義函數(shù)進(jìn)階
1. 在純 SQL 作業(yè)中使用 Python 自定義函數(shù)
CREATE TEMPORARY FUNCTION sub_string AS 'test_udf.sub_string' LANGUAGE PYTHONCREATE TABLE source (a VARCHAR) WITH ('connector' = 'datagen');CREATE TABLE sink (a VARCHAR) WITH ('connector' = 'print');INSERT INTO sinkSELECT sub_string(a, 1, 3)FROM source;
2. 在 Java 作業(yè)中使用 Python 自定義函數(shù)
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());tEnv.executeSql("CREATE TEMPORARY FUNCTION sub_string AS 'test_udf.sub_string' LANGUAGE PYTHON");tEnv.createTemporaryView("source", tEnv.fromValues("hello", "world", "flink").as("a"));tEnv.executeSql("SELECT sub_string(a) FROM source").collect();
3. 依賴管理
第三方 Python 庫如何被 Python 自定義函數(shù)訪問。不同的作業(yè),對于 Python 庫的版本要求是不一樣的,將第三方 Python 庫預(yù)安裝到集群的 Python 環(huán)境中,只適用于安裝一些公共的依賴,不能解決不同作業(yè)對于 Python 依賴個性化的需求; 機(jī)器學(xué)習(xí)模型或者數(shù)據(jù)文件,如何分發(fā)到集群節(jié)點,并最終被 Python 自定義函數(shù)訪問。
需要注意的是,Python UDF 的實現(xiàn)所在的文件,也需要在作業(yè)執(zhí)行的時候,作為依賴文件上傳; 可以通過合用 “存檔文件” 與 “ Python 解釋器路徑”,指定作業(yè)使用上傳的 Python 虛擬環(huán)境來執(zhí)行,比如:
table_env.add_python_archive("/path/to/py_env.zip")# 指定使用py_env.zip包中帶的python來執(zhí)行用戶自定義函數(shù),必須通過相對路徑來指定table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")
推薦用戶使用 conda 來構(gòu)建 Python 虛擬環(huán)境,conda 構(gòu)建的 Python 虛擬環(huán)境包含了執(zhí)行 Python 所需的絕大多數(shù)底層庫,可以極大地避免當(dāng)本地環(huán)境與集群環(huán)境不一樣時,所構(gòu)建的 Python 虛擬環(huán)境在集群執(zhí)行時,缺少各種底層依賴庫的問題。關(guān)于如何使用 conda 構(gòu)建的 Python 虛擬環(huán)境,可以參考阿里云 VVP 文檔中 “使用 Python 三方包” 章節(jié)的介紹 [4] 有些 Python 三方庫需要安裝才能使用,即并非 ”將其下載下來就可以直接放到 PYTHONPATH 中引用“,針對這種類型的 Python 三方庫,有兩種解決方案:
將其安裝在 Python 虛擬環(huán)境之中,指定作業(yè)運行使用所構(gòu)建的 Python 虛擬環(huán)境; 找一臺與集群環(huán)境相同的機(jī)器(或 docker),安裝所需的 Python 三方庫,然后將安裝文件打包。該方式相對于 Python 虛擬環(huán)境來說,打包文件比較小。詳情可以參考阿里云 VVP 文檔中 “使用自定義的 Python 虛擬環(huán)境” 章節(jié)的介紹 [5]。
4. 調(diào)試
5. 調(diào)優(yōu)
checkpoint 時,會觸發(fā)緩存數(shù)據(jù)的計算,因此當(dāng)上述參數(shù)配置的值過大時,可能會導(dǎo)致checkpoint 時需要處理過多的數(shù)據(jù),從而導(dǎo)致 checkpoint 時間過長,甚至?xí)?dǎo)致 checkpoint 失敗。當(dāng)遇到作業(yè)的 checkpoint 時間比較長的問題時,可以嘗試減少上述參數(shù)的取值。
三、常見問題
1)Python 自定義函數(shù)的實際返回值類型與 result_type 中聲明的類型不一致,該問題會導(dǎo)致 Java 算子在收到 Python 自定義函數(shù)的執(zhí)行結(jié)果,進(jìn)行反序列化時報錯,錯誤堆棧類似:
Caused by: java.io.EOFExceptionat java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]
四、總結(jié)
在這篇文章中,我們主要介紹了各種 Python 自定義函數(shù)的定義及使用方式,以及 Python 依賴管理、 Python 自定義函數(shù)調(diào)試及調(diào)優(yōu)等方面的信息,希望可以幫助用戶了解 Python 自定義函數(shù)。接下來,我們會繼續(xù)推出 PyFlink 系列文章,幫助 PyFlink 用戶深入了解 PyFlink 中各種功能、應(yīng)用場景、最佳實踐等。
實時機(jī)器學(xué)習(xí):支持機(jī)器學(xué)習(xí)場景下實時特征工程和 AI 引擎配合,基于 Apache Flink 及其生態(tài)打造實時機(jī)器學(xué)習(xí)的標(biāo)準(zhǔn),推動例如搜索、推薦、廣告、風(fēng)控等場景的全面實時化; 大數(shù)據(jù) + AI 一體化:包括編程語言一體化 (PyFlink 相關(guān)工作),執(zhí)行引擎集成化 (TF on Flink),工作流及管理一體化(Flink AI Flow)。
引用鏈接:
戳我,查看更多技術(shù)干貨~評論
圖片
表情
