iT邦幫忙

2023 iThome 鐵人賽

DAY 11
0
AI & Data

Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇系列 第 11

[Day11] 當我們同在一起 - Airflow Task Group

  • 分享至 

  • xImage
  •  

TaskGroup 簡介

TaskGroup 是一種在 Airflow 中用來組織和管理多個 Task 的方法,透過將任務分類成不同群組,讓我們 DAG 當中的任務(Tasks)不會雜亂無章,能夠更容易地設計、維護和監控整個工作流程,可以想像成是網頁書籤資料夾,方便我們整理多個單獨的網頁。

優點:可重複使用的輪子、清晰組織架構、維護性好

Step By Step 實作 TaskGroup

  • 今天實作會再詳細說明每一步的細節,應該就是最後一次,之後實作都只會說明新學的部分了/images/emoticon/emoticon41.gif

Step 1:匯入必要的模塊和類別

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
  • DummyOperator 是 Airflow 練習會常常用的 Operator,用途就是「Do Nothing」/images/emoticon/emoticon01.gif
  • Airflow 2.4.0 之後要改用 from airflow.operators.empty import EmptyOperator

Step 2:初始化 DAG

with DAG(
    dag_id = 'task_group_dag',
    start_date = datetime(2023, 9, 26),
    schedule_interval=None
) as dag:

Step 3:創建 TaskGroup

可以使用 TaskGroup 的 Class 創建群組

with DAG(
    ...
) as dag:
    with TaskGroup(group_id='my_task_group') as my_task_group:

Step 4:定義 TaskGroup 內的任務

with TaskGroup...:
    task_1 = DummyOperator(task_id='task_1', dag=my_task_group)
    task_2 = DummyOperator(task_id='task_2', dag=my_task_group)
    task_3 = DummyOperator(task_id='task_3', dag=my_task_group)
  • 所以 with TaskGroup 會在 with DAG( 的裡面,巢狀縮排的概念。

Step 5:定義 TaskGroup 的順序

with TaskGroup...:
    task_1 = ...
    ...
    task_1 >> task_2 >> task_3
  • task_2 在 task_1 完成後執行,而 task_3 在 task_2 完成後執行

Step 6:建立 DAG 的主要邏輯

start_task = DummyOperator(task_id='start_task', dag=dag)
end_task = DummyOperator(task_id='end_task', dag=dag)

start_task >> my_task_group >> end_task
  • 這樣就完成了在 Airflow DAG 中創建 TaskGroup 的步驟,我們就把 task1task2task3 都包進 my_task_group 當中了
  • 執行上就會先執行 start_task,接著完成 my_task_group 當中任務,最後在執行 end_task

打開 Airflow Web UI 確認

Step1:開啟 Web UI

  • 只會看到三個任務
  • 在 Web UI 上,你有兩種方式展開折疊的任務群組,點擊 Grid View 的下拉選單或是點擊 Graph 當中的 TaskGroup
    https://ithelp.ithome.com.tw/upload/images/20230926/20135427ZdnPZ8HEc3.png

Step 2:展開任務

  • 展開之後就會看到 TaskGroup 當中的 task1task2task3
    https://ithelp.ithome.com.tw/upload/images/20230926/20135427w9nItaOMvv.png

Step3 Trigger

我執行了兩次,第二次故意在 TaskGroup 中的 task2 標記 fail,可以看到整個 TaskGroup 的狀態(state)就會是 fail,所以當 Task 被綁再一起成為生命共同體 TaskGroup,他們就是要共同進退的,一個人失敗就整組失敗,沒有人可以當小組報告的冗員/images/emoticon/emoticon17.gif

https://ithelp.ithome.com.tw/upload/images/20230926/20135427b7liHuei4O.png

今天的完整程式碼

from airflow import DAG
from datetime import datetime
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup


with DAG(
    dag_id = 'task_group_dag',
    start_date = datetime(2023, 9, 26),
    schedule_interval=None
) as dag:
    with TaskGroup(group_id='my_task_group') as tg1:
        task_1 = DummyOperator(task_id='task_1')
        task_2 = DummyOperator(task_id='task_2')
        task_3 = DummyOperator(task_id='task_3')

        task_1 >> task_2 >> task_3

    start_task = DummyOperator(task_id='start_task')
    end_task = DummyOperator(task_id='end_task')

    start_task >> tg1 >> end_task

上一篇
[Day10] Airflow Variable 變數的神秘魔法
下一篇
[Day12] 零地點突破‧改-Airflow Taskflow API(上)
系列文
Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言