iT邦幫忙

2023 iThome 鐵人賽

DAY 16
0

昨天說明了 DAG 以及 Task,今天我們繼續說明另一個很重要的 Airflow 元件 -- Relationships 關聯關係。

在我們做的第一個 DAG 當中 Relationships 的程式其實就只有短短一行,但是他卻非常的重要,因著這一行程式,DAG 裡面所描述的 Task 彼此之間有了關聯,也可以讓 Airflow 知道該如何執行各個Task。

extract_task >> transform_task >> load_task

Relationships 定義方法

Relationships 的寫法,我們可以用另外一種寫法如下,也可以使用 Taskflow 的寫法(未來會介紹),只不過目前大部分開發皆是以上面的 bitshift 方法進行開發,因為這種方法相對簡單也易於閱讀理解。

extract.set_downstream(transform)
load.set_upstream(transform)

多重關聯

除了單一的 Relationships 之外,Task 可以有多重的關係,使得工作流程可以有許多的變化。

extract_task >> transform_task >> load_task

上面的程式碼為本來的程式碼,如果來看看如果我們想要在 load_task 之後加上兩個 task : end_task1, end_task2 要怎麼做。在這邊的範例我們使用 DummyOperator 建立 Task,DummyOperator 是一個空的 Operator 常常使用當作狀態的處理。
首先我們在原來的 first_dag.py import DummyOperator

from airflow.operators.dummy_operator import DummyOperator

引入 DummyOperator 之後我們建立再兩個 Task

end_task1 = DummyOperator(task_id="end_task1", dag=dag)
end_task2 = DummyOperator(task_id="end_task2", dag=dag)

最後將 Relationships 更改為:

    extract_task >> transform_task >> load_task
    load_task >> end_task1
    load_task >> end_task2

如此我們可以看到 Airflow 的流程變為以下流程
https://ithelp.ithome.com.tw/upload/images/20231001/20140477kWy0XI8ZtT.png

如此我們可以知道,同一個 Task 是可以對於不只一個 Task 建立 Relationships。了解這些特性後,可以幫助我們更有彈性的開發 DAG。

而上述的 Relationships 其實也可以有不同寫法:

extract_task >> transform_task >> load_task >> end_task1
load_task >> end_task2

或是我們可以將兩個 Task 以中括號合在一起進行宣告:

extract_task >> transform_task >> load_task >> [end_task1,end_task2]

了解不同寫法可以幫助我們寫出更易於理解、更好維護的程式碼。

執行條件

在我們目前開發的 DAG 當中,一個 Task 做完了,就會接著做被 Relationships 所定義的下一個 Task。一切都是美麗美好而且自然,但是現實場景真的會這麼美好嗎?
如果 Task 失敗的話怎麼辦呢?如果Task 失敗但是其他 Task 也持續想要執行呢?現實開發中會遇到種種的問題。而好消息是,Airflow 本身也強大到可以很好的處理各種狀況。在Relationships 中,其實有一個很重要的設定叫做 trigger_rules。一開始我們沒有設定 trigger_rules up,因為本身他預設為 "all_success",意思是要前面有 Relationships 都需要 success 的狀態下這個 Task 才會被觸發。
而另外 Airflow 有其他的 trigger rules 可以進行定義:

  • all_success (default):所有上游 Task 都成功完成
  • all_failed:所有上游 Task 處於失敗或上游失敗狀態
  • all_done:所有上游 Task 皆已完成執行
  • all_skipped:所有上游 Task 均處於跳過狀態
  • one_failed:至少有一個上游任務失敗(不等待所有上游任務完成)
  • one_success:至少有一個上游任務成功(不等待所有上游任務完成)
  • one_done:至少有一個上游 Task 成功或失敗
  • none_failed:所有上游 Task 都沒有失敗以及其上游也都沒有失敗 - 也就是說,所有上游 Task 都已成功或已跳過
  • none_failed_min_one_success:所有上游 Task 都沒有失敗及其上游也都沒有失敗,並且至少有一個上游 Task 成功
  • none_skipped:沒有上游 Task 處於跳過狀態, 也就是說,所有上游任務均處於成功、失敗或上游失敗狀態
  • always:完全沒有依賴關係,可以隨時運行此任務

透過這些 trigger_rules 進行排列組合之後,我們可以組裝成最適合專案情景的 DAG。

了解了 Relationships 對我們在編寫 DAG 的時候是非常重要的,因為 各個 Task 有彼此的順序關係和相依關係,透過 Relationships 可以準確地對流程進行定義。


上一篇
『Day15』如何使用 DAG , Task
下一篇
『Day17』DAG 執行方式與排程
系列文
Data pipeline 建起來!用 Airflow 開發你的 Data pipeline30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言