如何從 0 到 1 開發(fā) PyFlink API 作業(yè)
環(huán)境準(zhǔn)備 作業(yè)開發(fā) 作業(yè)提交 問題排查 總結(jié)
GitHub 地址 
一、環(huán)境準(zhǔn)備
第一步:安裝 Python
第二步:安裝 JDK
第三步:安裝 PyFlink
# 創(chuàng)建Python虛擬環(huán)境python3 -m pip install virtualenvvirtualenv -p `which python3` venv# 使用上述創(chuàng)建的Python虛擬環(huán)境./venv/bin/activate# 安裝PyFlink 1.12python3 -m pip install apache-flink==1.12.2
二、作業(yè)開發(fā)
PyFlink Table API 作業(yè)
■ 1)創(chuàng)建 TableEnvironment 對象
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)
■ 2)配置作業(yè)的執(zhí)行參數(shù)
t_env.get_config().get_configuration().set_string('parallelism.default', '4')■ 3)創(chuàng)建數(shù)據(jù)源表
tab = t_env.from_elements([("hello", 1), ("world", 2), ("flink", 3)], ['a', 'b'])這種方式通常用于測試階段,可以快速地創(chuàng)建一個(gè)數(shù)據(jù)源表,驗(yàn)證作業(yè)邏輯。
from_elements 方法可以接收多個(gè)參數(shù),其中第一個(gè)參數(shù)用于指定數(shù)據(jù)列表,列表中的每一個(gè)元素必須為 tuple 類型;第二個(gè)參數(shù)用于指定表的 schema。
t_env.execute_sql("""CREATE TABLE my_source (a VARCHAR,b VARCHAR) WITH ('connector' = 'datagen','number-of-rows' = '10')""")tab = t_env.from_path('my_source')
通過 DDL 的方式來定義數(shù)據(jù)源表是目前最推薦的方式,且所有 Java Table API & SQL 中支持的 connector,都可以通過 DDL 的方式,在 PyFlink Table API 作業(yè)中使用,詳細(xì)的 connector 列表請參見 Flink 官方文檔 [1]。
當(dāng)前僅有部分 connector 的實(shí)現(xiàn)包含在 Flink 官方提供的發(fā)行包中,比如 FileSystem,DataGen、Print、BlackHole 等,大部分 connector 的實(shí)現(xiàn)當(dāng)前沒有包含在 Flink 官方提供的發(fā)行包中,比如 Kafka、ES 等。針對沒有包含在 Flink 官方提供的發(fā)行包中的 connector,如果需要在 PyFlink 作業(yè)中使用,用戶需要顯式地指定相應(yīng) FAT JAR,比如針對 Kafka,需要使用 JAR 包 [2],JAR 包可以通過如下方式指定:
# 注意:file:///前綴不能省略t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
hive_catalog = HiveCatalog("hive_catalog")t_env.register_catalog("hive_catalog", hive_catalog)t_env.use_catalog("hive_catalog")# 假設(shè)hive catalog中已經(jīng)定義了一個(gè)名字為source_table的表tab = t_env.from_path('source_table')
■ 4)定義作業(yè)的計(jì)算邏輯
@udf(result_type=DataTypes.STRING())def sub_string(s: str, begin: int, end: int):return s[begin:end]transformed_tab = tab.select(sub_string(col('a'), 2, 4))
t_env.create_temporary_function("sub_string", sub_string)transformed_tab = t_env.sql_query("SELECT sub_string(a, 2, 4) FROM %s" % tab)
說明:
TableEnvironment 中提供了多種方式用于執(zhí)行 SQL 語句,其用途略有不同:
| 方法名 | 使用說明 |
|---|---|
sql_query | 用來執(zhí)行 SELECT 語句 |
sql_update | 用來執(zhí)行 INSERT 語句 / CREATE TABLE 語句。該方法已經(jīng)被 deprecate,推薦使用 execute_sql 或者create_statement_set 替代。 |
create_statement_set | 用來執(zhí)行多條 SQL 語句,可以通過該方法編寫 multi-sink 的作業(yè)。 |
execute_sql | 用來執(zhí)行單條 SQL 語句。execute_sql VS create_statement_set: 前者只能執(zhí)行單條 SQL 語句,后者可用于執(zhí)行多條 SQL 語句 execute_sql VS sql_query:前者可用于執(zhí)行各種類型的 SQL 語句,比如 DDL、 DML、DQL、SHOW、DESCRIBE、EXPLAIN、USE 等,后者只能執(zhí)行 DQL 語句即使是 DQL 語句,兩者的行為也不一樣。前者會(huì)生成 Flink 作業(yè),觸發(fā)表數(shù)據(jù)的計(jì)算,返回 TableResult 類型,后者并不觸發(fā)計(jì)算,僅對表進(jìn)行邏輯變換,返回 Table 類型 |
== Abstract Syntax Tree ==LogicalProject(EXPR$0=[sub_string($0, 2, 4)])+- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]])== Optimized Logical Plan ==PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0])+- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]], fields=[a])== Physical Execution Plan ==Stage 1 : Data Sourcecontent : Source: PythonInputFormatTableSource(a)Stage 2 : Operatorcontent : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]], fields=[a])ship_strategy : FORWARDStage 3 : Operatorcontent : StreamExecPythonCalcship_strategy : FORWARD
print(t_env.explain_sql("INSERT INTO my_sink SELECT * FROM %s " % transformed_tab))== Abstract Syntax Tree ==LogicalSink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])+- LogicalProject(EXPR$0=[sub_string($0, 2, 4)])+- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]])== Optimized Logical Plan ==Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])+- PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0])+- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]], fields=[a])== Physical Execution Plan ==Stage 1 : Data Sourcecontent : Source: PythonInputFormatTableSource(a)Stage 2 : Operatorcontent : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]], fields=[a])ship_strategy : FORWARDStage 3 : Operatorcontent : StreamExecPythonCalcship_strategy : FORWARDStage 4 : Data Sinkcontent : Sink: Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])ship_strategy : FORWARD
■ 6)寫出結(jié)果數(shù)據(jù)
t_env.execute_sql("""CREATE TABLE my_sink (`sum` VARCHAR) WITH ('connector' = 'print')""")table_result = transformed_tab.execute_insert('my_sink')
當(dāng)使用 print 作為 sink 時(shí),作業(yè)結(jié)果會(huì)打印到標(biāo)準(zhǔn)輸出中。如果不需要查看輸出,也可以使用 blackhole 作為 sink。
table_result = transformed_tab.execute()with table_result.collect() as results:for result in results:print(result)
該方式可以方便地將 table 的結(jié)果收集到客戶端并查看。 由于數(shù)據(jù)最終會(huì)收集到客戶端,所以最好限制一下數(shù)據(jù)條數(shù),比如:
result = transformed_tab.to_pandas()print(result)
_c00 321 e62 8b3 be4 4f5 b46 a67 498 359 6b
該方式與 collect 類似,也會(huì)將 table 的結(jié)果收集到客戶端,所以最好限制一下結(jié)果數(shù)據(jù)的條數(shù)。
■ 7)總結(jié)
from pyflink.table import DataTypes, EnvironmentSettings, StreamTableEnvironmentfrom pyflink.table.expressions import colfrom pyflink.table.udf import udfdef table_api_demo():env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)t_env.get_config().get_configuration().set_string('parallelism.default', '4')t_env.execute_sql("""CREATE TABLE my_source (a VARCHAR,b VARCHAR) WITH ('connector' = 'datagen','number-of-rows' = '10')""")tab = t_env.from_path('my_source')@udf(result_type=DataTypes.STRING())def sub_string(s: str, begin: int, end: int):return s[begin:end]transformed_tab = tab.select(sub_string(col('a'), 2, 4))t_env.execute_sql("""CREATE TABLE my_sink (`sum` VARCHAR) WITH ('connector' = 'print')""")table_result = transformed_tab.execute_insert('my_sink')# 1)等待作業(yè)執(zhí)行結(jié)束,用于local執(zhí)行,否則可能作業(yè)尚未執(zhí)行結(jié)束,該腳本已退出,會(huì)導(dǎo)致minicluster過早退出# 2)當(dāng)作業(yè)通過detach模式往remote集群提交時(shí),比如YARN/Standalone/K8s等,需要移除該方法table_result.wait()if __name__ == '__main__':table_api_demo()
4> +I(a1)3> +I(b0)2> +I(b1)1> +I(37)3> +I(74)4> +I(3d)1> +I(07)2> +I(f4)1> +I(7f)2> +I(da)
PyFlink DataStream API 作業(yè)
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
ds = env.from_collection(collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],type_info=Types.ROW([Types.INT(), Types.STRING()]))
這種方式通常用于測試階段,可以方便地創(chuàng)建一個(gè)數(shù)據(jù)源。 from_collection 方法可以接收兩個(gè)參數(shù),其中第一個(gè)參數(shù)用于指定數(shù)據(jù)列表;第二個(gè)參數(shù)用于指定數(shù)據(jù)的類型。
deserialization_schema = JsonRowDeserializationSchema.builder() \.type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()kafka_consumer = FlinkKafkaConsumer(topics='test_source_topic',deserialization_schema=deserialization_schema,properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})ds = env.add_source(kafka_consumer)
Kafka connector 當(dāng)前沒有包含在 Flink 官方提供的發(fā)行包中,如果需要在PyFlink 作業(yè)中使用,用戶需要顯式地指定相應(yīng) FAT JAR [2],JAR 包可以通過如下方式指定:
# 注意:file:///前綴不能省略env.add_jars("file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
即使是 PyFlink DataStream API 作業(yè),也推薦使用 Table & SQL connector 中打包出來的 FAT JAR,可以避免遞歸依賴的問題。
t_env = StreamTableEnvironment.create(stream_execution_environment=env)t_env.execute_sql("""CREATE TABLE my_source (a INT,b VARCHAR) WITH ('connector' = 'datagen','number-of-rows' = '10')""")ds = t_env.to_append_stream(t_env.from_path('my_source'),Types.ROW([Types.INT(), Types.STRING()]))
由于當(dāng)前 PyFlink DataStream API 中 built-in 支持的 connector 種類還比較少,推薦通過這種方式來創(chuàng)建 PyFlink DataStream API 作業(yè)中使用的數(shù)據(jù)源表,這樣的話,所有 PyFlink Table API 中可以使用的 connector,都可以在 PyFlink DataStream API 作業(yè)中使用。
需要注意的是,TableEnvironment 需要通過以下方式創(chuàng)建 StreamTableEnvironment.create(stream_execution_environment=env),以使得 PyFlink DataStream API 與 PyFlink Table API 共享同一個(gè) StreamExecutionEnvironment 對象。
def split(s):splits = s[1].split("|")for sp in splits:yield s[0], spds = ds.map(lambda i: (i[0] + 1, i[1])) \.flat_map(split) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: (i[0] + j[0], i[1]))
■ 5)寫出結(jié)果數(shù)據(jù)
ds.print()
serialization_schema = JsonRowSerializationSchema.builder() \.with_type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()kafka_producer = FlinkKafkaProducer(topic='test_sink_topic',serialization_schema=serialization_schema,producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})ds.add_sink(kafka_producer)
JDBC、Kafka connector 當(dāng)前沒有包含在 Flink 官方提供的發(fā)行包中,如果需要在 PyFlink 作業(yè)中使用,用戶需要顯式地指定相應(yīng) FAT JAR,比如 Kafka connector 可以使用 JAR 包 [2],JAR 包可以通過如下方式指定:
# 注意:file:///前綴不能省略env.add_jars("file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
推薦使用 Table & SQL connector 中打包出來的 FAT JAR,可以避免遞歸依賴的問題。
# 寫法一:ds類型為Types.ROWdef split(s):splits = s[1].split("|")for sp in splits:yield Row(s[0], sp)ds = ds.map(lambda i: (i[0] + 1, i[1])) \.flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: Row(i[0] + j[0], i[1]))# 寫法二:ds類型為Types.TUPLEdef split(s):splits = s[1].split("|")for sp in splits:yield s[0], spds = ds.map(lambda i: (i[0] + 1, i[1])) \.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: (i[0] + j[0], i[1]))# 將ds寫出到sinkt_env.execute_sql("""CREATE TABLE my_sink (a INT,b VARCHAR) WITH ('connector' = 'print')""")table = t_env.from_data_stream(ds)table_result = table.execute_insert("my_sink")
需要注意的是,t_env.from_data_stream(ds) 中的 ds 對象的 result type 類型必須是復(fù)合類型 Types.ROW 或者 Types.TUPLE,這也就是為什么需要顯式聲明作業(yè)計(jì)算邏輯中 flat_map 操作的 result 類型
作業(yè)的提交,需要通過 PyFlink Table API 中提供的作業(yè)提交方式進(jìn)行提交
由于當(dāng)前 PyFlink DataStream API 中支持的 connector 種類還比較少,推薦通過這種方式來定義 PyFlink DataStream API 作業(yè)中使用的數(shù)據(jù)源表,這樣的話,所有 PyFlink Table API 中可以使用的 connector,都可以作為 PyFlink DataStream API 作業(yè)的 sink。
from pyflink.common.typeinfo import Typesfrom pyflink.datastream import StreamExecutionEnvironmentdef data_stream_api_demo():env = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(4)ds = env.from_collection(collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],type_info=Types.ROW([Types.INT(), Types.STRING()]))def split(s):splits = s[1].split("|")for sp in splits:yield s[0], spds = ds.map(lambda i: (i[0] + 1, i[1])) \.flat_map(split) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: (i[0] + j[0], i[1]))ds.print()env.execute()if __name__ == '__main__':data_stream_api_demo()
3> (2, 'aaa')3> (2, 'bb')3> (6, 'aaa')3> (4, 'a')3> (5, 'bb')3> (7, 'a')
from pyflink.common.typeinfo import Typesfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.table import StreamTableEnvironmentdef data_stream_api_demo():env = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(stream_execution_environment=env)env.set_parallelism(4)t_env.execute_sql("""CREATE TABLE my_source (a INT,b VARCHAR) WITH ('connector' = 'datagen','number-of-rows' = '10')""")ds = t_env.to_append_stream(t_env.from_path('my_source'),Types.ROW([Types.INT(), Types.STRING()]))def split(s):splits = s[1].split("|")for sp in splits:yield s[0], spds = ds.map(lambda i: (i[0] + 1, i[1])) \.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: (i[0] + j[0], i[1]))t_env.execute_sql("""CREATE TABLE my_sink (a INT,b VARCHAR) WITH ('connector' = 'print')""")table = t_env.from_data_stream(ds)table_result = table.execute_insert("my_sink")# 1)等待作業(yè)執(zhí)行結(jié)束,用于local執(zhí)行,否則可能作業(yè)尚未執(zhí)行結(jié)束,該腳本已退出,會(huì)導(dǎo)致minicluster過早退出# 2)當(dāng)作業(yè)通過detach模式往remote集群提交時(shí),比如YARN/Standalone/K8s等,需要移除該方法table_result.wait()if __name__ == '__main__':data_stream_api_demo()
三、作業(yè)提交
Flink 提供了多種作業(yè)部署方式,比如 local、standalone、YARN、K8s 等,PyFlink 也支持上述作業(yè)部署方式,請參考 Flink 官方文檔 [3],了解更多詳細(xì)信息。
local
standalone
YARN Per-Job
K8s application mode
參數(shù)說明
| 參數(shù)名 | 用途描述 | 示例 |
|---|---|---|
-py / --python | 指定作業(yè)的入口文件 | -py file:///path/to/table_api_demo.py |
-pym / --pyModule | 指定作業(yè)的 entry module,功能和--python類似,可用于當(dāng)作業(yè)的 Python 文件為 zip 包,無法通過--python 指定時(shí),相比--python 來說,更通用 | -pym table_api_demo -pyfs file:///path/to/table_api_demo.py |
-pyfs / --pyFiles | 指定一個(gè)到多個(gè) Python 文件(.py/.zip等,逗號分割),這些 Python 文件在作業(yè)執(zhí)行的時(shí)候,會(huì)放到 Python 進(jìn)程的 PYTHONPATH 中,可以在 Python 自定義函數(shù)中訪問到 | -pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip |
-pyarch / --pyArchives | 指定一個(gè)到多個(gè)存檔文件(逗號分割),這些存檔文件,在作業(yè)執(zhí)行的時(shí)候,會(huì)被解壓之后,放到 Python 進(jìn)程的 workspace 目錄,可以通過相對路徑的方式進(jìn)行訪問 | -pyarch file:///path/to/venv.zip |
-pyexec / --pyExecutable | 指定作業(yè)執(zhí)行的時(shí)候,Python 進(jìn)程的路徑 | -pyarch file:///path/to/venv.zip -pyexec venv.zip/venv/bin/python3 |
-pyreq / --pyRequirements | 指定 requirements 文件,requirements 文件中定義了作業(yè)的依賴 | -pyreq requirements.txt |
四、問題排查
client 端異常輸出
Traceback (most recent call last):File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 50, in <module>data_stream_api_demo()File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 45, in data_stream_api_demotable_result = table.execute_insert("my_")File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/table/table.py", line 864, in execute_insertreturn TableResult(self._j_table.executeInsert(table_path, overwrite))File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__return_value = get_return_value(File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 162, in decoraise java_exceptionpyflink.util.exceptions.TableException: Sink `default_catalog`.`default_database`.`my_` does not existsat org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247)at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)at scala.collection.Iterator$class.foreach(Iterator.scala:891)at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)at scala.collection.AbstractIterable.foreach(Iterable.scala:54)at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)at scala.collection.AbstractTraversable.map(Traversable.scala:104)at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:159)at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)at java.lang.Thread.run(Thread.java:748)Process finished with exit code 1
TaskManager 日志文件
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _executeresponse = task()File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>lambda: self.create_worker().do_instruction(request), request)File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instructionreturn getattr(self, request_type)(File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundlebundle_processor.process_bundle(instruction_id))File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 977, in process_bundleinput_op_by_transform_id[element.transform_id].process_encoded(File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encodedself.output(decoded_value)File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.outputFile "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.outputFile "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receiveFile "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.processFile "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 85, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.processFile "pyflink/fn_execution/coder_impl_fast.pyx", line 83, in pyflink.fn_execution.coder_impl_fast.DataStreamFlatMapCoderImpl.encode_to_streamFile "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 26, in splitimport cv2ModuleNotFoundError: No module named 'cv2'at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)... 1 more
local 模式下,TaskManager 的 log 位于 PyFlink 的安裝目錄下:site-packages/pyflink/log/,也可以通過如下命令找到:
>>> import pyflink
>>> print(pyflink.path)
['/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink'],則log文件位于/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/log目錄下
自定義日志
def split(s):import logginglogging.info("s: " + str(s))splits = s[1].split("|")for sp in splits:yield s[0], sp
遠(yuǎn)程調(diào)試
def split(s):import pydevd_pycharmpydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True, stderrToServer=True)splits = s[1].split("|")for sp in splits:yield s[0], sp
社區(qū)用戶郵件列表
釘釘群
五、總結(jié)
引用鏈接
▼ 關(guān)注「Flink 中文社區(qū)」,獲取更多技術(shù)干貨 ▼
戳我,查看更多技術(shù)干貨!
