iT邦幫忙

2023 iThome 鐵人賽

DAY 3
0

Airflow DAG 是啥?

DAG 定義

Airflow 的核心之一,一定是 DAG (Directed Acyclic Graph,有向無環圖)。如果說其他功能都還不一定會用到,DAG 跟 Task 是絕不可能避開的,避開不如就別用 Airflow 了。

DAG 是Airflow的核心元素之一,它代表了任務 (Task) 之間的依賴關係和執行順序。在一個數據管道中,通常有多個任務需要按照一定的順序執行,並且某些任務可能依賴於其他任務的輸出。DAG 的主要功能是定義這些任務之間的依賴性,以確保它們按照正確的順序執行,並在任何失敗的情況下進行重試或錯誤處理。

而所謂的有向無環,就是指這些任務之間的順序,基本上是有向的。當然你可以把幾個無關係的 task 寫在同一個 DAG,這樣一來它們彼此之間就是無向的,但真有這種情況的話是建議切成不同的 dag 以便管理。

而無環就很重要了,在數學圖論裡,環的定義是一條路徑的起始點與終點相同。在 DAG 裡,代表有兩個 Task 的執行順序互為前後。例如 A → B, B → A,這樣會造成無限的循環。

例子

在Airflow中,我們使用Python代碼來定義DAG。以下是一個簡單的DAG示例:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

# 定義DAG
dag = DAG('my_dag', start_date=datetime(2023, 1, 1))

# 定義任務
def task_hello_world():
    print("Hello, World!")

task1 = PythonOperator(
    task_id='hello_world',
    python_callable=task_hello_world,
    dag=dag
)
task2 = PythonOperator(
    task_id='hello_world2',
    python_callable=task_hello_world,
    dag=dag
)

# 定義任務之間的依賴關係
task1 >> task2

在這個示例中,我們首先創建了一個DAG對象,並指定了它的開始日期。然後,我們定義了一個名為"hello_world"的任務,該任務將在DAG中執行。最後,我們使用DAG中的 task1 , task2 來定義了任務之間的依賴關係,這意味著 "hello_world" 任務將在DAG中執行兩次,先 1 再 2。

總結

對於數據工程師來說,熟練掌握Airflow DAG的主要工作是關鍵,以確保數據流程的穩定執行和高效管理。

由於 DAG 之間還可以再另外設定依賴,因此建議每個 DAG 都是一個完整的最小工作。所謂完整代表不能分割,而最小是不要把所有有關聯的都寫進來。舉例來說,我想要每天從 Database A 倒一筆玩家記錄進 Database B,這個工作在 DAG 內可能會被拆成兩個 task (或一個,你可以寫在一起)。但你不太會想把它寫成兩個 DAG,第一個 DAG 讀取後先寫在 local disk,由另一個 DAG 觸發。這就不會是一個完整的工作,除非你有其他的目的與需求。

如果我同時又想把每天股票的收盤價寫入 Database B,而且觸發時間跟上面的 DAG 相同,我也不會寫在同一個 dag 內。因為這增加了維護的複雜度。記得 SOLID 的單一職責原則,不要有第二個理由去改 code。


上一篇
Airflow 的安裝及啟動 - Day2
下一篇
來寫第一個 DAG 吧 - Day4
系列文
用 Airflow & Flink 來開發 ETL 吧30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言