<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          如何從 0 到 1 開發(fā) PyFlink API 作業(yè)

          共 27815字,需瀏覽 56分鐘

           ·

          2021-05-02 05:41

          摘要:Apache Flink 作為當(dāng)前最流行的流批統(tǒng)一的計(jì)算引擎,在實(shí)時(shí) ETL、事件處理、數(shù)據(jù)分析、CEP、實(shí)時(shí)機(jī)器學(xué)習(xí)等領(lǐng)域都有著廣泛的應(yīng)用。從 Flink 1.9 開始,Apache Flink 社區(qū)開始在原有的 Java、Scala、SQL 等編程語言的基礎(chǔ)之上,提供對于 Python 語言的支持。經(jīng)過 Flink 1.9 ~ 1.12 以及即將發(fā)布的 1.13 版本的多個(gè)版本的開發(fā),目前 PyFlink API 的功能已經(jīng)日趨完善,可以滿足絕大多數(shù)情況下 Python 用戶的需求。接下來,我們以 Flink 1.12 為例,介紹如何使用 Python 語言,通過 PyFlink API 來開發(fā) Flink 作業(yè)內(nèi)容包括:

          1. 環(huán)境準(zhǔn)備
          2. 作業(yè)開發(fā)
          3. 作業(yè)提交
          4. 問題排查
          5. 總結(jié)

          Tips:點(diǎn)擊文末閱讀原文查看更多技術(shù)干貨~

           GitHub 地址 
          https://github.com/apache/flink
          歡迎大家給 Flink 點(diǎn)贊送 star~

          一、環(huán)境準(zhǔn)備


          第一步:安裝 Python


          PyFlink 僅支持 Python 3.5+,您首先需要確認(rèn)您的開發(fā)環(huán)境是否已安裝了 Python 3.5+,如果沒有的話,需要先安裝 Python 3.5+。

          第二步:安裝 JDK


          我們知道 Flink 的運(yùn)行是使用 Java 語言開發(fā)的,所以為了執(zhí)行 Flink 作業(yè),您還需要安裝 JDK。Flink 提供了對于 JDK 8 以及 JDK 11 的全面支持,您需要確認(rèn)您的開發(fā)環(huán)境中是否已經(jīng)安裝了上述版本的 JDK,如果沒有的話,需要先安裝 JDK。

          第三步:安裝 PyFlink


          接下來需要安裝 PyFlink,可以通過以下命令進(jìn)行安裝:

          # 創(chuàng)建Python虛擬環(huán)境python3 -m pip install virtualenvvirtualenv -p `which python3` venv
          # 使用上述創(chuàng)建的Python虛擬環(huán)境./venv/bin/activate
          # 安裝PyFlink 1.12python3 -m pip install apache-flink==1.12.2

          二、作業(yè)開發(fā)


          PyFlink Table API 作業(yè)


          我們首先介紹一下如何開發(fā) PyFlink Table API 作業(yè)。

          ■ 1)創(chuàng)建 TableEnvironment 對象


          對于 Table API 作業(yè)來說,用戶首先需要?jiǎng)?chuàng)建一個(gè) TableEnvironment 對象。以下示例定義了一個(gè) TableEnvironment 對象,使用該對象的定義的作業(yè),運(yùn)行在流模式,且使用 blink planner 執(zhí)行。


          env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)


          ■ 2)配置作業(yè)的執(zhí)行參數(shù)


          可以通過以下方式,配置作業(yè)的執(zhí)行參數(shù)。以下示例將作業(yè)的默認(rèn)并發(fā)度設(shè)置為 4。

          t_env.get_config().get_configuration().set_string('parallelism.default', '4')


          ■ 3)創(chuàng)建數(shù)據(jù)源表


          接下來,需要為作業(yè)創(chuàng)建一個(gè)數(shù)據(jù)源表。PyFlink 中提供了多種方式來定義數(shù)據(jù)源表。

          方式一:from_elements

          PyFlink 支持用戶從一個(gè)給定列表,創(chuàng)建源表。以下示例定義了包含了 3 行數(shù)據(jù)的表:[("hello", 1), ("world", 2), ("flink", 3)],該表有 2 列,列名分別為 a 和 b,類型分別為 VARCHAR 和 BIGINT。

          tab = t_env.from_elements([("hello", 1), ("world", 2), ("flink", 3)], ['a', 'b'])

          說明:

          • 這種方式通常用于測試階段,可以快速地創(chuàng)建一個(gè)數(shù)據(jù)源表,驗(yàn)證作業(yè)邏輯。

          • from_elements 方法可以接收多個(gè)參數(shù),其中第一個(gè)參數(shù)用于指定數(shù)據(jù)列表,列表中的每一個(gè)元素必須為 tuple 類型;第二個(gè)參數(shù)用于指定表的 schema。

          方式二:DDL

          除此之外,數(shù)據(jù)也可以來自于一個(gè)外部的數(shù)據(jù)源。以下示例定義了一個(gè)名字為 my_source,類型為 datagen 的表,表中有兩個(gè)類型為 VARCHAR 的字段。

          t_env.execute_sql("""        CREATE TABLE my_source (          a VARCHAR,          b VARCHAR        ) WITH (          'connector' = 'datagen',          'number-of-rows' = '10'        )    """)
          tab = t_env.from_path('my_source')

          說明:

          • 通過 DDL 的方式來定義數(shù)據(jù)源表是目前最推薦的方式,且所有 Java Table API & SQL 中支持的 connector,都可以通過 DDL 的方式,在 PyFlink Table API 作業(yè)中使用,詳細(xì)的 connector 列表請參見 Flink 官方文檔 [1]。

          • 當(dāng)前僅有部分 connector 的實(shí)現(xiàn)包含在 Flink 官方提供的發(fā)行包中,比如 FileSystem,DataGen、Print、BlackHole 等,大部分 connector 的實(shí)現(xiàn)當(dāng)前沒有包含在 Flink 官方提供的發(fā)行包中,比如 Kafka、ES 等。針對沒有包含在 Flink 官方提供的發(fā)行包中的 connector,如果需要在 PyFlink 作業(yè)中使用,用戶需要顯式地指定相應(yīng) FAT JAR,比如針對 Kafka,需要使用 JAR 包 [2],JAR 包可以通過如下方式指定:

          # 注意:file:///前綴不能省略t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")

          方式三:catalog

          hive_catalog = HiveCatalog("hive_catalog")t_env.register_catalog("hive_catalog", hive_catalog)t_env.use_catalog("hive_catalog")
          # 假設(shè)hive catalog中已經(jīng)定義了一個(gè)名字為source_table的表tab = t_env.from_path('source_table')

          這種方式和 DDL 的方式類似,只不過表的定義事先已經(jīng)注冊到了 catalog 中了,不需要在作業(yè)中重新再定義一遍。

          4)定義作業(yè)的計(jì)算邏輯


          方式一:通過 Table API

          得到 source 表之后,接下來就可以使用 Table API 中提供的各種操作,定義作業(yè)的計(jì)算邏輯,對表進(jìn)行各種變換,比如:

          @udf(result_type=DataTypes.STRING())def sub_string(s: str, begin: int, end: int):   return s[begin:end]
          transformed_tab = tab.select(sub_string(col('a'), 2, 4))

          方式二:通過 SQL 語句

          除了可以使用 Table API 中提供的各種操作之外,也可以直接通過 SQL 語句來對表進(jìn)行變換,比如上述邏輯,也可以通過 SQL 語句來實(shí)現(xiàn):

          t_env.create_temporary_function("sub_string", sub_string)transformed_tab = t_env.sql_query("SELECT sub_string(a, 2, 4) FROM %s" % tab)


          說明:


          • TableEnvironment 中提供了多種方式用于執(zhí)行 SQL 語句,其用途略有不同:

          方法名使用說明

          sql_query

          用來執(zhí)行 SELECT 語句

          sql_update

          用來執(zhí)行 INSERT 語句 / CREATE TABLE 語句。該方法已經(jīng)被 deprecate,推薦使用 execute_sql 或者create_statement_set 替代。

          create_statement_set

          用來執(zhí)行多條 SQL 語句,可以通過該方法編寫 multi-sink 的作業(yè)。

          execute_sql

          用來執(zhí)行單條 SQL 語句。execute_sql VS create_statement_set: 前者只能執(zhí)行單條 SQL 語句,后者可用于執(zhí)行多條 SQL 語句 execute_sql VS sql_query:前者可用于執(zhí)行各種類型的 SQL 語句,比如 DDL、 DML、DQL、SHOW、DESCRIBE、EXPLAIN、USE 等,后者只能執(zhí)行 DQL 語句即使是 DQL 語句,兩者的行為也不一樣。前者會(huì)生成 Flink 作業(yè),觸發(fā)表數(shù)據(jù)的計(jì)算,返回 TableResult 類型,后者并不觸發(fā)計(jì)算,僅對表進(jìn)行邏輯變換,返回 Table 類型


          5)查看執(zhí)行計(jì)劃

          用戶在開發(fā)或者調(diào)試作業(yè)的過程中,可能需要查看作業(yè)的執(zhí)行計(jì)劃,可以通過如下方式。

          方式一:Table.explain

          比如,當(dāng)我們需要知道 transformed_tab 當(dāng)前的執(zhí)行計(jì)劃時(shí),可以執(zhí)行:print(transformed_tab.explain()),得到如下輸出:

          == Abstract Syntax Tree ==LogicalProject(EXPR$0=[sub_string($0, 2, 4)])+- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]])
          == Optimized Logical Plan ==PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0])+- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]], fields=[a])
          == Physical Execution Plan ==Stage 1 : Data Source content : Source: PythonInputFormatTableSource(a)
          Stage 2 : Operator content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]], fields=[a]) ship_strategy : FORWARD
          Stage 3 : Operator content : StreamExecPythonCalc ship_strategy : FORWARD

          方式二:TableEnvironment.explain_sql

          方式一適用于查看某一個(gè) table 的執(zhí)行計(jì)劃,有時(shí)候并沒有一個(gè)現(xiàn)成的 table 對象可用,比如:

          print(t_env.explain_sql("INSERT INTO my_sink SELECT * FROM %s " % transformed_tab))

          其執(zhí)行計(jì)劃如下所示:

          == Abstract Syntax Tree ==LogicalSink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])+- LogicalProject(EXPR$0=[sub_string($0, 2, 4)])   +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]])
          == Optimized Logical Plan ==Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])+- PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]], fields=[a])
          == Physical Execution Plan ==Stage 1 : Data Source content : Source: PythonInputFormatTableSource(a)
          Stage 2 : Operator content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]], fields=[a]) ship_strategy : FORWARD
          Stage 3 : Operator content : StreamExecPythonCalc ship_strategy : FORWARD
          Stage 4 : Data Sink content : Sink: Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0]) ship_strategy : FORWARD

          6)寫出結(jié)果數(shù)據(jù)


          方式一:通過 DDL

          和創(chuàng)建數(shù)據(jù)源表類似,也可以通過 DDL 的方式來創(chuàng)建結(jié)果表。

          t_env.execute_sql("""        CREATE TABLE my_sink (          `sum` VARCHAR        ) WITH (          'connector' = 'print'        )    """)
          table_result = transformed_tab.execute_insert('my_sink')

          說明:

          • 當(dāng)使用 print 作為 sink 時(shí),作業(yè)結(jié)果會(huì)打印到標(biāo)準(zhǔn)輸出中。如果不需要查看輸出,也可以使用 blackhole 作為 sink。

          方式二:collect

          也可以通過 collect 方法,將 table 的結(jié)果收集到客戶端,并逐條查看。

          table_result = transformed_tab.execute()with table_result.collect() as results:    for result in results:        print(result)

          說明:

          • 該方式可以方便地將 table 的結(jié)果收集到客戶端并查看。
          • 由于數(shù)據(jù)最終會(huì)收集到客戶端,所以最好限制一下數(shù)據(jù)條數(shù),比如:
          transformed_tab.limit(10).execute(),限制只收集 10 條數(shù)據(jù)到客戶端。

          方式三:to_pandas

          也可以通過 to_pandas 方法,將 table 的結(jié)果轉(zhuǎn)換成 pandas.DataFrame 并查看。

          result = transformed_tab.to_pandas()print(result)

          可以看到如下輸出:

            _c00  321  e62  8b3  be4  4f5  b46  a67  498  359  6b

          說明:

          • 該方式與 collect 類似,也會(huì)將 table 的結(jié)果收集到客戶端,所以最好限制一下結(jié)果數(shù)據(jù)的條數(shù)。

          ■ 7)總結(jié)


          完整的作業(yè)示例如下:

          from pyflink.table import DataTypes, EnvironmentSettings, StreamTableEnvironmentfrom pyflink.table.expressions import colfrom pyflink.table.udf import udf

          def table_api_demo(): env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.get_config().get_configuration().set_string('parallelism.default', '4')
          t_env.execute_sql(""" CREATE TABLE my_source ( a VARCHAR, b VARCHAR ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '10' ) """)
          tab = t_env.from_path('my_source')
          @udf(result_type=DataTypes.STRING()) def sub_string(s: str, begin: int, end: int): return s[begin:end]
          transformed_tab = tab.select(sub_string(col('a'), 2, 4))
          t_env.execute_sql(""" CREATE TABLE my_sink ( `sum` VARCHAR ) WITH ( 'connector' = 'print' ) """)
          table_result = transformed_tab.execute_insert('my_sink')
          # 1)等待作業(yè)執(zhí)行結(jié)束,用于local執(zhí)行,否則可能作業(yè)尚未執(zhí)行結(jié)束,該腳本已退出,會(huì)導(dǎo)致minicluster過早退出 # 2)當(dāng)作業(yè)通過detach模式往remote集群提交時(shí),比如YARN/Standalone/K8s等,需要移除該方法 table_result.wait()

          if __name__ == '__main__': table_api_demo()

          執(zhí)行結(jié)果如下:

          4> +I(a1)3> +I(b0)2> +I(b1)1> +I(37)3> +I(74)4> +I(3d)1> +I(07)2> +I(f4)1> +I(7f)2> +I(da)


          PyFlink DataStream API 作業(yè)


          1)創(chuàng)建 StreamExecutionEnvironment 對象

          對于 DataStream API 作業(yè)來說,用戶首先需要定義一個(gè) StreamExecutionEnvironment 對象。

          env = StreamExecutionEnvironment.get_execution_environment()

          2)配置作業(yè)的執(zhí)行參數(shù)

          可以通過以下方式,配置作業(yè)的執(zhí)行參數(shù)。以下示例將作業(yè)的默認(rèn)并發(fā)度設(shè)置為4。

          env.set_parallelism(4)

          3)創(chuàng)建數(shù)據(jù)源

          接下來,需要為作業(yè)創(chuàng)建一個(gè)數(shù)據(jù)源。PyFlink 中提供了多種方式來定義數(shù)據(jù)源。

          方式一:from_collection

          PyFlink 支持用戶從一個(gè)列表創(chuàng)建源表。以下示例定義了包含了 3 行數(shù)據(jù)的表:[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],該表有 2 列,列名分別為 a 和 b,類型分別為 VARCHAR 和 BIGINT。

          ds = env.from_collection(        collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],        type_info=Types.ROW([Types.INT(), Types.STRING()]))

          說明:

          • 這種方式通常用于測試階段,可以方便地創(chuàng)建一個(gè)數(shù)據(jù)源。
          • from_collection 方法可以接收兩個(gè)參數(shù),其中第一個(gè)參數(shù)用于指定數(shù)據(jù)列表;第二個(gè)參數(shù)用于指定數(shù)據(jù)的類型。

          方式二:使用 PyFlink DataStream API 中定義的 connector

          此外,也可以使用 PyFlink DataStream API 中已經(jīng)支持的 connector,需要注意的是,1.12 中僅提供了 Kafka connector 的支持。

          deserialization_schema = JsonRowDeserializationSchema.builder() \    .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
          kafka_consumer = FlinkKafkaConsumer( topics='test_source_topic', deserialization_schema=deserialization_schema, properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
          ds = env.add_source(kafka_consumer)

          說明:

          • Kafka connector 當(dāng)前沒有包含在 Flink 官方提供的發(fā)行包中,如果需要在PyFlink 作業(yè)中使用,用戶需要顯式地指定相應(yīng) FAT JAR [2],JAR 包可以通過如下方式指定:

          # 注意:file:///前綴不能省略env.add_jars("file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")


          • 即使是 PyFlink DataStream API 作業(yè),也推薦使用 Table & SQL connector 中打包出來的 FAT JAR,可以避免遞歸依賴的問題。

          方式三:使用 PyFlink Table API 中定義的 connector

          以下示例定義了如何將 Table & SQL 中支持的 connector 用于 PyFlink DataStream API 作業(yè)。

          t_env = StreamTableEnvironment.create(stream_execution_environment=env)
          t_env.execute_sql(""" CREATE TABLE my_source ( a INT, b VARCHAR ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '10' ) """)
          ds = t_env.to_append_stream( t_env.from_path('my_source'), Types.ROW([Types.INT(), Types.STRING()]))

          說明:

          • 由于當(dāng)前 PyFlink DataStream API 中 built-in 支持的 connector 種類還比較少,推薦通過這種方式來創(chuàng)建 PyFlink DataStream API 作業(yè)中使用的數(shù)據(jù)源表,這樣的話,所有 PyFlink Table API 中可以使用的 connector,都可以在 PyFlink DataStream API 作業(yè)中使用。

          • 需要注意的是,TableEnvironment 需要通過以下方式創(chuàng)建 StreamTableEnvironment.create(stream_execution_environment=env),以使得 PyFlink DataStream API 與 PyFlink Table API 共享同一個(gè) StreamExecutionEnvironment 對象。

          4)定義計(jì)算邏輯

          生成數(shù)據(jù)源對應(yīng)的 DataStream 對象之后,接下來就可以使用 PyFlink DataStream API 中定義的各種操作,定義計(jì)算邏輯,對 DataStream 對象進(jìn)行變換了,比如:

          def split(s):    splits = s[1].split("|")    for sp in splits:       yield s[0], sp
          ds = ds.map(lambda i: (i[0] + 1, i[1])) \ .flat_map(split) \ .key_by(lambda i: i[1]) \ .reduce(lambda i, j: (i[0] + j[0], i[1]))


          5)寫出結(jié)果數(shù)據(jù)


          方式一:print

          可以調(diào)用 DataStream 對象上的 print 方法,將 DataStream 的結(jié)果打印到標(biāo)準(zhǔn)輸出中,比如:

          ds.print()

          方式二:使用 PyFlink DataStream API 中定義的 connector

          可以直接使用 PyFlink DataStream API 中已經(jīng)支持的 connector,需要注意的是,1.12 中提供了對于 FileSystem、JDBC、Kafka connector 的支持,以 Kafka 為例:

          serialization_schema = JsonRowSerializationSchema.builder() \    .with_type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
          kafka_producer = FlinkKafkaProducer( topic='test_sink_topic', serialization_schema=serialization_schema, producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
          ds.add_sink(kafka_producer)

          說明:

          • JDBC、Kafka connector 當(dāng)前沒有包含在 Flink 官方提供的發(fā)行包中,如果需要在 PyFlink 作業(yè)中使用,用戶需要顯式地指定相應(yīng) FAT JAR,比如 Kafka connector 可以使用 JAR 包 [2],JAR 包可以通過如下方式指定:

          # 注意:file:///前綴不能省略env.add_jars("file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")


          • 推薦使用 Table & SQL connector 中打包出來的 FAT JAR,可以避免遞歸依賴的問題。

          方式三:使用 PyFlink Table API 中定義的 connector

          以下示例展示了如何將 Table & SQL 中支持的 connector,用作 PyFlink DataStream API 作業(yè)的 sink。

          # 寫法一:ds類型為Types.ROWdef split(s):    splits = s[1].split("|")    for sp in splits:        yield Row(s[0], sp)
          ds = ds.map(lambda i: (i[0] + 1, i[1])) \ .flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \ .key_by(lambda i: i[1]) \ .reduce(lambda i, j: Row(i[0] + j[0], i[1]))
          # 寫法二:ds類型為Types.TUPLEdef split(s): splits = s[1].split("|") for sp in splits: yield s[0], sp
          ds = ds.map(lambda i: (i[0] + 1, i[1])) \ .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \ .key_by(lambda i: i[1]) \ .reduce(lambda i, j: (i[0] + j[0], i[1]))
          # 將ds寫出到sinkt_env.execute_sql(""" CREATE TABLE my_sink ( a INT, b VARCHAR ) WITH ( 'connector' = 'print' ) """)
          table = t_env.from_data_stream(ds)table_result = table.execute_insert("my_sink")

          說明:

          • 需要注意的是,t_env.from_data_stream(ds) 中的 ds 對象的 result type 類型必須是復(fù)合類型 Types.ROW 或者 Types.TUPLE,這也就是為什么需要顯式聲明作業(yè)計(jì)算邏輯中 flat_map 操作的 result 類型

          • 作業(yè)的提交,需要通過 PyFlink Table API 中提供的作業(yè)提交方式進(jìn)行提交

          • 由于當(dāng)前 PyFlink DataStream API 中支持的 connector 種類還比較少,推薦通過這種方式來定義 PyFlink DataStream API 作業(yè)中使用的數(shù)據(jù)源表,這樣的話,所有 PyFlink Table API 中可以使用的 connector,都可以作為 PyFlink DataStream API 作業(yè)的 sink。

          7)總結(jié)

          完整的作業(yè)示例如下:

          方式一(適合調(diào)試):

          from pyflink.common.typeinfo import Typesfrom pyflink.datastream import StreamExecutionEnvironment

          def data_stream_api_demo(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(4)
          ds = env.from_collection( collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')], type_info=Types.ROW([Types.INT(), Types.STRING()]))
          def split(s): splits = s[1].split("|") for sp in splits: yield s[0], sp
          ds = ds.map(lambda i: (i[0] + 1, i[1])) \ .flat_map(split) \ .key_by(lambda i: i[1]) \ .reduce(lambda i, j: (i[0] + j[0], i[1]))
          ds.print()
          env.execute()

          if __name__ == '__main__': data_stream_api_demo()

          執(zhí)行結(jié)果如下:

          3> (2, 'aaa')3> (2, 'bb')3> (6, 'aaa')3> (4, 'a')3> (5, 'bb')3> (7, 'a')

          方式二(適合線上作業(yè)):

          from pyflink.common.typeinfo import Typesfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.table import StreamTableEnvironment

          def data_stream_api_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) env.set_parallelism(4)
          t_env.execute_sql(""" CREATE TABLE my_source ( a INT, b VARCHAR ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '10' ) """)
          ds = t_env.to_append_stream( t_env.from_path('my_source'), Types.ROW([Types.INT(), Types.STRING()]))
          def split(s): splits = s[1].split("|") for sp in splits: yield s[0], sp
          ds = ds.map(lambda i: (i[0] + 1, i[1])) \ .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \ .key_by(lambda i: i[1]) \ .reduce(lambda i, j: (i[0] + j[0], i[1]))
          t_env.execute_sql(""" CREATE TABLE my_sink ( a INT, b VARCHAR ) WITH ( 'connector' = 'print' ) """)
          table = t_env.from_data_stream(ds) table_result = table.execute_insert("my_sink")
          # 1)等待作業(yè)執(zhí)行結(jié)束,用于local執(zhí)行,否則可能作業(yè)尚未執(zhí)行結(jié)束,該腳本已退出,會(huì)導(dǎo)致minicluster過早退出 # 2)當(dāng)作業(yè)通過detach模式往remote集群提交時(shí),比如YARN/Standalone/K8s等,需要移除該方法 table_result.wait()

          if __name__ == '__main__': data_stream_api_demo()


          三、作業(yè)提交


          Flink 提供了多種作業(yè)部署方式,比如 local、standalone、YARN、K8s 等,PyFlink 也支持上述作業(yè)部署方式,請參考 Flink 官方文檔 [3],了解更多詳細(xì)信息。


          local


          說明:使用該方式執(zhí)行作業(yè)時(shí),會(huì)啟動(dòng)一個(gè) minicluster,作業(yè)會(huì)提交到 minicluster 中執(zhí)行,該方式適合作業(yè)開發(fā)階段。

          示例:python3 table_api_demo.py

          standalone


          說明:使用該方式執(zhí)行作業(yè)時(shí),作業(yè)會(huì)提交到一個(gè)遠(yuǎn)端的 standalone 集群。

          示例:

          ./bin/flink run --jobmanager localhost:8081 --python table_api_demo.py

          YARN Per-Job


          說明:使用該方式執(zhí)行作業(yè)時(shí),作業(yè)會(huì)提交到一個(gè)遠(yuǎn)端的 YARN 集群。

          示例:

          ./bin/flink run --target yarn-per-job --python table_api_demo.py

          K8s application mode


          說明:使用該方式執(zhí)行作業(yè)時(shí),作業(yè)會(huì)提交到 K8s 集群,以 application mode 的方式執(zhí)行。

          示例:

          ./bin/flink run-application \    --target kubernetes-application \    --parallelism 8 \    -Dkubernetes.cluster-id=<ClusterId> \    -Dtaskmanager.memory.process.size=4096m \    -Dkubernetes.taskmanager.cpu=2 \    -Dtaskmanager.numberOfTaskSlots=4 \    -Dkubernetes.container.image=<PyFlinkImageName> \
          --pyModule table_api_demo \    --pyFiles file:///path/to/table_api_demo.py

          參數(shù)說明


          除了上面提到的參數(shù)之外,通過 flink run 提交的時(shí)候,還有其它一些和 PyFlink 作業(yè)相關(guān)的參數(shù)。

          參數(shù)名用途描述示例

          -py / --python

          指定作業(yè)的入口文件

          -py file:///path/to/table_api_demo.py

          -pym / --pyModule

          指定作業(yè)的 entry module,功能和--python類似,可用于當(dāng)作業(yè)的 Python 文件為 zip 包,無法通過--python 指定時(shí),相比--python 來說,更通用

          -pym table_api_demo -pyfs file:///path/to/table_api_demo.py

          -pyfs / --pyFiles

          指定一個(gè)到多個(gè) Python 文件(.py/.zip等,逗號分割),這些 Python 文件在作業(yè)執(zhí)行的時(shí)候,會(huì)放到 Python 進(jìn)程的 PYTHONPATH 中,可以在 Python 自定義函數(shù)中訪問到

          -pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip

          -pyarch / --pyArchives

          指定一個(gè)到多個(gè)存檔文件(逗號分割),這些存檔文件,在作業(yè)執(zhí)行的時(shí)候,會(huì)被解壓之后,放到 Python 進(jìn)程的 workspace 目錄,可以通過相對路徑的方式進(jìn)行訪問

          -pyarch file:///path/to/venv.zip

          -pyexec / --pyExecutable

          指定作業(yè)執(zhí)行的時(shí)候,Python 進(jìn)程的路徑

          -pyarch file:///path/to/venv.zip -pyexec venv.zip/venv/bin/python3

          -pyreq / --pyRequirements

          指定 requirements 文件,requirements 文件中定義了作業(yè)的依賴

          -pyreq requirements.txt


          四、問題排查


          當(dāng)我們剛剛上手 PyFlink 作業(yè)開發(fā)的時(shí)候,難免會(huì)遇到各種各樣的問題,學(xué)會(huì)如何排查問題是非常重要的。接下來,我們介紹一些常見的問題排查手段。


          client 端異常輸出


          PyFlink 作業(yè)也遵循 Flink 作業(yè)的提交方式,作業(yè)首先會(huì)在 client 端編譯成 JobGraph,然后提交到 Flink 集群執(zhí)行。如果作業(yè)編譯有問題,會(huì)導(dǎo)致在 client 端提交作業(yè)的時(shí)候就拋出異常,此時(shí)可以在 client 端看到類似這樣的輸出:

          Traceback (most recent call last):  File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 50, in <module>    data_stream_api_demo()  File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 45, in data_stream_api_demo    table_result = table.execute_insert("my_")  File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/table/table.py", line 864, in execute_insert    return TableResult(self._j_table.executeInsert(table_path, overwrite))  File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__    return_value = get_return_value(  File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 162, in deco    raise java_exceptionpyflink.util.exceptions.TableException: Sink `default_catalog`.`default_database`.`my_` does not exists     at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247)     at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)     at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)     at scala.collection.Iterator$class.foreach(Iterator.scala:891)     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)     at scala.collection.AbstractTraversable.map(Traversable.scala:104)     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:159)     at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)     at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)     at java.lang.reflect.Method.invoke(Method.java:498)     at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)     at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)     at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)     at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)     at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)     at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)     at java.lang.Thread.run(Thread.java:748)
          Process finished with exit code 1

          比如上述報(bào)錯(cuò)說明作業(yè)中使用的名字為"my_"的表不存在。

          TaskManager 日志文件


          有些錯(cuò)誤直到作業(yè)運(yùn)行的過程中才會(huì)發(fā)生,比如臟數(shù)據(jù)或者 Python 自定義函數(shù)的實(shí)現(xiàn)問題等,針對這種錯(cuò)誤,通常需要查看 TaskManager 的日志文件,比如以下錯(cuò)誤反映用戶在 Python 自定義函數(shù)中訪問的 opencv 庫不存在。

          Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):  File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute    response = task()  File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>    lambda: self.create_worker().do_instruction(request), request)  File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction    return getattr(self, request_type)(  File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle    bundle_processor.process_bundle(instruction_id))  File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 977, in process_bundle    input_op_by_transform_id[element.transform_id].process_encoded(  File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded    self.output(decoded_value)  File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output  File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output  File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 85, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process  File "pyflink/fn_execution/coder_impl_fast.pyx", line 83, in pyflink.fn_execution.coder_impl_fast.DataStreamFlatMapCoderImpl.encode_to_stream  File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 26, in split    import cv2ModuleNotFoundError: No module named 'cv2'
          at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

          說明:

          • local 模式下,TaskManager 的 log 位于 PyFlink 的安裝目錄下:site-packages/pyflink/log/,也可以通過如下命令找到:


            >>> import pyflink 

            >>> print(pyflink.path)

            ['/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink'],則log文件位于/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/log目錄下


          自定義日志


          有時(shí)候,異常日志的內(nèi)容并不足以幫助我們定位問題,此時(shí)可以考慮在 Python 自定義函數(shù)中打印一些日志信息。PyFlink 支持用戶在 Python 自定義函數(shù)中通過 logging 的方式輸出 log,比如:

          def split(s):    import logging    logging.info("s: " + str(s))    splits = s[1].split("|")    for sp in splits:        yield s[0], sp

          通過上述方式,split 函數(shù)的輸入?yún)?shù),會(huì)打印到 TaskManager 的日志文件中。


          遠(yuǎn)程調(diào)試


          PyFlink 作業(yè),在運(yùn)行過程中,會(huì)啟動(dòng)一個(gè)獨(dú)立的 Python 進(jìn)程執(zhí)行 Python 自定義函數(shù),所以如果需要調(diào)試 Python 自定義函數(shù),需要通過遠(yuǎn)程調(diào)試的方式進(jìn)行,可以參見[4],了解如何在 Pycharm 中進(jìn)行 Python 遠(yuǎn)程調(diào)試。

          1)在 Python 環(huán)境中安裝 pydevd-pycharm:

          pip install pydevd-pycharm~=203.7717.65

          2)在 Python 自定義函數(shù)中設(shè)置遠(yuǎn)程調(diào)試參數(shù):

          def split(s):    import pydevd_pycharm    pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True, stderrToServer=True)    splits = s[1].split("|")    for sp in splits:        yield s[0], sp

          3)按照 Pycharm 中遠(yuǎn)程調(diào)試的步驟,進(jìn)行操作即可,可以參見[4],也可以參考博客[5]中“代碼調(diào)試”部分的介紹。

          說明:Python 遠(yuǎn)程調(diào)試功能只在 Pycharm 的 professional 版才支持。

          社區(qū)用戶郵件列表


          如果通過以上步驟之后,問題還未解決,也可以訂閱 Flink 用戶郵件列表 [6],將問題發(fā)送到 Flink 用戶郵件列表。需要注意的是,將問題發(fā)送到郵件列表時(shí),盡量將問題描述清楚,最好有可復(fù)現(xiàn)的代碼及數(shù)據(jù),可以參考一下這個(gè)郵件[7]。

          釘釘群


          此外,也歡迎大家加入“PyFlink交流群”,交流 PyFlink 相關(guān)的問題。


          五、總結(jié)


          在這篇文章中,我們主要介紹了 PyFlink API 作業(yè)的環(huán)境準(zhǔn)備、作業(yè)開發(fā)、作業(yè)提交、問題排查等方面的信息,希望可以幫助用戶使用 Python 語言快速構(gòu)建一個(gè) Flink 作業(yè),希望對大家有所幫助。接下來,我們會(huì)繼續(xù)推出 PyFlink 系列文章,幫助 PyFlink 用戶深入了解 PyFlink 中各種功能、應(yīng)用場景、最佳實(shí)踐等。

          另外,我們推出一個(gè)調(diào)查問卷,希望大家積極參與這個(gè)問卷,幫助我們更好的去整理 PyFlink 相關(guān)學(xué)習(xí)資料。填完問卷后即可參與抽獎(jiǎng) Flink 定制款 Polo 衫,4 月 30 日中午 12:00 準(zhǔn)時(shí)開獎(jiǎng)。


          引用鏈接


          [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/
          [2] https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.12.0/flink-sql-connector-kafka_2.11-1.12.0.jar
          [3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html#submitting-pyflink-jobs
          [4] https://www.jetbrains.com/help/pycharm/remote-debugging-with-product.html#remote-debug-config
          [5] https://mp.weixin.qq.com/s?__biz=MzIzMDMwNTg3MA==&mid=2247485386&idx=1&sn=da24e5200d72e0627717494c22d0372e&chksm=e8b43eebdfc3b7fdbd10b49e6749cb761b7aa5f8ddc90b34eb3170119a8bbb3ddd7327acb712&scene=178&cur_album_id=1386152464113811456#rd
          [6] https://flink.apache.org/community.html#mailing-lists
          [7] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PyFlink-called-already-closed-and-NullPointerException-td42997.html

          更多 Flink 相關(guān)技術(shù)問題,可掃碼加入社區(qū)釘釘交流



          ▼ 關(guān)注「Flink 中文社區(qū)」,獲取更多技術(shù)干貨 



            戳我,查看更多技術(shù)干貨! 

          瀏覽 84
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久久人人爽人人爽人人片aV东京热 | 久久午夜无码鲁丝片精品 | 91丨豆花丨成人 熟女 | 一级A片调教打屁股 | 日韩在线中文字幕视频 |