這篇文章主要是在討論在 Schedule 的設定以及 Trigger Rule,如果有問題歡迎留言討論
在 airflow 當中,我們可以透過設定 DAG 的 schedule_interval
屬性來讓指定的 DAG 於指定的時間自動執行,而 官方文件 上有提供了一些預設的設定參數,如下圖
另外也可以透過 這個網站 來快速的察看自己想要設定任務的執行時間,只要將網頁上產出的 crontab 直接以字串型態貼到 schedule_interval 內即可
下面附上範例程式
@dag(start_date=datetime.today(), tags=['user'], schedule_interval="@daily")
def python_operator_demo_dag():
start_task = EmptyOperator(task_id=TaskID.start_task_id)
crawl_task = PythonOperator(task_id=TaskID.crawler_task_id,
python_callable=crawl_ptt)
end_task = EmptyOperator(task_id=TaskID.end_task_id)
start_task >> crawl_task >> end_task
create_python_dag = python_operator_demo_dag()
上述程式碼可以看到我們在 dag 的 decorator 當中設定了 schedule_interval 參數,讓任務於每天的 00:00 自動執行一次,對於使用建立 DAG 物件方式的朋友也同樣要設定相同的屬性才會有自動執行的作用
在 這篇文章 裡面有提到 Trigger Rule 這個參數,這個參數主要是用來控制 Operator 的觸發條件,直接沿用前篇文章的範例做講解
在下面的範例當中,可以看到為了讓 end_task 會被執行,筆者這邊用了 NONE_FAILED 條件,表示當沒有任務失敗的時候,都會被執行到,確保有一個完整的運行流程,當然 EmptyOperator 本身並不會執行任何事情,就單純只是寫個流程圖好看用的
@dag(start_date=datetime.today(), tags=['user'])
def branch_operator_demo_dag():
start_task = EmptyOperator(task_id=TaskID.start_task_id)
crawl_task = BranchPythonOperator(task_id=TaskID.crawler_task_id,
python_callable=crawl_ptt_branch)
success_task = PythonOperator(task_id=TaskID.success_task_id,
python_callable=success_crawl)
failed_task = PythonOperator(task_id=TaskID.failed_task_id,
python_callable=failed_crawl)
end_task = EmptyOperator(task_id=TaskID.end_task_id,
trigger_rule=TriggerRule.NONE_FAILED)
start_task >> crawl_task >> [success_task, failed_task] >> end_task
為了避免打錯字我們可以利用 airflow 套件本身提供的 class 進行 trigger_rule
的設定,直接利用下方程式進行 importfrom airflow.utils.trigger_rule import TriggerRule
進去套件裡面查看,可以發現 TriggerRule 這個 class 有提供一些預設的觸發條件,下方程式碼為 airflow
套件內部的程式碼,可以去 這個網站 進行下載以及查看
class TriggerRule(str, Enum):
ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
NONE_FAILED = 'none_failed'
NONE_FAILED_OR_SKIPPED = 'none_failed_or_skipped'
NONE_SKIPPED = 'none_skipped'
DUMMY = 'dummy'
ALWAYS = 'always'
NONE_FAILED_MIN_ONE_SUCCESS = "none_failed_min_one_success"
ALL_SKIPPED = 'all_skipped'
基本上預設的條件意思都是很簡單的英文,直接進行英翻中即可對應,至於使用方式則是直接 import 這個 class 後直接坐使用即可,如下範例
end_task = EmptyOperator(task_id=TaskID.end_task_id,
trigger_rule=**TriggerRule.NONE_FAILED**)