iT邦幫忙

2024 iThome 鐵人賽

DAY 12
1

https://ithelp.ithome.com.tw/upload/images/20240926/20168816kNq7Fv8YsP.png

『如果工作流程很複雜,任務的相依性很高,Airflow 還是能實現嗎?』

https://ithelp.ithome.com.tw/upload/images/20240926/201688163pd5tlDn5I.png
圖/一個 DAG 完成 RFM 分析。簡書廷製。

我們以 Day 06 提到的 RFM 模型來說明吧!節點 (node) 表示資料表,連線 (link) 表示一段資料轉換 (transformation) 的任務。
如果把這所有的任務透過一個 DAG 完成,任務相依性會長得像這樣:

...
(
    [source_to_bronze_orders, source_to_bronze_users] 
    >> [bronze_to_silver_orders, bronze_to_silver_users]
    >> aggregate_rfm
    >> calculate_rfm
    >> score_rfm
)

這是一個好的相依性設計嗎?

Airflow 官方文件的 Best Practices 提到,雖然 Airflow 能夠支援彈性且大量的 DAG,但若有太多過於複雜的 DAG,會降低排程系統的效能,應該要「降低 DAG 複雜性 (Reducing DAG complexity)」。

改善 DAG 的設計


針對 DAG 複雜度,雖然官方文件提到沒有明確指標 (metrics) 可以評估,但文件揭示了一些設計準則。
提升載入 DAG 的速度:簡言之是讓 DAG 專注於定義排程與任務相依性,Task 才去撰寫任務本身的邏輯及引入套件等。
保持 DAG 本身的結構簡單: A -> B -> C 的線性結構會比複雜的巢狀樹 (nested tree) 來得更好,因為同時運行的任務會比較少。

看起來,我們目前的設計可以進一步拆解。

https://ithelp.ithome.com.tw/upload/images/20240926/20168816xBRg4T4K2t.png
圖/多個 DAG 完成 RFM 分析。簡書廷製。

我們把 orders 和 users 的 ELT 流程分別拆成兩個 DAG,最後一個 DAG 則專心處理 RFM 分析的轉換。

  • DAG: orders_elt 負責訂單資料的 ELT 流程
  • DAG: users_elt 負責顧客資料的 ELT 流程
  • DAG: rfm_analysis 負責 RFM 分析的資料轉換

在 DAG 層級的設計,程式碼模組化的理念依然適用,在此案例下 ELT 任務與分析任務也被分離,兼顧了效能與易讀性。不過新的問題衍生了:如何確保前面資料源的 ELT 流程都完成了,才進行 RFM 分析的執行?
新的課題是跨 DAG 間的相依性設計。有幾個可能的解法都能夠在 Airflow UI 上產出相依關係線圖,讓我們一一探討。

感應器|Sensor


ExternalTaskSensor 是一個內建於 Airflow 中的感應器 (sensor),用於等待另一個 DAG 中的特定任務完成。這是處理跨 DAG 相依性最直接的方法。程式碼如下:
rfm_analysis.py

from airflow.sensors.external_task import ExternalTaskSensor

with DAG(
    dag_id='rfm_analysis'
    ...
):
 
    wait_for_orders = ExternalTaskSensor(
        task_id='wait_for_orders',
        external_dag_id='orders_elt',
        external_task_id='bronze_to_silver_orders'
    )
    wait_for_users = ExternalTaskSensor(
        task_id='wait_for_users',
        external_dag_id='users_elt',
        external_task_id='bronze_to_silver_users'
    )

    [wait_for_orders, wait_for_users] >> aggregate_rfm >> ...

https://ithelp.ithome.com.tw/upload/images/20240926/20168816A13nvYvF8p.png
圖/透過 Sensor 串接多個 DAG 完成 RFM 分析。簡書廷製。

透過 ExternalTaskSensor 的持續感應,直到指定的任務都完成之後,再進行 aggregate_rfm 這個任務,就能達成一樣的效果! 

優點

  • 簡單易用,內建於 Airflow 中。
  • 支援檢查任務是否成功或失敗。
  • 適用於有多個上游對一個下游的情形。

缺點

  • 如果 DAG 執行時間很長或頻繁運行,ExternalTaskSensor 的輪詢 (Polling) 可能會對系統資源造成負擔。
  • 上下游執行頻率不一致時,比較難處理。

觸發器|Trigger


TriggerDagRunOperator 是另一個處理 DAG 之間相依的方法。這個執行單位可以用在一個任務完成後即刻觸發另一個 DAG 的執行。例如我們希望在 DAG: orders_elt 的工作流程觸發 DAG: rfm_analysis,就可以用以下程式碼達成。

orders_elt.py

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

with DAG(
    dag_id='orders_elt'
) as dag:
    
    trigger_dag = TriggerDagRunOperator(
        task_id='trigger_rfm',
        trigger_dag_id='rfm_analysis',
    )

不過觸發器和感應器是恰好相反的做法,觸發器可以一個上游對多個下游。如果我們做成這樣的設計,是沒辦法保證 orders 和 users 資料都完整走完 ELT 流程才進行下游分析的。

  • DAG: orders_elt 觸發 DAG: rfm_analysis
  • DAG: users_elt 觸發 DAG: rfm_analysis

優點

  • 可以即時觸發其他 DAG。
  • 簡單直觀,適合需要在一個 DAG 完成後立刻啟動其他 DAG 的情境。

缺點

  • 無法處理複雜的 DAG 相依情境。
  • 上下游執行頻率不一致時,比較難處理。

資料集|Dataset


Airflow 2.4 引入了一個新的方法來處理跨 DAG 相依性,稱為資料集(Dataset)機制。它的運作方式是在 DAG 任務之間共享資料集,並且在資料集被更新時,自動觸發相依於該資料集的 DAG。這個方法是基於 data pipeline 中資料變化的概念來建立相依性,而不再仰賴 DAG 任務或 DAG 執行狀態。

根據我們對資料上下游關聯的理解,引入 Dataset 的概念可以將 DAG 的相依設計改寫成這樣:

from airflow import DAG
from airflow.datasets import Dataset
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryInsertJobOperator
)

silver_orders = Dataset("bigquery:///project/silver/orders")
silver_users = Dataset("bigquery://project/silver/users")

with DAG(
    dag_id="orders_elt",
) as dag:
    bronze_to_silver_orders = BigQueryInsertJobOperator(
        task_id="bronze_to_silver_orders",
        outlets=[silver_orders],
    )

with DAG(
    dag_id="users_elt",
) as dag:
    bronze_to_silver_users = BigQueryInsertJobOperator(
        task_id="bronze_to_silver_users",
        outlets=[silver_users],
    )

with DAG(
    dag_id="rfm_analysis",
    schedule=[silver_orders, silver_users],
) as dag:
    rfm_analysis = BigQueryInsertJobOperator(
        task_id="rfm_analysis",
    )

優點

  • 可以將 DAG 間的相依關係與 data pipeline 中的資料變化直接關聯。
  • 相對於 Sensor 或 Trigger 是根據執行頻率的時間驅動 (time-driven),Dataset 是一種事件驅動 (event-driven),跟資料本身的關聯更強。

缺點

  • 需要 Airflow 2.4 以上版本。
  • 能夠每個任務和單一資料表/檔案的關聯梳理清楚的的需求情境下,才較適用。

小總結|基於資料正確性而生的任務相依


Airflow 有多種方法來處理跨 DAG 間的相依性設計。

  • 感應器 (Sensor):用於等待所有上游任務完成。
  • 觸發器 (Trigger):DAG 本身完成後,觸發下游一到多個 DAG。
  • 資料集 (Dataset):基於資料變化的相依性設計,跟資料血緣 (data lineage) 的綁定更深。

在運用 Airflow 作為任務編排器 (Orchestrator) 建構 data pipeline 時,別忘了最重要的目標是確保「資料正確性」。

參考資料


  1. Airflow Documentation - Best Practice
  2. Airflow Documentation-Cross-DAG Dependencies
  3. Airflow Documentation-Data-aware scheduling
  4. 開發者筆記 | 避免 Airflow DAG 無法順利啟動的幾個原則
  5. Cross-DAG Dependencies in Apache Airflow: A Comprehensive Guide

上一篇
《資料與程式碼的交鋒》Day 11 -工作流程編排工具 Airflow
下一篇
《資料與程式碼的交鋒》Day 13 -可重現性
系列文
資料與程式碼的交鋒 - Data Engineer 與合作夥伴的協奏曲 30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言