iT邦幫忙

2024 iThome 鐵人賽

DAY 17
1
AI/ ML & Data

這跟文件說的不一樣!從 0 到 1 導入 dbt 的實戰甘苦談系列 第 17

DAY 17 排程跟文件說的不一樣!談解耦 Airflow 與 dbt

  • 分享至 

  • xImage
  •  

我們盡可能地把 Airflow 的開發與 dbt 的工作流程做更清楚的隔離。

我們組織的需求大概就是日更、週更、月更的資料流,原先我們使用 BigQuery 時,是將所有的 procedure(資料模型)在 Airflow 中做任務安排,日更 pipeline 中,我們就把所有需要每日更新的資料模型放在該 pipeline 中,安排好 dependency,讓他逐一觸發,週更及月更依此類推。

類似以下方法:

dag = DAG(
    'daily_bigquery_pipeline_procedures',
    default_args=default_args,
    description='A daily BigQuery pipeline calling stored procedures',
    schedule_interval='@daily',  # 每日執行
)

task1 = BigQueryExecuteQueryOperator(
    task_id='call_stored_procedure_task1',
    sql="CALL `your_project.your_dataset.procedure_name1`()",
    dag=dag,
)

task2 = BigQueryExecuteQueryOperator(
    task_id='call_stored_procedure_task2',
    sql="CALL `your_project.your_dataset.procedure_name2`()",
    dag=dag,
)

task3 = BigQueryExecuteQueryOperator(
    task_id='call_stored_procedure_task3',
    sql="CALL `your_project.your_dataset.procedure_name3`()",
    dag=dag,
)

task1 >> task2 >> task3

在我們一開始的設計中,Airflow 負責排程,並調用 BigQuery 的 stored procedures 來進行資料處理。這樣的設計雖然功能上可行,但隨著我們的 pipeline 變得更加複雜,每次更新時都需要同步調整 Airflow 的任務設置與 BigQuery 的模型,這樣就產生了排程工具與資料轉換工具之間的高度耦合性

高耦合性的問題:

每次模型更新或調整時,我們都必須同步更新 Airflow pipeline 中的依賴關係和 BigQuery stored procedures,這樣的操作不僅容易出錯,還會增加維護的成本,尤其當我們的資料流從日更擴展到週更、月更甚至其他更複雜的時間排程時,這樣的問題變得更加明顯。

這次轉往 dbt 時,我們解決了這個問題。

它讓我們在 Airflow 中所定義的排程不需要依賴具體的模型或過程,而是透過 標籤(tags) 來對資料模型進行分類和管理。

我們在 Airflow 的不同的任務中,放進 dbt 的 tags,在日更任務中,我們的 BashOperator 就會去執行:dbt build —select +tag:daily ,其他則是 weekly, monthly,或是若有更頻繁的需求,可以使用 hourly 等標籤來做任務管理。

在利用標籤來管理 dbt 的資料模型下,我們想要調整資料模型的更新頻率時,只需要修改 dbt 資料模型的標籤,Airflow 觸發任務時就會自己抓到相對應的模型來做更新,再也不需要每次都兩頭做修改。

範例如下:

dag = DAG(
    'dbt_tag_based_pipeline',
    default_args=default_args,
    description='A dbt pipeline using tags for daily, weekly, and monthly builds',
    schedule_interval='@daily',  # 每日執行
)

daily_task = BashOperator(
    task_id='dbt_daily_task',
    bash_command='dbt build --select +tag:daily',
    dag=dag,
)

daily_task

輕鬆完成!方便調整!

而這個方法也可以擴充至其他的情境,譬如今天有一個新的活動、專案或企劃,有另外的更新頻率需求,我們可以為其設定一個新的標籤,依照其需求安排另外一個任務來做管理,無論是新增、調整或是移除,都不會干擾到原先的 pipeline 規劃。

切記,需要人工同時做多項項目的修正,才能完成的任務,都不是好的任務安排模式。這正是自動化 pipeline 要解決的核心問題。


上一篇
DAY 16 排程跟文件說的不一樣!談 dbt 與 Airflow 的協作方法
下一篇
DAY 18 Freshness 跟文件說的不一樣!談如何確保資料的「新鮮度」
系列文
這跟文件說的不一樣!從 0 到 1 導入 dbt 的實戰甘苦談30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言