<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Airflow 基礎(chǔ) | Airflow 基礎(chǔ)系列-01:Airflow組件

          共 3438字,需瀏覽 7分鐘

           ·

          2021-12-27 17:05


          Apache Airflow 微信公眾號(hào)是由一群Airflow的愛(ài)好者一起維護(hù)的,我們旨在普及Airflow知識(shí),給廣大Airflow用戶搭建一個(gè)交流的平臺(tái)。


          前期我們會(huì)發(fā)布一些Airflow的基本知識(shí)文章。本文是Apache Airflow 公眾號(hào)的開(kāi)篇之作,主要介紹Airflow的整體架構(gòu)以及主要的組件。


          Airflow 架構(gòu)


          下圖是Airflow的架構(gòu)圖,

          我們可以看到Airflow主要有4個(gè)組件:

          • WebServer

          • Scheduler

          • Executor

          • Database


          WebServer - Airflow UI


          WebServer 實(shí)際上是一個(gè)Python的Flask app。你可以在Airflow WebServer的UI界面上看到調(diào)度作業(yè)的狀態(tài)。作業(yè)的信息都存儲(chǔ)在數(shù)據(jù)庫(kù)里,WebServer負(fù)責(zé)查詢數(shù)據(jù)庫(kù)并在頁(yè)面上展示作業(yè)信息。WebServer 組件也負(fù)責(zé)讀取并展示Remote的作業(yè)日志(Airflow的作業(yè)日志可以存放在S3,Google Cloud Storage,AzureBlobs,ElasticsSearch等等)。


          Scheduler -?作業(yè)調(diào)度器


          Scheduler 是一個(gè)多線程的Python進(jìn)程。Scheduler 通過(guò)檢查DAG的task依賴關(guān)系以及數(shù)據(jù)庫(kù)里各個(gè)task的狀態(tài)來(lái)決定接下來(lái)跑哪個(gè)task,什么時(shí)候跑以及在哪里跑。


          Executor?- 作業(yè)執(zhí)行器


          Airflow?支持以下4種類型的 Executor

          • SequentialExecutor 按照線性的方式運(yùn)行task,沒(méi)有并發(fā)和并行。一般用在開(kāi)發(fā)環(huán)境里用。

          • LocalExecutor 支持并行和多行程,一般用在單節(jié)點(diǎn)的Airflow里。

          • CeleryExecutor 是在分布式環(huán)境下執(zhí)行器。但是依賴第三方的message queue組件來(lái)調(diào)度task到worker節(jié)點(diǎn),message queue可以用Redis,RabbitMQ。

          • KubernetesExecutor 是 Airfow 1.10新引入的一個(gè)執(zhí)行器,主要用在K8s環(huán)境里。


          Metadata Database - 元數(shù)據(jù)數(shù)據(jù)庫(kù)


          元數(shù)據(jù)數(shù)據(jù)庫(kù)可以是任何支持 SQLAlchemy的數(shù)據(jù)庫(kù)(比如Postgres,MySql)。Scheduler 通過(guò)修改數(shù)據(jù)庫(kù)來(lái)更新task狀態(tài),WebServer會(huì)讀取數(shù)據(jù)庫(kù)來(lái)展示作業(yè)狀態(tài)





          Airflow 是如何調(diào)度的


          1. Schduler 會(huì)掃描dags文件夾,在元數(shù)據(jù)數(shù)據(jù)庫(kù)里創(chuàng)建DAG對(duì)應(yīng)的記錄。根據(jù)配置,每一個(gè)DAG都會(huì)分配若干個(gè)進(jìn)程。

          2. 每個(gè)進(jìn)程都會(huì)掃描對(duì)應(yīng)的DAG文件,根據(jù)調(diào)度配置參數(shù)創(chuàng)建DagRuns。然后每一個(gè)滿足被調(diào)度條件的Task都會(huì)實(shí)例出來(lái)一個(gè)TaskInstance,TaskInstance會(huì)被初始化為Scheduled的狀態(tài),并在數(shù)據(jù)庫(kù)里更新。

          3. Scheduler 進(jìn)程查詢數(shù)據(jù)庫(kù)拿到所有Scheduled狀態(tài)的tasks,并把他們發(fā)送到Executor(對(duì)應(yīng)TaskInstance的狀態(tài)更新為QUEUED)

          4. Worker會(huì)從queue里拉取task并執(zhí)行。TaskInstance的狀態(tài)由QUEUED轉(zhuǎn)變?yōu)?RUNNING

          5. 當(dāng)一個(gè)task結(jié)束了,worker會(huì)把task狀態(tài)更新為對(duì)應(yīng)的結(jié)束狀態(tài)(FINISHED,F(xiàn)AILED,等等),Scheduler會(huì)在數(shù)據(jù)庫(kù)里更新對(duì)應(yīng)的狀態(tài)。


          # https://github.com/apache/incubator-airflow/blob/2d50ba43366f646e9391a981083623caa12e8967/airflow/jobs.py#L1386
          def _process_dags(self, dagbag, dags, tis_out): """ Iterates over the dags and processes them. Processing includes: 1. Create appropriate DagRun(s) in the DB.
          2. Create appropriate TaskInstance(s) in the DB.
          3. Send emails for tasks that have missed SLAs.
          :param dagbag: a collection of DAGs to process :type dagbag: models.DagBag :param dags: the DAGs from the DagBag to process :type dags: DAG :param tis_out: A queue to add generated TaskInstance objects :type tis_out: multiprocessing.Queue[TaskInstance] :return: None """ for dag in dags: dag = dagbag.get_dag(dag.dag_id) if dag.is_paused: self.log.info("Not processing DAG %s since it's paused", dag.dag_id) continue
          if not dag: self.log.error("DAG ID %s was not found in the DagBag", dag.dag_id) continue
          self.log.info("Processing %s", dag.dag_id)
          dag_run = self.create_dag_run(dag) if dag_run: self.log.info("Created %s", dag_run) self._process_task_instances(dag, tis_out) self.manage_slas(dag)
          models.DagStat.update([d.dag_id for d in dags])



          Airflow 組件配置


          配置Airflow 組件之間的交互是同一個(gè)airflow.cfg 文件控制的,這個(gè)文件里本身有對(duì)各種配置的說(shuō)明文檔,這里我們介紹一些常用的配置項(xiàng):

          Parallelism

          以下3個(gè)參數(shù)可以用來(lái)控制task的并行度。

          ?parallelism,?dag_concurrency?and?max_active_runs_per_dag?

          parallelism是指airflow executor最多運(yùn)行的task數(shù)。dag_concurrenty 是指單個(gè)dag最多運(yùn)行的task數(shù)。

          max_active_runs_per_dag?是指最多的dag運(yùn)行實(shí)例。

          Scheduler

          job_heartbeat_sec 是指task接受外部kill signal的頻率(比如你在airflow web頁(yè)面kill task),默認(rèn)是5秒鐘

          scheduler_heartbeat_sec 是指scheduler 觸發(fā)新task的時(shí)間隔間,默認(rèn)是5秒。

          還有更多配置會(huì)在后面的文章里介紹,也希望大家多多支持Airflow公眾號(hào),后續(xù)我們更新更多有關(guān)Airflow的文章。

          本文翻譯 https://www.astronomer.io/guides/airflow-components


          瀏覽 177
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  大逼成人娱乐网 | 无码一级毛片免费视频播放 | 黄色一级A | 人妻精品导航 | 大香蕉插逼|