我們盡可能地把 Airflow 的開發與 dbt 的工作流程做更清楚的隔離。
我們組織的需求大概就是日更、週更、月更的資料流,原先我們使用 BigQuery 時,是將所有的 procedure(資料模型)在 Airflow 中做任務安排,日更 pipeline 中,我們就把所有需要每日更新的資料模型放在該 pipeline 中,安排好 dependency,讓他逐一觸發,週更及月更依此類推。
類似以下方法:
dag = DAG(
'daily_bigquery_pipeline_procedures',
default_args=default_args,
description='A daily BigQuery pipeline calling stored procedures',
schedule_interval='@daily', # 每日執行
)
task1 = BigQueryExecuteQueryOperator(
task_id='call_stored_procedure_task1',
sql="CALL `your_project.your_dataset.procedure_name1`()",
dag=dag,
)
task2 = BigQueryExecuteQueryOperator(
task_id='call_stored_procedure_task2',
sql="CALL `your_project.your_dataset.procedure_name2`()",
dag=dag,
)
task3 = BigQueryExecuteQueryOperator(
task_id='call_stored_procedure_task3',
sql="CALL `your_project.your_dataset.procedure_name3`()",
dag=dag,
)
task1 >> task2 >> task3
在我們一開始的設計中,Airflow 負責排程,並調用 BigQuery 的 stored procedures 來進行資料處理。這樣的設計雖然功能上可行,但隨著我們的 pipeline 變得更加複雜,每次更新時都需要同步調整 Airflow 的任務設置與 BigQuery 的模型,這樣就產生了排程工具與資料轉換工具之間的高度耦合性。
高耦合性的問題:
每次模型更新或調整時,我們都必須同步更新 Airflow pipeline 中的依賴關係和 BigQuery stored procedures,這樣的操作不僅容易出錯,還會增加維護的成本,尤其當我們的資料流從日更擴展到週更、月更甚至其他更複雜的時間排程時,這樣的問題變得更加明顯。
這次轉往 dbt 時,我們解決了這個問題。
它讓我們在 Airflow 中所定義的排程不需要依賴具體的模型或過程,而是透過 標籤(tags) 來對資料模型進行分類和管理。
我們在 Airflow 的不同的任務中,放進 dbt 的 tags,在日更任務中,我們的 BashOperator 就會去執行:dbt build —select +tag:daily
,其他則是 weekly, monthly,或是若有更頻繁的需求,可以使用 hourly 等標籤來做任務管理。
在利用標籤來管理 dbt 的資料模型下,我們想要調整資料模型的更新頻率時,只需要修改 dbt 資料模型的標籤,Airflow 觸發任務時就會自己抓到相對應的模型來做更新,再也不需要每次都兩頭做修改。
範例如下:
dag = DAG(
'dbt_tag_based_pipeline',
default_args=default_args,
description='A dbt pipeline using tags for daily, weekly, and monthly builds',
schedule_interval='@daily', # 每日執行
)
daily_task = BashOperator(
task_id='dbt_daily_task',
bash_command='dbt build --select +tag:daily',
dag=dag,
)
daily_task
輕鬆完成!方便調整!
而這個方法也可以擴充至其他的情境,譬如今天有一個新的活動、專案或企劃,有另外的更新頻率需求,我們可以為其設定一個新的標籤,依照其需求安排另外一個任務來做管理,無論是新增、調整或是移除,都不會干擾到原先的 pipeline 規劃。
切記,需要人工同時做多項項目的修正,才能完成的任務,都不是好的任務安排模式。這正是自動化 pipeline 要解決的核心問題。