iT邦幫忙

2023 iThome 鐵人賽

DAY 21
1
AI & Data

Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇系列 第 21

[Day21] 坐上 Airflow 時光機回到過去-Catchup 和 Backfill

  • 分享至 

  • xImage
  •  

今日練習程式

import pendulum
from datetime import datetime
import requests
from airflow.decorators import dag, task

local_tz = pendulum.timezone("Asia/Taipei") 


@dag(
    dag_id="show_today_activity",
    schedule_interval="0 9 * * *",
    start_date=datetime(2023, 10, 1, tzinfo=local_tz),
    catchup=True
)
def run():
    @task
    def today_activity(logical_date=None):
        res = requests.get("https://www.boredapi.com/api/activity")
        return str(logical_date) + ":" + res.text
    today_activity()

run()

有發現之前的範例程式也會加上 catchup=False嗎 ?如果沒加一定會設定 start_date 是今天,然後schedule=None,原因就是因為 catchup 的預設是 True ,所以只要把日期設成過去,然後沒有設定catchup=False,一不小心就會執行很多 DAG RUN,雖然等等還是會介紹 catchup ,先講結論就是永遠設定成 False ,省事+少挖坑,要補過去資料就用 backfill,不要ㄍㄟ猴~

Catchup

Catchup 中文叫做追趕、跟上的意思,但在 Airflow 當中的意思是從現在的時間往回追到你設定的 start_date,而且是「自動的」,不需要 trigger

catchup = True

例如上方範例,今天是10/6,但是我設定 start_date 是 10/1,而且今天是我第一次我第一次執行,所以就會執行從 10/1-10/6,如果沒執行過,並不會執行任何追趕喔,但當你執行第一次的 DAG run,就會自動跑起來了!

catchup = False

如果 catchup 沒有被啟用,執行 dag 時只會跑最新的一次

用法

如同上方只要把 catchup=True 即可,或是不設定 catchup 也可以,因為預設就是 True

@dag(
    dag_id="show_today_activity",
    schedule_interval=None,
    start_date=pendulum.datetime(2023, 10, 1, tz="Asia/Taipei"),
    catchup=True,
)

強烈建議:固定設為 catchup=False,避免意料之外的執行,會在 DAG RUN 大海中被淹死,而且如果還在測試階段,會很崩潰因為無法將 catchup 暫停,只能去刪除或清除 DagRun 狀態,然後趕快設定成 False😅

Backfill

介紹

Backfill 的中文就是回填,可以手動回填資料,並且能執行 DAG 任意日期區間,想要回填到比 start_date 更早也可以,例如正在幫客戶開發 5/1 要開始執行的 DAG,但是 3/1 到 4/30 的資料客戶也要執行,那就可以直接執行 backfill 3/1 到 4/30

如果 3/1 到 4/30 之間已經有執行過 3/10,那在正常情況下執行 backfill 時就會略過 3/10,除非執行時加上--reset-dagruns 才會每個 dagrun都在執行一次,下面會在說明有哪些參數可以用和怎麼用

記得這邊用天舉例只是為了簡單清楚一些,到底日期內有幾個 dag要 執行,或是說有幾個 DagRun 的實例,都是依照 schedule 的間隔決定的

用法

step 1: 進入 docker 容器

docker exec -it  {webserver_container_id} bash 

step2執行 airflow backfill 回填數據

airflow dags backfill --start-date START_DATE --end-date END_DATE {dag_id}

--start-date--end-date 可以縮寫成 -s-e
以上方舉例就是要執行 3/1-4/30 如下:

airflow dags backfill -s "2023-03-01" -e "2023-05-01" show_today_activity

!!! 為什麼 end 是 "2023-05-01" ,因為 start 是指 data_interval_start,end 是指 data_interval_end

如果不懂 data_interval_startdata_interval_end ,可以回到昨天查看或留言~

Backfill 和 Catchup 差很多嗎?

其實 catchup 就是自動執行 backfill 從 start_date 到今天,不能進行任何進階設定,就是偵測到沒有執行的 DagRun 就會執行,但只要執行過了,就不會執行了,不管上一次到底是成功或失敗,那 backfill 呢?就是什麼都能自訂,自訂執行日期,自訂執行過去失敗的 DagRun,比較表如下:

Backfill Catchup
啟動方式 手動 自動
預設 沒給日期,預設抓今天 預設是True,自動抓到start_date
回填的日期區間 可以自訂 不能自訂
最早的回填日期 無限制 start_date
能不能給參數 不能
推薦與否 推薦 不推薦

Backfill 的好用參數

backfill 可以在執行的時候給參數,我自己常用的如下:

參數 意思
--continue-on-failures 過程中有任務失敗還是會繼續
--rerun-failed-tasks 自動執行日期區間內的失敗任務
--reset-dagruns 重新執行日期區間內的所有DagRun,不管過去有沒有執行過
-B, --run-backwards 從最新的日期往回執行
-t, --task-regex 執行特定的任務,用regex篩選

來源:airflow 官方文件

範例

airflow dags backfill -s "2023-03-01" -e "2023-05-01" --reset-dagruns show_today_activity

Web UI 檢查

我們可以到 Web UI 即時監控,執行完之後可以看到在 Grid View 上面會有一個返回的箭頭,代表這些 DagRun 是用 backfill 執行的

結語

今天先到這邊吧~明天要來講 Airflow 的 Debug 和 Testing 了


上一篇
[Day20] Airflow Scheduler 排程爬坑筆記(下)
下一篇
[Day22] Airflow牙起來-Debug 和 Testing (1)
系列文
Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

1 則留言

0
Shawn
iT邦新手 5 級 ‧ 2023-10-07 11:05:40

發現又忘記說明 今日練習程式 內容在幹嘛?會補在隔天喔~Day22 也是用一樣的程式來實作/images/emoticon/emoticon12.gif

我要留言

立即登入留言