iT邦幫忙

2023 iThome 鐵人賽

DAY 6
0

Airflow PythonOperator(二)

Param

今天延續上週說的 PythonOperator 的一些使用方法,除了 Variable 之外,Airflow 還能設定 param。一樣我們先來看一段 code


with DAG(
     "the_dag",
     params={
         "x1": "Hello"
     },
 ) as dag:

		def my_python_function(x1, **context):
				x2 = context["params"]["x2"]
				print(f"{x1}, {x2}!")
		
		task = PythonOperator(
				task_id='my_task',
				python_callable=my_python_function,
        params={"x2": "Airflow"},
				op_kwargs={
				  "x1": {{params.x1}}
				},
				dag=dag,
		)

跟 variable 類似,param 也可以透過 template 的方式代入,並且可以指定在 DAG 層級或是 Task 層級。只是它不是全域屬性,所以如果要在程式內取得,只能從 dag context 內用 dict 的方式取出。也就是這行 x2 = context["params"]["x2"]

那它跟 Variable 差在哪呢?

  1. 概念上不是依賴於環境,而是 DAG 執行期的參數。
  2. 在執行時可以代入參數去變更這次執行的內容,有時候 Variable 會有好幾個 DAG 同時使用,但 param 就是單一一個 DAG 內的變數。
  3. 在 Web UI 上可以直接輸入

讓我們配點圖片,以下是 Web UI 的按鈕

Untitled

第二個就是指定參數了,按下去之後就可以看到這個畫面:

Untitled

這裡的 env, from_env 是我這隻 DAG 用到的 param,如果是上面的範例程式應該就會顯示 x1, x2

如果是指定 param 執行的話,也可以在 DAG 的 detail 內看到當時的參數

Untitled

小結

Operator 是靈活且可以指定參數的,我們在設計時如果好好規劃,不但能方便測試,也能依環境跟需求來執行 dag。

Variable 通常會依環境不同做調整,且它是全域物件,所有 DAG 都會讀到相同的值。

Param 則是依附於 DAG,但可以另外要求執行不同參數。例如有隻 DAG 平常都是撈取過去 30 天的資料,但你臨時需要一個 60 天份的,不需要修改程式,只要在手動啟動時設定參數就好。


上一篇
Airflow PythonOperator (一) - Day5
下一篇
Airflow Variable 設定 - Day7
系列文
用 Airflow & Flink 來開發 ETL 吧30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言