iT邦幫忙

2023 iThome 鐵人賽

DAY 15
0

昨天晚成第一個 DAG 之後,是不是覺得有些成就感呢?不過相信也對其中許多概念仍然感覺霧煞煞吧,在今天我們會來說明一下 Airflow 的 DAG 以及 Task 的相關概念。

DAG 組成

如同我們在 Airflow 介紹中所說的,Airflow 是以 DAG 的概念建立 Data pipeline。還記得 DAG -- 有向無環圖的組成嗎?有向無環圖是以節點及有方向的邊組成,並且任何路徑無法讓節點重新回到自己本身的節點。那麼我們思考一下昨天開發的 DAG ,將開發 DAG 使用到幾個元件對應到定義,聰明的你應該就可以抓到在 Airflow DAG 的運作當中誰是節點的角色,誰是邊的角色了吧?沒錯!在 Airflow DAG 中,Task 成為節點的基本單位,而 Task 之間彼此的關聯則是 DAG 的邊。DAG 透過 Task 以及關聯關係就可以構成一個 Data pipline 的 DAG。

DAG 設定

在昨天的 DAG 當中,我們簡單的宣告了 DAG 並使用。在當中我們在使用 DAG 的時候,傳遞了一些參數,這些參數定義了 DAG 的執行方式。

with DAG(
    "first_dag",
    default_args={
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
    },
    description="my first DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
) as dag:
  • 在 DAG 的第一個參數,我們傳入 "first_dag",它的意義為 DAG 的名稱,此名稱同時也是在 web 介面上顯示的 DAG 的名稱。
  • default_arg 定義 DAG 中的 operator 會使用到的共同參數。
  • description 則是DAG 的簡介,在 DAG 狀態頁面也會展示出來。
  • schedule 進行執行時間的設定
  • start_date 從何時開始執行 DAG
  • catchup 定義 DAG 是否執行之前遺漏的 Task

在 DAG 當中有更多細部的配置可以進行設定,可以參考 Airflow 官網給的資訊。
default_arg 相關參數可以參考:
https://airflow.apache.org/docs/apache-airflow/1.10.10/_api/airflow/models/index.html#airflow.models.BaseOperator

Task 介紹

Task 可以說是 DAG 在執行的最基本單位。Task 被定義在 DAG 的程式碼當中,並且 Task 與 Task 互相的關係,也都被定義於程式碼當中,Airflow 會解析相關設定之後運行。

Task 執行結果

在 Task 的執行結果中不會只有成功與失敗,Airflow 定義了一些不同的狀態讓開發者可以偵測其執行狀態。針對每個狀態也會有不同的顏色進行呈現,對開發人員以及維運人員是十分友善的功能。昨天的 Task 執行結果(下圖)皆為 success,所以都標示為綠色,圖片中間有一行彩色的方框,我們可以透過其顏色了解各個顏色的定義。
https://ithelp.ithome.com.tw/upload/images/20230930/20140477ere29I4lj4.png

各個 Task 階段的意義:
queued: task 已分配給執行程序,正在等待執行
running: task 正在執行中
success:task 在執行時未出現錯誤並成功完成
shutdown:外部要求當 task 正在運行時關閉
restarting:外部要求在 task 執行時重新啟動
failed:task 在執行過程中出現錯誤並運行失敗
skipped: task 任務被跳過。

透過解析 Task 狀態,開發人員以及維運人員可以了解整個 DAG 目前的狀態。

Task,Operator 傻傻分不清楚

在昨天的 DAG 中,我們有宣告了三個 Task,而每個 Task 也都是使用 Operator。

    extract_task = PythonOperator(
    task_id="extract", python_callable=extract, dag=dag,
    )

    transform_task = PythonOperator(
    task_id="transform", python_callable=transform, dag=dag,
    )
    
    load_task = PythonOperator(
    task_id="load", python_callable=load, dag=dag,
    )

那到底 Task 以及 Operator 有什麼差別呢?
在DAG 的環境當中 Operator 主要是為了執行工作而存在,我們可以看成 DAG 就是在管理一個個 Operator 的動作,以及相關順序等等。然而在 Operator本身也有些狀態需要被管理,好比說關聯關係,抑或是如果 Operator 失敗了需要重新處理等狀況,都需要有一個另外一個維度的設計來進行 Operator 的管理,而這也就是 Task 存在的目的。透過 Task 有效的管理 Operator 執行的狀態。

今天主要在說明 DAG 以及 Task,明天接著繼續說明 Task 的關聯關係~


上一篇
『Day14』第一個 Airflow Data Pipeline
下一篇
『Day16』Relationships 關聯關係
系列文
Data pipeline 建起來!用 Airflow 開發你的 Data pipeline30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言