import pendulum
from datetime import datetime
import requests
from airflow.decorators import dag, task
local_tz = pendulum.timezone("Asia/Taipei")
@dag(
dag_id="show_today_activity",
schedule_interval="0 9 * * *",
start_date=datetime(2023, 10, 1, tzinfo=local_tz),
catchup=True
)
def run():
@task
def today_activity(logical_date=None):
res = requests.get("https://www.boredapi.com/api/activity")
return str(logical_date) + ":" + res.text
today_activity()
run()
有發現之前的範例程式也會加上 catchup=False
嗎 ?如果沒加一定會設定 start_date 是今天
,然後schedule=None
,原因就是因為 catchup 的預設是 True ,所以只要把日期設成過去,然後沒有設定catchup=False
,一不小心就會執行很多 DAG RUN,雖然等等還是會介紹 catchup ,先講結論就是永遠設定成 False ,省事+少挖坑,要補過去資料就用 backfill,不要ㄍㄟ猴~
Catchup 中文叫做追趕、跟上的意思,但在 Airflow 當中的意思是從現在的時間往回追到你設定的 start_date,而且是「自動的」,不需要 trigger
例如上方範例,今天是10/6,但是我設定 start_date 是 10/1,而且今天是我第一次我第一次執行,所以就會執行從 10/1-10/6,如果沒執行過,並不會執行任何追趕喔,但當你執行第一次的 DAG run,就會自動跑起來了!
如果 catchup 沒有被啟用,執行 dag 時只會跑最新的一次
如同上方只要把 catchup=True 即可,或是不設定 catchup 也可以,因為預設就是 True
@dag(
dag_id="show_today_activity",
schedule_interval=None,
start_date=pendulum.datetime(2023, 10, 1, tz="Asia/Taipei"),
catchup=True,
)
強烈建議:固定設為 catchup=False,避免意料之外的執行,會在 DAG RUN 大海中被淹死,而且如果還在測試階段,會很崩潰因為無法將 catchup 暫停,只能去刪除或清除 DagRun 狀態,然後趕快設定成 False😅
Backfill 的中文就是回填,可以手動回填資料,並且能執行 DAG 任意日期區間,想要回填到比 start_date 更早也可以,例如正在幫客戶開發 5/1 要開始執行的 DAG,但是 3/1 到 4/30 的資料客戶也要執行,那就可以直接執行 backfill 3/1 到 4/30
如果 3/1 到 4/30 之間已經有執行過 3/10,那在正常情況下執行 backfill 時就會略過 3/10,除非執行時加上--reset-dagruns
才會每個 dagrun都在執行一次,下面會在說明有哪些參數可以用和怎麼用
記得這邊用天舉例只是為了簡單清楚一些,到底日期內有幾個 dag要 執行,或是說有幾個 DagRun 的實例,都是依照 schedule 的間隔決定的
docker exec -it {webserver_container_id} bash
airflow dags backfill --start-date START_DATE --end-date END_DATE {dag_id}
--start-date
和 --end-date
可以縮寫成 -s
和 -e
以上方舉例就是要執行 3/1-4/30 如下:
airflow dags backfill -s "2023-03-01" -e "2023-05-01" show_today_activity
!!! 為什麼 end 是 "2023-05-01" ,因為 start 是指 data_interval_start
,end 是指 data_interval_end
如果不懂
data_interval_start
和data_interval_end
,可以回到昨天查看或留言~
其實 catchup 就是自動執行 backfill 從 start_date 到今天,不能進行任何進階設定,就是偵測到沒有執行的 DagRun 就會執行,但只要執行過了,就不會執行了,不管上一次到底是成功或失敗,那 backfill 呢?就是什麼都能自訂,自訂執行日期,自訂執行過去失敗的 DagRun,比較表如下:
Backfill | Catchup | |
---|---|---|
啟動方式 | 手動 | 自動 |
預設 | 沒給日期,預設抓今天 | 預設是True,自動抓到start_date |
回填的日期區間 | 可以自訂 | 不能自訂 |
最早的回填日期 | 無限制 | start_date |
能不能給參數 | 能 | 不能 |
推薦與否 | 推薦 | 不推薦 |
backfill 可以在執行的時候給參數,我自己常用的如下:
參數 | 意思 |
---|---|
--continue-on-failures | 過程中有任務失敗還是會繼續 |
--rerun-failed-tasks | 自動執行日期區間內的失敗任務 |
--reset-dagruns | 重新執行日期區間內的所有DagRun,不管過去有沒有執行過 |
-B, --run-backwards | 從最新的日期往回執行 |
-t, --task-regex | 執行特定的任務,用regex篩選 |
來源:airflow 官方文件
airflow dags backfill -s "2023-03-01" -e "2023-05-01" --reset-dagruns show_today_activity
我們可以到 Web UI 即時監控,執行完之後可以看到在 Grid View 上面會有一個返回的箭頭,代表這些 DagRun 是用 backfill 執行的
今天先到這邊吧~明天要來講 Airflow 的 Debug 和 Testing 了