<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Apache Airflow數(shù)據(jù)管道監(jiān)控工具

          聯(lián)合創(chuàng)作 · 2023-09-19 02:29

          Airflow 被 Airbnb 內(nèi)部用來創(chuàng)建、監(jiān)控和調(diào)整數(shù)據(jù)管道。任何工作流都可以在這個(gè)使用 Python 編寫的平臺上運(yùn)行(目前加入 Apache 基金會孵化器)。

          Airflow 允許工作流開發(fā)人員輕松創(chuàng)建、維護(hù)和周期性地調(diào)度運(yùn)行工作流(即有向無環(huán)圖或成為DAGs)的工具。在Airbnb中,這些工作流包括了如數(shù)據(jù)存儲、增長分析、Email發(fā)送、A/B測試等等這些跨越多部門的用例。這個(gè)平臺擁有和 Hive、Presto、MySQL、HDFS、Postgres和S3交互的能力,并且提供了鉤子使得系統(tǒng)擁有很好地?cái)U(kuò)展性。除了一個(gè)命令行界面,該工具還提供了一個(gè)    基于Web的用戶界面讓您可以可視化管道的依賴關(guān)系、監(jiān)控進(jìn)度、觸發(fā)任務(wù)等。 

          Airflow 包含如下組件:

          • 一個(gè)元數(shù)據(jù)庫(MySQL或Postgres)

          • 一組Airflow工作節(jié)點(diǎn)

          • 一個(gè)調(diào)節(jié)器(Redis或RabbitMQ)

          • 一個(gè)Airflow Web服務(wù)器

          截圖:

          管道定義示例:

          """
          Code that goes along with the Airflow tutorial located at:
          https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
          """
          from airflow import DAG
          from airflow.operators.bash_operator import BashOperator
          from datetime import datetime, timedelta
          
          
          default_args = {
              'owner': 'airflow',
              'depends_on_past': False,
              'start_date': datetime(2015, 6, 1),
              'email': ['[email protected]'],
              'email_on_failure': False,
              'email_on_retry': False,
              'retries': 1,
              'retry_delay': timedelta(minutes=5),
              # 'queue': 'bash_queue',
              # 'pool': 'backfill',
              # 'priority_weight': 10,
              # 'end_date': datetime(2016, 1, 1),
          }
          
          dag = DAG('tutorial', default_args=default_args)
          
          # t1, t2 and t3 are examples of tasks created by instantiating operators
          t1 = BashOperator(
              task_id='print_date',
              bash_command='date',
              dag=dag)
          
          t2 = BashOperator(
              task_id='sleep',
              bash_command='sleep 5',
              retries=3,
              dag=dag)
          
          templated_command = """
              {% for i in range(5) %}
                  echo "{{ ds }}"
                  echo "{{ macros.ds_add(ds, 7)}}"
                  echo "{{ params.my_param }}"
              {% endfor %}
          """
          
          t3 = BashOperator(
              task_id='templated',
              bash_command=templated_command,
              params={'my_param': 'Parameter I passed in'},
              dag=dag)
          
          t2.set_upstream(t1)
          t3.set_upstream(t1)
          瀏覽 27
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          編輯 分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          編輯 分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  九色毛片| 偷拍福利| 天天爽天天摸 | 777国产偷窥盗摄精品 | 亚洲男女操逼 |