經過前幾天的介紹,相信大家應該都有一個基本的概念輪廓,知道如何透過 Airflow DAG 建立所需要的 Data pipeline了。今天來談談 DAG 如何執行以及其排程。
我們執行 first_dag 的時候是使用 Airflow 的介面進行操作,點擊 DAG 右方的三角形按鈕之後選擇 "Trigger Dag" 即可執行。
這種手動執行方式可以讓我們很直覺且很即時的執行 DAG ,手動執行的方式使用在有些任務並非固定排定時間執行,需要夠過人為決定時間執行的任務當中使用。除此之外,也常常在測試開發使用。然而當 Data pipeline 是固定一定的時間會執行時,就不建議使用手動執行,於是就不得不要提到排程執行的方式了。
Data pipeline 許多的使用情境是需要固定一段時間進行資料的搬運,有可能是每天或每周。過去我在處理這類型需求的時候,一開始都使用 cronjob 進行處理,而開始開發 Airflow DAG 中,覺得很不錯的是 Airflow 也有完整的 Schedule 功能可以讓我們進行設定並使用。
在昨天有提到在設定 Airflow DAG 時可以帶入許多的參數進行設定。要設定 Schedule 的話,其中一種方式是在 schedule_interval 參數中設定 Airflow 定義的區間模式。我們可以參考下面所寫的 Airflow 的設定。
舉例來說,如果我把我們的 first_dag 的 schedule_interval 設定為 "@daily",則我們可以看到在圖形介面上,DAG 的 Schedule 欄位會改動(下圖),而 DAG 則會每天執行一次。
schedule_interval = "@daily",
另外我們也可以以 Timetable 的方式進行設定,在 schedule 的參數中加入 Timetable 的設定:
schedule=CronTriggerTimetable(
"0 9 * * *",
timezone="UTC",
),
DAG 的 Schedule 欄位會改動(下圖),而 DAG 則會每天早上九點執行。
我們可以選擇適合的方式排定排程執行。透過排程執行能讓我們很有節奏的管理 Data pipeline 的執行時機。
上面所描述的兩種都算是主動的執行方式。不管是手動,或是排定排程都是系統自己本身主動進行。然而有些狀況會需要是由其他進行系統進行觸發,此時可以使用 Airflow REST API 的功能進行外部的觸發。Airflow REST API 目前的功能已經十分強大,未來有機會我們再更完整的跟大家介紹,在這邊由於今天的主題是 DAG 執行方式與排程,所以我只和大家觸發 Airflow DAG 的方式。
如果要透過外部觸發我們可以 使用 POST request 打下面這個 API 位置。
{Airflow address}/dags/{dag_id}/dagRuns
並帶入以下相關參數
{
"dag_run_id": "string",
"logical_date": "2023-08-24T14:15:22Z",
"execution_date": "2023-08-24T14:15:22Z",
"conf": { },
"note": "string"
}
如此就可以輕鬆的從外部觸發 Airflow DAG。
今天介紹了 DAG 的執行方式與排程,了解了各種做法之後,我們針對各種情境做出良好的設計。