iT邦幫忙

2023 iThome 鐵人賽

DAY 5
1

在上一篇文章裡有提到 PythonOperator 可能是 Airflow 裡最重要的 operator 之一,那當然要花點時間來說明一下使用方法,不過其實很簡單,大都是通用的原則。

# 定義一個Python函數,這將是我們要運行的任務
def my_python_function():
    print("Hello, Airflow!")

# 使用PythonOperator來執行我們的Python函數
task = PythonOperator(
    task_id='my_task',
    python_callable=my_python_function,
    dag=dag,
)

透過 python_callable 可以指定要執行的 function reference,然後就可以執行各種彈性的事了。當然如果想用 lambda 式的寫法也沒問題 python_callable=lambda x: print("Hello, Airflow!") ,就效果來說是等同於上面的寫法的。

想必接下來會有人問,要怎麼代入參數。畢竟在現實生活中,很多時候我們要做的工作都是有條件的。

參數代入一: op_kwargs

我們稍微改寫一下範例的 operator

task = PythonOperator(
    task_id='my_task',
    python_callable=lambda x: print(x),
	  op_kwargs={"x": "Hello, Airflow"},
    dag=dag,
)

Bingo! 這樣子我們的 task 就能 print 參數了。我知道你下一句話會說:「這樣子還是等於寫死的啊?」

Airflow 也有提供一些方便的元件,但我們晚點再討論那些,先來說參數化的好處。由於 PythonOperator 通常會指向另外一個 function,我們便可以也應該對那個 function 做測試。事實上由於 Dag 整個是由 Operator 的實例組成的,如果想測試的話需要有一些特殊的前置作業跟配合 Airflow Framework 的寫法。但對某個 function 做單元測試就不用這麼麻煩,無論是用 Unittest 或是 Pytest,大都照基本寫法就好。

參數代入二: Variables

實務上我們經常需要區分開發環境、測試環境、真實環境;最簡單的例子,就是在本機開發 ETL 時,你不會想也不應該去連到 production 資料庫,進而污染了資料。

同理,有一些參數也是依賴於環境的,或許在測試環境你只想洗出前三大的熱門商品,但在正式環境你需要 20 個。Airflow 提供了 Variables 的元件來讓我們做到這件事。

再用一次同樣的例子:

def my_python_function(x1):
    x2 = Variable.get("DEMO_X2", "Default Value")
    print(f"{x1}, {x2}!")

task = PythonOperator(
		task_id='my_task',
		python_callable=my_python_function,
		op_kwargs={
		  "x1": {{var.value.DEMO_X1}}
		},
		dag=dag,
)

這裡展示了兩種寫法,第一種是在 Python Code 裡 import Variable,便可以透過它來取得 value,並設定 default 值。第二種則是在 op_kwargs 的 {{}} 寫法,用 var.value 開頭,Airflow 便會自己去找出對應的 key 並代入 value。

要注意的事,官方並不建議在 top-level 使用 Variable.get,這會對效能造成負擔。建議要寫在 def 內。


上一篇
來寫第一個 DAG 吧 - Day4
下一篇
Airflow PythonOperator(二) - Day6
系列文
用 Airflow & Flink 來開發 ETL 吧30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言