<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Airflow 快速學(xué)習(xí)入門

          共 6895字,需瀏覽 14分鐘

           ·

          2021-04-29 20:37

          作者:Corwien

          來源:SegmentFault 思否社區(qū)

          一、Airflow簡介

          Airflow 是一個使用 Python 語言編寫的 Data Pipeline 調(diào)度和監(jiān)控工作流的平臺。

          Airflow 是通過 DAG(Directed acyclic graph 有向無環(huán)圖)來管理任務(wù)流程的任務(wù)調(diào)度工具,不需要知道業(yè)務(wù)數(shù)據(jù)的具體內(nèi)容,設(shè)置任務(wù)的依賴關(guān)系即可實現(xiàn)任務(wù)調(diào)度。

          這個平臺擁有和 Hive、Presto、MySQL、HDFS、Postgres 等數(shù)據(jù)源之間交互的能力,并且提供了鉤子(hook)使其擁有很好地擴展性。除了使用命令行,該工具還提供了一個 WebUI 可以可視化的查看依賴關(guān)系、監(jiān)控進(jìn)度、觸發(fā)任務(wù)等。

          Airflow 的架構(gòu)

          在一個可擴展的生產(chǎn)環(huán)境中,Airflow 含有以下組件:

          元數(shù)據(jù)庫:這個數(shù)據(jù)庫存儲有關(guān)任務(wù)狀態(tài)的信息。

          調(diào)度器:Scheduler 是一種使用 DAG 定義結(jié)合元數(shù)據(jù)中的任務(wù)狀態(tài)來決定哪些任務(wù)需要被執(zhí)行以及任務(wù)執(zhí)行優(yōu)先級的過程。調(diào)度器通常作為服務(wù)運行。

          執(zhí)行器:Executor 是一個消息隊列進(jìn)程,它被綁定到調(diào)度器中,用于確定實際執(zhí)行每個任務(wù)計劃的工作進(jìn)程。有不同類型的執(zhí)行器,每個執(zhí)行器都使用一個指定工作進(jìn)程的類來執(zhí)行任務(wù)。例如,LocalExecutor 使用與調(diào)度器進(jìn)程在同一臺機器上運行的并行進(jìn)程執(zhí)行任務(wù)。其他像 CeleryExecutor 的執(zhí)行器使用存在于獨立的工作機器集群中的工作進(jìn)程執(zhí)行任務(wù)。

          Workers:這些是實際執(zhí)行任務(wù)邏輯的進(jìn)程,由正在使用的執(zhí)行器確定。


          Airflow 解決哪些問題

          通常,在一個運維系統(tǒng),數(shù)據(jù)分析系統(tǒng),或測試系統(tǒng)等大型系統(tǒng)中,我們會有各種各樣的依賴需求。包括但不限于:
          時間依賴:任務(wù)需要等待某一個時間點觸發(fā)。
          外部系統(tǒng)依賴:任務(wù)依賴外部系統(tǒng)需要調(diào)用接口去訪問。
          任務(wù)間依賴:任務(wù) A 需要在任務(wù) B 完成后啟動,兩個任務(wù)互相間會產(chǎn)生影響。
          資源環(huán)境依賴:任務(wù)消耗資源非常多, 或者只能在特定的機器上執(zhí)行。
          crontab 可以很好地處理定時執(zhí)行任務(wù)的需求,但僅能管理時間上的依賴。
          Airflow 是一種 WMS,即:它將任務(wù)以及它們的依賴看作代碼,按照那些計劃規(guī)范任務(wù)執(zhí)行,并在實際工作進(jìn)程之間分發(fā)需執(zhí)行的任務(wù)。
          Airflow 提供了一個用于顯示當(dāng)前活動任務(wù)和過去任務(wù)狀態(tài)的優(yōu)秀 UI,并允許用戶手動管理任務(wù)的執(zhí)行和狀態(tài)。
          Airflow 中的工作流是具有方向性依賴的任務(wù)集合。
          具體說就是 Airflow 的核心概念 DAG(有向無環(huán)圖)—— 來表現(xiàn)工作流。
          DAG 中的每個節(jié)點都是一個任務(wù),DAG 中的邊表示的是任務(wù)之間的依賴(強制為有向無環(huán),因此不會出現(xiàn)循環(huán)依賴,從而導(dǎo)致無限執(zhí)行循環(huán))。
          Airflow 在 ETL 上的實踐
          ETL,是英文 Extract,Transform,Load 的縮寫,用來描述將數(shù)據(jù)從來源端經(jīng)過抽?。╡xtract)、轉(zhuǎn)換(transform)、加載(load)至目的端的過程。ETL 一詞較常用在數(shù)據(jù)倉庫,Airflow 在解決 ETL 任務(wù)各種依賴問題上的能力恰恰是我們所需要的。
          在現(xiàn)階段的實踐中,我們使用 Airflow 來同步各個數(shù)據(jù)源數(shù)據(jù)到數(shù)倉,同時定時執(zhí)行一些批處理任務(wù)及帶有數(shù)據(jù)依賴、資源依賴關(guān)系的計算腳本。
          本文立意于科普介紹,故在后面的用例中只介紹了 BashOperator,PythonOperator這倆個最為易用且在我們?nèi)粘J褂弥凶顬槌R姷?Operator。
          Airflow 同時也具有不錯的集群擴展能力,可使用 CeleryExecuter 以及多個 Pool 來提高任務(wù)并發(fā)度。
          Airflow在 CeleryExecuter 下可以使用不同的用戶啟動 Worker,不同的 Worker 監(jiān)聽不同的 Queue,這樣可以解決用戶權(quán)限依賴問題。Worker 也可以啟動在多個不同的機器上,解決機器依賴的問題。
          Airflow 可以為任意一個 Task 指定一個抽象的 Pool,每個 Pool 可以指定一個 Slot 數(shù)。每當(dāng)一個 Task 啟動時,就占用一個 Slot,當(dāng) Slot 數(shù)占滿時,其余的任務(wù)就處于等待狀態(tài)。這樣就解決了資源依賴問題。

          二、安裝及使用

          假設(shè):你已經(jīng)安裝好了 Python 及配置好了其包管理工具 pip。

          1、安裝airflow

          pip install apache-airflow
          在安裝airflow的時候可能會報錯:
          Cannot uninstall 'PyYAML'. It is a distutils installed project and thus we cannot
          忽略掉 PyYAML
          # 親測可用
          pip install apache-airflow --ignore-installed PyYAML
          安裝成功后查看命令:
          [root@quant ~]# airflow -h
          usage: airflow [-h] GROUP_OR_COMMAND ...

          positional arguments:
            GROUP_OR_COMMAND

              Groups:
                celery         Celery components
                config         View configuration
                connections    Manage connections
                dags           Manage DAGs
                db             Database operations
                kubernetes     Tools to help run the KubernetesExecutor
                pools          Manage pools
                providers      Display providers
                roles          Manage roles
                tasks          Manage tasks
                users          Manage users
                variables      Manage variables

              Commands:
                cheat-sheet    Display cheat sheet
                info           Show information about current Airflow and environment
                kerberos       Start a kerberos ticket renewer
                plugins        Dump information about loaded plugins
                rotate-fernet-key
                               Rotate encrypted connection credentials and variables
                scheduler      Start a scheduler instance
                sync-perm      Update permissions for existing roles and DAGs
                version        Show the version
                webserver      Start a Airflow webserver instance

          optional arguments:
            -h, --help         show this help message and exit
          [root@quant ~]#

          2、初始化數(shù)據(jù)庫

          # initialize the database
          airflow db init
          報這樣的錯誤:
          ImportError: Something is wrong with the numpy installation. While importing we detected an older version of numpy
          解決方案:
          如報錯信息所說

          先卸載numpy:pip uninstall numpy
          再卸載numpy,直到卸載到提示信息顯示,此時完全已經(jīng)沒有numpy了為止
          下載numpy:pip install numpy
          此時應(yīng)該可用;
          若不可用,查看python安裝目錄下的libs文件夾,刪除掉其中的另一個dll文件,應(yīng)該可用。

          3、添加用戶

          airflow users create \
              --username admin \
              --firstname Corwien \
              --lastname Wong \
              --role Admin \
              --email [email protected]


          創(chuàng)建的用戶密碼為:quant

          4、啟動web服務(wù)

          # start the web server, default port is 8080
          airflow webserver --port 8080a

          5、啟動定時任務(wù)

          # start the scheduler
          # open a new terminal or else run webserver with ``-D`` option to run it as a daemon
          airflow scheduler

          # visit localhost:8080 in the browser and use the admin account you just
          # created to login. Enable the example_bash_operator dag in the home page



          點擊左下角閱讀原文,到 SegmentFault 思否社區(qū) 和文章作者展開更多互動和交流,掃描下方”二維碼“或在“公眾號后臺回復(fù)“ 入群 ”即可加入我們的技術(shù)交流群,收獲更多的技術(shù)文章~

          - END -

          瀏覽 54
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲中文字幕在线免费视频 | www.激情网站 | 青春草在线视频免费观看网站 | 国产AV直播 | 亚洲色老板 |