iT邦幫忙

1

Python & Airflow 學習筆記_Schedule 以及 Trigger Rule

  • 分享至 

  • xImage
  •  

這篇文章主要是在討論在 Schedule 的設定以及 Trigger Rule,如果有問題歡迎留言討論

一、任務排程 Schedule_Interval

在 airflow 當中,我們可以透過設定 DAG 的 schedule_interval 屬性來讓指定的 DAG 於指定的時間自動執行,而 官方文件 上有提供了一些預設的設定參數,如下圖
https://ithelp.ithome.com.tw/upload/images/20220821/20144024ZOewA8SPZ4.jpg

另外也可以透過 這個網站 來快速的察看自己想要設定任務的執行時間,只要將網頁上產出的 crontab 直接以字串型態貼到 schedule_interval 內即可

下面附上範例程式

@dag(start_date=datetime.today(), tags=['user'], schedule_interval="@daily")
def python_operator_demo_dag():
    start_task = EmptyOperator(task_id=TaskID.start_task_id)

    crawl_task = PythonOperator(task_id=TaskID.crawler_task_id,
                                python_callable=crawl_ptt)

    end_task = EmptyOperator(task_id=TaskID.end_task_id)

    start_task >> crawl_task >> end_task

create_python_dag = python_operator_demo_dag()

上述程式碼可以看到我們在 dag 的 decorator 當中設定了 schedule_interval 參數,讓任務於每天的 00:00 自動執行一次,對於使用建立 DAG 物件方式的朋友也同樣要設定相同的屬性才會有自動執行的作用

https://ithelp.ithome.com.tw/upload/images/20220821/201440242z9G2U2Jou.jpg

二、觸發條件 Trigger Rule

這篇文章 裡面有提到 Trigger Rule 這個參數,這個參數主要是用來控制 Operator 的觸發條件,直接沿用前篇文章的範例做講解

在下面的範例當中,可以看到為了讓 end_task 會被執行,筆者這邊用了 NONE_FAILED 條件,表示當沒有任務失敗的時候,都會被執行到,確保有一個完整的運行流程,當然 EmptyOperator 本身並不會執行任何事情,就單純只是寫個流程圖好看用的

@dag(start_date=datetime.today(), tags=['user'])
def branch_operator_demo_dag():
    start_task = EmptyOperator(task_id=TaskID.start_task_id)

    crawl_task = BranchPythonOperator(task_id=TaskID.crawler_task_id,
                                      python_callable=crawl_ptt_branch)

    success_task = PythonOperator(task_id=TaskID.success_task_id,
                                  python_callable=success_crawl)

    failed_task = PythonOperator(task_id=TaskID.failed_task_id,
                                 python_callable=failed_crawl)

    end_task = EmptyOperator(task_id=TaskID.end_task_id,
                             trigger_rule=TriggerRule.NONE_FAILED)

    start_task >> crawl_task >> [success_task, failed_task] >> end_task

為了避免打錯字我們可以利用 airflow 套件本身提供的 class 進行 trigger_rule 的設定,直接利用下方程式進行 import
from airflow.utils.trigger_rule import TriggerRule

進去套件裡面查看,可以發現 TriggerRule 這個 class 有提供一些預設的觸發條件,下方程式碼為 airflow 套件內部的程式碼,可以去 這個網站 進行下載以及查看

class TriggerRule(str, Enum):
    ALL_SUCCESS = 'all_success'
    ALL_FAILED = 'all_failed'
    ALL_DONE = 'all_done'
    ONE_SUCCESS = 'one_success'
    ONE_FAILED = 'one_failed'
    NONE_FAILED = 'none_failed'
    NONE_FAILED_OR_SKIPPED = 'none_failed_or_skipped'
    NONE_SKIPPED = 'none_skipped'
    DUMMY = 'dummy'
    ALWAYS = 'always'
    NONE_FAILED_MIN_ONE_SUCCESS = "none_failed_min_one_success"
    ALL_SKIPPED = 'all_skipped'

基本上預設的條件意思都是很簡單的英文,直接進行英翻中即可對應,至於使用方式則是直接 import 這個 class 後直接坐使用即可,如下範例

end_task = EmptyOperator(task_id=TaskID.end_task_id,
                         trigger_rule=**TriggerRule.NONE_FAILED**)

圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言