<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          ?Flink 教程 | PyFlink 教程(二):PyFlink Table API - Python 自定義函數(shù)

          共 15916字,需瀏覽 32分鐘

           ·

          2021-06-07 06:27

          背景:Python 自定義函數(shù)是 PyFlink Table API 中最重要的功能之一,其允許用戶在 PyFlink Table API 中使用 Python 語言開發(fā)的自定義函數(shù),極大地拓寬了 Python Table API 的使用范圍。


          目前 Python 自定義函數(shù)的功能已經(jīng)非常完善,支持多種類型的自定義函數(shù),比如 UDF(scalar function)、UDTF(table function)、UDAF(aggregate function),UDTAF(table aggregate function,1.13 支持)、Panda UDF、Pandas UDAF 等。接下來,我們詳細(xì)介紹一下如何在 PyFlink Table API 作業(yè)中使用 Python 自定義函數(shù)。


          Tips:點擊文閱讀原文即可查看更多技術(shù)干貨~

           GitHub 地址 
          https://github.com/apache/flink
          歡迎大家給 Flink 點贊送 star~

          一、Python 自定義函數(shù)基礎(chǔ)


          根據(jù)輸入 / 輸出數(shù)據(jù)的行數(shù),F(xiàn)link Table API & SQL 中,自定義函數(shù)可以分為以下幾類:

          自定義函數(shù)
          Single Row Input
          Multiple Row Input
          Single Row Output
          ScalarFunction
          AggregateFunction
          Multiple Row Output
          TableFunction
          TableAggregateFunction

          PyFlink 針對以上四種類型的自定義函數(shù)都提供了支持,接下來,我們分別看一下每種類型的自定義函數(shù)如何使用。

          1. Python UDF


          Python UDF,即 Python ScalarFunction,針對每一條輸入數(shù)據(jù),僅產(chǎn)生一條輸出數(shù)據(jù)。比如以下示例,展示了通過多種方式,來定義名字為 "sub_string" 的 Python UDF:

          from pyflink.table.udf import udf, FunctionContext, ScalarFunctionfrom pyflink.table import DataTypes
          方式一:@udf(result_type=DataTypes.STRING())def sub_string(s: str, begin: int, end: int): return s[begin:end]
          方式二:sub_string = udf(lambda s, begin, end: s[begin:end], result_type=DataTypes.STRING())
          方式三:class SubString(object): def __call__(self, s: str, begin: int, end: int): return s[begin:end]
          sub_string = udf(SubString(), result_type=DataTypes.STRING())
          方式四:def sub_string(s: str, begin: int, end: int): return s[begin:end]
          sub_string_begin_1 = udf(functools.partial(sub_string, begin=1), result_type=DataTypes.STRING())
          方式五:class SubString(ScalarFunction): def open(self, function_context: FunctionContext): pass
          def eval(self, s: str, begin: int, end: int): return s[begin:end]
          sub_string = udf(SubString(), result_type=DataTypes.STRING())

          說明:

          • 需要通過名字為 “ udf ” 的裝飾器,聲明這是一個 scalar function;

          • 需要通過裝飾器中的 result_type 參數(shù),聲明 scalar function 的結(jié)果類型;

          • 上述方式五,通過繼承 ScalarFunction 的方式來定義 Python UDF 有以下用處:

            • ScalarFunction 的基類 UserDefinedFunction 中定義了一個 open 方法,該方法只在作業(yè)初始化時執(zhí)行一次,因此可以利用該方法,做一些初始化工作,比如加載機(jī)器學(xué)習(xí)模型、連接外部服務(wù)等。
            • 此外,還可以通過 open 方法中的 function_context 參數(shù),注冊及使用 metrics。

          env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)
          table = t_env.from_elements([("hello", 1), ("world", 2), ("flink", 3)], ['a', 'b'])table.select(sub_string(table.a, 1, 3))

          2. Python UDTF


          Python UDTF,即 Python TableFunction,針對每一條輸入數(shù)據(jù),Python UDTF 可以產(chǎn)生 0 條、1 條或者多條輸出數(shù)據(jù),此外,一條輸出數(shù)據(jù)可以包含多個列。比如以下示例,定義了一個名字為 split 的Python UDF,以指定字符串為分隔符,將輸入字符串切分成兩個字符串:

          from pyflink.table.udf import udtffrom pyflink.table import DataTypes
          @udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])def split(s: str, sep: str): splits = s.split(sep) yield splits[0], splits[1]

          說明:

          • 需要通過名字為 “ udtf ” 的裝飾器,聲明這是一個 table function;

          • 需要通過裝飾器中的 result_types 參數(shù),聲明 table function 的結(jié)果類型。由于 table function 每條輸出可以包含多個列,result_types 需要指定所有輸出列的類型;

          • Python UDTF 的定義,也支持 Python UDF 章節(jié)中所列出的多種定義方式,這里只展示了其中一種。

          定義完 Python UDTF 之后,可以直接在 Python Table API 中使用:

          env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)
          table = t_env.from_elements([("hello|word", 1), ("abc|def", 2)], ['a', 'b'])
          table.join_lateral(split(table.a, '|').alias("c1, c2"))table.left_outer_join_lateral(split(table.a, '|').alias("c1, c2"))


          3. Python UDAF


          Python UDAF,即 Python AggregateFunction。Python UDAF 用來針對一組數(shù)據(jù)進(jìn)行聚合運算,比如同一個 window 下的多條數(shù)據(jù)、或者同一個 key 下的多條數(shù)據(jù)等。針對同一組輸入數(shù)據(jù),Python AggregateFunction 產(chǎn)生一條輸出數(shù)據(jù)。比如以下示例,定義了一個名字為 weighted_avg 的 Python UDAF:

          from pyflink.common import Rowfrom pyflink.table import AggregateFunction, DataTypesfrom pyflink.table.udf import udaf

          class WeightedAvg(AggregateFunction):
          def create_accumulator(self): # Row(sum, count) return Row(0, 0)
          def get_value(self, accumulator: Row) -> float: if accumulator[1] == 0: return 0 else: return accumulator[0] / accumulator[1]
          def accumulate(self, accumulator: Row, value, weight): accumulator[0] += value * weight accumulator[1] += weight
          def retract(self, accumulator: Row, value, weight): accumulator[0] -= value * weight accumulator[1] -= weight

          weighted_avg = udaf(f=WeightedAvg(), result_type=DataTypes.DOUBLE(), accumulator_type=DataTypes.ROW([ DataTypes.FIELD("f0", DataTypes.BIGINT()), DataTypes.FIELD("f1", DataTypes.BIGINT())]))

          說明:

          • 需要通過名字為 “ udaf ” 的裝飾器,聲明這是一個 aggregate function,

          • 需要分別通過裝飾器中的 result_type 及 accumulator_type 參數(shù),聲明 aggregate function 的結(jié)果類型及 accumulator 類型;

          • create_accumulator,get_value 和 accumulate 這 3 個方法必須要定義,retract 方法可以根據(jù)需要定義,詳細(xì)信息可以參見 Flink 官方文檔 [1];需要注意的是,由于必須定義 create_accumulator,get_value 和 accumulate 這 3 個方法,Python UDAF 只能通過繼承AggregateFunction 的方式進(jìn)行定義(Pandas UDAF 沒有這方面的限制)。

          定義完 Python UDAF 之后,可以在 Python Table API 中這樣使用:

          env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)
          t = t_env.from_elements([(1, 2, "Lee"), (3, 4, "Jay"), (5, 6, "Jay"), (7, 8, "Lee")], ["value", "count", "name"])
          t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg"))


          4. Python UDTAF


          Python UDTAF,即 Python TableAggregateFunction。Python UDTAF 用來針對一組數(shù)據(jù)進(jìn)行聚合運算,比如同一個 window 下的多條數(shù)據(jù)、或者同一個 key 下的多條數(shù)據(jù)等,與 Python UDAF 不同的是,針對同一組輸入數(shù)據(jù),Python UDTAF 可以產(chǎn)生 0 條、1 條、甚至多條輸出數(shù)據(jù)。

          以下示例,定義了一個名字為 Top2 的 Python UDTAF:

          from pyflink.common import Rowfrom pyflink.table import DataTypesfrom pyflink.table.udf import udtaf, TableAggregateFunction
          class Top2(TableAggregateFunction):
          def create_accumulator(self): # 存儲當(dāng)前最大的兩個值 return [None, None]
          def accumulate(self, accumulator, input_row): if input_row[0] is not None: # 新的輸入值最大 if accumulator[0] is None or input_row[0] > accumulator[0]: accumulator[1] = accumulator[0] accumulator[0] = input_row[0] # 新的輸入值次大 elif accumulator[1] is None or input_row[0] > accumulator[1]: accumulator[1] = input_row[0]
          def emit_value(self, accumulator): yield Row(accumulator[0]) if accumulator[1] is not None: yield Row(accumulator[1])
          top2 = udtaf(f=Top2(), result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]), accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))

          說明:

          • Python UDTAF 功能是 Flink 1.13 之后支持的新功能;

          • create_accumulator,accumulate 和 emit_value 這 3 個方法必須定義,此外 TableAggregateFunction 中支持 retract、merge 等方法,可以根據(jù)需要選擇是否定義,詳細(xì)信息可以參見 Flink 官方文檔[2]。

          定義完 Python UDTAF 之后,可以在 Python Table API 中這樣使用:

          env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)
          t = t_env.from_elements([(1, 'Hi', 'Hello'), (3, 'Hi', 'hi'), (5, 'Hi2', 'hi'), (2, 'Hi', 'Hello'), (7, 'Hi', 'Hello')], ['a', 'b', 'c'])
          t_env.execute_sql(""" CREATE TABLE my_sink ( word VARCHAR, `sum` BIGINT ) WITH ( 'connector' = 'print' ) """)
          result = t.group_by(t.b).flat_aggregate(top2).select("b, a").execute_insert("my_sink")
          # 1)等待作業(yè)執(zhí)行結(jié)束,用于local執(zhí)行,否則可能作業(yè)尚未執(zhí)行結(jié)束,該腳本已退出,會導(dǎo)致minicluster過早退出# 2)當(dāng)作業(yè)通過detach模式往remote集群提交時,比如YARN/Standalone/K8s等,需要移除該方法result.wait()

          當(dāng)執(zhí)行以上程序,可以看到類似如下輸出:

          11> +I[Hi, 7]10> +I[Hi2, 5]11> +I[Hi, 3]

          說明:

          • Python UDTAF 只能用于 Table API,不能用于 SQL 語句中;

          • flat_aggregate 的結(jié)果包含了原始的 grouping 列以及 UDTAF(top 2)的輸出,因此,可以在 select 中訪問列 “ b ”。

          二、Python 自定義函數(shù)進(jìn)階


          1. 在純 SQL 作業(yè)中使用 Python 自定義函數(shù)


          Flink SQL 中的 CREATE FUNCTION 語句支持注冊 Python 自定義函數(shù),因此用戶除了可以在 PyFlink Table API 作業(yè)中使用 Python 自定義函數(shù)之外,還可以在純 SQL 作業(yè)中使用 Python 自定義函數(shù)。

          CREATE TEMPORARY FUNCTION sub_string AS 'test_udf.sub_string' LANGUAGE PYTHON
          CREATE TABLE source ( a VARCHAR) WITH ( 'connector' = 'datagen');
          CREATE TABLE sink ( a VARCHAR) WITH ( 'connector' = 'print');
          INSERT INTO sinkSELECT sub_string(a, 1, 3)FROM source;


          2. 在 Java 作業(yè)中使用 Python 自定義函數(shù)


          用戶可以通過 DDL 的方式注冊 Python 自定義函數(shù),這意味著,用戶也可以在 Java Table API 作業(yè)中使用 Python 自定義函數(shù),比如:

          TableEnvironment tEnv = TableEnvironment.create( EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());tEnv.executeSql("CREATE TEMPORARY FUNCTION sub_string AS 'test_udf.sub_string' LANGUAGE PYTHON");tEnv.createTemporaryView("source", tEnv.fromValues("hello", "world", "flink").as("a"));tEnv.executeSql("SELECT sub_string(a) FROM source").collect();

          詳細(xì)示例可以參見 PyFlink Playground [3]。

          該功能的一個重要用處是將 Java 算子與 Python 算子混用。用戶可以使用 Java 語言來開發(fā)絕大部分的作業(yè)邏輯,當(dāng)作業(yè)邏輯中的某些部分必須使用 Python 語言來編寫時,可以通過如上方式來調(diào)用使用 Python 語言編寫的自定義函數(shù)。

          如果是 DataStream 作業(yè),可以先將 DataStream 轉(zhuǎn)換成 Table,然后再通過上述方式,調(diào)用 Python 語言編寫的自定義函數(shù)。

          3. 依賴管理


          在 Python 自定義函數(shù)中訪問第三方 Python 庫是非常常見的需求,另外,在機(jī)器學(xué)習(xí)預(yù)測場景中,用戶也可能需要在 Python 自定義函數(shù)中加載一個機(jī)器學(xué)習(xí)模型。當(dāng)我們通過 local 模式執(zhí)行 PyFlink 作業(yè)時,可以將第三方 Python 庫安裝在本地 Python 環(huán)境中,或者將機(jī)器學(xué)習(xí)模型下載到本地;然而當(dāng)我們將 PyFlink 作業(yè)提交到遠(yuǎn)程執(zhí)行的時候,這也可能會出現(xiàn)一些問題:

          • 第三方 Python 庫如何被 Python 自定義函數(shù)訪問。不同的作業(yè),對于 Python 庫的版本要求是不一樣的,將第三方 Python 庫預(yù)安裝到集群的 Python 環(huán)境中,只適用于安裝一些公共的依賴,不能解決不同作業(yè)對于 Python 依賴個性化的需求;

          • 機(jī)器學(xué)習(xí)模型或者數(shù)據(jù)文件,如何分發(fā)到集群節(jié)點,并最終被 Python 自定義函數(shù)訪問。

          除此之外,依賴可能還包括 JAR 包等,PyFlink 中針對各種依賴提供了多種解決方案:

          依賴
          類型
          解決方案
          用途描述
          示例(flink run)
          flink run參數(shù)
          配置項
          API
          作業(yè)入口文件
          -py / --python
          指定作業(yè)的入口文件,只能是.py文件
          -py file:///path/to/table_api_demo.py
          作業(yè)入口entry module
          -pym / --pyModule
          指定作業(yè)的entry module,功能和
          --python類似,可用于當(dāng)作業(yè)的Python文件為zip包等情況,無法通過
          --python指定的時候,相比
          --python來說,更通用
          -pym table_api_demo-pyfs file:///path/to/table_api_demo.py
          Python三方庫文件
          -pyfs / --pyFiles
          python.files
          add_python_file
          指定一個到多個Python文件(.py/.zip/.whl等,逗號分割),這些Python文件在作業(yè)執(zhí)行時,會放到Python進(jìn)程的PYTHONPATH中,可以在Python自定義函數(shù)中直接訪問
          -pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip
          存檔文件
          -pyarch /--pyArchives
          python.archives
          add_python_archive
          指定一個到多個存檔文件(逗號分割),這些存檔文件,在作業(yè)執(zhí)行的時候,會被解壓,并放到Python進(jìn)程的工作目錄,可以通過相對路徑的方式進(jìn)行訪問
          -pyarchfile:///path/to/venv.zip
          Python解釋器路徑
          -pyexec / --pyExecutable
          python.executable
          set_python_executable
          指定作業(yè)執(zhí)行時,所使用的Python解釋器路徑
          -pyarchfile:///path/to/venv.zip-pyexec venv.zip/venv/bin/python3
          requirements文件
          -pyreq / --pyRequirements
          python.requirements
          set_python_requirements
          指定requirements文件,requirements文件中定義了作業(yè)的Python三方庫依賴,作業(yè)執(zhí)行時,會根據(jù)requirements的內(nèi)容,通過pip安裝相關(guān)依賴
          -pyreq requirements.txt
          JAR包
          pipeline.classpaths,pipeline.jars
          沒有專門的API,可以通過configuration的set_string方法設(shè)置
          指定作業(yè)依賴的JAR包,通常用于指定connector JAR包
          說明:


          • 需要注意的是,Python UDF 的實現(xiàn)所在的文件,也需要在作業(yè)執(zhí)行的時候,作為依賴文件上傳;

          • 可以通過合用 “存檔文件” 與 “ Python 解釋器路徑”,指定作業(yè)使用上傳的 Python 虛擬環(huán)境來執(zhí)行,比如:

          table_env.add_python_archive("/path/to/py_env.zip")
          # 指定使用py_env.zip包中帶的python來執(zhí)行用戶自定義函數(shù),必須通過相對路徑來指定table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")

          • 推薦用戶使用 conda 來構(gòu)建 Python 虛擬環(huán)境,conda 構(gòu)建的 Python 虛擬環(huán)境包含了執(zhí)行 Python 所需的絕大多數(shù)底層庫,可以極大地避免當(dāng)本地環(huán)境與集群環(huán)境不一樣時,所構(gòu)建的 Python 虛擬環(huán)境在集群執(zhí)行時,缺少各種底層依賴庫的問題。關(guān)于如何使用 conda 構(gòu)建的 Python 虛擬環(huán)境,可以參考阿里云 VVP 文檔中 “使用 Python 三方包” 章節(jié)的介紹 [4]

          • 有些 Python 三方庫需要安裝才能使用,即并非 ”將其下載下來就可以直接放到 PYTHONPATH 中引用“,針對這種類型的 Python 三方庫,有兩種解決方案:

            • 將其安裝在 Python 虛擬環(huán)境之中,指定作業(yè)運行使用所構(gòu)建的 Python 虛擬環(huán)境;
            • 找一臺與集群環(huán)境相同的機(jī)器(或 docker),安裝所需的 Python 三方庫,然后將安裝文件打包。該方式相對于 Python 虛擬環(huán)境來說,打包文件比較小。詳情可以參考阿里云 VVP 文檔中 “使用自定義的 Python 虛擬環(huán)境” 章節(jié)的介紹 [5]。

          4. 調(diào)試


          PyFlink 支持用戶通過遠(yuǎn)程調(diào)試的方式,來調(diào)試 Python 自定義函數(shù),具體方法可以參見文章 “如何從 0 到 1 開發(fā) PyFlink API 作業(yè)”  [6] 中 “遠(yuǎn)程調(diào)試” 章節(jié)的介紹。

          另外,用戶還可以在 Python 自定義函數(shù)中,通過 logging 的方式,打印日志。需要注意的是,日志輸出需要在 TaskManager 的日志文件中查看,而不是當(dāng)前 console。具體使用方式,請參見 “如何從 0 到 1 開發(fā) PyFlink API 作業(yè)”  [6] 中 “自定義日志” 章節(jié)的介紹。需要注意的是,當(dāng)通過 local 方式運行作業(yè)的時候,TM 的日志位于 PyFlink 的安裝目錄,比如:

          >>> import pyflink
          ['/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink']

          5. 調(diào)優(yōu)


          Python 自定義函數(shù)的性能在很大程度上取決于 Python 自定義函數(shù)自身的實現(xiàn),如果遇到性能問題,您首先需要想辦法盡可能優(yōu)化 Python 自定義函數(shù)的實現(xiàn)。

          除此之外,Python 自定義函數(shù)的性能也受以下參數(shù)取值的影響。

          參數(shù)
          說明
          python.fn-execution.bundle.size
          Python自定義函數(shù)的執(zhí)行是異步的,在作業(yè)執(zhí)行過程中,Java算子將數(shù)據(jù)異步發(fā)送給Python進(jìn)程進(jìn)行處理。Java算子在將數(shù)據(jù)發(fā)送給Python進(jìn)程之前,會先將數(shù)據(jù)緩存起來,到達(dá)一定閾值之后,再發(fā)送給Python進(jìn)程。python.fn-execution.bundle.size參數(shù)可用來控制可緩存的數(shù)據(jù)最大條數(shù),默認(rèn)值為100000。
          python.fn-execution.bundle.time
          用來控制數(shù)據(jù)的最大緩存時間。當(dāng)緩存的數(shù)據(jù)條數(shù)到達(dá)python.fn-execution.bundle.size定義的閾值或緩存時間到達(dá)python.fn-execution.bundle.time定義的閾值時,會觸發(fā)緩存數(shù)據(jù)的計算。默認(rèn)值為1000,單位是毫秒。
          python.fn-execution.arrow.batch.size
          用來控制當(dāng)使用Pandas UDF時,一個arrow batch可容納的數(shù)據(jù)最大條數(shù),默認(rèn)值為10000。說明 python.fn-execution.arrow.batch.size參數(shù)值不能大于python.fn-execution.bundle.size參數(shù)值。

          說明:

          • checkpoint 時,會觸發(fā)緩存數(shù)據(jù)的計算,因此當(dāng)上述參數(shù)配置的值過大時,可能會導(dǎo)致checkpoint 時需要處理過多的數(shù)據(jù),從而導(dǎo)致 checkpoint 時間過長,甚至?xí)?dǎo)致 checkpoint 失敗。當(dāng)遇到作業(yè)的 checkpoint 時間比較長的問題時,可以嘗試減少上述參數(shù)的取值。


          三、常見問題


          1)Python 自定義函數(shù)的實際返回值類型與 result_type 中聲明的類型不一致,該問題會導(dǎo)致 Java 算子在收到 Python 自定義函數(shù)的執(zhí)行結(jié)果,進(jìn)行反序列化時報錯,錯誤堆棧類似:


          Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261] at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]    at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]

          2)在 Python 自定義函數(shù)的 init 方法里實例化了一個不能被 cloudpickle 序列化的對象。

          在提交作業(yè)的時候,PyFlink 會通過 cloudpickle 序列化 Python 自定義函數(shù),若 Python 自定義函數(shù)包含不能被 cloudpickle 序列化的對象,則會遇到類似錯誤:TypeError: can't pickle xxx,可以將這種變量放在 open 方法里初始化。

          3)在 Python 自定義函數(shù)的 init 方法里 load 一個非常大的數(shù)據(jù)文件。

          由于在提交作業(yè)的時候,PyFlink 會通過 cloudpickle 序列化 Python 自定義函數(shù),若在 init 方法里 load 一個非常大的數(shù)據(jù)文件,則整個數(shù)據(jù)文件都會被序列化并作為 Python 自定義函數(shù)實現(xiàn)的一部分,若數(shù)據(jù)文件非常大,可能會導(dǎo)致作業(yè)執(zhí)行失敗,可以將 load 數(shù)據(jù)文件的操作放在 open 方法里執(zhí)行。

          4)客戶端 Python 環(huán)境與集群端 Python 環(huán)境不一致,比如 Python 版本不一致、PyFlink 版本不一致(大版本需要保持一致,比如都為 1.12.x)等。


          四、總結(jié)


          在這篇文章中,我們主要介紹了各種 Python 自定義函數(shù)的定義及使用方式,以及 Python 依賴管理、 Python 自定義函數(shù)調(diào)試及調(diào)優(yōu)等方面的信息,希望可以幫助用戶了解 Python 自定義函數(shù)。接下來,我們會繼續(xù)推出 PyFlink 系列文章,幫助 PyFlink 用戶深入了解 PyFlink 中各種功能、應(yīng)用場景、最佳實踐等。


          另外,阿里云實時計算生態(tài)團(tuán)隊長期招聘優(yōu)秀大數(shù)據(jù)人才(包括實習(xí)+社招),我們的工作包括:

          • 實時機(jī)器學(xué)習(xí):支持機(jī)器學(xué)習(xí)場景下實時特征工程和 AI 引擎配合,基于 Apache Flink 及其生態(tài)打造實時機(jī)器學(xué)習(xí)的標(biāo)準(zhǔn),推動例如搜索、推薦、廣告、風(fēng)控等場景的全面實時化;


          • 大數(shù)據(jù) + AI 一體化:包括編程語言一體化 (PyFlink 相關(guān)工作),執(zhí)行引擎集成化 (TF on Flink),工作流及管理一體化(Flink AI Flow)。

          如果你對開源、大數(shù)據(jù)或者 AI 感興趣,請發(fā)簡歷到:[email protected]

          此外,也歡迎大家加入 “PyFlink交流群”,交流 PyFlink 相關(guān)的問題。


          引用鏈接:


          [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#aggregate-functions
          [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/table/udfs/python_udfs/#table-aggregate-functions
          [3] https://github.com/pyflink/playgrounds#7-python-udf-used-in-java-table-api-jobs
          [4] https://help.aliyun.com/document_detail/207351.html?spm=a2c4g.11186623.6.687.1fe76f50loCz96#title-09r-29j-9d7
          [5] https://help.aliyun.com/document_detail/207351.html?spm=a2c4g.11186623.6.687.4b18419aCuhgmq#title-r01-50c-j82
          [6] https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q

          更多 Flink 相關(guān)技術(shù)問題,可掃碼加入社區(qū)釘釘交流群~



          ▼ 關(guān)注「Flink 中文社區(qū)」,獲取更多技術(shù)干貨 


           戳我,查看更多技術(shù)干貨~
          瀏覽 125
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  五月婷婷性爱 | 亚洲日本黄色三级电影在线观看 | 91丨露脸丨熟女精品 | 久久豆花| 国产高清无码在线不卡视频 |