dbt 的模型,如何用 airflow 來做任務排程工作?
dbt 官方有提供文件說明:
我們是用 dbt core 來開發,並且用了他提到的 BashOperator 來實作。
當時我們考慮過另一個 Astronomer 提供的 Cosmos 工具,而這個第三方工具主要的優勢在於,他可以在 airflow 內將 dbt 內部的任務展開,譬如說我們要運行 dbt_task_1 >> dbt_task_2 >> dbt_task_3,如果用 BashOperator 是看不出來任務各自的執行狀況的,而用這個第三方工具就可以在此展現優勢。
更甚的是,他可以在 dbt 的任務中,插入其他的任務。
什麼意思呢?
假設在 dbt 的模型之間,有一段資料的轉換需要特殊的轉碼方式,或是像是我們有在 pipeline 中串接一些 LLM 工具來做資料的解析,譬如說有些文本資料我們需要將其分類,這時候就需要其他的任務來達成,不是 sql code 能處理的範圍,而 Cosmos 正是在此刻派上用場,他可以在 dbt 模型間加入 python 或其他方式處理的任務。
聽起來挺不錯的,不過由於又是一套新工具,而我們當時有需求的情境比較少,因此就先用馬上就能解決的 BashOperator 來處理,或許在未來 pipeline 中需要更多 LLM 加入時,再做考量。
BashOperator 的部分,基本上部署過 dbt core 的服務之後,要在 airflow 中處理就變得簡單許多,因為基本上就只是換個地方做而已。
把 dbt repo 的內容,拉至 airflow 運行的容器中,就用原先在 dbt core 開發時在 CLI 使用的那些指令來觸發相對應的任務即可,簡單輕便。
將 dbt 的任務排在所有原始資料更新完後再處理,確保所有上游資料都已更新,才進行動作。而 dbt 的下游,有我們用來產出報表的 Metabase、以及一些小型的微服務,在 dbt 執行完畢、將 BigQuery 的資料表更新後,即會自動讀取到新的資料,不需要額外的心思來檢查。
比想像中的簡單!