Airflow 基礎(chǔ) | Airflow 基礎(chǔ)系列-01:Airflow組件
Apache Airflow 微信公眾號(hào)是由一群Airflow的愛(ài)好者一起維護(hù)的,我們旨在普及Airflow知識(shí),給廣大Airflow用戶搭建一個(gè)交流的平臺(tái)。
前期我們會(huì)發(fā)布一些Airflow的基本知識(shí)文章。本文是Apache Airflow 公眾號(hào)的開(kāi)篇之作,主要介紹Airflow的整體架構(gòu)以及主要的組件。
Airflow 架構(gòu)
下圖是Airflow的架構(gòu)圖,

我們可以看到Airflow主要有4個(gè)組件:
WebServer
Scheduler
Executor
Database
WebServer - Airflow UI
WebServer 實(shí)際上是一個(gè)Python的Flask app。你可以在Airflow WebServer的UI界面上看到調(diào)度作業(yè)的狀態(tài)。作業(yè)的信息都存儲(chǔ)在數(shù)據(jù)庫(kù)里,WebServer負(fù)責(zé)查詢數(shù)據(jù)庫(kù)并在頁(yè)面上展示作業(yè)信息。WebServer 組件也負(fù)責(zé)讀取并展示Remote的作業(yè)日志(Airflow的作業(yè)日志可以存放在S3,Google Cloud Storage,AzureBlobs,ElasticsSearch等等)。
Scheduler -?作業(yè)調(diào)度器
Scheduler 是一個(gè)多線程的Python進(jìn)程。Scheduler 通過(guò)檢查DAG的task依賴關(guān)系以及數(shù)據(jù)庫(kù)里各個(gè)task的狀態(tài)來(lái)決定接下來(lái)跑哪個(gè)task,什么時(shí)候跑以及在哪里跑。
Executor?- 作業(yè)執(zhí)行器
Airflow?支持以下4種類型的 Executor
SequentialExecutor 按照線性的方式運(yùn)行task,沒(méi)有并發(fā)和并行。一般用在開(kāi)發(fā)環(huán)境里用。
LocalExecutor 支持并行和多行程,一般用在單節(jié)點(diǎn)的Airflow里。
CeleryExecutor 是在分布式環(huán)境下執(zhí)行器。但是依賴第三方的message queue組件來(lái)調(diào)度task到worker節(jié)點(diǎn),message queue可以用Redis,RabbitMQ。
KubernetesExecutor 是 Airfow 1.10新引入的一個(gè)執(zhí)行器,主要用在K8s環(huán)境里。
Metadata Database - 元數(shù)據(jù)數(shù)據(jù)庫(kù)
元數(shù)據(jù)數(shù)據(jù)庫(kù)可以是任何支持 SQLAlchemy的數(shù)據(jù)庫(kù)(比如Postgres,MySql)。Scheduler 通過(guò)修改數(shù)據(jù)庫(kù)來(lái)更新task狀態(tài),WebServer會(huì)讀取數(shù)據(jù)庫(kù)來(lái)展示作業(yè)狀態(tài)

Airflow 是如何調(diào)度的
Schduler 會(huì)掃描dags文件夾,在元數(shù)據(jù)數(shù)據(jù)庫(kù)里創(chuàng)建DAG對(duì)應(yīng)的記錄。根據(jù)配置,每一個(gè)DAG都會(huì)分配若干個(gè)進(jìn)程。
每個(gè)進(jìn)程都會(huì)掃描對(duì)應(yīng)的DAG文件,根據(jù)調(diào)度配置參數(shù)創(chuàng)建DagRuns。然后每一個(gè)滿足被調(diào)度條件的Task都會(huì)實(shí)例出來(lái)一個(gè)TaskInstance,TaskInstance會(huì)被初始化為Scheduled的狀態(tài),并在數(shù)據(jù)庫(kù)里更新。
Scheduler 進(jìn)程查詢數(shù)據(jù)庫(kù)拿到所有Scheduled狀態(tài)的tasks,并把他們發(fā)送到Executor(對(duì)應(yīng)TaskInstance的狀態(tài)更新為QUEUED)
Worker會(huì)從queue里拉取task并執(zhí)行。TaskInstance的狀態(tài)由QUEUED轉(zhuǎn)變?yōu)?RUNNING
當(dāng)一個(gè)task結(jié)束了,worker會(huì)把task狀態(tài)更新為對(duì)應(yīng)的結(jié)束狀態(tài)(FINISHED,F(xiàn)AILED,等等),Scheduler會(huì)在數(shù)據(jù)庫(kù)里更新對(duì)應(yīng)的狀態(tài)。
# https://github.com/apache/incubator-airflow/blob/2d50ba43366f646e9391a981083623caa12e8967/airflow/jobs.py#L1386def _process_dags(self, dagbag, dags, tis_out):"""Iterates over the dags and processes them. Processing includes:1. Create appropriate DagRun(s) in the DB.2. Create appropriate TaskInstance(s) in the DB.3. Send emails for tasks that have missed SLAs.:param dagbag: a collection of DAGs to process:type dagbag: models.DagBag:param dags: the DAGs from the DagBag to process:type dags: DAG:param tis_out: A queue to add generated TaskInstance objects:type tis_out: multiprocessing.Queue[TaskInstance]:return: None"""for dag in dags:dag = dagbag.get_dag(dag.dag_id)if dag.is_paused:self.log.info("Not processing DAG %s since it's paused", dag.dag_id)continueif not dag:self.log.error("DAG ID %s was not found in the DagBag", dag.dag_id)continueself.log.info("Processing %s", dag.dag_id)dag_run = self.create_dag_run(dag)if dag_run:self.log.info("Created %s", dag_run)self._process_task_instances(dag, tis_out)self.manage_slas(dag)models.DagStat.update([d.dag_id for d in dags])
Airflow 組件配置
配置Airflow 組件之間的交互是同一個(gè)airflow.cfg 文件控制的,這個(gè)文件里本身有對(duì)各種配置的說(shuō)明文檔,這里我們介紹一些常用的配置項(xiàng):
Parallelism
以下3個(gè)參數(shù)可以用來(lái)控制task的并行度。
?parallelism,?dag_concurrency?and?max_active_runs_per_dag?
parallelism是指airflow executor最多運(yùn)行的task數(shù)。dag_concurrenty 是指單個(gè)dag最多運(yùn)行的task數(shù)。
max_active_runs_per_dag?是指最多的dag運(yùn)行實(shí)例。
Scheduler
job_heartbeat_sec 是指task接受外部kill signal的頻率(比如你在airflow web頁(yè)面kill task),默認(rèn)是5秒鐘
scheduler_heartbeat_sec 是指scheduler 觸發(fā)新task的時(shí)間隔間,默認(rèn)是5秒。
還有更多配置會(huì)在后面的文章里介紹,也希望大家多多支持Airflow公眾號(hào),后續(xù)我們更新更多有關(guān)Airflow的文章。
本文翻譯 https://www.astronomer.io/guides/airflow-components
