昨天晚成第一個 DAG 之後,是不是覺得有些成就感呢?不過相信也對其中許多概念仍然感覺霧煞煞吧,在今天我們會來說明一下 Airflow 的 DAG 以及 Task 的相關概念。
如同我們在 Airflow 介紹中所說的,Airflow 是以 DAG 的概念建立 Data pipeline。還記得 DAG -- 有向無環圖的組成嗎?有向無環圖是以節點及有方向的邊組成,並且任何路徑無法讓節點重新回到自己本身的節點。那麼我們思考一下昨天開發的 DAG ,將開發 DAG 使用到幾個元件對應到定義,聰明的你應該就可以抓到在 Airflow DAG 的運作當中誰是節點的角色,誰是邊的角色了吧?沒錯!在 Airflow DAG 中,Task 成為節點的基本單位,而 Task 之間彼此的關聯則是 DAG 的邊。DAG 透過 Task 以及關聯關係就可以構成一個 Data pipline 的 DAG。
在昨天的 DAG 當中,我們簡單的宣告了 DAG 並使用。在當中我們在使用 DAG 的時候,傳遞了一些參數,這些參數定義了 DAG 的執行方式。
with DAG(
"first_dag",
default_args={
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
description="my first DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
在 DAG 當中有更多細部的配置可以進行設定,可以參考 Airflow 官網給的資訊。
default_arg 相關參數可以參考:
https://airflow.apache.org/docs/apache-airflow/1.10.10/_api/airflow/models/index.html#airflow.models.BaseOperator
Task 可以說是 DAG 在執行的最基本單位。Task 被定義在 DAG 的程式碼當中,並且 Task 與 Task 互相的關係,也都被定義於程式碼當中,Airflow 會解析相關設定之後運行。
在 Task 的執行結果中不會只有成功與失敗,Airflow 定義了一些不同的狀態讓開發者可以偵測其執行狀態。針對每個狀態也會有不同的顏色進行呈現,對開發人員以及維運人員是十分友善的功能。昨天的 Task 執行結果(下圖)皆為 success,所以都標示為綠色,圖片中間有一行彩色的方框,我們可以透過其顏色了解各個顏色的定義。
各個 Task 階段的意義:
queued: task 已分配給執行程序,正在等待執行
running: task 正在執行中
success:task 在執行時未出現錯誤並成功完成
shutdown:外部要求當 task 正在運行時關閉
restarting:外部要求在 task 執行時重新啟動
failed:task 在執行過程中出現錯誤並運行失敗
skipped: task 任務被跳過。
透過解析 Task 狀態,開發人員以及維運人員可以了解整個 DAG 目前的狀態。
在昨天的 DAG 中,我們有宣告了三個 Task,而每個 Task 也都是使用 Operator。
extract_task = PythonOperator(
task_id="extract", python_callable=extract, dag=dag,
)
transform_task = PythonOperator(
task_id="transform", python_callable=transform, dag=dag,
)
load_task = PythonOperator(
task_id="load", python_callable=load, dag=dag,
)
那到底 Task 以及 Operator 有什麼差別呢?
在DAG 的環境當中 Operator 主要是為了執行工作而存在,我們可以看成 DAG 就是在管理一個個 Operator 的動作,以及相關順序等等。然而在 Operator本身也有些狀態需要被管理,好比說關聯關係,抑或是如果 Operator 失敗了需要重新處理等狀況,都需要有一個另外一個維度的設計來進行 Operator 的管理,而這也就是 Task 存在的目的。透過 Task 有效的管理 Operator 執行的狀態。
今天主要在說明 DAG 以及 Task,明天接著繼續說明 Task 的關聯關係~