Airflow 快速學(xué)習(xí)入門
作者:Corwien
來源:SegmentFault 思否社區(qū)
一、Airflow簡介
Airflow 是一個使用 Python 語言編寫的 Data Pipeline 調(diào)度和監(jiān)控工作流的平臺。
Airflow 是通過 DAG(Directed acyclic graph 有向無環(huán)圖)來管理任務(wù)流程的任務(wù)調(diào)度工具,不需要知道業(yè)務(wù)數(shù)據(jù)的具體內(nèi)容,設(shè)置任務(wù)的依賴關(guān)系即可實現(xiàn)任務(wù)調(diào)度。
這個平臺擁有和 Hive、Presto、MySQL、HDFS、Postgres 等數(shù)據(jù)源之間交互的能力,并且提供了鉤子(hook)使其擁有很好地擴展性。除了使用命令行,該工具還提供了一個 WebUI 可以可視化的查看依賴關(guān)系、監(jiān)控進(jìn)度、觸發(fā)任務(wù)等。
Airflow 的架構(gòu)
在一個可擴展的生產(chǎn)環(huán)境中,Airflow 含有以下組件:
元數(shù)據(jù)庫:這個數(shù)據(jù)庫存儲有關(guān)任務(wù)狀態(tài)的信息。
調(diào)度器:Scheduler 是一種使用 DAG 定義結(jié)合元數(shù)據(jù)中的任務(wù)狀態(tài)來決定哪些任務(wù)需要被執(zhí)行以及任務(wù)執(zhí)行優(yōu)先級的過程。調(diào)度器通常作為服務(wù)運行。
執(zhí)行器:Executor 是一個消息隊列進(jìn)程,它被綁定到調(diào)度器中,用于確定實際執(zhí)行每個任務(wù)計劃的工作進(jìn)程。有不同類型的執(zhí)行器,每個執(zhí)行器都使用一個指定工作進(jìn)程的類來執(zhí)行任務(wù)。例如,LocalExecutor 使用與調(diào)度器進(jìn)程在同一臺機器上運行的并行進(jìn)程執(zhí)行任務(wù)。其他像 CeleryExecutor 的執(zhí)行器使用存在于獨立的工作機器集群中的工作進(jìn)程執(zhí)行任務(wù)。
Workers:這些是實際執(zhí)行任務(wù)邏輯的進(jìn)程,由正在使用的執(zhí)行器確定。

Airflow 解決哪些問題
二、安裝及使用
1、安裝airflow
pip install apache-airflow
Cannot uninstall 'PyYAML'. It is a distutils installed project and thus we cannot
PyYAML# 親測可用
pip install apache-airflow --ignore-installed PyYAML
[root@quant ~]# airflow -h
usage: airflow [-h] GROUP_OR_COMMAND ...
positional arguments:
GROUP_OR_COMMAND
Groups:
celery Celery components
config View configuration
connections Manage connections
dags Manage DAGs
db Database operations
kubernetes Tools to help run the KubernetesExecutor
pools Manage pools
providers Display providers
roles Manage roles
tasks Manage tasks
users Manage users
variables Manage variables
Commands:
cheat-sheet Display cheat sheet
info Show information about current Airflow and environment
kerberos Start a kerberos ticket renewer
plugins Dump information about loaded plugins
rotate-fernet-key
Rotate encrypted connection credentials and variables
scheduler Start a scheduler instance
sync-perm Update permissions for existing roles and DAGs
version Show the version
webserver Start a Airflow webserver instance
optional arguments:
-h, --help show this help message and exit
[root@quant ~]#
2、初始化數(shù)據(jù)庫
# initialize the database
airflow db init
ImportError: Something is wrong with the numpy installation. While importing we detected an older version of numpy
如報錯信息所說
再卸載numpy,直到卸載到提示信息顯示,此時完全已經(jīng)沒有numpy了為止
下載numpy:pip install numpy
此時應(yīng)該可用;
若不可用,查看python安裝目錄下的libs文件夾,刪除掉其中的另一個dll文件,應(yīng)該可用。
3、添加用戶
airflow users create \
--username admin \
--firstname Corwien \
--lastname Wong \
--role Admin \
--email [email protected]

quant4、啟動web服務(wù)
# start the web server, default port is 8080
airflow webserver --port 8080a

5、啟動定時任務(wù)
# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler
# visit localhost:8080 in the browser and use the admin account you just
# created to login. Enable the example_bash_operator dag in the home page


