<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink入門 02.安裝部署

          共 15262字,需瀏覽 31分鐘

           ·

          2021-09-04 23:47

          Flink支持多種安裝模式

          • Local—本地單機模式,學(xué)習(xí)測試時使用

          • Standalone—獨立集群模式,F(xiàn)link自帶集群,開發(fā)測試環(huán)境使用

          • StandaloneHA—獨立集群高可用模式,F(xiàn)link自帶集群,開發(fā)測試環(huán)境使用

          • On Yarn—計算資源統(tǒng)一由Hadoop YARN管理,生產(chǎn)環(huán)境使用

          1 Local本地模式

          1.1 原理

          1. Flink程序由JobClient進(jìn)行提交

          2. JobClient將作業(yè)提交給JobManager

          3. JobManager負(fù)責(zé)協(xié)調(diào)資源分配和作業(yè)執(zhí)行。資源分配完成后,任務(wù)將提交給相應(yīng)的TaskManager

          4. TaskManager啟動一個線程以開始執(zhí)行。TaskManager會向JobManager報告狀態(tài)更改,如開始執(zhí)行,正在進(jìn)行或已完成。

          5. 作業(yè)執(zhí)行完成后,結(jié)果將發(fā)送回客戶端(JobClient)

          1.2 操作

          1. 下載安裝包

            https://archive.apache.org/dist/flink/

          2. 上傳flink-1.13.2-bin-scala_2.11.tgz到cdh68的指定目錄

          3. 解壓

            [song@cdh68 ~]$ tar -zxvf /data20/download/flink-1.13.2-bin-scala_2.11.tgz -C ~/app/
          4. 如果出現(xiàn)權(quán)限問題,需要修改權(quán)限

            [song@cdh68 ~]$ chmod -R 755 /home/song/app/flink-1.13.2
          5. 配置環(huán)境變量

            [song@cdh68 ~]$ cat .bashrc 
            # set flink env
            export FLINK_HOME=/home/song/app/flink-1.13.2
            export PATH=$FLINK_HOME/bin:$PATH
            [song@cdh68 ~]$ source .bashrc 

          1.3 測試

          1. 準(zhǔn)備文件

            [song@cdh68 ~]$ vim data/flink/words.txt
            Hello Java
            Hello Scala
            Hello Flink
          2. 啟動Flink本地集群

            [song@cdh68 ~]$ app/flink-1.13.2/bin/start-cluster.sh 
            Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
            Starting cluster.
            Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
            Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
            Starting standalonesession daemon on host cdh68.bigdata.com.
            Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
            Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
            Starting taskexecutor daemon on host cdh68.bigdata.com.

            雖然配置了環(huán)境變量,但是最好使用絕對路徑啟動,因為好多大數(shù)據(jù)組件,都有start-cluster.sh命令,避免沖突

          3. 使用jps可以查看到下面兩個進(jìn)程

            • TaskManagerRunner
            • StandaloneSessionClusterEntrypoint
          4. 訪問Flink的Web UI

            http://cdh68:8081

            slot在Flink里面可以認(rèn)為是資源組,F(xiàn)link是通過將任務(wù)分成子任務(wù)并且將這些子任務(wù)分配到slot來并行執(zhí)行程序。

          5. 執(zhí)行官方示例

            [song@cdh68 ~]$ flink run app/flink-1.13.2/examples/batch/WordCount.jar --input data/flink/words.txt --output data/flink/wordOut.txt
            [song@cdh68 ~]$ cat data/flink/wordOut.txt 
            flink 1
            hello 3
            java 1
            scala 1

            --output不用提前創(chuàng)建

          6. 停止Flink

            [song@cdh68 ~]$ app/flink-1.13.2/bin/stop-cluster.sh 
            Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
            Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
            Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
            Stopping taskexecutor daemon (pid: 132331) on host cdh68.bigdata.com.
            Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
            Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
            Stopping standalonesession daemon (pid: 131975) on host cdh68.bigdata.com.
          7. 啟動shell交互式窗口(目前所有Scala 2.12版本的安裝包暫時都不支持 Scala Shell)

            [song@cdh68 ~]$ start-scala-shell.sh local

            執(zhí)行如下命令:

            scala> benv.readTextFile("/home/song/data/flink/words.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()
            (Flink,1)
            (Hello,3)
            (Java,1)
            (Scala,1)

            退出shell:

            scala> :q
             good bye ..

          2 Standalone獨立集群模式

          2.1 原理

          1. client客戶端提交任務(wù)給JobManager

          2. JobManager負(fù)責(zé)申請任務(wù)運行所需要的資源并管理任務(wù)和資源,

          3. JobManager分發(fā)任務(wù)給TaskManager執(zhí)行

          4. TaskManager定期向JobManager匯報狀態(tài)

          2.2 操作

          1. 集群規(guī)劃:

            服務(wù)器: cdh68(Master + Slave): JobManager + TaskManager

            服務(wù)器: cdh69(Slave): TaskManager

            服務(wù)器: cdh70(Slave): TaskManager

          2. 修改flink-conf.yaml

            [song@cdh68 ~]$ vim app/flink-1.13.2/conf/flink-conf.yaml 

            修改內(nèi)容如下:

            jobmanager.rpc.address: cdh68
            jobmanager.memory.process.size: 4096m

            #taskmanager.memory.process.size: 1728m
            taskmanager.memory.flink.size: 16384m
            taskmanager.numberOfTaskSlots: 4

            web.submit.enable: true

            #歷史服務(wù)器
            jobmanager.archive.fs.dir: hdfs://nameservice1/flink/completed-jobs/
            historyserver.web.address: cdh68
            historyserver.web.port: 8082
            historyserver.archive.fs.dir: hdfs://nameservice1/flink/completed-jobs/
          3. 修改masters

            [song@cdh68 ~]$ vim app/flink-1.13.2/conf/masters 
            cdh68:8081
          4. 修改workers

            [song@cdh68 ~]$ vim app/flink-1.13.2/conf/workers 
            cdh68
            cdh69
            cdh70
          5. 添加HADOOP_CONF_DIR環(huán)境變量

            [song@cdh68 ~]$ vim .bashrc 
            # set hadoop env
            export HADOOP_CONF_DIR=/etc/hadoop/conf
            [song@cdh68 ~]$ source .bashrc 
            [song@cdh68 ~]$ echo $HADOOP_CONF_DIR
            /etc/hadoop/conf

            每臺節(jié)點都要添加

          6. 分發(fā)

            [song@cdh68 ~]$ scp -r app/flink-1.13.2/ cdh69:~/app/
            [song@cdh68 ~]$ scp -r app/flink-1.13.2/ cdh70:~/app/

          2.3 測試

          1. 啟動集群,在cdh68上執(zhí)行如下命令

            [song@cdh68 ~]$ app/flink-1.13.2/bin/start-cluster.sh 
            Starting standalonesession daemon on host cdh68.bigdata.com.
            Starting taskexecutor daemon on host cdh68.bigdata.com.
            Starting taskexecutor daemon on host cdh69.bigdata.com.
            Starting taskexecutor daemon on host cdh70.bigdata.com.

            #
             或者單獨啟動
            [song@cdh68 ~]$ app/flink-1.13.2/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
            [song@cdh68 ~]$ app/flink-1.13.2/bin/taskmanager.sh start|start-foreground|stop|stop-all
          2. 啟動歷史服務(wù)器(沒啥用,直接看Flink Web UI即可)

            [song@cdh68 ~]$ app/flink-1.13.2/bin/historyserver.sh start
            Starting historyserver daemon on host cdh68.bigdata.com.
          3. 訪問Flink UI界面或使用jps查看

            http://cdh68:8081/#/overview

            TaskManager界面:可以查看到當(dāng)前Flink集群中有多少個TaskManager,每個TaskManager的slots、內(nèi)存、CPU Core是多少

          4. 執(zhí)行官方測試案例

            [song@cdh68 ~]$ hdfs dfs -put data/flink/words.txt /data/wordcount/input 
            [song@cdh68 ~]$ flink run app/flink-1.13.2/examples/batch/WordCount.jar \
            --input hdfs://nameservice1/data/wordcount/input/words.txt \
            --output hdfs://nameservice1/data/wordcount/output/wordCount.txt
            [song@cdh68 ~]$ hdfs dfs -cat /data/wordcount/output/wordCount.txt 
            flink 1
            hello 3
            java 1
            scala 1
          5. 停止Flink集群

            [song@cdh68 ~]$ app/flink-1.13.2/bin/stop-cluster.sh 
            Stopping taskexecutor daemon (pid: 164979) on host cdh68.bigdata.com.
            Stopping taskexecutor daemon (pid: 147217) on host cdh69.bigdata.com.
            Stopping taskexecutor daemon (pid: 226601) on host cdh70.bigdata.com.
            Stopping standalonesession daemon (pid: 164456) on host cdh68.bigdata.com.

          3 Standalone-HA高可用集群模式

          此處不想多介紹,生產(chǎn)環(huán)境一般使用flink on yarn

          3.1 原理

          從之前的架構(gòu)中我們可以很明顯的發(fā)現(xiàn) JobManager 有明顯的單點問題(SPOF,single point of failure)。JobManager 肩負(fù)著任務(wù)調(diào)度以及資源分配,一旦 JobManager 出現(xiàn)意外,其后果可想而知。

          在 Zookeeper 的幫助下,一個 Standalone的Flink集群會同時有多個活著的 JobManager,其中只有一個處于工作狀態(tài),其他處于 Standby 狀態(tài)。當(dāng)工作中的 JobManager 失去連接后(如宕機或 Crash),Zookeeper 會從 Standby 中選一個新的 JobManager 來接管 Flink 集群。

          3.2 操作

          1. 集群規(guī)劃

            • 服務(wù)器: node1(Master + Slave): JobManager + TaskManager

            • 服務(wù)器: node2(Master + Slave): JobManager + TaskManager

            • 服務(wù)器: node3(Slave): TaskManager

          2. 啟動ZooKeeper

            zkServer.sh status
            zkServer.sh stop
            zkServer.sh start
          3. 啟動HDFS

            /export/serves/hadoop/sbin/start-dfs.sh
          4. 停止Flink集群

            /export/server/flink/bin/stop-cluster.sh
          5. 修改flink-conf.yaml

            vim /export/server/flink/conf/flink-conf.yaml
            # 在Standalone基礎(chǔ)上,增加如下內(nèi)容

            #
             開啟HA,使用文件系統(tǒng)作為快照存儲
            state.backend: filesystem
            # 啟用檢查點,可以將快照保存到HDFS
            state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
            # 使用zookeeper搭建高可用
            high-availability: zookeeper
            # 存儲JobManager的元數(shù)據(jù)到HDFS
            high-availability.storageDir: hdfs://node1:8020/flink/ha/
            # 配置ZK集群地址
            high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
          6. 修改masters

            node1:8081
            node2:8081
          7. 同步

            scp -r /export/server/flink/conf/flink-conf.yaml node2:/export/server/flink/conf/
            scp -r /export/server/flink/conf/flink-conf.yaml node3:/export/server/flink/conf/
            scp -r /export/server/flink/conf/masters node2:/export/server/flink/conf/
            scp -r /export/server/flink/conf/masters node3:/export/server/flink/conf/
          8. 修改node2上的flink-conf.yaml

            vim /export/server/flink/conf/flink-conf.yaml
            jobmanager.rpc.address: node2
          9. 重新啟動Flink集群,node1上執(zhí)行

            /export/server/flink/bin/stop-cluster.sh
            /export/server/flink/bin/start-cluster.sh
          10. 使用jps命令查看

            發(fā)現(xiàn)沒有Flink相關(guān)進(jìn)程被啟動

          11. 查看日志

            cat /export/server/flink/log/flink-root-standalonesession-0-node1.log

            發(fā)現(xiàn)如下錯誤:

            因為在Flink1.8版本后,Flink官方提供的安裝包里沒有整合HDFS的jar

          12. 下載jar包并在Flink的lib目錄下放入該jar包并分發(fā)使Flink能夠支持對Hadoop的操作

            下載地址:

            https://flink.apache.org/downloads.html

            放入lib目錄

            cd /export/server/flink/lib

            分發(fā)

            for i in {2..3}; 
            do 
             scp -r flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node$i:$PWD; 
            done
          13. 重新啟動Flink集群,node1上執(zhí)行

            /export/server/flink/bin/start-cluster.sh
          14. 使用jps命令查看,發(fā)現(xiàn)三臺機器已經(jīng)ok

          3.3 測試

          1. 訪問WebUI

            http://node1:8081/#/job-manager/config

            http://node2:8081/#/job-manager/config

          2. 執(zhí)行wc

            /export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
          3. kill掉其中一個master

          4. 重新執(zhí)行wc,還是可以正常執(zhí)行

            /export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar 
          5. 停止集群

            /export/server/flink/bin/stop-cluster.sh

          4   Flink On Yarn模式

          4.1   原理

          4.1.1   為什么使用Flink On Yarn?

          在實際開發(fā)中,使用Flink時,更多的使用方式是Flink On Yarn模式,原因如下:

          1. Yarn的資源可以按需使用,提高集群的資源利用率

          2. Yarn的任務(wù)有優(yōu)先級,根據(jù)優(yōu)先級運行作業(yè)

          3. 基于Yarn調(diào)度系統(tǒng),能夠自動化地處理各個角色的 Failover(容錯)

            • JobManager 進(jìn)程和 TaskManager 進(jìn)程都由 Yarn NodeManager 監(jiān)控
            • 如果 JobManager 進(jìn)程異常退出,則 Yarn ResourceManager 會重新調(diào)度 JobManager 到其他機器
            • 如果 TaskManager 進(jìn)程異常退出,JobManager 會收到消息并重新向 Yarn ResourceManager 申請資源,重新啟動 TaskManager

          4.1.2   Flink如何和Yarn進(jìn)行交互?

          1. Client上傳jar包和配置文件到HDFS集群上

          2. Client向Yarn ResourceManager提交任務(wù)并申請資源

          3. ResourceManager分配Container資源并啟動ApplicationMaster,然后AppMaster加載Flink的Jar包和配置構(gòu)建環(huán)境,啟動JobManager

            • JobManager和ApplicationMaster運行在同一個container上。

            • 一旦他們被成功啟動,AppMaster就知道JobManager的地址(AM它自己所在的機器)。

            • 它就會為TaskManager生成一個新的Flink配置文件(他們就可以連接到JobManager)。

            • 這個配置文件也被上傳到HDFS上。

            • 此外,AppMaster容器也提供了Flink的web服務(wù)接口。

            • YARN所分配的所有端口都是臨時端口,這允許用戶并行執(zhí)行多個Flink

          4. ApplicationMaster向ResourceManager申請工作資源,NodeManager加載Flink的Jar包和配置構(gòu)建環(huán)境并啟動TaskManager

          5. TaskManager啟動后向JobManager發(fā)送心跳包,并等待JobManager向其分配任務(wù)

          4.1.3   三種方式

          Session模式

          特點:需要事先申請資源,啟動JobManager和TaskManger

          優(yōu)點:不需要每次遞交作業(yè)申請資源,而是使用已經(jīng)申請好的資源,從而提高執(zhí)行效率

          缺點:作業(yè)執(zhí)行完成以后,資源不會被釋放,因此一直會占用系統(tǒng)資源

          應(yīng)用場景:適合作業(yè)遞交比較頻繁的場景,小作業(yè)比較多的場景

          Per-Job模式

          特點:每次遞交作業(yè)都需要申請一次資源

          優(yōu)點:作業(yè)運行完成,資源會立刻被釋放,不會一直占用系統(tǒng)資源

          缺點:每次遞交作業(yè)都需要申請資源,會影響執(zhí)行效率,因為申請資源需要消耗時間

          應(yīng)用場景:適合作業(yè)比較少的場景、大作業(yè)的場景

          Application 模式

          Flink Application Mode

          4.2   操作

          Make sure that the HADOOP_CLASSPATH environment variable is set up (it can be checked by running echo $HADOOP_CLASSPATH). If not, set it up using

          export HADOOP_CLASSPATH=`hadoop classpath`
          1. 關(guān)閉yarn的內(nèi)存檢查

            vim /export/server/hadoop/etc/hadoop/yarn-site.xml

            添加:

          yarn.nodemanager.pmem-check-enabledfalseyarn.nodemanager.vmem-check-enabledfalse```

          說明:

          是否啟動一個線程檢查每個任務(wù)正使用的虛擬內(nèi)存量,如果任務(wù)超出分配值,則直接將其殺掉,默認(rèn)是true。

          在這里面我們需要關(guān)閉,因為對于flink使用yarn模式下,很容易內(nèi)存超標(biāo),這個時候yarn會自動殺掉job

          1. 同步

            scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node2:/export/server/hadoop/etc/hadoop/yarn-site.xml  
            scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node3:/export/server/hadoop/etc/hadoop/yarn-site.xml  
          2. 重啟yarn

            /export/server/hadoop/sbin/stop-yarn.sh
            /export/server/hadoop/sbin/start-yarn.sh

          4.3   測試

          4.3.1   Session模式

          yarn-session.sh(開辟資源) + flink run(提交任務(wù))

          1. 在yarn上啟動一個Flink會話,cdh68上執(zhí)行以下命令

            [song@cdh68 ~]$ yarn-session.sh --detached
            2021-08-30 18:13:47,565 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
            JobManager Web Interface: http://cdh69.bigdata.com:33201
            2021-08-30 18:13:54,039 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
            echo "stop" | ./bin/yarn-session.sh -id application_1625993468363_0037
            If this should not be possible, then you can also kill Flink via YARN's web interface or via:
            $ yarn application -kill application_1625993468363_0037
            Note that killing Flink might not clean up all job artifacts and temporary files.
          2. 查看UI界面

            http://cdh68:8088/cluster

          3. 使用flink run提交任務(wù):

            [song@cdh68 ~]$ hdfs dfs -rm /data/wordcount/output/wordCount.txt

            [song@cdh68 ~]$ flink run -t yarn-session \
            -Dyarn.application.id=application_1625993468363_0037 \
            app/flink-1.13.2/examples/batch/WordCount.jar \
            --input hdfs://nameservice1/data/wordcount/input/words.txt \
            --output hdfs://nameservice1/data/wordcount/output/wordCount.txt  

            [song@cdh68 ~]$ hdfs dfs -cat /data/wordcount/output/wordCount.txt
            flink 1
            hello 3
            java 1
            scala 1

            運行完之后可以繼續(xù)運行其他的小任務(wù)

          4. 通過上方的ApplicationMaster可以進(jìn)入Flink的管理界面

          5. 關(guān)閉yarn-session:

            [song@cdh68 ~]$ yarn application -kill application_1625993468363_0037

            The session mode will create a hidden YARN properties file in /tmp/.yarn-properties-<username>, which will be picked up for cluster discovery by the command line interface when submitting a job.

            [song@cdh68 ~]$ rm -rf /tmp/.yarn-properties-song

          4.3.2   Per-Job分離模式

          1. 直接提交job

            [song@cdh68 ~]$ hdfs dfs -rm /data/wordcount/output/wordCount.txt

            [song@cdh68 ~]$ flink run -t yarn-per-job \
            -yjm 4096 \
            -ytm 16384 \
            -ys 4 \
            /home/song/app/flink-1.13.2/examples/batch/WordCount.jar \
            --input hdfs://nameservice1/data/wordcount/input/words.txt \
            --output hdfs://nameservice1/data/wordcount/output/wordCount.txt 
          2. 查看UI界面

            http://cdh68:8088/cluster

          3. 注意

            在之前版本中如果使用的是flink on yarn方式,想切換回standalone模式的話,如果報錯需要刪除:

            [song@cdh68 ~]$ rm -rf /tmp/.yarn-properties-song

            因為默認(rèn)查找當(dāng)前yarn集群中已有的yarn-session信息中的jobmanager

          4.3.3   Application分離模式

          1. 直接提交job

            [song@cdh68 ~]$ hdfs dfs -rm /data/wordcount/output/wordCount.txt

            [song@cdh68 ~]$ flink run-application -t yarn-application \
            -yjm 4096 \
            -ytm 16384 \
            -ys 4 \
            /home/song/app/flink-1.13.2/examples/batch/WordCount.jar \
            --input hdfs://nameservice1/data/wordcount/input/words.txt \
            --output hdfs://nameservice1/data/wordcount/output/wordCount.txt 
          2. 查看UI界面

            http://cdh68:8088/cluster


          歡迎關(guān)注微信公眾號:大數(shù)據(jù)AI


          瀏覽 58
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美黑人操逼 | 蜜桃AV鲁一鲁 | 土豪胖哥酒店微信高价的御范气质身材苗条匀称 | 囯产精品久久久久久久久久久久 | 少女爱操B|