PyFlink 開發(fā) | PyFlink 開發(fā)環(huán)境利器:Zeppelin Notebook
摘要:PyFlink 作為 Flink 的 Python 語言入口,其 Python 語言的確很簡單易學(xué),但是 PyFlink 的開發(fā)環(huán)境卻不容易搭建,稍有不慎,PyFlink 環(huán)境就會亂掉,而且很難排查原因。今天給大家介紹一款能夠幫你解決這些問題的 PyFlink 開發(fā)環(huán)境利器:Zeppelin Notebook。主要內(nèi)容為:
準備工作 搭建 PyFlink 環(huán)境 總結(jié)與未來
GitHub 地址 
■ 1. 能夠在 PyFlink 客戶端使用第三方 Python 庫,比如 matplotlib:

■ 2. 可以在 PyFlink UDF 里使用第三方 Python 庫,如:

一、準備工作
Step 1.
把 flink-Python-*.jar 這個 jar 包 copy 到 Flink 的 lib 文件夾下;
把 opt/Python 這個文件夾 copy 到 Flink 的 lib 文件夾下。
Step 3.
miniconda:
https://docs.conda.io/en/latest/miniconda.html
conda pack:
https://conda.github.io/conda-pack/
mamba:
https://github.com/mamba-org/mamba
二、搭建 PyFlink 環(huán)境
Step 1. 制作 JobManager 上的 PyFlink Conda 環(huán)境
某個版本的 Python (這里用的是 3.7)
apache-flink (這里用的是 1.13.1)
jupyter,grpcio,protobuf (這三個包是 Zeppelin 需要的)
%sh
# make sure you have conda and momba installed.
# install miniconda: https://docs.conda.io/en/latest/miniconda.html
# install mamba: https://github.com/mamba-org/mamba
echo "name: pyflink_env
channels:
- conda-forge
- defaults
dependencies:
- Python=3.7
- pip
- pip:
- apache-flink==1.13.1
- jupyter
- grpcio
- protobuf
- matplotlib
- pandasql
- pandas
- scipy
- seaborn
- plotnine
" > pyflink_env.yml
mamba env remove -n pyflink_env
mamba env create -f pyflink_env.yml
%sh
rm -rf pyflink_env.tar.gz
conda pack --ignore-missing-files -n pyflink_env -o pyflink_env.tar.gz
hadoop fs -rmr /tmp/pyflink_env.tar.gz
hadoop fs -put pyflink_env.tar.gz /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_env.tar.gz
Step 2. 制作 TaskManager 上的 PyFlink Conda 環(huán)境
某個版本的 Python (這里用的是 3.7)
apache-flink (這里用的是 1.13.1)
echo "name: pyflink_tm_env
channels:
- conda-forge
- defaults
dependencies:
- Python=3.7
- pip
- pip:
- apache-flink==1.13.1
- pandas
" > pyflink_tm_env.yml
mamba env remove -n pyflink_tm_env
mamba env create -f pyflink_tm_env.yml
%sh
rm -rf pyflink_tm_env.zip
conda pack --ignore-missing-files --zip-symlinks -n pyflink_tm_env -o pyflink_tm_env.zip
hadoop fs -rmr /tmp/pyflink_tm_env.zip
hadoop fs -put pyflink_tm_env.zip /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_tm_env.zip
Step 3. 在 PyFlink 中使用 Conda 環(huán)境
flink.execution.mode 為 yarn-application, 本文所講的方法只適用于 yarn-application 模式;
指定 yarn.ship-archives,zeppelin.pyflink.Python 以及 zeppelin.interpreter.conda.env.name 來配置 JobManager 側(cè)的 PyFlink Conda 環(huán)境;
指定 Python.archives 以及 Python.executable 來指定 TaskManager 側(cè)的 PyFlink Conda 環(huán)境;
指定其他可選的 Flink 配置,比如這里的 flink.jm.memory 和 flink.tm.memory。
%flink.conf
flink.execution.mode yarn-application
yarn.ship-archives /mnt/disk1/jzhang/zeppelin/pyflink_env.tar.gz
zeppelin.pyflink.Python pyflink_env.tar.gz/bin/Python
zeppelin.interpreter.conda.env.name pyflink_env.tar.gz
Python.archives hdfs:///tmp/pyflink_tm_env.zip
Python.executable pyflink_tm_env.zip/bin/Python3.7
flink.jm.memory 2048
flink.tm.memory 2048
下面的例子里,可以在 PyFlink 客戶端 (JobManager 側(cè)) 使用上面創(chuàng)建的 JobManager 側(cè)的 Conda 環(huán)境,比如下邊使用了 Matplotlib。

下面的例子是在 PyFlink UDF 里使用上面創(chuàng)建的 TaskManager 側(cè) Conda 環(huán)境里的庫,比如下面在 UDF 里使用 Pandas。 
三、總結(jié)與未來
目前我們需要創(chuàng)建 2 個 Conda env ,原因是 Zeppelin 支持 tar.gz 格式,而 Flink 只支持 zip 格式。等后期兩邊統(tǒng)一之后,只要創(chuàng)建一個 Conda env 就可以;
apache-flink 現(xiàn)在包含了 Flink 的 jar 包,這就導(dǎo)致打出來的 Conda env 特別大,Yarn container 在初始化的時候耗時會比較長,這個需要 Flink 社區(qū)提供一個輕量級的 Python 包 (不包含 Flink jar 包),就可以大大減小 Conda env 的大小。
伴隨著海量數(shù)據(jù)的沖擊,數(shù)據(jù)處理分析能力在業(yè)務(wù)中的價值與日俱增,各行各業(yè)對于數(shù)據(jù)處理時效性的探索也在不斷深入,作為主打?qū)崟r計算的計算引擎 - Apache Flink 應(yīng)運而生。
為給行業(yè)帶來更多實時計算賦能實踐的思路,鼓勵廣大熱愛技術(shù)的開發(fā)者加深對 Flink 的掌握,Apache Flink 社區(qū)聯(lián)手阿里云、英特爾、阿里巴巴人工智能治理與可持續(xù)發(fā)展實驗室 (AAIG)、Occlum 聯(lián)合舉辦 "第三屆 Apache Flink 極客挑戰(zhàn)賽暨 AAIG CUP" 活動,即日起正式啟動。

▼ 掃描二維碼,了解更多賽事信息 ▼
戳我,了解 Flink 挑戰(zhàn)賽信息~
