Flink入門 02.安裝部署
Flink支持多種安裝模式
Local—本地單機模式,學(xué)習(xí)測試時使用
Standalone—獨立集群模式,F(xiàn)link自帶集群,開發(fā)測試環(huán)境使用
StandaloneHA—獨立集群高可用模式,F(xiàn)link自帶集群,開發(fā)測試環(huán)境使用
On Yarn—計算資源統(tǒng)一由Hadoop YARN管理,生產(chǎn)環(huán)境使用
1 Local本地模式
1.1 原理

Flink程序由JobClient進(jìn)行提交
JobClient將作業(yè)提交給JobManager
JobManager負(fù)責(zé)協(xié)調(diào)資源分配和作業(yè)執(zhí)行。資源分配完成后,任務(wù)將提交給相應(yīng)的TaskManager
TaskManager啟動一個線程以開始執(zhí)行。TaskManager會向JobManager報告狀態(tài)更改,如開始執(zhí)行,正在進(jìn)行或已完成。
作業(yè)執(zhí)行完成后,結(jié)果將發(fā)送回客戶端(JobClient)
1.2 操作
下載安裝包
https://archive.apache.org/dist/flink/
上傳flink-1.13.2-bin-scala_2.11.tgz到cdh68的指定目錄
解壓
[song@cdh68 ~]$ tar -zxvf /data20/download/flink-1.13.2-bin-scala_2.11.tgz -C ~/app/如果出現(xiàn)權(quán)限問題,需要修改權(quán)限
[song@cdh68 ~]$ chmod -R 755 /home/song/app/flink-1.13.2配置環(huán)境變量
[song@cdh68 ~]$ cat .bashrc
# set flink env
export FLINK_HOME=/home/song/app/flink-1.13.2
export PATH=$FLINK_HOME/bin:$PATH
[song@cdh68 ~]$ source .bashrc
1.3 測試
準(zhǔn)備文件
[song@cdh68 ~]$ vim data/flink/words.txt
Hello Java
Hello Scala
Hello Flink啟動Flink本地集群
[song@cdh68 ~]$ app/flink-1.13.2/bin/start-cluster.sh
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Starting cluster.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Starting standalonesession daemon on host cdh68.bigdata.com.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Starting taskexecutor daemon on host cdh68.bigdata.com.雖然配置了環(huán)境變量,但是最好使用絕對路徑啟動,因為好多大數(shù)據(jù)組件,都有start-cluster.sh命令,避免沖突
使用jps可以查看到下面兩個進(jìn)程
TaskManagerRunner StandaloneSessionClusterEntrypoint 訪問Flink的Web UI
http://cdh68:8081

slot在Flink里面可以認(rèn)為是資源組,F(xiàn)link是通過將任務(wù)分成子任務(wù)并且將這些子任務(wù)分配到slot來并行執(zhí)行程序。
執(zhí)行官方示例
[song@cdh68 ~]$ flink run app/flink-1.13.2/examples/batch/WordCount.jar --input data/flink/words.txt --output data/flink/wordOut.txt
[song@cdh68 ~]$ cat data/flink/wordOut.txt
flink 1
hello 3
java 1
scala 1--output不用提前創(chuàng)建
停止Flink
[song@cdh68 ~]$ app/flink-1.13.2/bin/stop-cluster.sh
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Stopping taskexecutor daemon (pid: 132331) on host cdh68.bigdata.com.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Stopping standalonesession daemon (pid: 131975) on host cdh68.bigdata.com.啟動shell交互式窗口(目前所有Scala 2.12版本的安裝包暫時都不支持 Scala Shell)
[song@cdh68 ~]$ start-scala-shell.sh local執(zhí)行如下命令:
scala> benv.readTextFile("/home/song/data/flink/words.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()
(Flink,1)
(Hello,3)
(Java,1)
(Scala,1)退出shell:
scala> :q
good bye ..
2 Standalone獨立集群模式
2.1 原理

client客戶端提交任務(wù)給JobManager
JobManager負(fù)責(zé)申請任務(wù)運行所需要的資源并管理任務(wù)和資源,
JobManager分發(fā)任務(wù)給TaskManager執(zhí)行
TaskManager定期向JobManager匯報狀態(tài)
2.2 操作
集群規(guī)劃:
服務(wù)器: cdh68(Master + Slave): JobManager + TaskManager
服務(wù)器: cdh69(Slave): TaskManager
服務(wù)器: cdh70(Slave): TaskManager
修改flink-conf.yaml
[song@cdh68 ~]$ vim app/flink-1.13.2/conf/flink-conf.yaml修改內(nèi)容如下:
jobmanager.rpc.address: cdh68
jobmanager.memory.process.size: 4096m
#taskmanager.memory.process.size: 1728m
taskmanager.memory.flink.size: 16384m
taskmanager.numberOfTaskSlots: 4
web.submit.enable: true
#歷史服務(wù)器
jobmanager.archive.fs.dir: hdfs://nameservice1/flink/completed-jobs/
historyserver.web.address: cdh68
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://nameservice1/flink/completed-jobs/修改masters
[song@cdh68 ~]$ vim app/flink-1.13.2/conf/masters
cdh68:8081修改workers
[song@cdh68 ~]$ vim app/flink-1.13.2/conf/workers
cdh68
cdh69
cdh70添加HADOOP_CONF_DIR環(huán)境變量
[song@cdh68 ~]$ vim .bashrc
# set hadoop env
export HADOOP_CONF_DIR=/etc/hadoop/conf
[song@cdh68 ~]$ source .bashrc
[song@cdh68 ~]$ echo $HADOOP_CONF_DIR
/etc/hadoop/conf每臺節(jié)點都要添加
分發(fā)
[song@cdh68 ~]$ scp -r app/flink-1.13.2/ cdh69:~/app/
[song@cdh68 ~]$ scp -r app/flink-1.13.2/ cdh70:~/app/
2.3 測試
啟動集群,在cdh68上執(zhí)行如下命令
[song@cdh68 ~]$ app/flink-1.13.2/bin/start-cluster.sh
Starting standalonesession daemon on host cdh68.bigdata.com.
Starting taskexecutor daemon on host cdh68.bigdata.com.
Starting taskexecutor daemon on host cdh69.bigdata.com.
Starting taskexecutor daemon on host cdh70.bigdata.com.
# 或者單獨啟動
[song@cdh68 ~]$ app/flink-1.13.2/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
[song@cdh68 ~]$ app/flink-1.13.2/bin/taskmanager.sh start|start-foreground|stop|stop-all啟動歷史服務(wù)器(沒啥用,直接看Flink Web UI即可)
[song@cdh68 ~]$ app/flink-1.13.2/bin/historyserver.sh start
Starting historyserver daemon on host cdh68.bigdata.com.訪問Flink UI界面或使用jps查看
http://cdh68:8081/#/overview
TaskManager界面:可以查看到當(dāng)前Flink集群中有多少個TaskManager,每個TaskManager的slots、內(nèi)存、CPU Core是多少

執(zhí)行官方測試案例
[song@cdh68 ~]$ hdfs dfs -put data/flink/words.txt /data/wordcount/input
[song@cdh68 ~]$ flink run app/flink-1.13.2/examples/batch/WordCount.jar \
--input hdfs://nameservice1/data/wordcount/input/words.txt \
--output hdfs://nameservice1/data/wordcount/output/wordCount.txt
[song@cdh68 ~]$ hdfs dfs -cat /data/wordcount/output/wordCount.txt
flink 1
hello 3
java 1
scala 1停止Flink集群
[song@cdh68 ~]$ app/flink-1.13.2/bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 164979) on host cdh68.bigdata.com.
Stopping taskexecutor daemon (pid: 147217) on host cdh69.bigdata.com.
Stopping taskexecutor daemon (pid: 226601) on host cdh70.bigdata.com.
Stopping standalonesession daemon (pid: 164456) on host cdh68.bigdata.com.
3 Standalone-HA高可用集群模式
此處不想多介紹,生產(chǎn)環(huán)境一般使用flink on yarn
3.1 原理

從之前的架構(gòu)中我們可以很明顯的發(fā)現(xiàn) JobManager 有明顯的單點問題(SPOF,single point of failure)。JobManager 肩負(fù)著任務(wù)調(diào)度以及資源分配,一旦 JobManager 出現(xiàn)意外,其后果可想而知。
在 Zookeeper 的幫助下,一個 Standalone的Flink集群會同時有多個活著的 JobManager,其中只有一個處于工作狀態(tài),其他處于 Standby 狀態(tài)。當(dāng)工作中的 JobManager 失去連接后(如宕機或 Crash),Zookeeper 會從 Standby 中選一個新的 JobManager 來接管 Flink 集群。
3.2 操作
集群規(guī)劃
服務(wù)器: node1(Master + Slave): JobManager + TaskManager
服務(wù)器: node2(Master + Slave): JobManager + TaskManager
服務(wù)器: node3(Slave): TaskManager
啟動ZooKeeper
zkServer.sh status
zkServer.sh stop
zkServer.sh start啟動HDFS
/export/serves/hadoop/sbin/start-dfs.sh停止Flink集群
/export/server/flink/bin/stop-cluster.sh修改flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
# 在Standalone基礎(chǔ)上,增加如下內(nèi)容
# 開啟HA,使用文件系統(tǒng)作為快照存儲
state.backend: filesystem
# 啟用檢查點,可以將快照保存到HDFS
state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
# 使用zookeeper搭建高可用
high-availability: zookeeper
# 存儲JobManager的元數(shù)據(jù)到HDFS
high-availability.storageDir: hdfs://node1:8020/flink/ha/
# 配置ZK集群地址
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181修改masters
node1:8081
node2:8081同步
scp -r /export/server/flink/conf/flink-conf.yaml node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/flink-conf.yaml node3:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node3:/export/server/flink/conf/修改node2上的flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
jobmanager.rpc.address: node2重新啟動Flink集群,node1上執(zhí)行
/export/server/flink/bin/stop-cluster.sh
/export/server/flink/bin/start-cluster.sh
使用jps命令查看
發(fā)現(xiàn)沒有Flink相關(guān)進(jìn)程被啟動
查看日志
cat /export/server/flink/log/flink-root-standalonesession-0-node1.log發(fā)現(xiàn)如下錯誤:

因為在Flink1.8版本后,Flink官方提供的安裝包里沒有整合HDFS的jar
下載jar包并在Flink的lib目錄下放入該jar包并分發(fā)使Flink能夠支持對Hadoop的操作
下載地址:
https://flink.apache.org/downloads.html

放入lib目錄
cd /export/server/flink/lib
分發(fā)
for i in {2..3};
do
scp -r flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node$i:$PWD;
done重新啟動Flink集群,node1上執(zhí)行
/export/server/flink/bin/start-cluster.sh使用jps命令查看,發(fā)現(xiàn)三臺機器已經(jīng)ok
3.3 測試
訪問WebUI
http://node1:8081/#/job-manager/config
http://node2:8081/#/job-manager/config
執(zhí)行wc
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jarkill掉其中一個master
重新執(zhí)行wc,還是可以正常執(zhí)行
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar停止集群
/export/server/flink/bin/stop-cluster.sh
4 Flink On Yarn模式
4.1 原理
4.1.1 為什么使用Flink On Yarn?
在實際開發(fā)中,使用Flink時,更多的使用方式是Flink On Yarn模式,原因如下:
Yarn的資源可以按需使用,提高集群的資源利用率
Yarn的任務(wù)有優(yōu)先級,根據(jù)優(yōu)先級運行作業(yè)
基于Yarn調(diào)度系統(tǒng),能夠自動化地處理各個角色的 Failover(容錯)
JobManager 進(jìn)程和 TaskManager 進(jìn)程都由 Yarn NodeManager 監(jiān)控 如果 JobManager 進(jìn)程異常退出,則 Yarn ResourceManager 會重新調(diào)度 JobManager 到其他機器 如果 TaskManager 進(jìn)程異常退出,JobManager 會收到消息并重新向 Yarn ResourceManager 申請資源,重新啟動 TaskManager
4.1.2 Flink如何和Yarn進(jìn)行交互?


Client上傳jar包和配置文件到HDFS集群上
Client向Yarn ResourceManager提交任務(wù)并申請資源
ResourceManager分配Container資源并啟動ApplicationMaster,然后AppMaster加載Flink的Jar包和配置構(gòu)建環(huán)境,啟動JobManager
JobManager和ApplicationMaster運行在同一個container上。
一旦他們被成功啟動,AppMaster就知道JobManager的地址(AM它自己所在的機器)。
它就會為TaskManager生成一個新的Flink配置文件(他們就可以連接到JobManager)。
這個配置文件也被上傳到HDFS上。
此外,AppMaster容器也提供了Flink的web服務(wù)接口。
YARN所分配的所有端口都是臨時端口,這允許用戶并行執(zhí)行多個Flink
ApplicationMaster向ResourceManager申請工作資源,NodeManager加載Flink的Jar包和配置構(gòu)建環(huán)境并啟動TaskManager
TaskManager啟動后向JobManager發(fā)送心跳包,并等待JobManager向其分配任務(wù)
4.1.3 三種方式
Session模式


特點:需要事先申請資源,啟動JobManager和TaskManger
優(yōu)點:不需要每次遞交作業(yè)申請資源,而是使用已經(jīng)申請好的資源,從而提高執(zhí)行效率
缺點:作業(yè)執(zhí)行完成以后,資源不會被釋放,因此一直會占用系統(tǒng)資源
應(yīng)用場景:適合作業(yè)遞交比較頻繁的場景,小作業(yè)比較多的場景
Per-Job模式


特點:每次遞交作業(yè)都需要申請一次資源
優(yōu)點:作業(yè)運行完成,資源會立刻被釋放,不會一直占用系統(tǒng)資源
缺點:每次遞交作業(yè)都需要申請資源,會影響執(zhí)行效率,因為申請資源需要消耗時間
應(yīng)用場景:適合作業(yè)比較少的場景、大作業(yè)的場景
Application 模式
4.2 操作
Make sure that the
HADOOP_CLASSPATHenvironment variable is set up (it can be checked by runningecho $HADOOP_CLASSPATH). If not, set it up usingexport HADOOP_CLASSPATH=`hadoop classpath`
關(guān)閉yarn的內(nèi)存檢查
vim /export/server/hadoop/etc/hadoop/yarn-site.xml添加:
說明:
是否啟動一個線程檢查每個任務(wù)正使用的虛擬內(nèi)存量,如果任務(wù)超出分配值,則直接將其殺掉,默認(rèn)是true。
在這里面我們需要關(guān)閉,因為對于flink使用yarn模式下,很容易內(nèi)存超標(biāo),這個時候yarn會自動殺掉job
同步
scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node2:/export/server/hadoop/etc/hadoop/yarn-site.xml
scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node3:/export/server/hadoop/etc/hadoop/yarn-site.xml重啟yarn
/export/server/hadoop/sbin/stop-yarn.sh
/export/server/hadoop/sbin/start-yarn.sh
4.3 測試
4.3.1 Session模式
yarn-session.sh(開辟資源) + flink run(提交任務(wù))
在yarn上啟動一個Flink會話,cdh68上執(zhí)行以下命令
[song@cdh68 ~]$ yarn-session.sh --detached
2021-08-30 18:13:47,565 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED
JobManager Web Interface: http://cdh69.bigdata.com:33201
2021-08-30 18:13:54,039 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1625993468363_0037
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1625993468363_0037
Note that killing Flink might not clean up all job artifacts and temporary files.查看UI界面
http://cdh68:8088/cluster

使用flink run提交任務(wù):
[song@cdh68 ~]$ hdfs dfs -rm /data/wordcount/output/wordCount.txt
[song@cdh68 ~]$ flink run -t yarn-session \
-Dyarn.application.id=application_1625993468363_0037 \
app/flink-1.13.2/examples/batch/WordCount.jar \
--input hdfs://nameservice1/data/wordcount/input/words.txt \
--output hdfs://nameservice1/data/wordcount/output/wordCount.txt
[song@cdh68 ~]$ hdfs dfs -cat /data/wordcount/output/wordCount.txt
flink 1
hello 3
java 1
scala 1運行完之后可以繼續(xù)運行其他的小任務(wù)
通過上方的ApplicationMaster可以進(jìn)入Flink的管理界面


關(guān)閉yarn-session:
[song@cdh68 ~]$ yarn application -kill application_1625993468363_0037
The session mode will create a hidden YARN properties file in
/tmp/.yarn-properties-<username>, which will be picked up for cluster discovery by the command line interface when submitting a job.[song@cdh68 ~]$ rm -rf /tmp/.yarn-properties-song
4.3.2 Per-Job分離模式
直接提交job
[song@cdh68 ~]$ hdfs dfs -rm /data/wordcount/output/wordCount.txt
[song@cdh68 ~]$ flink run -t yarn-per-job \
-yjm 4096 \
-ytm 16384 \
-ys 4 \
/home/song/app/flink-1.13.2/examples/batch/WordCount.jar \
--input hdfs://nameservice1/data/wordcount/input/words.txt \
--output hdfs://nameservice1/data/wordcount/output/wordCount.txt查看UI界面
http://cdh68:8088/cluster


注意
在之前版本中如果使用的是flink on yarn方式,想切換回standalone模式的話,如果報錯需要刪除:
[song@cdh68 ~]$ rm -rf /tmp/.yarn-properties-song因為默認(rèn)查找當(dāng)前yarn集群中已有的yarn-session信息中的jobmanager
4.3.3 Application分離模式
直接提交job
[song@cdh68 ~]$ hdfs dfs -rm /data/wordcount/output/wordCount.txt
[song@cdh68 ~]$ flink run-application -t yarn-application \
-yjm 4096 \
-ytm 16384 \
-ys 4 \
/home/song/app/flink-1.13.2/examples/batch/WordCount.jar \
--input hdfs://nameservice1/data/wordcount/input/words.txt \
--output hdfs://nameservice1/data/wordcount/output/wordCount.txt查看UI界面
http://cdh68:8088/cluster

